Skip to content

Custom policies with OPA in Steampunk Spotter

Introduction

Rego Language for Open Policy Agent (OPA) is a powerful policy creation language that allows you to customize and enhance your Ansible automation scans.

In this document, you will learn how to write your own Rego files for use with Steampunk Spotter to further strengthen the quality of your Ansible projects.

Note

Custom policies functionality is subject to subscriptions and licensing.

To follow along the examples, we recommend that you prepare as follows:

  • In the Steampunk Spotter App, create a project that you can use for learning and experimenting with the use of custom policies. See how.
  • Note down the --project-id <your ID> as it appears on the screen.
  • Create and edit the playbook .yml files in your working directory.
  • For creating and editing Rego files, you can use the Custom policies functionality of the Steampunk Spotter App. See how.
  • To run a scan of your playbook, execute:
spotter scan --project-id <your ID> playbook_name.yml

Hello World in Rego files

The following Rego file is an example of a minimal policy to be run as a custom check in Steampunk Spotter:

package hello_world

import rego.v1

my_first_rule contains result if {
    result := {
        "check_type": "OTHER",
        "message": "Hello world - printed for every scan and exactly once.",
    }
}

The following sections describe this policy's building blocks.

Package name

The package is obligatory keyword of the Rego language and must be present as the first statement in the file. The keyword is followed by your package name, which is not used by Steampunk Spotter, therefore we are free to choose any name. Any string of alpha-numeric characters or the dot may be used as a package name.

Rego v1

Adding the line import rego.v1 ensures that the policies can be run on rego engine v0 and v1. It enforces that all the code is written in the new format.

Rule

A rule is constructed from 3 parts:

  • The rule name – in our case my_first_rule.
  • The variable that holds return value – result.
  • The rule's body { result := ...}.

The rule's name should be descriptive for the rule. However, the value is not used by Steampunk Spotter.

The name of the variable that holds the return value can also be anything you want, just be careful to use it in the rule body.

In the example, the value assigned to the return variable is a dictionary containing the following elements:

  • message: this is the message that Spotter shows in the check result.
  • check_type: one of TASK, PLAY, or OTHER. In the example, we use OTHER as the type that best matches the policy.

Writing task policies

Tasks are the most typical elements of an Ansible playbook. The following example shows an Ansible file my_playbook_task.yml that contains a single task:

---
- name: remove service now incident
  servicenow.itsm.incident:
    number: I1234
    state: absent
  register: result

A policy that evaluates this and other playbooks needs to refer to the elements of the task file as prepared by the Spotter client. You can create a JSON representation of the playbook's contents by running:

spotter scan --export-payload spotter_payload.json my_playbook_task.yml

The resulting spotter_payload.json may look like the following JSON:

{
  "tasks": [
    {
      "task_id": "15a212ee-097d-444e-98d6-4d3cf4ab2701",
      "play_id": null,
      "block_id": null,
      "task_args": {
        "name": "remove service now incident",
        "servicenow.itsm.incident": {
          "number": "I1234",
          "state": "absent",
          "register": "result"
        }
      },
      "spotter_metadata": {},
      "spotter_obfuscated": [],
      "spotter_noqa": []
    }
  ],
  "playbooks": [],
  "environment": { ... },
  "dynamic_inventories": [],
  "roles": [],
  "blocks": [],
  "plugins": [],
  "variables": [],
  "statistics": {
    "included_files_count": 1,
    "excluded_paths_count": 0
  }
}

Spotter wraps this structure in a special keyword input, which we can use in our policy. For example, to get the actual arguments of the task with index i in the input, the policy might use input.tasks[i].task_args.

Evaluating task arguments

In the following example, the policy requires that each task has a name. This way, Spotter helps ensure that we follow the recommended pattern that a task is described with a name that is visible while a playbook is being executed rather than having the description in comments.

package hello_world

import rego.v1

check_name_is_set contains result if {
    some task in input.tasks
    not task.task_args.name

    result := {
        "check_type": "TASK",
        "message": "Task is missing a name",
    }
}

Here, we used a some task in input.tasks pattern where one might expect a loop and an index. In a Rego language, loops are implicit, and an expression such as some value in list means that there exists some value in the list where all the subsequent statements are true.

Running a scan of my_playbook_task.yml does not result in any errors. This is normal because our example task contains a name, therefore the policy evaluation succeeds.

To see the effect of the policy, let's have another playbook file, no_names.yml, with the following content:

---
- ansible.builtin.command: ls /tmp
  changed_when: false

- servicenow.itsm.incident:
    number: I1234
    state: absent
  register: result

Scanning this file, we get:

:0:0: ERROR: [E2303] Task is missing a name.

This result is only useful to a point, but it would be much better if Spotter could tell us exactly where it found the tasks that resulted in these errors.

This is possible by referencing task.task_id, which contains an identifier unique to each task. By including correlation_id in the returned result, Spotter can correlate the policy outcome with a specific input element:

package hello_world

import rego.v1

check_name_is_set contains result if {
    some task in input.tasks
    not task.task_args.name

    result := {
        "check_type": "TASK",
        "correlation_id": task.task_id,
        "message": "Task is missing a name",
    }
}

Now, a scan output will look like:

no_names.yml:2:3: ERROR: [E2303] Task is missing a name.
no_names.yml:5:3: ERROR: [E2303] Task is missing a name.

If we are running scans in an IDE's terminal console window, we can then hold down the Ctrl key and click on the no_names.yml:... part of the output to have the IDE navigate directly to the identified line of the input file.

Returning custom messages

So far, the policies ensure that the scanned playbooks' tasks all have a name. But we might have further requirements on the contents of the task names. For example, we would like that each name starts with an upper case letter. For the tasks where the policy isn't met, we want to display the problematic task name.

To do that, we edit our Rego file to append the following rule:

package hello_world

import rego.v1

first_letter_capital contains result if {
    some task in input.tasks
    name = task.task_args.name
    not regex.match(`^[A-Z]`, name)

    result := {
        "check_type": "TASK",
        "correlation_id": task.task_id,
        "message": sprintf("Task name should start with a capital letter. Current value is '%s'", [name]),
    }
}

Scanning my_playbook_task.yml, we obtain the following result:

my_playbook_task.yml:2:3: ERROR: [E2303] Task name should start with a capital letter. Current value is 'remove service now incident'.

More information on how function regex.match is defined and how exactly the negation with keyword not works is available at the official Rego documentation.

By using sprintf, we can construct input-specific messages using the %s, which OPA replaces with the value from elements of the array parameter. The usage should be familiar to any C or Go language programmers.

Spotter will show any text enclosed with single quote signs ' .. ' in an emphasized form. In our example, the text will appear as:

Task name should start with a capital letter. Current value is remove service now incident.

Return value schema

The following schema describes valid return variable dictionaries:

{
    "check_type": "TASK" | "PLAY" | "VAR" | "OTHER",
    "correlation_id": task.task_id | play.play_id | variable.variable_id | null,
    "message": str,
    "subcode": str | null,
    "submessage": str | null,
    "level": "error" | "hint" | "warning" | null  # default is error
}