Skip to content

Conversation

@triddell
Copy link
Contributor

@triddell triddell commented Jan 20, 2026

Overview

This PR adds a new aws_s3_parquet_stream output plugin that streams Parquet files directly to S3 using multipart uploads, addressing memory limitations and partition routing issues in the current batch approach.

Resolves: #660
Related: #659 (partition routing bug)

Motivation

Problem 1: High Memory Usage

The current approach (aws_s3 + parquet_encode processor) buffers entire Parquet files in memory before uploading to S3:

# Current approach - high memory usage
output:
  aws_s3:
    batching:
      count: 100000
      processors:
        - parquet_encode:
            schema: [...]

Memory usage for 100K events (minimal schema):

  • Current approach: ~2.2 GB
  • This PR: ~270 MB
  • Reduction: 88%

For production workloads with complex schemas (OCSF) or larger datasets, memory can exceed 10+ GB, causing OOM errors in constrained environments.

Problem 2: Partition Routing

As documented in #659, the batch approach evaluates path expressions once per batch, causing incorrect partition routing:

path: 'data/account=${! meta("account") }/${! uuid_v4() }.parquet'

With 100K events across 2 accounts:

  • Expected: 2 files (one per account)
  • Actual: 1 file with all 100K events in wrong partition

This breaks Iceberg/Hive/Delta table partitioning and causes silent data corruption.

Solution

This PR introduces aws_s3_parquet_stream with:

  1. Incremental row group streaming - Uploads Parquet row groups to S3 as they're generated
  2. Per-message partition routing - Routes each message to correct writer based on partition_by expressions
  3. S3 multipart uploads - Streams data without buffering complete files
  4. Thread-safe writer pooling - Manages multiple concurrent partition writers

Architecture

Input Messages
    ↓
Partition Router (evaluates partition_by per message)
    ↓
Writer Pool (one writer per partition key)
    ├─ Row Buffer (configurable size, default 10K rows)
    ├─ Row Group Encoder (Parquet columnar encoding)
    ├─ Compression (Snappy/Zstd/Gzip/Brotli/LZ4)
    └─ Upload Buffer (accumulates until 5MB for S3 part)
    ↓
S3 Multipart Upload (streaming)

Memory profile per writer: ~30-60 MB (independent of dataset size)

Key Features

1. Partition Routing with partition_by

output:
  aws_s3_parquet_stream:
    bucket: my-bucket
    path: 'events/date=${! meta("date") }/account=${! meta("account") }/${! uuid_v4() }.parquet'

    # Messages with same partition values → same file
    partition_by:
      - '${! meta("date") }'
      - '${! meta("account") }'

    schema_file: ./schemas/events.yml
    row_group_size: 10000
  • Each message evaluated against partition_by expressions
  • Messages routed to correct writer based on partition key
  • Path evaluated once per partition (UUID works correctly)
  • Multiple concurrent writers for different partitions

2. Schema Loading

Supports both inline and external schema definitions:

# Inline schema
schema:
  - name: id
    type: INT64
  - name: message
    type: UTF8

# External schema file
schema_file: ./schemas/ocsf_network_activity.yml

3. Compression Support

All Parquet compression types supported:

  • uncompressed
  • snappy (default)
  • gzip
  • zstd
  • brotli
  • lz4raw

4. Configurable Row Groups

row_group_size: 5000  # Smaller = less memory, more row groups

Implementation Details

Files Added

  1. output_aws_s3_parquet_stream.go (543 lines)

    • Main output plugin implementation
    • Partition routing logic
    • Writer pool management
    • Configuration parsing
  2. parquet_streaming_writer.go (623 lines)

    • Streaming Parquet writer with S3 multipart uploads
    • Row group buffering and encoding
    • Parquet footer generation
    • Thread-safe operations
  3. output_aws_s3_parquet_stream_test.go (327 lines)

    • Unit tests for configuration parsing
    • Schema generation tests
    • Path interpolation tests
    • Writer pooling tests
  4. parquet_streaming_writer_test.go (216 lines)

    • Unit tests for streaming writer
    • Row group encoding tests
    • S3 multipart upload tests
  5. output_aws_s3_parquet_stream_localstack_test.go (266 lines)

    • Integration tests with LocalStack (mock S3)
    • End-to-end write verification
    • Partition routing validation
    • Parquet file validation

Files Modified

  1. internal/impl/parquet/convert.go (+17 lines)

    • Added json.Number type support for int32/int64 conversion
    • Required for JSON parsers that return json.Number
  2. internal/impl/parquet/schema.go (+26 lines)

    • Exported SchemaOpts struct for use by output plugins
    • Maintains backward compatibility with internal usage

Testing

Unit Tests

  • 22 tests covering configuration, schema generation, path interpolation, Parquet serialization, and S3 multipart upload lifecycle
  • Includes mock-based S3 client testing via S3API interface
  • All passing ✅

Integration Tests

  • LocalStack-based tests (no real AWS required)
  • Tests basic write operations with row group verification
  • Tests partition_by with multiple partitions
  • Validates Parquet file structure and correctness

