Processor

Below you will find details that cover writing a snap processor plugin in Python.

Plugin Naming

The first naming decision you will need to make will be for your project/repository. For this we suggest the following format.

snap-plugin-<TYPE>-<PLUGIN_NAME>

TYPE is either collector, processor or publisher and the PLUGIN_NAME is the same name that your plugin will provide Snap when it does its initial handshake (more on the later).

Creating a processor

In order to write a processor plugin you will want to start by defining a class that inherits from snap_plugin.v1.processor.Processor.

1
2
3
import snap_plugin.v1 as snap

class Rand(snap.Processor):
The next step is to provide an implementation for the following methods:

We’ll start with get_config_policy()

This method is called by the framework when a plugin loads. It communicates what configuration items are needed, whether they are required or optional and what default values and constraints (min, max) the values may have.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def get_config_policy(self):
    return snap.ConfigPolicy(
        [
            None,
            [
                (
                    "instance-id",
                    snap.StringRule(default="xyz-abc-qwerty")
                )
            ]
        ]
    )

Policies are grouped by namespaces for collector plugins however since this is a processor plugin it should be set to None. The single IntegerRule defines a config item “instance-id” with the default value “xyz-abc-qwerty”. We could have choosen to make this a required field and not provided a default which would have had the affect of requiring that any task that leveraged this plugin to include a config item providing the required value.

Next we have process.

process()

This method is called by the framework when a task manifest includes the use of a processor plugin like in the following example task.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
---
 version: 1
 schedule:
     type: "simple"
     interval: "1s"
 workflow:
     collect:
         metrics:
             /random/float64: {}
             /random/int64: {}
             /random/string: {}
         process:
             - plugin_name: "tag-py"
               publish:
                 - plugin_name: "file-py"

Below is an example implementation of a process() method.

1
2
3
4
def process(self, metrics, config):
    for metric in metrics:
        metric.tags["instance-id"] = config["instance-id"]
    return metrics

This example demonstrates using the config to add a tag to the metrics before returning them. A less trivial demonstration of adding context (tags) to a metric could have involved calling out to another system to obtain additional context (the owner, datacenter, instance-id, etc..). Besides adding adding additional context to a metric processor plugins are a great place to apply functions like ‘min’, ‘max’, ‘mean’ etc.