Overview
This collector writes messages to a single Kafka cluster. It is a Message Collector with the module name “msg_kafka”.
Stack Configuration
This is a message collector with module name “msg_kafka”.
Parameters
Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
payload_format | enum (JSON) | Y | Encoding of the Kafka message payload | |
batch_size | uint64 | N | Not specified | Maximum batch size to insert |
batch_timeout_ms | uint64 | N | Not specified | Timeout before inserting a batch in milliseconds |
large_numeric_as_string | bool | N | false | Datafields that may contain large numeric values will be written as string fields |
Tables
global_configuration
Key value pairs for configuring librdkafka. See https://github.com/edenhill/librdkafka/blob/v1.9.0/CONFIGURATION.md
Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
key | string | Y | Configuration key | |
value | string | Y | Configuration value |
field_definitions
If a field is named topic , it will be the Kafka message topic. This field name is required.
If a field is named key, it will be the Kafka message key.
If a field is named timestamp, it will be the Kafka message timestamp (note that Kafka only allows millisecond precision timestamps).
The field_definitions section also allows you to change the output field name or data type. See Stack Message Collector Common Field Definitions Configuration for more detail and configuration examples.
Example
{ "probe": { "parameters": { "name": "CDFM_example_stack_probe", "protocols": [ { "type": "module", "value": "ethernet" } ], "msg_collector": [ { "type": "module", "value": "msg_kafka", "id": "collMsgKafka" } ] }, "tables": { "static_datafields": [ { "name": "fixed_topic", "type": "string", "value": "com.tipoff.coredatafeed.message.client_integrated.port1" } ] } }, "collMsgKafka": { "parameters": { "payload_format": "JSON", "large_numeric_as_string": true }, "tables": { "global_configuration": [ { "key": "bootstrap.servers", "value": "kafka-cluster:9092" } ], "field_definitions": [ { "field_name": "topic", "df_name": "fixed_topic" }, { "field_name": "timestamp", "field_type": "UINT64", "df_name": "packet_timestamp" }, { "field_name": "packet_length", "df_name": "eth.len" } ] } }}Example Output
Based on this configuration, the Kafka message payload will be similar to the example below:
{"packet_length":"499"}