-
Notifications
You must be signed in to change notification settings - Fork 170
Description
Summary
Implement a new aws_s3_stream output that provides true streaming S3 writes with proper partition routing, constant memory usage, and multipart upload support. This addresses critical memory scalability and data correctness issues in the existing aws_s3 output when writing partitioned data.
Motivation
The current aws_s3 output with batching has fundamental limitations that prevent it from being used in production for partitioned data pipelines:
1. Memory Scalability Issues
The batch approach buffers entire batches in memory before writing, causing memory usage to scale linearly with data volume:
| Dataset Size | Batch Memory | Streaming Memory | Reduction |
|---|---|---|---|
| 50K events | 1.73 GB | 0.17 GB | 90% |
| 100K events | 3.31 GB | 0.18 GB | 95% |
| 500K events | FAILED | 0.17 GB | 93%+ |
Impact:
- Cannot process large datasets without massive memory allocation
- Requires expensive, over-provisioned infrastructure
- Fails completely at scale (tested: 500K events crashed after 15+ minutes)
2. Partition Routing Bug (Data Correctness)
The batch approach has a critical bug when batch boundaries cross partition boundaries - events are written to the wrong partitions:
Test: 10K events, alternating accounts, batch_size=1000
Batch Result (BROKEN):
account=353785743975/:
- 499 events from account 353785743975 ✓
- 501 events from account 999888777666 ✗ WRONG PARTITION!
account=999888777666/:
- 500 events from account 999888777666 ✓
- 500 events from account 353785743975 ✗ WRONG PARTITION!
Streaming Result (CORRECT):
account=353785743975/: 5,000 events from 353785743975 (100%)
account=999888777666/: 5,000 events from 999888777666 (100%)
Impact:
- Data corruption: Events appear in wrong partitions
- Security issue: Multi-tenant data leakage (account A's data in account B's partition)
- Compliance violation: Data residency requirements broken
- Query errors: Partition pruning fails, aggregations return incorrect results
- Note: Similar partition routing bug addressed in parallel PR Add aws_s3_parquet_stream output for memory-efficient Parquet streaming to S3 #661 for
aws_s3_parquet_stream
3. Performance Degradation at Scale
Batch performance becomes non-linear and eventually fails:
| Dataset | Batch Time | Streaming Time | Improvement |
|---|---|---|---|
| 50K | 2.88s | 4.80s | -67% slower |
| 100K | 6.13s | 11.19s | -82% slower |
| 500K | >900s (FAILED) | 38.56s | 24× faster |
At small scales, batch is faster. At production scales, batch fails completely while streaming succeeds.
4. Small File Problem
Batch creates many small files instead of optimal large files per partition:
- 50K: 5 files @ 15.8 MB each (batch) vs 1 file @ 78.9 MB (streaming)
- 100K: 9 files @ 15.8 MB each (batch) vs 1 file @ 157.6 MB (streaming)
- 500K: 13+ incomplete files (batch) vs 1 file @ 788 MB (streaming)
Impact:
- Poor query performance (more S3 LIST operations, more file opens)
- Data lake "small file problem" (Iceberg/Hive/Delta tables)
- Higher S3 costs (more API calls)
Proposed Solution
Implement aws_s3_stream output that:
- Streams data directly to S3 multipart uploads without buffering entire batches
- Maintains separate writers per partition using
partition_byparameter - Evaluates partition routing per-message instead of per-batch
- Uses constant memory (~170-260 MB regardless of dataset size)
Design
Similar in approach to the aws_s3_parquet_stream implementation (PR #661), but adapted for generic text-based S3 writes:
output:
aws_s3_stream:
bucket: my-bucket
region: us-west-2
path: 'logs/date=${! meta("date") }/account=${! meta("account") }/${! uuid_v4() }.json'
# Partition routing - creates separate writer per partition
partition_by:
- '${! meta("date") }'
- '${! meta("account") }'
# Buffer settings - flushes when ANY condition met
max_buffer_bytes: 5242880 # 5MB (S3 multipart optimal)
max_buffer_count: 10000 # Safety for tiny messages
max_buffer_period: 10s # Low-volume partition flushing
# Optional compression (applied in pipeline, not here)
content_type: 'application/x-ndjson'
content_encoding: 'gzip' # If using compress processorKey Features
- Per-partition writer pooling: Maintains active multipart upload per partition
- Automatic buffer flushing: Triggers on size, count, or time (whichever comes first)
- Graceful shutdown: Completes all multipart uploads on pipeline termination
- S3 multipart optimization: 5MB parts for optimal throughput
- Content type flexibility: Supports JSON, NDJSON, CSV, or any text format
Why No Built-in Compression?
Unlike aws_s3, this output does not provide a compress parameter. Compression should be handled in the pipeline using the compress processor instead:
Reason: S3 multipart uploads require non-final parts to be ≥5MB. After compressing a 5MB buffer, the compressed size is unpredictable (often <5MB), causing S3 to reject the upload. Handling this correctly would require complex logic to either:
- Buffer much larger amounts (15-20MB+) uncompressed before compressing, negating memory benefits
- Implement adaptive logic to skip compression on undersized parts, creating inconsistent compression within files
- Fall back to uploading uncompressed data when compressed size is too small, adding complexity
Solution: Use pipeline-level compression which operates on messages before buffering:
pipeline:
processors:
- compress:
algorithm: gzip
output:
aws_s3_stream:
content_encoding: 'gzip' # Set metadata for S3This approach works reliably because concatenated gzip streams are valid (each compressed message can be decompressed independently). Testing shows 54% storage reduction with ~31% performance overhead.
Implementation
Core Components
-
output_aws_s3_stream.go: Main output plugin- Registers as
aws_s3_streamoutput type - Manages writer pool (map of partition key → writer)
- Routes messages to correct partition writer
- Handles graceful shutdown
- Registers as
-
s3_streaming_writer.go: Generic S3 streaming writer- Manages S3 multipart upload for one partition
- Buffers data until flush threshold met
- Handles part uploads and completion
- Thread-safe for concurrent access
-
Configuration parameters:
- All standard S3 auth/config (reuse existing
aws_s3config) partition_by: Array of interpolated strings for partition keymax_buffer_bytes: Buffer size thresholdmax_buffer_count: Message count thresholdmax_buffer_period: Time threshold
- All standard S3 auth/config (reuse existing
Pipeline Integration
Works seamlessly with existing pipeline processors:
pipeline:
processors:
# Extract partition metadata
- bloblang: |
root = this
meta date = (this.timestamp / 1000).ts_unix().format_timestamp("2006-01-02")
meta account = this.account_id
# Optional: Add newlines for NDJSON
- bloblang: |
root = content().string() + "\n"
# Optional: Compress in pipeline
- compress:
algorithm: gzip
output:
aws_s3_stream:
path: 'logs/date=${! meta("date") }/account=${! meta("account") }/${! uuid_v4() }.json.gz'
partition_by: ['${! meta("date") }', '${! meta("account") }']
content_encoding: 'gzip'Testing Results
Comprehensive testing with OCSF security event data (complex nested JSON):
Memory Efficiency
- Constant memory: 170-260 MB regardless of dataset size
- 90-95% reduction vs batch approach
- Works in constrained environments: Lambda (512 MB), containers (<1 GB)
Partition Correctness
- 100% correct routing: All events in proper partitions
- Zero data leakage: Multi-tenant isolation maintained
- Validated with interleaved data: Alternating accounts every message
Performance
- Linear scaling: Time increases predictably with data size
- Reliable at scale: 500K events in 38.56s (batch failed)
- Optional compression: GZIP adds 31% time, saves 54% storage
Compression Trade-offs (500K events)
| Metric | Uncompressed | GZIP | Change |
|---|---|---|---|
| Time | 38.56s | 50.56s | +31% slower |
| Memory | 0.17 GB | 0.26 GB | +90 MB |
| Size | 788 MB | 360 MB | -54% |
Benefits
For Users
- Correct data: No more partition routing bugs
- Lower costs: 90% less memory = smaller instances
- Better performance: Handles datasets batch approach cannot
- Predictable scaling: Linear time/memory characteristics
For Platform Teams
- Production-ready: Thoroughly tested with 500K+ events
- Drop-in replacement: Same config structure as
aws_s3 - Flexible: Works with any text format (JSON, CSV, logs, etc.)
- Debuggable: Clear writer lifecycle logging
For Data Lakes
- Optimal file sizes: One large file per partition
- Better query performance: Fewer files to scan
- Partition pruning works: Correct data in correct partitions
- Standards compliant: Works with Iceberg, Hive, Delta Lake
Migration Path
Current (Broken)
output:
aws_s3:
path: 'partition=${! meta("key") }/${! uuid_v4() }.json'
batching:
count: 10000
processors:
- archive:
format: linesMigrated (Fixed)
pipeline:
processors:
- bloblang: |
root = content().string() + "\n"
output:
aws_s3_stream:
path: 'partition=${! meta("key") }/${! uuid_v4() }.json'
partition_by: ['${! meta("key") }']
max_buffer_bytes: 5242880Key changes:
- Move
archive: lineslogic to pipeline (add\n) - Add
partition_byparameter for routing - Remove
batchingsection from output - Use
aws_s3_streaminstead ofaws_s3
Related Work
- PR Add aws_s3_parquet_stream output for memory-efficient Parquet streaming to S3 #661:
aws_s3_parquet_stream- Parallel PR solving same partition routing bug for Parquet format - This PR:
aws_s3_stream- Streaming approach for generic text formats (JSON, CSV, logs, etc.) - Common pattern: Both use per-message partition evaluation and writer pooling