Overview

This collector writes messages to a single Kafka cluster. It supports protocols that create dynamic datafields on the fly (e.g. the FIX protocol decoder).

Stack Configuration

This is a message collector with module name “msg_kafka_dynamic”.

Parameters

Stack configuration parameters

Tables

global_configuration

Key value pairs for configuring librdkafka. See https://github.com/edenhill/librdkafka/blob/v1.9.0/CONFIGURATION.md

Key value pairs for librdkafka

json_property_mapping

This table is optional. If configured, it can be used to specify properties to write to Kafka.

Optional json_property_mapping parameters

Example

Stack Configuration

{
"probe": {
"parameters": {
"name": "test",
"protocols": [
{
"type": "module",
"value": "ethernet"
},
{
"type": "module",
"value": "ip"
},
{
"type": "module",
"value": "fix",
"id": "decFix"
}
],
"msg_collector": [
{
"type": "module",
"value": "msg_kafka_dynamic",
"id": "collMsgKafkaDynamic"
}
]
},
"tables": {
"static_datafields": [
{
"name": "fixed_topic",
"type": "string",
"value": "test.topic"
}
]
}
},
"decFix": {
"parameters": {
"ddc": "*:string"
}
},
"collMsgKafkaDynamic": {
"parameters": {
"payload_format": "json",
"topic_datafield": "fixed_topic",
"timestamp_datafield": "TIMESTAMP",
"add_other_fields": true,
"json_replace_dot_with_underscore": true,
"json_group_list": [
"eth",
"ip",
"tcp",
"fix"
]
},
"tables": {
"global_configuration": [
{
"key": "bootstrap.servers",
"value": "kafka-cluster:9092"
}
],
"json_property_mapping": [
{
"datafield_name": "ip.src_host",
"json_property_name": "src_ip",
"json_group_name": "ip"
},
{
"datafield_name": "ip.dst_host",
"json_property_name": "dst_ip",
"json_group_name": "ip"
},
{
"datafield_name": "ip.src_port",
"json_property_name": "src_port",
"json_group_name": "tcp"
},
{
"datafield_name": "ip.dst_port",
"json_property_name": "dst_port",
"json_group_name": "tcp"
},
{
"datafield_name": "TIMESTAMP",
"json_property_name": "packet_timestamp"
}
]
}
}
}

Output

For an example FIX message, topic would be “test.topic”, payload would be as below:

{
"fields": {
"hostname": "beeksanalytics",
"probe_name": "test",
"fixed_topic": "test.topic",
"packet_timestamp": 1448733575877513000
},
"groups": [
{
"name": "eth",
"fields": {
"type": 2048,
"src": "00:00:00:00:00:00",
"dst": "00:00:00:00:00:00",
"ig": false,
"len": 166
}
},
{
"name": "ip",
"fields": {
"id": 3653,
"len": 152,
"ttl": 64,
"tos": 0,
"checksum": 11801,
"src_ip": "127.0.0.1",
"dst_ip": "127.0.0.1",
"proto": 6
}
},
{
"name": "tcp",
"fields": {
"window_size_value": 342,
"window_size_scalefactor": 7,
"window_size": 43776,
"src_port": 53867,
"dst_port": 11001
}
},
{
"name": "fix",
"fields": {
"version": "FIXT.1.1",
"MsgSeqNum": 1,
"MsgType": "A",
"size": 100,
"SenderCompID": "DLD_TEX",
"TargetCompID": "TEX1_DLD",
"35": "A",
"49": "DLD_TEX",
"56": "TEX1_DLD",
"34": "1",
"52": "20151128-17:59:35.877",
"98": "0",
"108": "10",
"1137": "8",
"10": "035"
}
}
]
}