Kafka Stack Message Collector
Overview
This collector writes messages to a single Kafka cluster. It is a Message Collector with the module name “msg_kafka”.
Stack Configuration
This is a message collector with module name “msg_kafka”.
Parameters
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
payload_format | enum (JSON) | Y | Encoding of the Kafka message payload | |
batch_size | uint64 | N | Not specified | Maximum batch size to insert |
batch_timeout_ms | uint64 | N | Not specified | Timeout before inserting a batch in milliseconds |
large_numeric_as_string | bool | N | false | Datafields that may contain large numeric values will be written as string fields |
Tables
global_configuration
Key value pairs for configuring librdkafka. See https://github.com/edenhill/librdkafka/blob/v1.9.0/CONFIGURATION.md
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
key | string | Y | Configuration key | |
value | string | Y | Configuration value |
field_definitions
If a field is named topic
, it will be the Kafka message topic. This field name is required.
If a field is named key
, it will be the Kafka message key.
If a field is named timestamp
, it will be the Kafka message timestamp (note that Kafka only allows millisecond precision timestamps).
The field_definitions section also allows you to change the output field name or data type. See Stack Message Collector Common Field Definitions Configuration for more detail and configuration examples.
Example
{
"probe"
: {
"parameters"
: {
"name"
:
"CDFM_example_stack_probe"
,
"protocols"
: [
{
"type"
:
"module"
,
"value"
:
"ethernet"
}
],
"msg_collector"
: [
{
"type"
:
"module"
,
"value"
:
"msg_kafka"
,
"id"
:
"collMsgKafka"
}
]
},
"tables"
: {
"static_datafields"
: [
{
"name"
:
"fixed_topic"
,
"type"
:
"string"
,
"value"
:
"com.tipoff.coredatafeed.message.client_integrated.port1"
}
]
}
},
"collMsgKafka"
: {
"parameters"
: {
"payload_format"
:
"JSON"
,
"large_numeric_as_string"
:
true
},
"tables"
: {
"global_configuration"
: [
{
"key"
:
"bootstrap.servers"
,
"value"
:
"kafka-cluster:9092"
}
],
"field_definitions"
: [
{
"field_name"
:
"topic"
,
"df_name"
:
"fixed_topic"
},
{
"field_name"
:
"timestamp"
,
"field_type"
:
"UINT64"
,
"df_name"
:
"packet_timestamp"
},
{
"field_name"
:
"packet_length"
,
"df_name"
:
"eth.len"
}
]
}
}
}
Example Output
Based on this configuration, the Kafka message payload will be similar to the example below:
{
"packet_length"
:
"499"
}