Skip to content

Commit f61be97

Browse files
author
wangzheyan
committed
fix: json chunk by byte sizes
1 parent fb5ef0c commit f61be97

File tree

7 files changed

+418
-215
lines changed

7 files changed

+418
-215
lines changed

src/daft-json/src/lib.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use common_error::DaftError;
2-
use futures::stream::TryChunksError;
32
use snafu::Snafu;
43

54
mod decoding;
@@ -17,7 +16,7 @@ pub mod schema;
1716
pub use options::{JsonConvertOptions, JsonParseOptions, JsonReadOptions};
1817
#[cfg(feature = "python")]
1918
use pyo3::prelude::*;
20-
pub use read::{read_json, read_json_bulk};
19+
pub use read::read_json;
2120

2221
#[derive(Debug, Snafu)]
2322
pub enum Error {
@@ -29,10 +28,6 @@ pub enum Error {
2928
ArrowError { source: arrow::error::ArrowError },
3029
#[snafu(display("JSON deserialization error: {}", string))]
3130
JsonDeserializationError { string: String },
32-
#[snafu(display("Error chunking: {}", source))]
33-
ChunkError {
34-
source: TryChunksError<String, std::io::Error>,
35-
},
3631
#[snafu(display("Error joining spawned task: {}", source))]
3732
JoinError { source: tokio::task::JoinError },
3833
#[snafu(display(

src/daft-json/src/local.rs

Lines changed: 79 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{borrow::Cow, collections::HashSet, num::NonZeroUsize, sync::Arc};
22

33
use arrow::array::builder::ArrayBuilder;
4-
use common_error::DaftResult;
4+
use common_error::{DaftError, DaftResult};
55
use daft_core::prelude::*;
66
use daft_dsl::{Expr, ExprRef, expr::bound_expr::BoundExpr};
77
use daft_recordbatch::RecordBatch;
@@ -18,34 +18,69 @@ use crate::{
1818
decoding::{allocate_array, deserialize_into},
1919
deserializer::Value,
2020
inference::{column_types_map_to_fields, infer_records_schema},
21-
read::tables_concat,
21+
read::{tables_concat, truncate_tables_to_limit},
2222
};
2323

2424
const JSON_NULL_VALUE: Value<'static> = Value::Static(StaticNode::Null);
2525

2626
const NEWLINE: u8 = b'\n';
2727
const CLOSING_BRACKET: u8 = b'}';
2828

29+
/// Backward-compatible wrapper that concatenates all chunks into a single RecordBatch.
30+
///
31+
/// Only used by legacy test paths (via `read_json_single_into_table`).
32+
/// The production path uses [`read_json_local_into_tables_with_range`] directly via `stream_json`.
2933
pub fn read_json_local(
3034
uri: &str,
3135
convert_options: Option<JsonConvertOptions>,
3236
parse_options: Option<JsonParseOptions>,
3337
read_options: Option<JsonReadOptions>,
3438
max_chunks_in_flight: Option<usize>,
3539
) -> DaftResult<RecordBatch> {
40+
let schema_hint = convert_options
41+
.as_ref()
42+
.and_then(|c| c.schema.as_ref())
43+
.map_or_else(|| Schema::empty().into(), |s| s.clone());
44+
let tables = read_json_local_into_tables_with_range(
45+
uri,
46+
convert_options,
47+
parse_options,
48+
read_options,
49+
max_chunks_in_flight,
50+
None,
51+
)?;
52+
if tables.is_empty() {
53+
return Ok(RecordBatch::empty(Some(schema_hint)));
54+
}
55+
tables_concat(tables)
56+
}
57+
58+
pub fn read_json_local_into_tables_with_range(
59+
uri: &str,
60+
convert_options: Option<JsonConvertOptions>,
61+
parse_options: Option<JsonParseOptions>,
62+
read_options: Option<JsonReadOptions>,
63+
max_chunks_in_flight: Option<usize>,
64+
range: Option<daft_io::GetRange>,
65+
) -> DaftResult<Vec<RecordBatch>> {
3666
let uri = uri.trim_start_matches("file://");
3767
let file = std::fs::File::open(uri)?;
3868
// SAFETY: mmapping is inherently unsafe.
3969
// We are trusting that the file is not modified or accessed by other systems while we are reading it.
4070
let mmap = unsafe { memmap2::Mmap::map(&file) }.context(StdIOSnafu)?;
71+
let full_bytes = &mmap[..];
72+
let has_range = range.is_some();
73+
let bytes = if let Some(range) = range {
74+
let slice_range = range
75+
.as_range(full_bytes.len())
76+
.map_err(|e| DaftError::ValueError(e.to_string()))?;
77+
&full_bytes[slice_range]
78+
} else {
79+
full_bytes
80+
};
4181

42-
let bytes = &mmap[..];
4382
if parse_options.as_ref().is_some_and(|p| p.skip_empty_files) && bytes.is_empty() {
44-
let schema = convert_options
45-
.as_ref()
46-
.and_then(|c| c.schema.as_ref())
47-
.map_or_else(|| Schema::empty().into(), |s| s.clone());
48-
return Ok(RecordBatch::empty(Some(schema)));
83+
return Ok(Vec::new());
4984
}
5085

5186
if bytes.is_empty() {
@@ -54,23 +89,25 @@ pub fn read_json_local(
5489
}
5590
.into());
5691
}
57-
if bytes[0] == b'[' {
58-
let schema: Schema = infer_schema(bytes, None, None)?.try_into()?;
5992

93+
// JSON array format (e.g. `[{...}, {...}]`) is only possible when reading the full file.
94+
// Byte-range sub-slices from scan_task_split_and_merge are always JSONL, so skip this check.
95+
if !has_range && bytes[0] == b'[' {
96+
let schema: Schema = infer_schema(bytes, None, None)?.try_into()?;
6097
let predicate = convert_options
6198
.as_ref()
6299
.and_then(|options| options.predicate.clone());
63-
read_json_array_impl(bytes, schema, predicate)
64-
} else {
65-
let reader = JsonReader::try_new(
66-
bytes,
67-
convert_options,
68-
parse_options,
69-
read_options,
70-
max_chunks_in_flight,
71-
)?;
72-
reader.finish()
100+
return Ok(vec![read_json_array_impl(bytes, schema, predicate)?]);
73101
}
102+
103+
let reader = JsonReader::try_new(
104+
bytes,
105+
convert_options,
106+
parse_options,
107+
read_options,
108+
max_chunks_in_flight,
109+
)?;
110+
reader.finish_into_tables()
74111
}
75112

76113
pub fn read_json_array_impl(
@@ -227,7 +264,7 @@ impl<'a> JsonReader<'a> {
227264
})
228265
}
229266

230-
pub fn finish(&self) -> DaftResult<RecordBatch> {
267+
pub fn finish_into_tables(&self) -> DaftResult<Vec<RecordBatch>> {
231268
let mut bytes = self.bytes;
232269
let mut n_threads = self.n_threads;
233270
let mut total_rows = 128;
@@ -254,31 +291,25 @@ impl<'a> JsonReader<'a> {
254291
}
255292

256293
let total_len = bytes.len();
257-
let chunk_size = self.chunk_size.unwrap_or_else(|| total_len / n_threads);
258-
let file_chunks = self.get_file_chunks(bytes, n_threads, total_len, chunk_size);
294+
let chunk_size = self
295+
.chunk_size
296+
.unwrap_or_else(|| total_len / n_threads.max(1));
297+
let file_chunks = self.get_file_chunks(bytes, total_len, chunk_size);
259298

260299
let tbls = self.pool.install(|| {
261300
file_chunks
262301
.into_par_iter()
263302
.map(|(start, stop)| {
264303
let chunk = &bytes[start..stop];
265-
self.parse_json_chunk(chunk, chunk_size)
304+
self.parse_json_chunk(chunk)
266305
})
267306
.collect::<DaftResult<Vec<RecordBatch>>>()
268307
})?;
269308

270-
let tbl = tables_concat(tbls)?;
271-
272-
// The `limit` is not guaranteed to be fully applied from the byte slice, so we need to properly apply the limit after concatenating the tables
273-
if let Some(limit) = self.n_rows
274-
&& tbl.len() > limit
275-
{
276-
return tbl.head(limit);
277-
}
278-
Ok(tbl)
309+
truncate_tables_to_limit(tbls, self.n_rows)
279310
}
280311

281-
fn parse_json_chunk(&self, bytes: &[u8], chunk_size: usize) -> DaftResult<RecordBatch> {
312+
fn parse_json_chunk(&self, bytes: &[u8]) -> DaftResult<RecordBatch> {
282313
let mut scratch = vec![];
283314
let scratch = &mut scratch;
284315

@@ -294,9 +325,15 @@ impl<'a> JsonReader<'a> {
294325
let iter =
295326
serde_json::Deserializer::from_slice(bytes).into_iter::<&serde_json::value::RawValue>();
296327

328+
let estimated_rows = std::cmp::max(1, memchr::memchr_iter(NEWLINE, bytes).count());
297329
let mut columns: IndexMap<Cow<str>, Box<dyn ArrayBuilder>> = arrow_fields
298330
.iter()
299-
.map(|f| (Cow::Owned(f.name().clone()), allocate_array(f, chunk_size)))
331+
.map(|f| {
332+
(
333+
Cow::Owned(f.name().clone()),
334+
allocate_array(f, estimated_rows),
335+
)
336+
})
300337
.collect();
301338

302339
let mut num_rows = 0;
@@ -350,17 +387,14 @@ impl<'a> JsonReader<'a> {
350387
fn get_file_chunks(
351388
&self,
352389
bytes: &[u8],
353-
n_threads: usize,
354390
total_len: usize,
355391
chunk_size: usize,
356392
) -> Vec<(usize, usize)> {
357393
let mut last_pos = 0;
358394

359-
let (n_chunks, chunk_size) = calculate_chunks_and_size(n_threads, chunk_size, total_len);
360-
361-
let mut offsets = Vec::with_capacity(n_chunks);
395+
let mut offsets = Vec::new();
362396

363-
for _ in 0..n_chunks {
397+
loop {
364398
let search_pos = last_pos + chunk_size;
365399

366400
if search_pos >= bytes.len() {
@@ -480,38 +514,6 @@ fn get_line_stats_json(bytes: &[u8], n_lines: usize) -> Option<(f32, f32)> {
480514
Some((mean, std))
481515
}
482516

483-
/// Calculate the max number of chunks to split the file into
484-
/// It looks for the largest number divisible by `n_threads` and less than `chunk_size`
485-
/// It has an arbitrary limit of `n_threads * n_threads`, which seems to work well in practice.
486-
///
487-
/// Example:
488-
///
489-
/// ```text
490-
/// n_threads = 4
491-
/// chunk_size = 2048
492-
/// calculate_chunks_and_size(n_threads, chunk_size) = (16, 128)
493-
/// ```
494-
fn calculate_chunks_and_size(n_threads: usize, chunk_size: usize, total: usize) -> (usize, usize) {
495-
let mut max_divisible_chunks = n_threads;
496-
497-
// The maximum number of chunks is n_threads * n_threads
498-
// This was chosen based on some crudely done benchmarks. It seems to work well in practice.
499-
// The idea is to have a number of chunks that is a divisible by the number threads to maximize parallelism.
500-
// But we dont want to have too small chunks, as that would increase the overhead of the parallelism.
501-
// This is a heuristic and could be improved.
502-
let max_chunks = n_threads * n_threads;
503-
504-
while max_divisible_chunks <= chunk_size && max_divisible_chunks < max_chunks {
505-
let md = max_divisible_chunks + n_threads;
506-
if md > chunk_size || md > max_chunks {
507-
break;
508-
}
509-
max_divisible_chunks = md;
510-
}
511-
let chunk_size = total / n_threads;
512-
(max_divisible_chunks, chunk_size)
513-
}
514-
515517
#[inline(always)]
516518
fn parse_raw_value<'a>(
517519
raw_value: &'a RawValue,
@@ -527,6 +529,9 @@ fn parse_raw_value<'a>(
527529
})
528530
}
529531

532+
// TODO: only recognises `}\n` as a line boundary; JSONL rows that are JSON arrays
533+
// (e.g. `[1,2,3]\n`) will not be detected. This is fine for now because Daft only
534+
// supports object-per-line JSONL, but should be revisited if array rows are needed.
530535
fn next_line_position(input: &[u8]) -> Option<usize> {
531536
let pos = memchr::memchr(NEWLINE, input)?;
532537
if pos == 0 {
@@ -581,6 +586,7 @@ mod tests {
581586
{"floats": 3.0, "utf8": "!\\n", "bools": true}
582587
"#;
583588
let reader = JsonReader::try_new(json.as_bytes(), None, None, None, None).unwrap();
584-
let _result = reader.finish();
589+
let tables = reader.finish_into_tables().unwrap();
590+
let _result = tables_concat(tables);
585591
}
586592
}

src/daft-json/src/options.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,11 @@ impl_bincode_py_state_serialization!(JsonParseOptions);
169169
#[cfg_attr(feature = "python", pyclass(module = "daft.daft", get_all))]
170170
pub struct JsonReadOptions {
171171
pub buffer_size: Option<usize>,
172+
/// Target chunk size in bytes (approximate).
173+
///
174+
/// Controls how data is split into RecordBatches for parallel processing.
175+
/// Each chunk may be slightly larger than this value because splits are aligned
176+
/// to line boundaries.
172177
pub chunk_size: Option<usize>,
173178
}
174179

0 commit comments

Comments
 (0)