Skip to content

Conversation

@triddell
Copy link
Contributor

Overview

This pull request introduces aws_s3_stream, a new output plugin designed to stream generic text-based files (JSON, NDJSON, CSV, logs) directly to Amazon S3 using multipart uploads. The implementation addresses two critical issues: excessive memory consumption and incorrect partition routing in the existing batch approach.

Key Problems Addressed

Memory Efficiency: The current aws_s3 output with batching buffers entire batches in memory before writing, consuming approximately 3.31 GB for 100K events. This solution reduces memory usage to ~180 MB—a 95% improvement. For 500K events, the batch approach fails completely after 15+ minutes, while streaming succeeds in 38.56 seconds with only 170 MB of memory.

Partition Routing: The batch approach evaluates path expressions once per batch rather than per message. When batch boundaries cross partition boundaries, events are written to the wrong partitions. Testing with 10K interleaved events showed ~50% of events ending up in incorrect partitions, causing silent data corruption for Iceberg/Hive/Delta tables and multi-tenant data leakage.

Small File Problem: Batch processing creates many small files instead of optimal large files per partition. For example, 500K events produces 13+ incomplete files with batch vs 1 optimal 788 MB file with streaming, negatively impacting query performance and S3 costs.

Solution Architecture

The implementation uses:

  • Direct streaming to S3 multipart uploads without batch buffering
  • Per-message partition routing via partition_by expressions
  • Per-partition writer pooling with automatic lifecycle management
  • Configurable flush triggers: size (5MB), count (10K), or time (10s)

Memory footprint: 170–260 MB (constant regardless of dataset size).

Files Added

Core implementation (5 files):

  • output_aws_s3_stream.go (438 lines): Plugin configuration and partition routing logic
  • s3_streaming_writer.go (397 lines): Generic S3 streaming writer with multipart upload management
  • output_aws_s3_stream_test.go (373 lines): Unit tests for output configuration and partition logic
  • output_aws_s3_stream_integration_test.go (371 lines): LocalStack integration tests
  • s3_streaming_writer_test.go (405 lines): Unit tests for streaming writer with mock S3 client

Files Modified

None. This PR only adds new files without modifying any existing code.

Configuration Example

output:
  aws_s3_stream:
    bucket: my-logs-bucket
    path: 'logs/date=${! meta("date") }/service=${! meta("service") }/${! uuid_v4() }.json'
    partition_by:
      - '${! meta("date") }'
      - '${! meta("service") }'
    max_buffer_bytes: 5242880  # 5MB (S3 multipart optimal)
    max_buffer_count: 10000
    max_buffer_period: 10s
    content_type: 'application/x-ndjson'

With Compression

pipeline:
  processors:
    - bloblang: |
        root = content().string() + "\n"
    - compress:
        algorithm: gzip

output:
  aws_s3_stream:
    bucket: my-logs-bucket
    path: 'logs/date=${! meta("date") }/${! uuid_v4() }.json.gz'
    partition_by:
      - '${! meta("date") }'
    content_encoding: 'gzip'

Testing

The PR includes 30 unit tests and 4 LocalStack-based integration tests validating:

Unit tests (18 tests for output plugin):

  • Configuration parsing (basic, partition_by, buffer settings, content type/encoding)
  • Error cases (missing bucket, missing path, invalid period)
  • Path interpolation with dynamic metadata
  • Partition grouping logic
  • Backwards compatibility (behavior without partition_by)

Unit tests (12 tests for streaming writer):

  • Writer creation with defaults and custom config
  • Initialization lifecycle (single init, double-init error, uninitialized write error)
  • Buffer flushing on size threshold (>5MB), count threshold, and stats tracking
  • Close behavior (completes upload, write-after-close errors)
  • Multipart uploads (multiple parts, proper part numbering)
  • Content encoding metadata propagation

Integration tests (4 tests with LocalStack):

  • Basic write operations with 150 messages
  • Partition routing with 4 partitions (2 dates × 2 regions), verifying separate files
  • Content type and encoding metadata validation
  • Edge case: single message handling

