Processor¶
Below you will find details that cover writing a snap processor plugin in Python.
Plugin Naming¶
The first naming decision you will need to make will be for your project/repository. For this we suggest the following format.
snap-plugin-<TYPE>-<PLUGIN_NAME>
TYPE is either collector, processor or publisher and the PLUGIN_NAME is the same name that your plugin will provide Snap when it does its initial handshake (more on the later).
Creating a processor¶
In order to write a processor plugin you will want to start by defining a class
that inherits from snap_plugin.v1.processor.Processor
.
1 2 3 | import snap_plugin.v1 as snap
class Rand(snap.Processor):
|
- The next step is to provide an implementation for the following methods:
We’ll start with get_config_policy()
¶
This method is called by the framework when a plugin loads. It communicates what configuration items are needed, whether they are required or optional and what default values and constraints (min, max) the values may have.
1 2 3 4 5 6 7 8 9 10 11 12 | def get_config_policy(self):
return snap.ConfigPolicy(
[
None,
[
(
"instance-id",
snap.StringRule(default="xyz-abc-qwerty")
)
]
]
)
|
Policies are grouped by namespaces for collector plugins however since this is
a processor plugin it should be set to None
. The single
IntegerRule
defines a config item “instance-id” with the default
value “xyz-abc-qwerty”. We could have choosen to make this a required field
and not provided a default
which would have had the affect of requiring that
any task that leveraged this plugin to include a config item providing the
required value.
Next we have process
.
process()
¶
This method is called by the framework when a task manifest includes the use of a processor plugin like in the following example task.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | ---
version: 1
schedule:
type: "simple"
interval: "1s"
workflow:
collect:
metrics:
/random/float64: {}
/random/int64: {}
/random/string: {}
process:
- plugin_name: "tag-py"
publish:
- plugin_name: "file-py"
|
Below is an example implementation of a
process()
method.
1 2 3 4 | def process(self, metrics, config):
for metric in metrics:
metric.tags["instance-id"] = config["instance-id"]
return metrics
|
This example demonstrates using the config to add a tag to the metrics before returning them. A less trivial demonstration of adding context (tags) to a metric could have involved calling out to another system to obtain additional context (the owner, datacenter, instance-id, etc..). Besides adding adding additional context to a metric processor plugins are a great place to apply functions like ‘min’, ‘max’, ‘mean’ etc.