Manual Testing

Extensive testing documented in project with:

  • 100K+ event datasets
  • Multiple partition configurations
  • Memory profiling and benchmarking
  • Comparison with batch approach

Performance

Test: 100K events with minimal schema

Metric Batch Approach Streaming (This PR) Improvement
Memory 2.2 GB 270 MB 88% reduction
Time 4.0s 3.4s 15% faster
Partitioning ❌ Broken ✅ Correct Fixed

Test: Multiple partitions (2 dates × 2 accounts)

Metric Batch Approach Streaming (This PR)
Files created 1 (wrong) 4 (correct)
Data integrity ❌ Corrupt ✅ Valid

Backwards Compatibility

  • New output plugin, no changes to existing outputs
  • Only minor additions to shared parquet package
  • No breaking changes to existing functionality

Configuration Example

output:
  aws_s3_parquet_stream:
    bucket: data-lake
    region: us-west-2
    path: 'events/date=${! meta("date") }/tenant=${! meta("tenant") }/${! uuid_v4() }.parquet'

    partition_by:
      - '${! meta("date") }'
      - '${! meta("tenant") }'

    schema_file: ./schemas/ocsf_minimal.yml
    default_compression: snappy
    row_group_size: 10000

    batching:
      count: 10000
      period: 10s

Use Cases

This output is particularly valuable for:

  • High-volume event streaming to S3 data lakes
  • OCSF or complex schemas requiring large memory
  • Iceberg/Hive/Delta tables with dynamic partitioning
  • Memory-constrained environments (containers, Lambda, ECS)
  • Production workloads requiring data integrity guarantees

Adds a new output plugin that writes Parquet files to S3 using multipart
uploads with incremental row group streaming, reducing memory usage by 88%
compared to the batch approach.

Key features:
- Streams row groups directly to S3 (no full file buffering)
- partition_by parameter for correct partition routing
- schema_file parameter for external schema definitions
- Supports all Parquet compression types (snappy, zstd, gzip, etc.)
- Added S3API interface to enable mock-based testing
- Updated StreamingParquetWriter to use S3API instead of concrete *s3.Client
- Added 8 new unit tests covering S3 multipart upload lifecycle:
  - Initialization and double-init protection
  - Write lifecycle (before init, after close)
  - Part upload tracking
  - CompleteMultipartUpload on Close
  - Multiple parts with correct numbering
  - AbortMultipartUpload on errors
Fixes "insufficient definition levels" errors by:
1. Using consistent schema/message type generation (both with struct tags + pointers)
2. Preserving actual column metadata from temp files instead of approximating
@triddell
Copy link
Contributor Author

Fixed Nested Optional Structs

Found and fixed the "insufficient definition levels" bug for nested optional STRUCT fields.

Root causes:

  1. Schema and message types had inconsistent optional field representations (struct tags vs pointers)
  2. Footer generation approximated column metadata instead of preserving it from temp files

Fix:

  • Unified type generation with consistent representation
  • Extract and preserve actual column metadata from each temp file's footer

Tests verify column metadata extraction from temp files and proper definition/repetition level encoding in streaming parquet output.
- Fix documentation typo: timestamp().format() → now().ts_format()
- Replace fmt.Errorf with errors.New for static error strings
- Update AWS SDK endpoint resolver to use BaseEndpoint
- Update testify assertions to use specific assertion methods
- Remove unused test struct fields
- Regenerate documentation
@triddell
Copy link
Contributor Author

CI Fixes

Fixed all linting errors and CI check failures:

  • Fixed documentation syntax errors (MDX parsing issues with < and > characters)
  • Replaced fmt.Errorf with errors.New for static error strings
  • Updated deprecated AWS SDK endpoint resolver to use BaseEndpoint
  • Fixed testify verbose assertions
  • Removed unused test struct fields
  • Applied gofmt formatting

All tests and lint checks now pass. I will review the generated documentation output more closely for any additional improvements needed.

Preserve ColumnIndex and OffsetIndex from temporary parquet files to enable
query engine optimization through page-level pruning and predicate pushdown.

- Extract and store page index data during row group flush
- Write in correct Parquet format: all ColumnIndex, then all OffsetIndex
- Track filePosition separately from uploadSize for accurate offset calculation
- Add comprehensive unit tests for page index preservation
@triddell
Copy link
Contributor Author

Page Index Preservation Fix

Issue

Further in-depth testing revealed that while page index metadata pointers (ColumnIndexOffset, OffsetIndexOffset, etc.) were being copied from temporary files, the actual ColumnIndex and OffsetIndex binary data was not preserved. This prevented query engines from utilizing page-level pruning.

Solution

  • Extract and store page index data during row group flush
  • Write in correct Parquet format: all ColumnIndex, then all OffsetIndex
  • Track filePosition separately from uploadSize for accurate offsets
  • Add 4 unit tests verifying preservation, ordering, and offset calculation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant