Overview
This collector writes messages to a single Kafka cluster. It supports protocols that create dynamic datafields on the fly (e.g. the FIX protocol decoder).
Stack Configuration
This is a message collector with module name “msg_kafka_dynamic”.
Parameters
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
payload_format | enum (json) | Y | Encoding of the Kafka message payload | |
max_batch_size | uint64 | N | Not specified | Maximum batch size to insert |
batch_timeout_ms | uint64 | N | Not specified | Timeout before inserting a batch in milliseconds |
request_pool_initial_size | uint64 | N | Not specified | Initial size of the request pool. Each request inserts 1 message |
request_pool_maximum_size | uint64 | N | Unbounded | Maximum size of the request pool. If the insertion in to Kafka can not keep up with the incoming message rate, this parameter can limit the maximum pool size |
request_pool_allocation_batch_size | uint64 | N | Not specified | Number of new requests to allocate at a time if the request pool needs to grow |
request_pool_blocking | bool | N | true | Whether to block decoding packets and wait for requests to be returned back to the pool (true) or drop messages and not write them to Kafka (false) |
field_pool_initial_size | uint64 | N | Not specified | Initial size of the field pool. Fields are added to requests to write messages |
field_pool_allocation_batch_size | uint64 | N | Not specified | Number of new fields to allocate at a time if the field pool needs to grow |
kafka_payload_pool_initial_size | uint64 | N | Not specified | Initial size of the Kafka payload pool. Each Kafka payload inserts 1 message |
kafka_payload_pool_maximum_size | uint64 | N | Unbounded | Maximum size of the Kafka payload pool. If the insertion in to Kafka can not keep up with the incoming message rate, this parameter can limit the maximum pool size |
kafka_payload_pool_allocation_batch_size | uint64 | N | Same as request_pool_allocation_batch_size | Number of new Kafka payloads to allocate at a time if the Kafka payload pool needs to grow |
kafka_payload_pool_blocking | bool | N | Same as request_pool_blocking | Whether to block building payloads and wait for Kafka messages to finish being sent (true) or drop messages and not write them to Kafka (false) |
topic_datafield | string | Y | Datafield to use as the Kafka message topic | |
key_datafield | string | N | No message key | Datafield to use as the Kafka message key |
timestamp_datafield | string | N | Broker generates timestamp | Datafield name to use as the Kafka message timestamp |
add_other_fields | bool | N | false | Whether to add fields that that have not been explicitly configured to the message |
json_replace_dot_with_underscore | bool | N | false | Whether to replace '.' in JSON property names with '_' |
json_potentially_large_integers_as_strings | bool | N | false | Whether to store potentially large integers in JSON as strings |
json_group_list | array of string | N | No grouping | List of JSON groups which will be used for grouping datafields explicitly from json_property_mapping or by datafield prefix (part of the datafield before the first '.') if add_other_fields is true. |
ignore_binary_datafields | bool | N | true | Whether to ignore binary datafields |
Tables
global_configuration
Key value pairs for configuring librdkafka. See https://github.com/edenhill/librdkafka/blob/v1.9.0/CONFIGURATION.md
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
key | string | Y | Configuration key | |
value | string | Y | Configuration value |
json_property_mapping
This table is optional. If configured, it can be used to specify properties to write to Kafka.
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
datafield_name | string | Y | Datafield name | |
json_property_name | string | Y | JSON property name | |
json_type | enum (boolean | number | string) | N | Suitable type for the datafield | JSON type |
json_group_name | string | N | Ungrouped | JSON group name for grouping datafields in the JSON |
Example
Stack Configuration
{
"probe"
: {
"parameters"
: {
"name"
:
"test"
,
"protocols"
: [
{
"type"
:
"module"
,
"value"
:
"ethernet"
},
{
"type"
:
"module"
,
"value"
:
"ip"
},
{
"type"
:
"module"
,
"value"
:
"fix"
,
"id"
:
"decFix"
}
],
"msg_collector"
: [
{
"type"
:
"module"
,
"value"
:
"msg_kafka_dynamic"
,
"id"
:
"collMsgKafkaDynamic"
}
]
},
"tables"
: {
"static_datafields"
: [
{
"name"
:
"fixed_topic"
,
"type"
:
"string"
,
"value"
:
"test.topic"
}
]
}
},
"decFix"
: {
"parameters"
: {
"ddc"
:
"*:string"
}
},
"collMsgKafkaDynamic"
: {
"parameters"
: {
"payload_format"
:
"json"
,
"topic_datafield"
:
"fixed_topic"
,
"timestamp_datafield"
:
"TIMESTAMP"
,
"add_other_fields"
:
true
,
"json_replace_dot_with_underscore"
:
true
,
"json_group_list"
: [
"eth"
,
"ip"
,
"tcp"
,
"fix"
]
},
"tables"
: {
"global_configuration"
: [
{
"key"
:
"bootstrap.servers"
,
"value"
:
"kafka-cluster:9092"
}
],
"json_property_mapping"
: [
{
"datafield_name"
:
"ip.src_host"
,
"json_property_name"
:
"src_ip"
,
"json_group_name"
:
"ip"
},
{
"datafield_name"
:
"ip.dst_host"
,
"json_property_name"
:
"dst_ip"
,
"json_group_name"
:
"ip"
},
{
"datafield_name"
:
"ip.src_port"
,
"json_property_name"
:
"src_port"
,
"json_group_name"
:
"tcp"
},
{
"datafield_name"
:
"ip.dst_port"
,
"json_property_name"
:
"dst_port"
,
"json_group_name"
:
"tcp"
},
{
"datafield_name"
:
"TIMESTAMP"
,
"json_property_name"
:
"packet_timestamp"
}
]
}
}
}
Output
For an example FIX message, topic would be “test.topic”, payload would be as below:
{
"fields"
: {
"hostname"
:
"beeksanalytics"
,
"probe_name"
:
"test"
,
"fixed_topic"
:
"test.topic"
,
"packet_timestamp"
:
1448733575877513000
},
"groups"
: [
{
"name"
:
"eth"
,
"fields"
: {
"type"
:
2048
,
"src"
:
"00:00:00:00:00:00"
,
"dst"
:
"00:00:00:00:00:00"
,
"ig"
:
false
,
"len"
:
166
}
},
{
"name"
:
"ip"
,
"fields"
: {
"id"
:
3653
,
"len"
:
152
,
"ttl"
:
64
,
"tos"
:
0
,
"checksum"
:
11801
,
"src_ip"
:
"127.0.0.1"
,
"dst_ip"
:
"127.0.0.1"
,
"proto"
:
6
}
},
{
"name"
:
"tcp"
,
"fields"
: {
"window_size_value"
:
342
,
"window_size_scalefactor"
:
7
,
"window_size"
:
43776
,
"src_port"
:
53867
,
"dst_port"
:
11001
}
},
{
"name"
:
"fix"
,
"fields"
: {
"version"
:
"FIXT.1.1"
,
"MsgSeqNum"
:
1
,
"MsgType"
:
"A"
,
"size"
:
100
,
"SenderCompID"
:
"DLD_TEX"
,
"TargetCompID"
:
"TEX1_DLD"
,
"35"
:
"A"
,
"49"
:
"DLD_TEX"
,
"56"
:
"TEX1_DLD"
,
"34"
:
"1"
,
"52"
:
"20151128-17:59:35.877"
,
"98"
:
"0"
,
"108"
:
"10"
,
"1137"
:
"8"
,
"10"
:
"035"
}
}
]
}