Overview
This collector writes aggregated statistics to a Kafka cluster.
The time that the message was published will be in the message timestamp.
Stack Configuration
In this example, we have a statistics collector with module name “kafka”. In the example configuration we explictly define the topic name in a topic_mapping table, rather than using the topic_prefix parameter to define the topic name based on the aggregation name.
Parameters
Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
payload_format | enum (JSON) | Y |
| Encoding of the Kafka message payload |
topic_prefix | string | N | <none> | Prefix to prepend to aggregation name to form the Kafka topic, if the aggregation name is not in the topic_mapping table. |
publish_interval_ns | uint64 | N | 10000000000 | Nanoseconds between publishing statistics |
batch_size | uint64 | N | Not specified | Maximum batch size to insert |
batch_timeout_ms | uint64 | N | Not specified | Timeout before inserting a batch in milliseconds |
aggregator_json_filename | path | Y |
| Filename of aggregator JSON |
timestamp | string | N | packet_timestamp | Timestamp data field |
write_when_no_new_values | bool | N | true | Write an update for an aggregation instance before it expires with 0 statistics |
Tables
global_configuration
Key value pairs for configuring librdkafka. See https://github.com/edenhill/librdkafka/blob/v1.9.0/CONFIGURATION.md
Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
key | string | Y |
| Configuration key |
value | string | Y |
| Configuration value |
topic_mapping
This table is optional. If configured, it can be used to specify a topic to be used for each aggregation, instead of using the topic_prefix parameter.
Parameter | Type | Required | Description |
|---|---|---|---|
aggregation | string | Y | Aggregation name |
topic | string | Y | Kafka topic |
Example
{ "probe": { "parameters": { "name": "CDFT_example_stack_probe", "protocols": [ { "type": "module", "value": "ethernet" }, { "type": "module", "value": "ip" } ], "stat_collector": [ { "type": "module", "value": "kafka", "id": "collKafka" } ] } }, "collKafka": { "parameters": { "payload_format": "JSON", "publish_interval_ns": 1000000000, "aggregator_json_filename": "/tmp/example.stack.agg.json", "write_when_no_new_values": false }, "tables": { "global_configuration": [ { "key": "bootstrap.servers", "value": "kafka-cluster:9092" } ], "topic_mapping": [ { "aggregation": "summary", "topic": "com.tipoff.coredatafeed.telemetry.MD_stats" } ] } }}Example Output
Based on this configuration, and a suitable aggregator configuration, the payload of the Kafka message will be similar to the example below:
{ "srcIp": "10.1.1.1", "dstIp": "10.1.1.2", "name": "srcDstType", "packet_bytes": 700.0, "packet_bytes_1ms": 420000.0, "tcp_conversations": 1.0}Aggregator Configuration
The following aggregator configuration could be expected to produce the output above:
{ "fieldDefinitions": { "packet_bytes": { "statistic": { "name": "ip.packet_bytes" } }, "packet_bytes_1ms": { "statistic": { "name": "ip.packet_bytes", "microburstNS": 1000000 } }, "tcp_conversations": { "activeStreams": { "type": "tcp" } } }, "fieldSets": { "allFields": [ "packet_bytes", "packet_bytes_1ms", "tcp_conversations" ] }, "expressionDefinitions": { "srcDstTypeNameExpression": { "static": { "value": "srcDstType" } }, "srcIpExpression": { "datafield": { "name": "ip.src_host" } }, "dstIpExpression": { "datafield": { "name": "ip.dst_host" } } }, "propertySets": { "srcDstTypeProperties": { "name": "srcDstTypeNameExpression" } }, "aggregations": { "srcDst": { "keys": [ { "srcIp": "srcIpExpression" }, { "dstIp": "dstIpExpression" } ], "fieldSets": [ "allFields" ], "propertySets": [ "srcDstTypeProperties" ] } }}If you look at the market data worked example in the Configuration Guide for VMX-Capture, you’ll see more examples of commonly used stack probe functions which can be used in aggregations that are produced by Stats Collectors. You’ll see that the microburstNS statistic and the activeStreams functions are commonly used, and you can also see other statistics which VMX-Capture stack probes can calculate which could be easily added to the CDF-T output, for example:
stack.msg_count
stack.gap_count (for market data)
stack.missed_msg_count (for missed data)
stack.out_of_order_msg_count
beeks.wiretime (for wiretime latency)