Overview

This collector writes messages to a single Kafka cluster. It is a Message Collector with the module name “msg_kafka”.

Stack Configuration

This is a message collector with module name “msg_kafka”.

Parameters

Parameters for stack configuration

Tables

global_configuration

Key value pairs for configuring librdkafka. See https://github.com/edenhill/librdkafka/blob/v1.9.0/CONFIGURATION.md

Key: global_configuration

field_definitions

If a field is named topic , it will be the Kafka message topic. This field name is required.

If a field is named key, it will be the Kafka message key.

If a field is named timestamp, it will be the Kafka message timestamp (note that Kafka only allows millisecond precision timestamps).

The field_definitions section also allows you to change the output field name or data type. See Stack Message Collector Common Field Definitions Configuration for more detail and configuration examples.

Example

{
"probe": {
"parameters": {
"name": "CDFM_example_stack_probe",
"protocols": [
{
"type": "module",
"value": "ethernet"
}
],
"msg_collector": [
{
"type": "module",
"value": "msg_kafka",
"id": "collMsgKafka"
}
]
},
"tables": {
"static_datafields": [
{
"name": "fixed_topic",
"type": "string",
"value": "com.tipoff.coredatafeed.message.client_integrated.port1"
}
]
}
},
"collMsgKafka": {
"parameters": {
"payload_format": "JSON",
"large_numeric_as_string": true
},
"tables": {
"global_configuration": [
{
"key": "bootstrap.servers",
"value": "kafka-cluster:9092"
}
],
"field_definitions": [
{
"field_name": "topic",
"df_name": "fixed_topic"
},
{
"field_name": "timestamp",
"field_type": "UINT64",
"df_name": "packet_timestamp"
},
{
"field_name": "packet_length",
"df_name": "eth.len"
}
]
}
}
}

Example Output

Based on this configuration, the Kafka message payload will be similar to the example below:

{"packet_length":"499"}