Categories
auto-remediation stackstorm

Configure Kafka and Stackstorm config

So you have installed stackstorm and the kafka pack, but now we need to configure the pack to consume and produce messages on kafka.

kafka-stackstorm-action

The first thing that you should check out is the stackstorm documentation on how packs are configured.

After you have read that you will know that in the pack’s repo there is a schema file describing how the configuration should look like called: config.schema.yaml

The kafka pack repo also has an example of the configuration that looks like this: kafka.yaml.example

The example config file looks like this:


---
# Used by the produce action
client_id: st2-kafka-producer
hosts: 172.16.1.10:9092
# Used for the kafka sensor
message_sensor:
  client_id: testclientID
  group_id: testgroupID
  hosts: 172.16.1.10:9092
  topics:
  - test
# Used by the gcp kafka sensor
gcp_message_sensor:
  client_id: st2gcpclient
  group_id: st2gcpgroup
  hosts: 172.16.1.10:9092
  topics:
- gcp_stackdriver_topic

Now put this file in: /opt/stackstorm/configs

Call it: kafka.yaml

No Auth?

I’m a bit staggered that there is no authentication fields. I suppose I will find out a bit later.

Now let us test if we are getting an messages.

Configuration files are not read dynamically at run-time. Instead, they must be registered, and values are then loaded into the StackStorm DB.

You can register the new config wth:

st2ctl reload --register-configs

 

Remember static values you don’t want stored in the database can be referenced with:

api_secret: "{{st2kv.user.api_secret}}"

and stored with:

st2 key set private_key_path "/home/myuser/.ssh/my_private_rsa_key"

Lets test it

To test producing a message use:

st2 run kafka.produce topic=test message='StackStorm meets Apache Kafka'

For me, it failed with:


............
id: 5cd3cffe9dc6d63a924c0635
status: failed
parameters: 
  message: StackStorm meets Apache Kafka
  topic: test
result: 
  exit_code: 1
  result: None
  stderr: "No handlers could be found for logger "kafka.conn"
...

So there was no handler for the logger, which I assumed to be some issue where no logger was set due to the mode. I switched to debug mode on. That didn’t work…

So I ended up editing the file add adding logging to the stackstorm-kafka pack as per this stackoverflow answer.

I added this to the top of /opt/stackstorm/packs/kafka/actions/produce.py:


logging.basicConfig(
    format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
    level=logging.DEBUG
)

and saw that it was querying for brokers and topics and getting the response, however the host part of the response was kafka and not an ip address so it didn’t know where to go.
So the kafka server need to be configured so broker configs have the following in:


advertised.listeners = 'xxx.xx.xxx.xxx'

So I fixed it in the interrum by addding to /etc/hosts:


xxx.xxx.xx.xxx kafka

Still an error

The kafka pack is old and still gave an error, when returning the response, so the action failed (yet the message was sent to kafka).

 Testing receiving Triggers

Lets test that stackstorm can receive messages from kafka, run criteria on them and send another message out.
How about we say hello and have stackstorm say hello back to us.

We have the trigger and action parts we just need to bind them together using a stackstorm rule.

List available triggers:

st2 trigger list

Get info about the trigger we want to use and take note of the payload parameters:

 

st2 trigger get kafka.new_message

The rule I created:


[cent@st2 rules]$ cat repeat_message.yaml 
---

name: "repeat_kafka_messages"
pack: "custom"
description: "Repeat the previous message received"
enabled: true

    trigger:
        type: "kafka.new_message"

    criteria:                              # optional
        trigger.topic:
            type: "equals"
            pattern : "test"
        trigger.message:
            type: "iequals"
            pattern : "hello"

    action:                                # required
        ref: "kafka.produce"
        parameters:
            topic: "test"                        # optional
            message: "topic: {{ trigger.topic }}\nmessage: {{ trigger.message }}\nparition: {{ trigger.partition }}\nkey: {{ trigger.key }}\noffset:{{ trigger.offset }}"

Now to create the rule:

st2 rule create /usr/share/doc/st2/examples/rules/sample_rule_with_webhook.yaml

then reload the rules:

st2ctl reload --register-rules

I tested the rule and it worked well, but it wasn’t actually sensing the message when added to the topic. Now the struggle of finding out why…

Traces

Traces track all triggers, actions and rules…this will help me debug.

If I check the traces with st2 trace list, there are no trigger instances in the last hour, even though I just send some messages into the test topic.

Logs

I got more lucky looking through the source code and finding a log:

self._logger.debug('[KafkaMessageSensor]: Initializing consumer ...')

but it is logging on the DEBUG log level, but the default level is INFO in /etc/st2/logging.sensorcontainer.conf.

So I changed it to DEBUG:


[logger_root]
level=INFO
handlers=consoleHandler, fileHandler, auditHandler

[handler_consoleHandler]
class=StreamHandler
level=INFO
formatter=simpleConsoleFormatter
args=(sys.stdout,)

[handler_fileHandler]
class=handlers.RotatingFileHandler
level=INFO
formatter=verboseConsoleFormatter
args=("/var/log/st2/st2sensorcontainer.log",)

then I saw in the logs that the payload validation was failing:


2019-05-10 05:42:03,589 140451038122704 WARNING trigger_dispatcher [-] Failed to validate payload ({'topic': 'test', 'message': 'hello', 'partition': 0, 'key': None, 'offset': 52}) for trigger "kafka.new_message": 'hello' is not of type u'object', 'null'

Failed validating u'type' in schema[u'properties'][u'message']:
    {u'description': u'Captured message. JSON-serialized messages are automatically parsed',
     u'type': [u'object', 'null']}

On instance[u'message']:
    'hello'
2019-05-10 05:42:03,590 140451038122704 WARNING trigger_dispatcher [-] Trigger payload validation failed and validation is enabled, not dispatching a trigger "kafka.new_message" ({'topic': 'test', 'message': 'hello', 'partition': 0, 'key': None, 'offset': 52}): 'hello' is not of type u'object', 'null'

Failed validating u'type' in schema[u'properties'][u'message']:
    {u'description': u'Captured message. JSON-serialized messages are automatically parsed',
     u'type': [u'object', 'null']}

On instance[u'message']:
    'hello'

which I raised on their github.