Publisher

Below you will find details that cover writing a Snap publisher plugin in Python.

Plugin Naming

The first naming decision you will need to make will be for your project/repository. For this we suggest the following format.

snap-plugin-<TYPE>-<PLUGIN_NAME>

TYPE is either collector, processor or publisher and the PLUGIN_NAME is the same name that your plugin will provide Snap when it does its initial handshake (more on the later).

Creating a publisher

In order to write a publisher plugin you will want to start by defining a class that inherits from snap_plugin.v1.publisher.Publisher.

1
2
3
import snap_plugin.v1 as snap

class Rand(snap.Publisher):
The next step is to provide an implementation for the following methods:

We’ll start with get_config_policy()

This method is called by the framework when a plugin loads. It communicates what configuration items are needed, whether they are required or optional and what default values and constraints (min, max) the values may have.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def get_config_policy(self):
    return snap.ConfigPolicy(
        [
            None,
            [
                (
                    "file",
                    snap.StringRule(default="/tmp/snap-py.out")
                )
            ]
        ],
    )

Policies are grouped by namespaces for collector plugins however since this is a publisher plugin it should be set to None. The single StringRule defines a config item “file” with the default value “/tmp/snap-py.out” defining what file metrics will be published to. We could have choosen to make this a required field and not provided a default which would have had the affect of requiring that any task that leveraged this plugin to include a config item providing the required value.

Next we have publish().

publish()

This method is called by the framework when a task manifest includes the use of a publisher plugin like in the following example task.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
---
 version: 1
 schedule:
     type: "simple"
     interval: "1s"
 workflow:
     collect:
         metrics:
             /random/float64: {}
             /random/int64: {}
             /random/string: {}
         process:
             - plugin_name: "tag-py"
               publish:
                 - plugin_name: "file-py"

Below is an example implementation of a publish() method.

1
2
3
4
5
6
7
def publish(self, metrics, config):
    if len(metrics) > 0:
            with open(config["file"], 'a') as outfile:
                for metric in metrics:
                    outfile.write(
                        json_format.MessageToJson(
                            metric._pb, including_default_value_fields=True))

This example demonstrates using the config to define the location of the file we will publish to.