fix(io): fix JSONL byte-range reading and make chunk_size byte-based#6374
fix(io): fix JSONL byte-range reading and make chunk_size byte-based#6374everySympathy wants to merge 2 commits intoEventual-Inc:mainfrom
Conversation
bb5b7b3 to
6cbcc28
Compare
Greptile SummaryThis PR changes JSON/JSONL chunking from a row-count basis to a byte-size basis, fixing issue #4496. The mmap-based local reader ( Key concerns:
Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Scan as Scan Operator
participant Split as split_by_jsonl_ranges
participant StreamJSON as stream_json
participant MmapReader as read_json_local_into_tables_with_range (mmap)
participant StreamReader as read_json_single_into_stream (streaming)
participant MP as MicroPartition
Scan->>Split: ScanTask (file URI)
Split-->>Scan: ScanTask × N (ChunkSpec::Bytes {start, end})
Scan->>StreamJSON: stream_json(uri, range)
alt Local, uncompressed file
StreamJSON->>MmapReader: mmap file, slice bytes[start..end]
MmapReader->>MmapReader: get_file_chunks (by byte size)
MmapReader-->>StreamJSON: Vec<RecordBatch>
StreamJSON-->>MP: Stream of RecordBatches
else Compressed or remote file
StreamJSON->>StreamReader: read_json_single_into_stream(uri, range)
StreamReader->>StreamReader: read_into_line_chunk_stream (chunk_size bytes)
StreamReader-->>StreamJSON: TableChunkStream
StreamJSON-->>MP: Stream of RecordBatches (filtered, limited)
end
|
| for uri in uris { | ||
| if remaining_rows.is_some_and(|rows_left| rows_left <= 0) { | ||
| break; | ||
| } | ||
|
|
||
| let convert_options_for_uri = convert_options.clone().map(|mut co| { | ||
| if let Some(rows_left) = remaining_rows { | ||
| co.limit = Some(usize::try_from(rows_left.max(0)).unwrap_or(0)); | ||
| } | ||
| co | ||
| }); | ||
|
|
||
| let stream = daft_json::read::stream_json( | ||
| (*uri).to_string(), | ||
| convert_options_for_uri, | ||
| parse_options.clone(), | ||
| read_options.clone(), | ||
| io_client.clone(), | ||
| io_stats.clone(), | ||
| None, | ||
| None, | ||
| ) | ||
| .await?; | ||
|
|
||
| let mut tables = stream.try_collect::<Vec<_>>().await?; | ||
| if let Some(rows_left) = remaining_rows { | ||
| let read_rows = tables.iter().map(|t| t.len() as i64).sum::<i64>(); | ||
| remaining_rows = Some(rows_left - read_rows); | ||
| } | ||
| out.append(&mut tables); | ||
| } |
There was a problem hiding this comment.
Sequential URI processing may regress parallelism for multi-file reads
The new implementation iterates over uris with a for loop and awaits each stream_json call one by one. The previous read_json_bulk call processed files concurrently. When convert_options.limit is None (the common case), all URIs could be streamed in parallel without correctness concerns.
Per the project's guideline, prefer futures::join_all (or FuturesUnordered) for concurrent async operations instead of sequentially awaiting each one in a loop. The limit-tracking logic makes a fully parallel approach slightly more complex, but it is still achievable — e.g. spawn all streams concurrently and then aggregate results with limit tracking in the collection phase.
// Example sketch for the no-limit path:
let streams = futures::future::try_join_all(uris.iter().map(|uri| {
daft_json::read::stream_json((*uri).to_string(), ...)
})).await?;Rule Used: When handling multiple async operations in Rust, p... (source)
Learnt From
Eventual-Inc/Daft#5210
There was a problem hiding this comment.
this is only a function for testing, performance is not concerned here.
26790f4 to
095d5c9
Compare
f61be97 to
3c4ab2c
Compare
3c4ab2c to
3a21065
Compare
There was a problem hiding this comment.
nit: can we add a test case for when the chunk size would start mid-line:
{"id":1,"val":"hello"}\n{"id":2,"val":"world"}\n{"id":3,"val":"foo"}\n
^
range start
There was a problem hiding this comment.
nit: can we add a test case for when the chunk size would start mid-line:
{"id":1,"val":"hello"}\n{"id":2,"val":"world"}\n{"id":3,"val":"foo"}\n ^ range start
added test: test_jsonl_chunk_size_mid_line_splits_correctly.
- Fix stream_json() ignoring byte range for local files, causing data duplication when scan_task_split_and_merge splits a large JSONL file - Change chunk_size semantic from row count to byte size - Clear stale metadata/statistics on byte-range sub-ScanTasks - Delete dead code (read_json_bulk, ChunkError)
bf211ea to
6833c13
Compare
Changes Made
Related Issues
close #6414
close #4496