Overview

This collector writes aggregated statistics to a Kafka cluster.

The time that the message was published will be in the message timestamp.

Stack Configuration

In this example, we have a statistics collector with module name “kafka”. In the example configuration we explictly define the topic name in a topic_mapping table, rather than using the topic_prefix parameter to define the topic name based on the aggregation name.

Parameters

Parameters for all instances of kafka module.

Tables

global_configuration

Key value pairs for configuring librdkafka. See https://github.com/edenhill/librdkafka/blob/v1.9.0/CONFIGURATION.md

Key: global_configuration.

topic_mapping

This table is optional. If configured, it can be used to specify a topic to be used for each aggregation, instead of using the topic_prefix parameter.

Key: topic_mapping.

Example

{
"probe": {
"parameters": {
"name": "CDFT_example_stack_probe",
"protocols": [
{
"type": "module",
"value": "ethernet"
},
{
"type": "module",
"value": "ip"
}
],
"stat_collector": [
{
"type": "module",
"value": "kafka",
"id": "collKafka"
}
]
}
},
"collKafka": {
"parameters": {
"payload_format": "JSON",
"publish_interval_ns": 1000000000,
"aggregator_json_filename": "/tmp/example.stack.agg.json",
"write_when_no_new_values": false
},
"tables": {
"global_configuration": [
{
"key": "bootstrap.servers",
"value": "kafka-cluster:9092"
}
],
"topic_mapping": [
{
"aggregation": "summary",
"topic": "com.tipoff.coredatafeed.telemetry.MD_stats"
}
]
}
}
}

Example Output

Based on this configuration, and a suitable aggregator configuration, the payload of the Kafka message will be similar to the example below:

{
"srcIp": "10.1.1.1",
"dstIp": "10.1.1.2",
"name": "srcDstType",
"packet_bytes": 700.0,
"packet_bytes_1ms": 420000.0,
"tcp_conversations": 1.0
}

Aggregator Configuration

See Stack Aggregator Configuration