Overview

This collector writes aggregated statistics to a Kafka cluster.

The time that the message was published will be in the message timestamp.

Stack Configuration

In this example, we have a statistics collector with module name “kafka”. In the example configuration we explictly define the topic name in a topic_mapping table, rather than using the topic_prefix parameter to define the topic name based on the aggregation name.

Parameters

Parameters for all instances of kafka module.

Tables

global_configuration

Key value pairs for configuring librdkafka. See https://github.com/edenhill/librdkafka/blob/v1.9.0/CONFIGURATION.md

Key: global_configuration.

topic_mapping

This table is optional. If configured, it can be used to specify a topic to be used for each aggregation, instead of using the topic_prefix parameter.

Key: topic_mapping.

Example

{
"probe": {
"parameters": {
"name": "CDFT_example_stack_probe",
"protocols": [
{
"type": "module",
"value": "ethernet"
},
{
"type": "module",
"value": "ip"
}
],
"stat_collector": [
{
"type": "module",
"value": "kafka",
"id": "collKafka"
}
]
}
},
"collKafka": {
"parameters": {
"payload_format": "JSON",
"publish_interval_ns": 1000000000,
"aggregator_json_filename": "/tmp/example.stack.agg.json",
"write_when_no_new_values": false
},
"tables": {
"global_configuration": [
{
"key": "bootstrap.servers",
"value": "kafka-cluster:9092"
}
],
"topic_mapping": [
{
"aggregation": "summary",
"topic": "com.tipoff.coredatafeed.telemetry.MD_stats"
}
]
}
}
}

Example Output

Based on this configuration, and a suitable aggregator configuration, the payload of the Kafka message will be similar to the example below:

{
"srcIp": "10.1.1.1",
"dstIp": "10.1.1.2",
"name": "srcDstType",
"packet_bytes": 700.0,
"packet_bytes_1ms": 420000.0,
"tcp_conversations": 1.0
}

Aggregator Configuration

The following aggregator configuration could be expected to produce the output above:

{
"fieldDefinitions": {
"packet_bytes": {
"statistic": {
"name": "ip.packet_bytes"
}
},
"packet_bytes_1ms": {
"statistic": {
"name": "ip.packet_bytes",
"microburstNS": 1000000
}
},
"tcp_conversations": {
"activeStreams": {
"type": "tcp"
}
}
},
"fieldSets": {
"allFields": [
"packet_bytes",
"packet_bytes_1ms",
"tcp_conversations"
]
},
"expressionDefinitions": {
"srcDstTypeNameExpression": {
"static": {
"value": "srcDstType"
}
},
"srcIpExpression": {
"datafield": {
"name": "ip.src_host"
}
},
"dstIpExpression": {
"datafield": {
"name": "ip.dst_host"
}
}
},
"propertySets": {
"srcDstTypeProperties": {
"name": "srcDstTypeNameExpression"
}
},
"aggregations": {
"srcDst": {
"keys": [
{
"srcIp": "srcIpExpression"
},
{
"dstIp": "dstIpExpression"
}
],
"fieldSets": [
"allFields"
],
"propertySets": [
"srcDstTypeProperties"
]
}
}
}

If you look at the market data worked example in the Configuration Guide for VMX-Capture, you’ll see more examples of commonly used stack probe functions which can be used in aggregations that are produced by Stats Collectors. You’ll see that the microburstNS statistic and the activeStreams functions are commonly used, and you can also see other statistics which VMX-Capture stack probes can calculate which could be easily added to the CDF-T output, for example:

  • stack.msg_count

  • stack.gap_count (for market data)

  • stack.missed_msg_count (for missed data)

  • stack.out_of_order_msg_count

  • beeks.wiretime (for wiretime latency)