|
| 1 | +# Arrow Columnar Data Processing |
| 2 | + |
| 3 | +This document explains the Apache Arrow implementation introduced for efficient batch aggregation of oracle observations in the LLO (Low-Latency Oracle) system. |
| 4 | + |
| 5 | +## Overview |
| 6 | + |
| 7 | +### Why Arrow? |
| 8 | + |
| 9 | +The Arrow implementation addresses key performance challenges: |
| 10 | + |
| 11 | +1. **Memory Efficiency** - Replaces the 1GB static memory ballast with controlled, bounded allocation |
| 12 | +2. **Batch Processing** - Enables efficient columnar operations on thousands of observations simultaneously |
| 13 | +3. **Reduced Allocations** - Builder pooling minimizes GC pressure during repeated aggregation cycles |
| 14 | + |
| 15 | +### Dependencies |
| 16 | + |
| 17 | +```go |
| 18 | +github.com/apache/arrow-go/v18 v18.3.1 |
| 19 | +``` |
| 20 | + |
| 21 | +## Architecture |
| 22 | + |
| 23 | +``` |
| 24 | +┌─────────────────────────────────────────────────────────────────────────────┐ |
| 25 | +│ Arrow Data Pipeline │ |
| 26 | +├─────────────────────────────────────────────────────────────────────────────┤ |
| 27 | +│ │ |
| 28 | +│ Node Observations │ |
| 29 | +│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ |
| 30 | +│ │ Observer │ │ Observer │ │ Observer │ ... │ |
| 31 | +│ │ 0 │ │ 1 │ │ N │ │ |
| 32 | +│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ |
| 33 | +│ │ │ │ │ |
| 34 | +│ └────────────┼────────────┘ │ |
| 35 | +│ ▼ │ |
| 36 | +│ ┌────────────────────────┐ │ |
| 37 | +│ │ ArrowObservationMerger │ │ |
| 38 | +│ │ MergeObservations() │ │ |
| 39 | +│ └───────────┬────────────┘ │ |
| 40 | +│ ▼ │ |
| 41 | +│ ┌────────────────────────┐ │ |
| 42 | +│ │ Arrow Record │ (ObservationSchema) │ |
| 43 | +│ │ [observer_id, stream_id, value_type, values...] │ |
| 44 | +│ └───────────┬────────────┘ │ |
| 45 | +│ ▼ │ |
| 46 | +│ ┌────────────────────────┐ │ |
| 47 | +│ │ ArrowAggregator │ │ |
| 48 | +│ │ AggregateObservations()│ │ |
| 49 | +│ └───────────┬────────────┘ │ |
| 50 | +│ ▼ │ |
| 51 | +│ ┌────────────────────────┐ │ |
| 52 | +│ │ StreamAggregates │ (per-stream aggregated values) │ |
| 53 | +│ └────────────────────────┘ │ |
| 54 | +│ │ |
| 55 | +└─────────────────────────────────────────────────────────────────────────────┘ |
| 56 | +``` |
| 57 | + |
| 58 | +## Schemas |
| 59 | + |
| 60 | +Four Arrow schemas are defined in `llo/arrow_schemas.go`: |
| 61 | + |
| 62 | +### 1. ObservationSchema |
| 63 | + |
| 64 | +Stores merged observations from all nodes for batch aggregation. |
| 65 | + |
| 66 | +| Column | Type | Description | |
| 67 | +|--------|------|-------------| |
| 68 | +| `observer_id` | uint8 | Node that produced the observation (0-255) | |
| 69 | +| `stream_id` | uint32 | Stream identifier | |
| 70 | +| `value_type` | uint8 | Type discriminator (see Value Types) | |
| 71 | +| `decimal_value` | binary | Encoded decimal value | |
| 72 | +| `quote_bid` | binary | Quote bid component | |
| 73 | +| `quote_benchmark` | binary | Quote benchmark component | |
| 74 | +| `quote_ask` | binary | Quote ask component | |
| 75 | +| `observed_at_ns` | uint64 | Provider timestamp (nanoseconds) | |
| 76 | +| `timestamp_ns` | uint64 | Node observation timestamp | |
| 77 | + |
| 78 | +### 2. StreamAggregatesSchema |
| 79 | + |
| 80 | +Output from aggregation, input to report generation. |
| 81 | + |
| 82 | +| Column | Type | Description | |
| 83 | +|--------|------|-------------| |
| 84 | +| `stream_id` | uint32 | Stream identifier | |
| 85 | +| `aggregator` | uint32 | Aggregator type used | |
| 86 | +| `value_type` | uint8 | Type discriminator | |
| 87 | +| `decimal_value` | binary | Aggregated decimal | |
| 88 | +| `quote_*` | binary | Aggregated quote components | |
| 89 | +| `observed_at_ns` | uint64 | Observation timestamp | |
| 90 | + |
| 91 | +### 3. CacheSchema |
| 92 | + |
| 93 | +Observation cache with TTL-based expiration. |
| 94 | + |
| 95 | +| Column | Type | Description | |
| 96 | +|--------|------|-------------| |
| 97 | +| `stream_id` | uint32 | Stream identifier | |
| 98 | +| `value_type` | uint8 | Type discriminator | |
| 99 | +| `decimal_value` | binary | Cached decimal | |
| 100 | +| `quote_*` | binary | Cached quote components | |
| 101 | +| `observed_at_ns` | uint64 | Observation timestamp | |
| 102 | +| `expires_at_ns` | int64 | TTL expiration timestamp | |
| 103 | + |
| 104 | +### 4. TransmissionSchema |
| 105 | + |
| 106 | +Batched report transmissions with Arrow IPC compression. |
| 107 | + |
| 108 | +| Column | Type | Description | |
| 109 | +|--------|------|-------------| |
| 110 | +| `server_url` | string | Destination server | |
| 111 | +| `config_digest` | fixed[32] | Configuration hash | |
| 112 | +| `seq_nr` | uint64 | Sequence number | |
| 113 | +| `report_data` | large_binary | Encoded report | |
| 114 | +| `lifecycle_stage` | string | Report lifecycle stage | |
| 115 | +| `report_format` | uint32 | Format identifier | |
| 116 | +| `signatures` | list<binary> | Report signatures | |
| 117 | +| `signers` | list<uint8> | Signer indices | |
| 118 | +| `transmission_hash` | fixed[32] | Transmission hash | |
| 119 | +| `created_at_ns` | timestamp[ns] | Creation time | |
| 120 | + |
| 121 | +## Value Types |
| 122 | + |
| 123 | +Three value types are supported, identified by `value_type` column: |
| 124 | + |
| 125 | +```go |
| 126 | +const ( |
| 127 | + StreamValueTypeDecimal uint8 = 0 // Single decimal value |
| 128 | + StreamValueTypeQuote uint8 = 1 // Quote with bid/benchmark/ask |
| 129 | + StreamValueTypeTimestampd uint8 = 2 // Decimal with observation timestamp |
| 130 | +) |
| 131 | +``` |
| 132 | + |
| 133 | +### Decimal Encoding |
| 134 | + |
| 135 | +Values use `shopspring/decimal` binary encoding for precise representation: |
| 136 | + |
| 137 | +```go |
| 138 | +// Encode |
| 139 | +bytes, _ := decimal.MarshalBinary() |
| 140 | + |
| 141 | +// Decode |
| 142 | +var d decimal.Decimal |
| 143 | +d.UnmarshalBinary(bytes) |
| 144 | +``` |
| 145 | + |
| 146 | +## Core Components |
| 147 | + |
| 148 | +### arrow_schemas.go |
| 149 | + |
| 150 | +Defines all four Arrow schemas and column index constants for type-safe access: |
| 151 | + |
| 152 | +```go |
| 153 | +// Column indices for ObservationSchema |
| 154 | +const ( |
| 155 | + ObsColObserverID = iota |
| 156 | + ObsColStreamID |
| 157 | + ObsColValueType |
| 158 | + // ... |
| 159 | +) |
| 160 | +``` |
| 161 | + |
| 162 | +### arrow_pool.go |
| 163 | + |
| 164 | +Memory management with two pool types: |
| 165 | + |
| 166 | +**LLOMemoryPool** - Wraps Arrow's allocator with metrics and optional bounds: |
| 167 | + |
| 168 | +```go |
| 169 | +pool := NewLLOMemoryPool(maxBytes) // 0 for unlimited |
| 170 | +allocated, allocs, releases := pool.Metrics() |
| 171 | +``` |
| 172 | + |
| 173 | +**ArrowBuilderPool** - Unified pool for all schema builders: |
| 174 | + |
| 175 | +```go |
| 176 | +builderPool := NewArrowBuilderPool(maxMemoryBytes) |
| 177 | + |
| 178 | +// Get a builder for observations |
| 179 | +builder := builderPool.GetObservationBuilder() |
| 180 | +// ... use builder ... |
| 181 | +builderPool.PutObservationBuilder(builder) |
| 182 | +``` |
| 183 | + |
| 184 | +### arrow_converters.go |
| 185 | + |
| 186 | +Type conversion between Go types and Arrow columns: |
| 187 | + |
| 188 | +```go |
| 189 | +// Write StreamValue to Arrow builders |
| 190 | +StreamValueToArrow(sv, valueTypeBuilder, decimalBuilder, bidBuilder, ...) |
| 191 | + |
| 192 | +// Read StreamValue from Arrow arrays |
| 193 | +sv, _ := ArrowToStreamValue(idx, valueTypeArr, decimalArr, bidArr, ...) |
| 194 | + |
| 195 | +// Batch conversion for cache operations |
| 196 | +record, _ := StreamValuesToArrowRecord(values, pool) |
| 197 | +values, _ := ArrowRecordToStreamValues(record) |
| 198 | +``` |
| 199 | + |
| 200 | +### arrow_observation_merger.go |
| 201 | + |
| 202 | +Merges observations from multiple nodes into a single Arrow record: |
| 203 | + |
| 204 | +```go |
| 205 | +merger := NewArrowObservationMerger(pool, codec) |
| 206 | + |
| 207 | +// Merge attributed observations from consensus |
| 208 | +record, _ := merger.MergeObservations(attributedObservations) |
| 209 | +defer record.Release() |
| 210 | + |
| 211 | +// Utility functions |
| 212 | +counts := CountByStreamID(record) // {streamID: count} |
| 213 | +counts := CountByObserver(record) // {observerID: count} |
| 214 | +``` |
| 215 | + |
| 216 | +### arrow_aggregators.go |
| 217 | + |
| 218 | +Performs vectorized aggregation on Arrow records: |
| 219 | + |
| 220 | +```go |
| 221 | +aggregator := NewArrowAggregator(pool) |
| 222 | + |
| 223 | +// Aggregate with channel definitions providing aggregator type per stream |
| 224 | +results, _ := aggregator.AggregateObservations(record, channelDefs, f) |
| 225 | +// f = fault tolerance threshold (observations must exceed f) |
| 226 | +``` |
| 227 | + |
| 228 | +**Supported Aggregators:** |
| 229 | + |
| 230 | +| Aggregator | Description | |
| 231 | +|------------|-------------| |
| 232 | +| `Median` | Sorts values, returns middle element | |
| 233 | +| `Mode` | Most common value (requires f+1 agreement) | |
| 234 | +| `Quote` | Median of each quote component separately | |
| 235 | + |
| 236 | +## Data Flow Example |
| 237 | + |
| 238 | +```go |
| 239 | +// 1. Create pools |
| 240 | +builderPool := NewArrowBuilderPool(0) |
| 241 | +codec := &StandardObservationCodec{} |
| 242 | + |
| 243 | +// 2. Merge observations from all nodes |
| 244 | +merger := NewArrowObservationMerger(builderPool, codec) |
| 245 | +record, _ := merger.MergeObservations(attributedObservations) |
| 246 | +defer record.Release() |
| 247 | + |
| 248 | +// 3. Aggregate using channel definitions |
| 249 | +aggregator := NewArrowAggregator(builderPool) |
| 250 | +streamAggregates, _ := aggregator.AggregateObservations(record, channelDefs, f) |
| 251 | + |
| 252 | +// 4. Use aggregated values for report generation |
| 253 | +for streamID, aggregatorValues := range streamAggregates { |
| 254 | + for aggregatorType, value := range aggregatorValues { |
| 255 | + // Build reports... |
| 256 | + } |
| 257 | +} |
| 258 | +``` |
| 259 | + |
| 260 | +## Memory Management Best Practices |
| 261 | + |
| 262 | +1. **Always release records** when done: |
| 263 | + ```go |
| 264 | + record, _ := merger.MergeObservations(...) |
| 265 | + defer record.Release() |
| 266 | + ``` |
| 267 | + |
| 268 | +2. **Return builders to pool** after use: |
| 269 | + ```go |
| 270 | + builder := pool.GetObservationBuilder() |
| 271 | + // ... use builder ... |
| 272 | + pool.PutObservationBuilder(builder) |
| 273 | + ``` |
| 274 | + |
| 275 | +3. **Set memory limits** in production: |
| 276 | + ```go |
| 277 | + pool := NewArrowBuilderPool(500 * 1024 * 1024) // 500MB limit |
| 278 | + ``` |
| 279 | + |
| 280 | +4. **Monitor allocation metrics**: |
| 281 | + ```go |
| 282 | + allocated, allocs, releases := pool.MemoryStats() |
| 283 | + ``` |
| 284 | + |
| 285 | +## Testing |
| 286 | + |
| 287 | +### Unit Tests |
| 288 | + |
| 289 | +`llo/arrow_aggregators_test.go` - Validates aggregation logic for all value types and aggregator combinations. |
| 290 | + |
| 291 | +### Benchmarks |
| 292 | + |
| 293 | +`llo/arrow_bench_test.go` - Performance benchmarks for: |
| 294 | +- Median aggregation |
| 295 | +- Quote aggregation |
| 296 | +- Type conversion operations |
| 297 | +- Builder pool efficiency |
| 298 | + |
| 299 | +Run benchmarks: |
| 300 | +```bash |
| 301 | +cd llo |
| 302 | +go test -bench=. -benchmem ./... |
| 303 | +``` |
| 304 | + |
| 305 | +### Comparison Tests |
| 306 | + |
| 307 | +`llo/arrow_comparison_test.go` - Compares Arrow implementation against the original implementation at various scales: |
| 308 | +- 10, 100, 1000, 5000, 10000 observations |
| 309 | + |
| 310 | +Run comparison: |
| 311 | +```bash |
| 312 | +go test -run=Comparison -v ./llo/ |
| 313 | +``` |
| 314 | + |
| 315 | +## File Reference |
| 316 | + |
| 317 | +| File | Purpose | |
| 318 | +|------|---------| |
| 319 | +| `llo/arrow_schemas.go` | Schema definitions and column constants | |
| 320 | +| `llo/arrow_pool.go` | Memory pool and builder pool management | |
| 321 | +| `llo/arrow_converters.go` | Go type <-> Arrow conversion utilities | |
| 322 | +| `llo/arrow_observation_merger.go` | Multi-node observation merging | |
| 323 | +| `llo/arrow_aggregators.go` | Vectorized aggregation algorithms | |
| 324 | +| `llo/arrow_aggregators_test.go` | Unit tests | |
| 325 | +| `llo/arrow_bench_test.go` | Performance benchmarks | |
| 326 | +| `llo/arrow_comparison_test.go` | Before/after comparison tests | |
0 commit comments