So you have installed stackstorm and the kafka pack, but now we need to configure the pack to consume and produce messages on kafka.
The first thing that you should check out is the stackstorm documentation on how packs are configured.
After you have read that you will know that in the pack’s repo there is a schema file describing how the configuration should look like called: config.schema.yaml
The kafka pack repo also has an example of the configuration that looks like this: kafka.yaml.example
The example config file looks like this:
---
# Used by the produce action
client_id: st2-kafka-producer
hosts: 172.16.1.10:9092
# Used for the kafka sensor
message_sensor:
client_id: testclientID
group_id: testgroupID
hosts: 172.16.1.10:9092
topics:
- test
# Used by the gcp kafka sensor
gcp_message_sensor:
client_id: st2gcpclient
group_id: st2gcpgroup
hosts: 172.16.1.10:9092
topics:
- gcp_stackdriver_topic
Now put this file in: /opt/stackstorm/configs
Call it: kafka.yaml
No Auth?
I’m a bit staggered that there is no authentication fields. I suppose I will find out a bit later.
Now let us test if we are getting an messages.
Configuration files are not read dynamically at run-time. Instead, they must be registered, and values are then loaded into the StackStorm DB.
You can register the new config wth:
st2ctl reload --register-configs
Remember static values you don’t want stored in the database can be referenced with:
api_secret: "{{st2kv.user.api_secret}}"
and stored with:
st2 key set private_key_path "/home/myuser/.ssh/my_private_rsa_key"
Lets test it
To test producing a message use:
st2 run kafka.produce topic=test message='StackStorm meets Apache Kafka'
For me, it failed with:
............
id: 5cd3cffe9dc6d63a924c0635
status: failed
parameters:
message: StackStorm meets Apache Kafka
topic: test
result:
exit_code: 1
result: None
stderr: "No handlers could be found for logger "kafka.conn"
...
So there was no handler for the logger, which I assumed to be some issue where no logger was set due to the mode. I switched to debug mode on. That didn’t work…
So I ended up editing the file add adding logging to the stackstorm-kafka pack as per this stackoverflow answer.
I added this to the top of /opt/stackstorm/packs/kafka/actions/produce.py
:
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
level=logging.DEBUG
)
and saw that it was querying for brokers and topics and getting the response, however the host
part of the response was kafka
and not an ip address so it didn’t know where to go.
So the kafka server need to be configured so broker configs have the following in:
advertised.listeners = 'xxx.xx.xxx.xxx'
So I fixed it in the interrum by addding to /etc/hosts
:
xxx.xxx.xx.xxx kafka
Still an error
The kafka pack is old and still gave an error, when returning the response, so the action failed (yet the message was sent to kafka).
Testing receiving Triggers
Lets test that stackstorm can receive messages from kafka, run criteria on them and send another message out.
How about we say hello
and have stackstorm say hello back to us.
We have the trigger and action parts we just need to bind them together using a stackstorm rule.
List available triggers:
st2 trigger list
Get info about the trigger we want to use and take note of the payload parameters:
st2 trigger get kafka.new_message
The rule I created:
[cent@st2 rules]$ cat repeat_message.yaml
---
name: "repeat_kafka_messages"
pack: "custom"
description: "Repeat the previous message received"
enabled: true
trigger:
type: "kafka.new_message"
criteria: # optional
trigger.topic:
type: "equals"
pattern : "test"
trigger.message:
type: "iequals"
pattern : "hello"
action: # required
ref: "kafka.produce"
parameters:
topic: "test" # optional
message: "topic: {{ trigger.topic }}\nmessage: {{ trigger.message }}\nparition: {{ trigger.partition }}\nkey: {{ trigger.key }}\noffset:{{ trigger.offset }}"
Now to create the rule:
st2 rule create /usr/share/doc/st2/examples/rules/sample_rule_with_webhook.yaml
then reload the rules:
st2ctl reload --register-rules
I tested the rule and it worked well, but it wasn’t actually sensing the message when added to the topic. Now the struggle of finding out why…
Traces
Traces track all triggers, actions and rules…this will help me debug.
If I check the traces with st2 trace list
, there are no trigger instances in the last hour, even though I just send some messages into the test topic.
Logs
I got more lucky looking through the source code and finding a log:
self._logger.debug('[KafkaMessageSensor]: Initializing consumer ...')
but it is logging on the DEBUG
log level, but the default level is INFO
in /etc/st2/logging.sensorcontainer.conf
.
So I changed it to DEBUG
:
[logger_root]
level=INFO
handlers=consoleHandler, fileHandler, auditHandler
[handler_consoleHandler]
class=StreamHandler
level=INFO
formatter=simpleConsoleFormatter
args=(sys.stdout,)
[handler_fileHandler]
class=handlers.RotatingFileHandler
level=INFO
formatter=verboseConsoleFormatter
args=("/var/log/st2/st2sensorcontainer.log",)
then I saw in the logs that the payload validation was failing:
2019-05-10 05:42:03,589 140451038122704 WARNING trigger_dispatcher [-] Failed to validate payload ({'topic': 'test', 'message': 'hello', 'partition': 0, 'key': None, 'offset': 52}) for trigger "kafka.new_message": 'hello' is not of type u'object', 'null'
Failed validating u'type' in schema[u'properties'][u'message']:
{u'description': u'Captured message. JSON-serialized messages are automatically parsed',
u'type': [u'object', 'null']}
On instance[u'message']:
'hello'
2019-05-10 05:42:03,590 140451038122704 WARNING trigger_dispatcher [-] Trigger payload validation failed and validation is enabled, not dispatching a trigger "kafka.new_message" ({'topic': 'test', 'message': 'hello', 'partition': 0, 'key': None, 'offset': 52}): 'hello' is not of type u'object', 'null'
Failed validating u'type' in schema[u'properties'][u'message']:
{u'description': u'Captured message. JSON-serialized messages are automatically parsed',
u'type': [u'object', 'null']}
On instance[u'message']:
'hello'
which I raised on their github.