Kafka Stack Statistics Collector
Overview
This collector writes aggregated statistics to a Kafka cluster.
The time that the message was published will be in the message timestamp.
Stack Configuration
In this example, we have a statistics collector with module name “kafka”. In the example configuration we explictly define the topic name in a topic_mapping table, rather than using the topic_prefix parameter to define the topic name based on the aggregation name.
Parameters
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
payload_format | enum (JSON) | Y |
| Encoding of the Kafka message payload |
topic_prefix | string | N | <none> | Prefix to prepend to aggregation name to form the Kafka topic, if the aggregation name is not in the topic_mapping table. |
publish_interval_ns | uint64 | N | 10000000000 | Nanoseconds between publishing statistics |
batch_size | uint64 | N | Not specified | Maximum batch size to insert |
batch_timeout_ms | uint64 | N | Not specified | Timeout before inserting a batch in milliseconds |
aggregator_json_filename | path | Y |
| Filename of aggregator JSON |
timestamp | string | N | packet_timestamp | Timestamp data field |
write_when_no_new_values | bool | N | true | Write an update for an aggregation instance before it expires with 0 statistics |
Tables
global_configuration
Key value pairs for configuring librdkafka. See https://github.com/edenhill/librdkafka/blob/v1.9.0/CONFIGURATION.md
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
key | string | Y |
| Configuration key |
value | string | Y |
| Configuration value |
topic_mapping
This table is optional. If configured, it can be used to specify a topic to be used for each aggregation, instead of using the topic_prefix parameter.
Parameter | Type | Required | Description |
---|---|---|---|
aggregation | string | Y | Aggregation name |
topic | string | Y | Kafka topic |
Example
{
"probe"
: {
"parameters"
: {
"name"
:
"CDFT_example_stack_probe"
,
"protocols"
: [
{
"type"
:
"module"
,
"value"
:
"ethernet"
},
{
"type"
:
"module"
,
"value"
:
"ip"
}
],
"stat_collector"
: [
{
"type"
:
"module"
,
"value"
:
"kafka"
,
"id"
:
"collKafka"
}
]
}
},
"collKafka"
: {
"parameters"
: {
"payload_format"
:
"JSON"
,
"publish_interval_ns"
:
1000000000
,
"aggregator_json_filename"
:
"/tmp/example.stack.agg.json"
,
"write_when_no_new_values"
:
false
},
"tables"
: {
"global_configuration"
: [
{
"key"
:
"bootstrap.servers"
,
"value"
:
"kafka-cluster:9092"
}
],
"topic_mapping"
: [
{
"aggregation"
:
"summary"
,
"topic"
:
"com.tipoff.coredatafeed.telemetry.MD_stats"
}
]
}
}
}
Example Output
Based on this configuration, and a suitable aggregator configuration, the payload of the Kafka message will be similar to the example below:
{
"srcIp"
:
"10.1.1.1"
,
"dstIp"
:
"10.1.1.2"
,
"name"
:
"srcDstType"
,
"packet_bytes"
:
700.0
,
"packet_bytes_1ms"
:
420000.0
,
"tcp_conversations"
:
1.0
}
Aggregator Configuration
See Stack Aggregator Configuration