Manual testing with OCSF security event data (complex nested JSON):

  • 50K, 100K, 500K event validation
  • Partition correctness: 100% accuracy with interleaved test data (alternating accounts)
  • Memory profiling: Constant usage across all dataset sizes
  • Compression trade-offs: GZIP adds 31% processing time, saves 54% storage

Performance Results

Dataset Batch Memory Batch Time Stream Memory Stream Time Memory Reduction
50K 1.73 GB 2.88s 0.17 GB 4.80s 90%
100K 3.31 GB 6.13s 0.18 GB 11.19s 95%
500K FAILED >900s 0.17 GB 38.56s 93%+

At small scales, batch is faster. At production scales, batch fails completely while streaming succeeds with constant memory.

Why No Built-in Compression?

Unlike aws_s3, this output does not provide a compress parameter. S3 multipart uploads require non-final parts to be ≥5MB. After compressing a 5MB buffer, the compressed size is unpredictable (often <5MB), causing S3 to reject the upload.

Solution: Use pipeline-level compression via the compress processor (see example above). This works reliably because concatenated gzip streams are valid, and each compressed message can be decompressed independently.

Migration Path

Before (Broken)

output:
  aws_s3:
    path: 'partition=${! meta("key") }/${! uuid_v4() }.json'
    batching:
      count: 10000
      processors:
        - archive:
            format: lines

After (Fixed)

pipeline:
  processors:
    - bloblang: |
        root = content().string() + "\n"

output:
  aws_s3_stream:
    path: 'partition=${! meta("key") }/${! uuid_v4() }.json'
    partition_by: ['${! meta("key") }']
    max_buffer_bytes: 5242880

Key changes:

  1. Move archive: lines logic to pipeline (add \n to each message)
  2. Add partition_by parameter for correct routing
  3. Remove batching section from output
  4. Use aws_s3_stream instead of aws_s3

Backwards Compatibility

No breaking changes. This PR introduces a new plugin (aws_s3_stream) alongside the existing aws_s3 output. Users can migrate at their own pace. All code is new with zero modifications to existing files.

Related Work

This PR follows the same pattern as PR #661 (aws_s3_parquet_stream), which addresses identical partition routing and memory issues for Parquet format. Both implementations use:

  • Per-message partition evaluation
  • Per-partition writer pooling
  • Streaming multipart uploads
  • Constant memory usage

Documentation

The plugin includes comprehensive inline documentation:

  • Detailed config spec with examples
  • Description of partition routing behavior
  • Buffer tuning guidelines
  • Compression recommendations
  • Migration guide from aws_s3

Adds new streaming S3 output that uses S3 multipart uploads with
per-partition writer pooling for constant memory usage and correct
partition routing.

## Added Files

- output_aws_s3_stream.go: Main output plugin with partition routing
- output_aws_s3_stream_test.go: Unit tests (18 tests)
- output_aws_s3_stream_integration_test.go: Integration tests (4 tests)
- s3_streaming_writer.go: S3 multipart streaming writer with S3API interface
- s3_streaming_writer_test.go: Writer unit tests (12 tests)

## Key Features

- partition_by parameter for per-message partition evaluation
- Configurable buffer thresholds (bytes, count, time)
- S3 multipart upload with automatic part management
- Content-Type and Content-Encoding metadata support
- Graceful shutdown with upload completion
@triddell
Copy link
Contributor Author

CI Fixes

Fixed all linting errors and CI check failures:

  • Fixed documentation generation by removing YAML example in description that was causing validation errors
  • Fixed Bloblang syntax in examples (timestamp().format()now().ts_format())
  • Replaced fmt.Errorf with errors.New for static error strings
  • Updated deprecated AWS SDK endpoint resolver to use BaseEndpoint
  • Fixed testify verbose assertions and removed unnecessary type conversions
  • Fixed errcheck error in test
  • Applied gofmt formatting

All tests and lint checks now pass. I will review the generated documentation output more closely for any additional improvements needed.

Rename S3API -> s3StreamingAPI, WriterStats -> S3StreamWriterStats,
and mockS3Client -> mockS3StreamClient to prevent type conflicts when
both streaming output implementations are merged to upstream.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant