Skip to content

aws_kinesis_streams: Support KPL Style AggregationΒ #24395

@lht

Description

@lht

A note for the community

  • Please vote on this issue by adding a πŸ‘ reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Use Cases

When sending high volumes of small records to AWS Kinesis Data Streams using Vector's aws_kinesis_streams sink, users face several challenges:

Throughput limitations

Kinesis shards have hard limits of 1,000 records/second or 1 MB/second per shard. Without aggregation, small records quickly hit the 1,000 records/sec limit while vastly underutilizing the 1 MB/sec bandwidth capacity. For example,

  • Ingesting sensor data from thousands of devices, each sending 100B payloads every second
  • Without aggregation: 10,000 records/sec requires 10 shards (hitting 1,000 records/sec limit) while the data limit is 1MiB/sec.
  • With aggregation: Same throughput could use 1 shards by packing ~100 records per Kinesis record

Multi-agent environments with hot shard issues

  • Multiple logging agents (Fluent-bit, Logstash, KPL-based agents, Osquery) write to the same Kinesis stream
  • Other agents occasionally cause hot shards due to poor partition key distribution or traffic spikes
  • Vector gets throttled frequently when hitting many shards, especially during hot shard events
  • Without aggregation: Vector distributes across many shards, increasing exposure to throttling. Or configures smaller batch size, decreasing throughput.
  • With aggregation: Vector can concentrate writes to fewer shards, reducing throttling surface area and improving resilience to hot shard issues caused by other agents

Firehose

  • Vector sends logs/metrics to Kinesis Data Streams. Kinesis Data Firehose consumes from the stream and delivers to S3, Redshift, OpenSearch, etc.
  • Firehose automatically deaggregates KPL records and processes individual user records improving delivery efficiency and reducing buffer churn

Attempted Solutions

The aws_kinesis_streams sink currently does not support KPL aggregation. Here's what alternatives might be:

Attempt 1: Increasing batch size

sinks:
  kinesis:
    type: aws_kinesis_streams
    stream_name: my-stream
    batch:
      max_events: 500  # Maximum for PutRecords API

Result: This helps with API call efficiency but does NOT solve the core problems:

  • Still limited to 1,000 records/sec per shard (batch size doesn't change this)
  • Still prone to more throttling when hitting 500 shards at each API call.

Attempt 2: Using compression

sinks:
  kinesis:
    type: aws_kinesis_streams
    encoding:
      codec: json
    compression: gzip

Result: Compression reduces data size but:

  • Does NOT pack multiple records into one Kinesis record
  • Does NOT change the record count (still 1,000 records/sec limit)

Attempt 3: External KPL aggregator

  • Set up a separate service to aggregate log events
  • Vector sends to the service via a HTTP sink
  • The service aggregates and sends to Kinesis

Result: This works but introduces significant operational complexity:

  • Additional infrastructure to manage and monitor
  • Extra network hop and serialization/deserialization overhead
  • Duplicate functionality (batching, retry, etc.)
  • Defeats the purpose of using Vector as a unified observability pipeline

Proposal

Add native KPL (Kinesis Producer Library) aggregation support to the aws_kinesis_streams sink.

From AWS doc:

Aggregation refers to the storage of multiple records in a Kinesis Data Streams record. Aggregation allows customers to increase the number of records sent per API call, which effectively increases producer throughput.

Add two new configuration options:

Configuration

sinks:
  kinesis:
    type: aws_kinesis_streams
    stream_name: my-stream
    region: us-east-1

    aggregation:
      # Enable KPL aggregation format (default: false for backward compatibility)
      enabled: true

      # Maximum number of user records to pack into one aggregated Kinesis record
      # (default: 100, range: 1-1000)
      max_events: 100

    # Existing options continue to work as before
    encoding:
      codec: json
    compression: gzip  # Applied to individual records before aggregation
    batch:
      max_events: 500  # Still batches aggregated records for PutRecords

Behavior

When aggregation.enabed: false (default):

  • No change to existing behavior
  • Full backward compatibility
  • Each Vector event becomes one Kinesis record

When aggregation.enabled: true:

  1. Vector processes events through the standard pipeline (encoding, compression)
  2. Up to max_records_per_aggregate records are packed into a single KPL-formatted record
  3. Aggregation uses the official KPL format:
    • Magic bytes: 0xF3899AC2 (identifies KPL records)
    • Protobuf message with partition key table and record array
    • MD5 checksum for integrity verification
  4. Aggregated records respect the ~950 KB size limit (leaves overhead under AWS 1 MB limit)
  5. Aggregated records are batched (up to aggregation.max_records_per_aggregate) for PutRecords API calls

Partition Key Strategy with Aggregation

  • When aggregation is enabled, the partition key behavior changes to optimize for aggregation efficiency while maintaining predictable shard distribution.
  • Propose following the same strategy as the AWS Kinesis plugin for Fluent Bit: The aggregated record uses the partition key of the first user record in the batch for shard routing

Since the partition_key_field of the first record in the batch will be used to route the entire batch to a given shard. Given this limitation, using both partition_key_field and aggregation simultaneously requires careful consideration.

References

Version

0.51.1

Metadata

Metadata

Assignees

No one assigned

    Labels

    type: featureA value-adding code addition that introduce new functionality.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions