Skip to content

Commit 4fa2932

Browse files
committed
feat: parquet reader
1 parent 61f23be commit 4fa2932

File tree

16 files changed

+1680
-117
lines changed

16 files changed

+1680
-117
lines changed

dev/build_counter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class VersionStatus(Enum):
2929

3030
__major_version__ = 0
3131
__minor_version__ = 6
32-
__revision_version__ = 30
32+
__revision_version__ = 31
3333
__author__ = "@joocer"
3434
__status__ = VersionStatus.RELEASE
3535

Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
# Parquet Row-Group Priority Read Design
2+
3+
## Objective
4+
5+
Prioritize fast completion of full row groups so execution can be fed with
6+
ready morsels as early and as steadily as possible.
7+
8+
The optimization target is:
9+
10+
- time to first morsel
11+
- sustained morsels per second
12+
13+
Not total scan completion time alone.
14+
15+
---
16+
17+
## Why This Design
18+
19+
In the split Parquet read path, a row group is the minimum useful execution
20+
unit. Reading fewer bytes is not enough if I/O is fragmented into many
21+
high-latency requests that delay completion of full row groups.
22+
23+
This design makes the scheduler row-group completion-first.
24+
25+
---
26+
27+
## Scheduling Principles
28+
29+
1. Keep a small active file window.
30+
2. Keep a bounded number of in-flight row groups per active file.
31+
3. Prioritize finishing already-started row groups before admitting new ones.
32+
4. Bound parallel column range reads to avoid request explosion.
33+
5. Enforce a global request cap across all files/row groups.
34+
6. Yield row groups immediately on completion.
35+
36+
---
37+
38+
## Concurrency Model
39+
40+
Use a hierarchical set of caps:
41+
42+
- `files_in_flight`: maximum active files.
43+
- `rowgroups_per_file_in_flight`: maximum active row groups per active file.
44+
- `ranges_per_rowgroup_in_flight`: maximum in-flight range reads per row group.
45+
- `global_ranges_in_flight`: hard cap across the whole query.
46+
47+
Suggested starting values for larger files (for example ~512 MB):
48+
49+
- `files_in_flight = 2`
50+
- `rowgroups_per_file_in_flight = 2`
51+
- `ranges_per_rowgroup_in_flight = 10`
52+
- `global_ranges_in_flight = 24`
53+
54+
Baseline startup allocation at `t=0`:
55+
56+
- first two row groups receive `10 + 10` readers
57+
- the next queued row group receives the remaining `4` readers
58+
59+
This creates a stable "2 almost-completing + 1 warming" pattern that favors
60+
consistent morsel throughput.
61+
62+
---
63+
64+
## Execution Pipeline
65+
66+
### Stage 1: Footer Planning
67+
68+
- Fetch footer bytes in parallel across files.
69+
- Parse footer metadata safely in the caller thread.
70+
- Build per-file row group work queues.
71+
72+
### Stage 2: Row Group Assembly
73+
74+
For each admitted row group:
75+
76+
- Plan projected column chunk ranges.
77+
- Fetch ranges with bounded parallelism.
78+
- Decode projected columns.
79+
- Assemble complete row group payload.
80+
81+
### Stage 3: Emit
82+
83+
- Convert assembled row group into a morsel.
84+
- Yield immediately to downstream operators.
85+
- Release scheduler credits.
86+
87+
---
88+
89+
## Admission And Priority Policy
90+
91+
Row-group completion-first policy:
92+
93+
1. Admit files until `files_in_flight` is reached.
94+
2. For each active file, admit row groups until
95+
`rowgroups_per_file_in_flight` is reached.
96+
3. For admitted row groups, grant range read credits up to
97+
`ranges_per_rowgroup_in_flight`, constrained by `global_ranges_in_flight`.
98+
4. If credits are limited, favor row groups already in progress over new ones.
99+
5. When a row group completes, emit it and then admit the next queued row group.
100+
6. Burn through files and row groups in order, but with bounded overlap from
101+
the active windows above.
102+
103+
This avoids a broad fanout pattern like:
104+
105+
`files * rowgroups * columns`
106+
107+
which can overwhelm object storage and increase latency.
108+
109+
---
110+
111+
## Backpressure And Cancellation
112+
113+
On limit reached or early stop:
114+
115+
- stop admitting new files/row groups
116+
- cancel pending non-started range requests
117+
- drain essential completions only
118+
- exit quickly after current critical section
119+
120+
This keeps the system focused on useful units of work.
121+
122+
---
123+
124+
## Telemetry Requirements
125+
126+
The scheduler should be evaluated with rowgroup-focused telemetry:
127+
128+
- `time_to_first_rowgroup_ns`
129+
- `rowgroups_completed`
130+
- `rowgroups_completed_per_s`
131+
- `rowgroup_completion_latency_ns` (p50/p95)
132+
- `ranges_in_flight_peak`
133+
- `scheduler_credit_wait_ns`
134+
- `files_active_peak`
135+
- `rowgroups_active_peak`
136+
137+
Complement with existing metrics already in place:
138+
139+
- `time_parquet_read_ranges_ns`
140+
- `time_parquet_task_queue_wait_ns`
141+
- `time_parquet_decode_columns_ns`
142+
- `time_parquet_footer_fetch_ns`
143+
- `parquet_range_request_count`
144+
145+
---
146+
147+
## Expected Outcome
148+
149+
With bounded hierarchical concurrency and completion-first priority:
150+
151+
- lower time to first morsel
152+
- steadier flow of complete morsels
153+
- reduced latency spikes from request trample
154+
- more predictable behavior as file size grows
155+
156+
---
157+
158+
## Rollout Plan
159+
160+
1. Introduce scheduler caps and defaults behind config flags.
161+
2. Implement admission/credit logic in `iter_row_groups`.
162+
3. Add rowgroup-priority queueing policy.
163+
4. Add cancellation behavior for early termination.
164+
5. Benchmark against current path using first-morsel and morsel-rate metrics.
165+
166+
---
167+
168+
## Implementation TODO
169+
170+
1. [x] Add config knobs in `opteryx/config.py`:
171+
`PARQUET_FILES_IN_FLIGHT=2`, `PARQUET_ROWGROUPS_PER_FILE_IN_FLIGHT=2`,
172+
`PARQUET_GLOBAL_RANGE_READERS=24`, `PARQUET_RANGE_READERS_PER_ROWGROUP=10`.
173+
2. [x] Add feature flag `FEATURE_PARQUET_ROWGROUP_SCHEDULER_V2` and keep current scheduler as fallback.
174+
3. [x] Refactor `iter_row_groups` in `opteryx/parquet_io/reader.py` into explicit stages:
175+
footer planning, admission, range dispatch, completion emit.
176+
4. [x] Implement `FileState` and `RowGroupState` structs/classes for scheduler state.
177+
5. [x] Implement bounded file window admission (`files_in_flight <= 2`) in file order.
178+
6. [x] Implement bounded rowgroup admission per file (`rowgroups_per_file_in_flight <= 2`) in rowgroup order.
179+
7. [x] Implement global range credit cap (`global_in_flight_ranges <= 24`) with semaphore/counter enforcement.
180+
8. [x] Implement per-rowgroup range credit cap (`ranges_per_rowgroup_in_flight <= 10`) with enforcement.
181+
9. [x] Implement startup allocation target (`10 + 10 + 4`) across first three admitted rowgroups.
182+
10. [x] Dispatch range work at scheduler layer as single-range tasks so global caps are authoritative.
183+
11. [x] Prioritize reads for in-progress rowgroups before admitting/feeding new rowgroups.
184+
12. [x] Within a rowgroup, sort pending column reads by descending remaining bytes.
185+
13. [x] Emit rowgroup immediately when complete; do not wait for submission order.
186+
14. [x] Add early termination behavior: stop admission, cancel not-started futures, fast-drain completions.
187+
15. [x] Add scheduler telemetry:
188+
`time_to_first_rowgroup_ns`, `ranges_in_flight_peak`, `active_files_peak`,
189+
`active_rowgroups_peak`, `scheduler_wait_ns`, `rowgroups_completed_per_s`.
190+
16. [x] Keep existing parquet telemetry and ensure new telemetry appears in `ReadRel` operation stats.
191+
17. [x] Add unit tests for cap invariants (2/2/24/10), startup distribution, and completion-first behavior.
192+
18. [x] Add integration tests for correctness parity vs current path and LIMIT/early-stop cancellation.
193+
19. [x] Add benchmark script/profile for first-morsel latency and morsel throughput under mixed file sizes.
194+
20. [x] Run A/B benchmark with `FEATURE_PARQUET_ROWGROUP_SCHEDULER_V2` on/off and record results in docs.
195+
196+
## A/B Benchmark (2026-02-27)
197+
198+
Benchmark script: `tests/performance/benchmarks/bench_parquet_rowgroup_scheduler.py`
199+
Dataset sample: 8 local parquet files, 8 projected columns.
200+
201+
- v1:
202+
- rowgroups: 349
203+
- first morsel: 249.18 ms
204+
- elapsed: 460.99 ms
205+
- rowgroups/s: 757.07
206+
- v2:
207+
- rowgroups: 349
208+
- first morsel: 22.55 ms
209+
- elapsed: 466.34 ms
210+
- rowgroups/s: 748.38
211+
212+
Observed behavior:
213+
214+
- Major reduction in time-to-first-morsel.
215+
- Similar total throughput/scan completion time in this profile.
216+
217+
---
218+
219+
## Decision Summary
220+
221+
Use a row-group completion-first scheduler with bounded hierarchical
222+
parallelism, rather than unconstrained fanout or strict one-file-at-a-time
223+
execution.
224+
225+
---
226+
227+
## Alternative Reviewed: Column-Worker Pool (Usenet-style)
228+
229+
### Proposal Summary
230+
231+
- Use `N` workers reading column chunks.
232+
- Prioritize larger columns first.
233+
- Workers take the next chunk from whichever row group has pending work.
234+
- Emit any row group immediately when all projected columns are complete.
235+
236+
### What Is Strong About This
237+
238+
- Naturally caps request fanout to `N`.
239+
- Good utilization under variable object-store latency.
240+
- Largest-first can reduce long-tail straggler effects for row-group completion.
241+
- Simple operational mental model.
242+
243+
### Scalability Nuance
244+
245+
Strict single-file processing is usually not the most scalable policy for
246+
remote object storage:
247+
248+
- one cold or throttled file can head-of-line block the entire pipeline
249+
- no cross-file latency hiding
250+
- weaker throughput when each file has few row groups
251+
252+
Strict single-rowgroup processing is also suboptimal:
253+
254+
- can underutilize workers if one row group has low projected-column count
255+
- can inflate time to first morsel if one large column dominates
256+
257+
### Revised Recommendation (Hybrid)
258+
259+
Keep the column-worker pool idea, but apply it over a bounded multi-file
260+
window:
261+
262+
- `file_window = 2` (or small bounded value)
263+
- row groups from active files share a global worker queue
264+
- work item = `(file, row_group, column_chunk)`
265+
- per-rowgroup in-flight cap to avoid over-fragmenting completion
266+
- global in-flight cap for all reads
267+
268+
This preserves your model while avoiding single-file bottlenecks.
269+
270+
### Priority Rule
271+
272+
Use completion-first with size-aware tie-break:
273+
274+
1. Prefer row groups already in progress.
275+
2. Among them, prioritize row groups with least remaining bytes to completion.
276+
3. Within a row group, read largest remaining column first.
277+
278+
This yields fast morsel completion while still preventing stragglers.
279+
280+
### Practical Initial Settings
281+
282+
- `range_workers = 24`
283+
- `file_window = 2`
284+
- `rowgroups_per_file_in_flight = 2`
285+
- `ranges_per_rowgroup_in_flight = 10`
286+
- `global_in_flight_ranges = 24`
287+
288+
Expected first-wave distribution:
289+
290+
- rowgroup A: 10 reads
291+
- rowgroup B: 10 reads
292+
- rowgroup C: 4 reads
293+
294+
These are safer than unconstrained fanout and more scalable than strict
295+
single-file or strict single-rowgroup modes.

opteryx/__version__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
# THIS FILE IS AUTOMATICALLY UPDATED DURING THE BUILD PROCESS
22
# DO NOT EDIT THIS FILE DIRECTLY
33

4-
__build__ = 366
4+
__build__ = 369
55
__author__ = "@joocer"
6-
__version__ = "0.6.30"
6+
__version__ = "0.6.31"
77
__lib__ = "opteryx-core"
8-
__build_date__ = "2026-02-27T00:28:15.473177+00:00Z"
8+
__build_date__ = "2026-02-27T17:38:41.874681+00:00Z"
99

1010
# Store the version here so:
1111
# 1) we don't load dependencies by storing it in __init__.py

opteryx/compiled/structures/shuffle_partition.pyx

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,16 @@ cpdef tuple row_indexes_by_bin_flat(const uint64_t[::1] hashes,
6262

6363
# ----- handle empty input -----
6464
if n_rows == 0:
65-
flat_arr = view.array(shape=(0,),
66-
itemsize=sizeof(Py_ssize_t),
67-
format="l")
68-
offsets_arr = view.array(shape=(num_bins + 1,),
69-
itemsize=sizeof(Py_ssize_t),
70-
format="l")
65+
flat_arr = view.array(
66+
shape=(0,),
67+
itemsize=sizeof(Py_ssize_t),
68+
format="l",
69+
)
70+
offsets_arr = view.array(
71+
shape=(num_bins + 1,),
72+
itemsize=sizeof(Py_ssize_t),
73+
format="l",
74+
)
7175
offsets = offsets_arr
7276
for i in range(num_bins + 1):
7377
offsets[i] = 0
@@ -99,12 +103,16 @@ cpdef tuple row_indexes_by_bin_flat(const uint64_t[::1] hashes,
99103
counts_p[bin_id] += 1
100104

101105
# ----- allocate the result arrays (Python objects, need GIL) -----
102-
flat_arr = view.array(shape=(n_rows,),
103-
itemsize=sizeof(Py_ssize_t),
104-
format="l")
105-
offsets_arr = view.array(shape=(num_bins + 1,),
106-
itemsize=sizeof(Py_ssize_t),
107-
format="l")
106+
flat_arr = view.array(
107+
shape=(n_rows,),
108+
itemsize=sizeof(Py_ssize_t),
109+
format="l",
110+
)
111+
offsets_arr = view.array(
112+
shape=(num_bins + 1,),
113+
itemsize=sizeof(Py_ssize_t),
114+
format="l",
115+
)
108116
flat = flat_arr
109117
offsets = offsets_arr
110118

0 commit comments

Comments
 (0)