Overview
This collector writes messages to a single Kafka cluster. It supports protocols that create dynamic datafields on the fly (e.g. the FIX protocol decoder).
Stack Configuration
This is a message collector with module name “msg_kafka_dynamic”.
Parameters
Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
payload_format | enum (json) | Y | Encoding of the Kafka message payload | |
max_batch_size | uint64 | N | Not specified | Maximum batch size to insert |
batch_timeout_ms | uint64 | N | Not specified | Timeout before inserting a batch in milliseconds |
request_pool_initial_size | uint64 | N | Not specified | Initial size of the request pool. Each request inserts 1 message |
request_pool_maximum_size | uint64 | N | Unbounded | Maximum size of the request pool. If the insertion in to Kafka can not keep up with the incoming message rate, this parameter can limit the maximum pool size |
request_pool_allocation_batch_size | uint64 | N | Not specified | Number of new requests to allocate at a time if the request pool needs to grow |
request_pool_blocking | bool | N | true | Whether to block decoding packets and wait for requests to be returned back to the pool (true) or drop messages and not write them to Kafka (false) |
field_pool_initial_size | uint64 | N | Not specified | Initial size of the field pool. Fields are added to requests to write messages |
field_pool_allocation_batch_size | uint64 | N | Not specified | Number of new fields to allocate at a time if the field pool needs to grow |
kafka_payload_pool_initial_size | uint64 | N | Not specified | Initial size of the Kafka payload pool. Each Kafka payload inserts 1 message |
kafka_payload_pool_maximum_size | uint64 | N | Unbounded | Maximum size of the Kafka payload pool. If the insertion in to Kafka can not keep up with the incoming message rate, this parameter can limit the maximum pool size |
kafka_payload_pool_allocation_batch_size | uint64 | N | Same as request_pool_allocation_batch_size | Number of new Kafka payloads to allocate at a time if the Kafka payload pool needs to grow |
kafka_payload_pool_blocking | bool | N | Same as request_pool_blocking | Whether to block building payloads and wait for Kafka messages to finish being sent (true) or drop messages and not write them to Kafka (false) |
topic_datafield | string | Y | Datafield to use as the Kafka message topic | |
key_datafield | string | N | No message key | Datafield to use as the Kafka message key |
timestamp_datafield | string | N | Broker generates timestamp | Datafield name to use as the Kafka message timestamp |
add_other_fields | bool | N | false | Whether to add fields that that have not been explicitly configured to the message |
json_replace_dot_with_underscore | bool | N | false | Whether to replace '.' in JSON property names with '_' |
json_potentially_large_integers_as_strings | bool | N | false | Whether to store potentially large integers in JSON as strings |
json_group_list | array of string | N | No grouping | List of JSON groups which will be used for grouping datafields explicitly from json_property_mapping or by datafield prefix (part of the datafield before the first '.') if add_other_fields is true. |
ignore_binary_datafields | bool | N | true | Whether to ignore binary datafields |
Tables
global_configuration
Key value pairs for configuring librdkafka. See https://github.com/edenhill/librdkafka/blob/v1.9.0/CONFIGURATION.md
Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
key | string | Y | Configuration key | |
value | string | Y | Configuration value |
json_property_mapping
This table is optional. If configured, it can be used to specify properties to write to Kafka.
Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
datafield_name | string | Y | Datafield name | |
json_property_name | string | Y | JSON property name | |
json_type | enum (boolean | number | string) | N | Suitable type for the datafield | JSON type |
json_group_name | string | N | Ungrouped | JSON group name for grouping datafields in the JSON |
Example
Stack Configuration
{ "probe": { "parameters": { "name": "test", "protocols": [ { "type": "module", "value": "ethernet" }, { "type": "module", "value": "ip" }, { "type": "module", "value": "fix", "id": "decFix" } ], "msg_collector": [ { "type": "module", "value": "msg_kafka_dynamic", "id": "collMsgKafkaDynamic" } ] }, "tables": { "static_datafields": [ { "name": "fixed_topic", "type": "string", "value": "test.topic" } ] } }, "decFix": { "parameters": { "ddc": "*:string" } }, "collMsgKafkaDynamic": { "parameters": { "payload_format": "json", "topic_datafield": "fixed_topic", "timestamp_datafield": "TIMESTAMP", "add_other_fields": true, "json_replace_dot_with_underscore": true, "json_group_list": [ "eth", "ip", "tcp", "fix" ] }, "tables": { "global_configuration": [ { "key": "bootstrap.servers", "value": "kafka-cluster:9092" } ], "json_property_mapping": [ { "datafield_name": "ip.src_host", "json_property_name": "src_ip", "json_group_name": "ip" }, { "datafield_name": "ip.dst_host", "json_property_name": "dst_ip", "json_group_name": "ip" }, { "datafield_name": "ip.src_port", "json_property_name": "src_port", "json_group_name": "tcp" }, { "datafield_name": "ip.dst_port", "json_property_name": "dst_port", "json_group_name": "tcp" }, { "datafield_name": "TIMESTAMP", "json_property_name": "packet_timestamp" } ] } }}Output
For an example FIX message, topic would be “test.topic”, payload would be as below:
{ "fields": { "hostname": "beeksanalytics", "probe_name": "test", "fixed_topic": "test.topic", "packet_timestamp": 1448733575877513000 }, "groups": [ { "name": "eth", "fields": { "type": 2048, "src": "00:00:00:00:00:00", "dst": "00:00:00:00:00:00", "ig": false, "len": 166 } }, { "name": "ip", "fields": { "id": 3653, "len": 152, "ttl": 64, "tos": 0, "checksum": 11801, "src_ip": "127.0.0.1", "dst_ip": "127.0.0.1", "proto": 6 } }, { "name": "tcp", "fields": { "window_size_value": 342, "window_size_scalefactor": 7, "window_size": 43776, "src_port": 53867, "dst_port": 11001 } }, { "name": "fix", "fields": { "version": "FIXT.1.1", "MsgSeqNum": 1, "MsgType": "A", "size": 100, "SenderCompID": "DLD_TEX", "TargetCompID": "TEX1_DLD", "35": "A", "49": "DLD_TEX", "56": "TEX1_DLD", "34": "1", "52": "20151128-17:59:35.877", "98": "0", "108": "10", "1137": "8", "10": "035" } } ]}