Skip to content

Commit ac99143

Browse files
authored
[otap-df-quiver] Segment file reader/writer for Quiver durable storage layer. (open-telemetry#1643)
Implements Segment file reader/writer for Quiver durable storage layer
1 parent f1fc6bd commit ac99143

File tree

17 files changed

+6379
-153
lines changed

17 files changed

+6379
-153
lines changed

rust/otap-dataflow/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ ahash = "0.8.11"
5555
arc-swap = "1.7"
5656
arrayvec = "0.7.6"
5757
arrow = { version = "57.0", features=["prettyprint"] }
58+
arrow-buffer = { version = "57.0" }
5859
arrow-ipc = { version = "57.0", features=["zstd"] }
5960
arrow-schema = { version = "57.0" }
6061
arrow-array = { version = "57.0" }
@@ -92,6 +93,7 @@ mimalloc = { version = "0.1.48", features = ["extended", "v3", "debug"] }
9293
libmimalloc-sys = { version = "0.1.44", features = ["extended", "v3"] }
9394
tikv-jemallocator = { version = "0.6.1" }
9495
tikv-jemalloc-ctl = { version = "0.6.1" }
96+
memmap2 = "0.9"
9597
nix = { version = "0.30.1", features = ["process", "signal", "fs"] }
9698
notify = "8.0" # Uses platform-native backend: inotify (Linux), kqueue (macOS), ReadDirectoryChanges (Windows)
9799
num_enum = "0.7"

rust/otap-dataflow/crates/quiver/ARCHITECTURE.md

Lines changed: 214 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ without data loss.
99
## Proposed Solution: Quiver
1010

1111
We propose building Quiver: a standalone, embeddable Arrow-based segment store
12-
packaged as a reusable Rust crate. Quiver *does not exist yet*; this document
12+
packaged as a reusable Rust crate. Quiver *is not fully implemented yet*; this document
1313
defines its initial design, scope, and open questions. While it will be
1414
developed first for `otap-dataflow`, we intend to keep it decoupled so it can
1515
integrate into other telemetry pipelines or streaming systems that need durable
@@ -22,11 +22,12 @@ value: a fixed set of payload slots (`Logs`, `LogAttrs`, `ScopeAttrs`,
2222

2323
### Core Concepts
2424

25-
**Segment Store**: Immutable Arrow IPC files containing batches of telemetry.
26-
Each segment:
25+
**Segment Store**: Immutable files containing multiple Arrow IPC file streams
26+
with batches of telemetry.
27+
Each segment file:
2728

28-
- Groups multiple `RecordBundle` arrivals (8-64MB target size) and persists the
29-
per-slot Arrow streams they reference.
29+
- Groups multiple `RecordBundle` arrivals (32MB default target size) and persists
30+
the per-slot Arrow streams they reference.
3031
- Supports many payload types and evolving schemas inside the same segment via
3132
a stream directory + batch manifest.
3233
- Contains metadata: time ranges, signal type (via adapter), schema fingerprints,
@@ -458,6 +459,48 @@ Quiver segments are containers around Arrow IPC streams plus a manifest
458459
that describes how those streams reassemble back into the `RecordBundle`
459460
abstraction used by the embedding pipeline.
460461

462+
#### Why a Custom Format Instead of Plain Arrow IPC?
463+
464+
Arrow IPC (both streaming and file formats) requires all `RecordBatch`es in a
465+
single stream to share the same schema. This constraint conflicts with OTAP's
466+
data model in several ways:
467+
468+
1. **Multiple payload types per bundle**: Each `RecordBundle` (OTAP batch)
469+
contains multiple payload slots (`Logs`, `LogAttrs`, `ScopeAttrs`,
470+
`ResourceAttrs`, etc.), each with a completely different schema. These
471+
cannot coexist in a single Arrow IPC stream.
472+
473+
2. **Schema evolution within a payload type**: Even for a single payload slot,
474+
the schema can change from one bundle to the next:
475+
- Optional columns may appear or disappear (e.g., `str` attribute column
476+
omitted when no string attributes are present)
477+
- Dictionary-encoded columns may switch between `Dictionary<u8, Utf8>`,
478+
`Dictionary<u16, Utf8>`, or native `Utf8` based on cardinality
479+
480+
3. **Optional payloads**: Some slots may be absent entirely for a given bundle
481+
(e.g., no `ScopeAttrs` when scope attributes are empty).
482+
483+
Alternative approaches considered:
484+
485+
- **One Arrow IPC file per payload type**: Simple format, but explodes the
486+
number of files to manage (one per slot x schema variation x segment).
487+
- **One Arrow IPC stream per `RecordBatch`**: Maximum flexibility, but repeats
488+
schema metadata for every batch and prevents dictionary delta encoding.
489+
490+
The Quiver segment format takes a middle path: interleave multiple Arrow IPC
491+
*file* streams (one per `(slot, schema_fingerprint)` pair) inside a single
492+
container file, with a manifest that records how to reconstruct each original
493+
`RecordBundle`. This preserves:
494+
495+
- **Standard Arrow IPC reading**: Each stream is a valid Arrow IPC file that
496+
can be handed directly to `arrow_ipc::FileReader` (via memory-mapped slice).
497+
- **Efficient storage**: Batches with the same schema share a stream, enabling
498+
dictionary delta encoding and avoiding repeated schema metadata.
499+
- **Zero-copy access**: The entire segment can be memory-mapped; readers seek
500+
to stream offsets without copying data.
501+
- **Bundle reconstruction**: The batch manifest records `(stream_id, chunk_index)`
502+
per slot, allowing readers to reassemble the original `RecordBundle` ordering.
503+
461504
#### Envelope Overview
462505

463506
- The segment header contains two primary sections:
@@ -488,15 +531,160 @@ graph TD
488531
F --> |mmap| G[Segment Reader]
489532
```
490533

534+
#### Segment File Layout
535+
536+
A segment file uses a variable-size footer with a fixed-size trailer, enabling
537+
future versions to extend the footer without breaking backwards compatibility:
538+
539+
```text
540+
+-------------------------------------------------------------------------+
541+
| Stream Data Region |
542+
| Stream 0: Arrow IPC File bytes |
543+
| Stream 1: Arrow IPC File bytes |
544+
| ... |
545+
+-------------------------------------------------------------------------+
546+
| Stream Directory |
547+
| Encoded as Arrow IPC (self-describing schema) |
548+
| Columns: stream_id, slot_id, schema_fingerprint, byte_offset, |
549+
| byte_length, row_count, chunk_count |
550+
+-------------------------------------------------------------------------+
551+
| Batch Manifest |
552+
| Encoded as Arrow IPC (self-describing schema) |
553+
| Columns: bundle_index, slot_refs (List<Struct>) |
554+
+-------------------------------------------------------------------------+
555+
| Footer (variable size, version-dependent) |
556+
| Version 1 (34 bytes): |
557+
| - version: u16 |
558+
| - stream_count: u32 |
559+
| - bundle_count: u32 |
560+
| - directory_offset: u64 |
561+
| - directory_length: u32 |
562+
| - manifest_offset: u64 |
563+
| - manifest_length: u32 |
564+
| (Future versions may add fields here) |
565+
+-------------------------------------------------------------------------+
566+
| Trailer (fixed 16 bytes) |
567+
| - footer_size: u32 (size of footer, not including trailer) |
568+
| - magic: b"QUIVER\0S" (8 bytes) |
569+
| - crc32: u32 (covers entire file from start through trailer, |
570+
| except the CRC field itself) |
571+
+-------------------------------------------------------------------------+
572+
```
573+
574+
**Reading a segment file:**
575+
576+
1. Seek to end of file, read the fixed 16-byte trailer
577+
2. Validate magic bytes (`QUIVER\0S`)
578+
3. Read `footer_size` to determine footer location
579+
4. Seek back `footer_size` bytes, read the variable-size footer
580+
5. Parse version from footer to determine how to interpret remaining fields
581+
6. Use directory/manifest offsets to locate metadata sections
582+
583+
#### Segment File Naming
584+
585+
Segment files are named using a zero-padded 16-digit sequence number with
586+
the `.qseg` extension:
587+
588+
```text
589+
{segment_seq:016}.qseg
590+
```
591+
592+
Examples:
593+
594+
- `0000000000000000.qseg` (sequence 0)
595+
- `0000000000000001.qseg` (sequence 1)
596+
- `0000000000123456.qseg` (sequence 123456)
597+
598+
The 16-digit zero-padding ensures lexicographic ordering matches numeric
599+
ordering, allowing simple directory listings to enumerate segments in order.
600+
The `SegmentSeq::to_filename_component()` method generates this format.
601+
602+
#### Read-Only Enforcement
603+
604+
Finalized segment files are immutable by design. After writing completes,
605+
`SegmentWriter` calls `sync_all()` (fsync) to ensure data is persisted to
606+
disk, then sets restrictive file permissions to prevent accidental modification:
607+
608+
- **Unix**: Permissions are set to `0o440` (read-only for owner and group,
609+
no access for others). This provides defense-in-depth against accidental
610+
writes while still allowing the process and admin group to read.
611+
- **Non-Unix**: Uses the platform's `set_readonly(true)` mechanism.
612+
613+
This immutability guarantee is critical for:
614+
615+
- **CRC integrity**: Any modification would invalidate the file's checksum
616+
- **mmap safety**: Memory-mapped reads assume file contents don't change
617+
- **Concurrent readers**: Background processes can safely read segments without
618+
coordination (though additional work is needed for safe deletion of segments
619+
that may still be in use)
620+
621+
#### Slot Reference Encoding
622+
623+
The batch manifest stores slot references using Arrow's native `List<Struct>`
624+
type. Each manifest entry has a `slot_refs` column containing a list of
625+
structs, where each struct maps a slot to a specific chunk within a stream:
626+
627+
```text
628+
slot_refs: List<Struct<slot_id: UInt16, stream_id: UInt32, chunk_index: UInt32>>
629+
```
630+
631+
Each struct in the list contains:
632+
633+
- `slot_id` (UInt16): The logical payload slot (e.g., Logs=1, LogAttrs=2)
634+
- `stream_id` (UInt32): Index into the stream directory
635+
- `chunk_index` (UInt32): Which Arrow RecordBatch within that stream
636+
637+
Example: A bundle with 4 slots would have a `slot_refs` list containing:
638+
639+
| slot_id | stream_id | chunk_index |
640+
|---------|-----------|-------------|
641+
| 1 | 0 | 0 |
642+
| 2 | 1 | 0 |
643+
| 30 | 2 | 0 |
644+
| 31 | 3 | 0 |
645+
646+
Using Arrow's nested types avoids string parsing and leverages the existing
647+
IPC decoder. The struct field types use the `ArrowPrimitive` trait to ensure
648+
type synchronization between the Rust newtypes (`SlotId`, `StreamId`,
649+
`ChunkIndex`) and their Arrow schema representation.
650+
651+
#### Error Handling and Recovery
652+
653+
Segment files are designed to be safely detectable as corrupt or incomplete:
654+
655+
| Error Condition | Detection Mechanism | Recovery Action |
656+
|-----------------|---------------------|-----------------|
657+
| Truncated file | File too short for trailer (< 16 bytes) | `SegmentError::Truncated` - skip file |
658+
| Invalid magic | Trailer magic bytes mismatch | `SegmentError::InvalidFormat` - skip file |
659+
| CRC mismatch | Computed CRC != stored CRC | `SegmentError::ChecksumMismatch` - skip file |
660+
| Partial write | CRC mismatch (write interrupted) | `SegmentError::ChecksumMismatch` - skip file |
661+
| Invalid IPC | Arrow decoder failure | `SegmentError::Arrow` - skip file |
662+
| Missing stream | Stream ID not in directory | `SegmentError::StreamNotFound` |
663+
| Missing slot | Slot not in manifest entry | `SegmentError::SlotNotInBundle` |
664+
665+
**Partial write safety**: The CRC32 at the end of the file is written last.
666+
If a write is interrupted (crash, power loss), one of three outcomes occurs:
667+
668+
1. File is too short to contain a valid trailer -> detected as truncated
669+
2. File has garbage at the end -> CRC mismatch
670+
3. File was written completely -> CRC validates
671+
672+
This design ensures that partially written segment files are never mistaken
673+
for valid data. The engine can safely skip corrupt segments during startup
674+
and continue operating with the valid ones.
675+
491676
#### Arrow IPC Encoding
492677

493678
- While a segment is open, Quiver appends messages to each stream using the
494679
Arrow **streaming** format so we can keep adding batches without rewriting
495680
footers.
496-
- On finalize, each stream flushes any buffered messages, writes an Arrow
497-
**file** footer, and aligns the slice on an 8-byte boundary. The header stores
498-
the final offsets and lengths so readers can memory map the slice and hand it
499-
directly to `arrow_ipc::FileReader`.
681+
- On finalize, each stream flushes any buffered messages and writes an Arrow
682+
**file** footer. When writing to disk, each stream is aligned to a 64-byte
683+
boundary. This ensures optimal cache-line alignment for zero-copy mmap reads
684+
and efficient SIMD/AVX-512 access patterns. Arrow IPC uses 8-byte alignment
685+
internally for data buffers; our 64-byte stream alignment ensures those
686+
offsets remain optimally aligned in the mmap region for modern CPU
687+
architectures.
500688
- During replay, the reader consults the manifest to rebuild each
501689
`RecordBundle`, hydrating only the payloads the consumer requested.
502690

@@ -517,18 +705,23 @@ sequenceDiagram
517705

518706
#### Dictionary Handling
519707

520-
- Each `(slot, schema)` stream keeps dictionary encoding intact. While bundles
521-
accumulate we capture the union of dictionary values per column. When
522-
finalizing the segment we rebuild those columns against a deterministic
523-
vocabulary and emit the Arrow IPC **file** with the canonical dictionary in
524-
the header. Readers reopen the slice via `arrow_ipc::FileReader`, which
525-
replays the seeded dictionaries before yielding the chunk referenced by the
526-
manifest.
527-
- Dictionaries stay deterministic for the lifetime of a stream because the
528-
final vocabulary is chosen from the accumulated batches. If a stream would
529-
exceed configured cardinality limits we rotate to a fresh stream (resetting
530-
dictionary ids) rather than serializing delta messages. That mirrors the
531-
in-memory lifecycle in `otap-dataflow` and keeps chunks self-contained.
708+
- Each `(slot, schema)` stream preserves dictionary encoding exactly as received.
709+
Quiver uses Arrow IPC's `DictionaryHandling::Resend` mode, where each batch
710+
includes its full dictionary. This ensures **schema fidelity**: readers receive
711+
the exact same dictionary key types (e.g., `UInt8` vs `UInt16`) that writers sent.
712+
- **Design rationale**: Dictionary unification (merging vocabularies across batches)
713+
could widen key types when cardinality exceeds the original type's capacity.
714+
For example, if batches arrive with `DictionaryArray<UInt8>` but the unified
715+
vocabulary exceeds 255 values, unification would produce `DictionaryArray<UInt16>`.
716+
This breaks round-trip schema guarantees, which is unacceptable for a persistence
717+
layer whose job is faithful reproduction.
718+
- **Trade-offs**:
719+
- *Pro*: Exact schema preservation - readers get back what writers sent
720+
- *Pro*: Each batch is self-contained and independently readable
721+
- *Con*: Larger file sizes due to duplicate dictionary values, which also
722+
increases memory consumption when segments are memory-mapped for reading
723+
- This design decision may be revisited if future performance measurements
724+
indicate that the size/memory overhead is a significant concern.
532725

533726
#### DataFusion Integration
534727

rust/otap-dataflow/crates/quiver/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@ path = "src/lib.rs"
1919
default = ["otap-dataflow-integrations"]
2020
otap-dataflow-integrations = []
2121
serde = ["dep:serde"]
22+
mmap = ["dep:memmap2"]
2223

2324
[dependencies]
2425
arrow-array.workspace = true
26+
arrow-buffer.workspace = true
2527
arrow-schema.workspace = true
2628
arrow-ipc.workspace = true
29+
bytes.workspace = true
2730
crc32fast.workspace = true
2831
blake3.workspace = true
2932
parking_lot.workspace = true
3033
serde = { workspace = true, optional = true }
3134
thiserror.workspace = true
35+
memmap2 = { workspace = true, optional = true }
3236

3337
[target.'cfg(unix)'.dependencies]
3438
nix.workspace = true
@@ -43,3 +47,7 @@ workspace = true
4347
[[bench]]
4448
name = "ingest"
4549
harness = false
50+
51+
[[bench]]
52+
name = "segment"
53+
harness = false

0 commit comments

Comments
 (0)