From bd47a13e3a9b2c804fae2ab877b95893639120c6 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Wed, 7 Jan 2026 20:14:00 +0200 Subject: [PATCH 1/7] feat: add boundary utility functions for aligned byte fetching in JSON source --- .../datasource-json/src/boundary_utils.rs | 215 ++++++++++++++++++ datafusion/datasource-json/src/mod.rs | 1 + datafusion/datasource-json/src/source.rs | 53 ++++- 3 files changed, 257 insertions(+), 12 deletions(-) create mode 100644 datafusion/datasource-json/src/boundary_utils.rs diff --git a/datafusion/datasource-json/src/boundary_utils.rs b/datafusion/datasource-json/src/boundary_utils.rs new file mode 100644 index 0000000000000..7807c0fe60e97 --- /dev/null +++ b/datafusion/datasource-json/src/boundary_utils.rs @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Bytes; +use datafusion_common::{DataFusionError, Result}; +use object_store::{ObjectStore, path::Path}; +use std::sync::Arc; + +pub const DEFAULT_BOUNDARY_WINDOW: usize = 4096; // 4KB + +/// Fetch bytes for [start, end) and align boundaries in memory. +/// +/// Start alignment: +/// - If start == 0, use bytes as-is. +/// - Else, check byte at start-1 (included in fetch). If it is the terminator, +/// start from `start`. Otherwise scan forward in memory for the first terminator +/// and start after it. If no terminator exists in the fetched range, return None. +/// +/// End alignment: +/// - If the last byte is not the terminator and end < file_size, fetch forward in +/// chunks until the terminator is found or EOF is reached. +pub async fn get_aligned_bytes( + store: &Arc, + location: &Path, + start: usize, + end: usize, + file_size: usize, + terminator: u8, + scan_window: usize, +) -> Result> { + if start >= end || start >= file_size { + return Ok(None); + } + + let fetch_start = start.saturating_sub(1); + let fetch_end = std::cmp::min(end, file_size); + let bytes = store + .get_range(location, (fetch_start as u64)..(fetch_end as u64)) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + if bytes.is_empty() { + return Ok(None); + } + + let data_offset = if start == 0 { + 0 + } else if bytes[0] == terminator { + 1 + } else { + match bytes[1..].iter().position(|&b| b == terminator) { + Some(pos) => pos + 2, + None => return Ok(None), + } + }; + + if data_offset >= bytes.len() { + return Ok(None); + } + + let data = bytes.slice(data_offset..); + + // Fast path: if already aligned, return zero-copy + if fetch_end >= file_size || data.last() == Some(&terminator) { + return Ok(Some(data)); + } + + // Slow path: need to extend, preallocate capacity + let mut buffer = Vec::with_capacity(data.len() + scan_window); + buffer.extend_from_slice(&data); + let mut cursor = fetch_end as u64; + + while cursor < file_size as u64 { + let chunk_end = std::cmp::min(cursor + scan_window as u64, file_size as u64); + let chunk = store + .get_range(location, cursor..chunk_end) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + if chunk.is_empty() { + break; + } + + if let Some(pos) = chunk.iter().position(|&b| b == terminator) { + buffer.extend_from_slice(&chunk[..=pos]); + return Ok(Some(Bytes::from(buffer))); + } + + buffer.extend_from_slice(&chunk); + cursor = chunk_end; + } + + Ok(Some(Bytes::from(buffer))) +} + +#[cfg(test)] +mod tests { + use super::*; + use object_store::memory::InMemory; + + #[tokio::test] + async fn test_get_aligned_bytes_start_at_beginning() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + store + .put(&path, "line1\nline2\nline3\n".into()) + .await + .unwrap(); + + let result = get_aligned_bytes(&store, &path, 0, 6, 18, b'\n', 4096) + .await + .unwrap(); + + assert_eq!(result.unwrap().as_ref(), b"line1\n"); + } + + #[tokio::test] + async fn test_get_aligned_bytes_start_aligned() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + // "line1\nline2\nline3\n" + // Position 6 is right after first \n + store + .put(&path, "line1\nline2\nline3\n".into()) + .await + .unwrap(); + + let result = get_aligned_bytes(&store, &path, 6, 12, 18, b'\n', 4096) + .await + .unwrap(); + + assert_eq!(result.unwrap().as_ref(), b"line2\n"); + } + + #[tokio::test] + async fn test_get_aligned_bytes_start_needs_alignment() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + // "line1\nline2\nline3\n" + // Position 8 is in the middle of "line2" + store + .put(&path, "line1\nline2\nline3\n".into()) + .await + .unwrap(); + + let result = get_aligned_bytes(&store, &path, 8, 18, 18, b'\n', 4096) + .await + .unwrap(); + + assert_eq!(result.unwrap().as_ref(), b"line3\n"); + } + + #[tokio::test] + async fn test_get_aligned_bytes_no_newline_in_range() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + store.put(&path, "abcdefghij\n".into()).await.unwrap(); + + let result = get_aligned_bytes(&store, &path, 2, 8, 11, b'\n', 4096) + .await + .unwrap(); + + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_get_aligned_bytes_extend_end() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + // "line1\nline2\nline3\n" + store + .put(&path, "line1\nline2\nline3\n".into()) + .await + .unwrap(); + + let result = get_aligned_bytes(&store, &path, 0, 8, 18, b'\n', 2) + .await + .unwrap(); + + assert_eq!(result.unwrap().as_ref(), b"line1\nline2\n"); + } + + #[tokio::test] + async fn test_get_aligned_bytes_end_at_eof_without_newline() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + store.put(&path, "line1".into()).await.unwrap(); + + let result = get_aligned_bytes(&store, &path, 0, 5, 5, b'\n', 4) + .await + .unwrap(); + + assert_eq!(result.unwrap().as_ref(), b"line1"); + } +} diff --git a/datafusion/datasource-json/src/mod.rs b/datafusion/datasource-json/src/mod.rs index 3d27d4cc5ef5a..1117f04bb4b38 100644 --- a/datafusion/datasource-json/src/mod.rs +++ b/datafusion/datasource-json/src/mod.rs @@ -21,6 +21,7 @@ #![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))] #![deny(clippy::allow_attributes)] +pub mod boundary_utils; pub mod file_format; pub mod source; diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 5797054f11b9c..8106b8ce6243a 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -18,10 +18,11 @@ //! Execution plan for reading line-delimited JSON files use std::any::Any; -use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::io::{BufReader, Cursor, Read, Seek, SeekFrom}; use std::sync::Arc; use std::task::Poll; +use crate::boundary_utils::{DEFAULT_BOUNDARY_WINDOW, get_aligned_bytes}; use crate::file_format::JsonDecoder; use datafusion_common::error::{DataFusionError, Result}; @@ -188,23 +189,51 @@ impl FileOpener for JsonOpener { let file_compression_type = self.file_compression_type.to_owned(); Ok(Box::pin(async move { - let calculated_range = - calculate_range(&partitioned_file, &store, None).await?; + let file_size = partitioned_file.object_meta.size as usize; + let location = &partitioned_file.object_meta.location; - let range = match calculated_range { - RangeCalculation::Range(None) => None, - RangeCalculation::Range(Some(range)) => Some(range.into()), - RangeCalculation::TerminateEarly => { + let file_range = if file_compression_type.is_compressed() { + None + } else { + partitioned_file.range.clone() + }; + + if let Some(file_range) = file_range.as_ref() { + let raw_start = file_range.start as usize; + let raw_end = file_range.end as usize; + let aligned_bytes = get_aligned_bytes( + &store, + location, + raw_start, + raw_end, + file_size, + b'\n', + DEFAULT_BOUNDARY_WINDOW, + ) + .await?; + + let Some(bytes) = aligned_bytes else { + return Ok( + futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed() + ); + }; + + if bytes.is_empty() { return Ok( futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed() ); } - }; - let options = GetOptions { - range, - ..Default::default() - }; + let reader = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build(BufReader::new(Cursor::new(bytes)))?; + + return Ok(futures::stream::iter(reader) + .map(|r| r.map_err(Into::into)) + .boxed()); + } + + let options = GetOptions::default(); let result = store .get_opts(&partitioned_file.object_meta.location, options) From 91e36cd5f1bd4c5a2edb3274430bb57ad55e3427 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Wed, 7 Jan 2026 20:47:34 +0200 Subject: [PATCH 2/7] feat: add benchmark for JSON boundary data source and update dependencies --- Cargo.lock | 1 + datafusion/datasource-json/Cargo.toml | 7 + .../datasource-json/benches/json_boundary.rs | 272 ++++++++++++++++++ .../datasource-json/src/boundary_utils.rs | 170 +++++++++++ datafusion/datasource-json/src/source.rs | 4 +- 5 files changed, 451 insertions(+), 3 deletions(-) create mode 100644 datafusion/datasource-json/benches/json_boundary.rs diff --git a/Cargo.lock b/Cargo.lock index 4f105dc1b4968..ca5aba8839a8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2022,6 +2022,7 @@ dependencies = [ "arrow", "async-trait", "bytes", + "criterion", "datafusion-common", "datafusion-common-runtime", "datafusion-datasource", diff --git a/datafusion/datasource-json/Cargo.toml b/datafusion/datasource-json/Cargo.toml index 37fa8d43a0816..c4a75e4eb880b 100644 --- a/datafusion/datasource-json/Cargo.toml +++ b/datafusion/datasource-json/Cargo.toml @@ -46,6 +46,9 @@ futures = { workspace = true } object_store = { workspace = true } tokio = { workspace = true } +[dev-dependencies] +criterion = { workspace = true } + # Note: add additional linter rules in lib.rs. # Rust does not support workspace + new linter rules in subcrates yet # https://github.com/rust-lang/cargo/issues/13157 @@ -55,3 +58,7 @@ workspace = true [lib] name = "datafusion_datasource_json" path = "src/mod.rs" + +[[bench]] +name = "json_boundary" +harness = false diff --git a/datafusion/datasource-json/benches/json_boundary.rs b/datafusion/datasource-json/benches/json_boundary.rs new file mode 100644 index 0000000000000..299a4049b0cbd --- /dev/null +++ b/datafusion/datasource-json/benches/json_boundary.rs @@ -0,0 +1,272 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema}; +use async_trait::async_trait; +use bytes::Bytes; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use datafusion_datasource::file::FileSource; +use datafusion_datasource::file_groups::FileGroup; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{FileRange, PartitionedFile, TableSchema}; +use datafusion_datasource_json::source::JsonSource; +use datafusion_execution::TaskContext; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_physical_plan::ExecutionPlan; +use futures::StreamExt; +use futures::stream::BoxStream; +use object_store::memory::InMemory; +use object_store::path::Path; +use object_store::{ + GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectStore, + PutMultipartOptions, PutOptions, PutPayload, PutResult, +}; +use std::fmt; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use tokio::runtime::{Builder, Runtime}; + +#[derive(Debug)] +struct CountingObjectStore { + inner: Arc, + requested_bytes: AtomicU64, + requested_calls: AtomicU64, +} + +impl CountingObjectStore { + fn new(inner: Arc) -> Self { + Self { + inner, + requested_bytes: AtomicU64::new(0), + requested_calls: AtomicU64::new(0), + } + } + + fn reset(&self) { + self.requested_bytes.store(0, Ordering::Relaxed); + self.requested_calls.store(0, Ordering::Relaxed); + } + + fn requested_bytes(&self) -> u64 { + self.requested_bytes.load(Ordering::Relaxed) + } +} + +impl fmt::Display for CountingObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "CountingObjectStore({})", self.inner) + } +} + +#[async_trait] +impl ObjectStore for CountingObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> object_store::Result { + self.inner.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> object_store::Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store::Result { + if let Some(range) = options.range.as_ref() { + let requested = match range { + GetRange::Bounded(r) => r.end.saturating_sub(r.start), + GetRange::Offset(_) | GetRange::Suffix(_) => 0, + }; + self.requested_bytes.fetch_add(requested, Ordering::Relaxed); + } + self.requested_calls.fetch_add(1, Ordering::Relaxed); + self.inner.get_opts(location, options).await + } + + async fn delete(&self, location: &Path) -> object_store::Result<()> { + self.inner.delete(location).await + } + + fn list( + &self, + prefix: Option<&Path>, + ) -> BoxStream<'static, object_store::Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> object_store::Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists( + &self, + from: &Path, + to: &Path, + ) -> object_store::Result<()> { + self.inner.copy_if_not_exists(from, to).await + } +} + +fn build_fixed_json_lines(line_len: usize, lines: usize) -> Bytes { + let prefix = r#"{"value":""#; + let suffix = "\"}\n"; + let min_len = prefix.len() + suffix.len() + 1; + assert!(line_len >= min_len, "line_len must be at least {min_len}"); + + let padding_len = line_len - prefix.len() - suffix.len(); + let mut line = Vec::with_capacity(line_len); + line.extend_from_slice(prefix.as_bytes()); + line.extend(std::iter::repeat(b'a').take(padding_len)); + line.extend_from_slice(suffix.as_bytes()); + + let mut data = Vec::with_capacity(line_len * lines); + for _ in 0..lines { + data.extend_from_slice(&line); + } + Bytes::from(data) +} + +struct Fixture { + store: Arc, + task_ctx: Arc, + exec: Arc, +} + +fn build_fixture(rt: &Runtime) -> Fixture { + let inner: Arc = Arc::new(InMemory::new()); + let store = Arc::new(CountingObjectStore::new(Arc::clone(&inner))); + let store_dyn: Arc = store.clone(); + let path = Path::from("bench.json"); + + let line_len = 128usize; + let lines = 65_536usize; + let data = build_fixed_json_lines(line_len, lines); + rt.block_on(inner.put(&path, data.into())).unwrap(); + let object_meta = rt.block_on(inner.head(&path)).unwrap(); + + let start = 1_000_003usize; + let raw_end = start + 256_000; + let end = (raw_end / line_len).max(1) * line_len; + + let task_ctx = Arc::new(TaskContext::default()); + let runtime_env = task_ctx.runtime_env(); + let object_store_url = ObjectStoreUrl::parse("test://bucket").unwrap(); + runtime_env.register_object_store(object_store_url.as_ref(), Arc::clone(&store_dyn)); + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Utf8, + false, + )])); + let table_schema = TableSchema::from_file_schema(schema); + let file_source: Arc = Arc::new(JsonSource::new(table_schema)); + let file = build_partitioned_file(object_meta.clone(), start, end); + let config = FileScanConfigBuilder::new(object_store_url, file_source) + .with_file_groups(vec![FileGroup::new(vec![file])]) + .build(); + let exec: Arc = DataSourceExec::from_data_source(config); + + Fixture { + store, + task_ctx, + exec, + } +} + +fn measure_datasource_exec_bytes(rt: &Runtime, fixture: &Fixture) -> u64 { + fixture.store.reset(); + let rows = rt.block_on(run_datasource_exec( + Arc::clone(&fixture.exec), + Arc::clone(&fixture.task_ctx), + )); + debug_assert!(rows > 0); + fixture.store.requested_bytes() +} + +fn build_partitioned_file( + object_meta: object_store::ObjectMeta, + start: usize, + end: usize, +) -> PartitionedFile { + PartitionedFile { + object_meta, + partition_values: vec![], + range: Some(FileRange { + start: start as i64, + end: end as i64, + }), + statistics: None, + ordering: None, + extensions: None, + metadata_size_hint: None, + } +} + +async fn run_datasource_exec( + exec: Arc, + task_ctx: Arc, +) -> usize { + let mut stream = exec.execute(0, task_ctx).unwrap(); + let mut rows = 0; + while let Some(batch) = stream.next().await { + let batch = batch.unwrap(); + rows += batch.num_rows(); + } + rows +} + +fn bench_json_boundary(c: &mut Criterion) { + let rt = Builder::new_current_thread().build().unwrap(); + let fixture = build_fixture(&rt); + + let exec_bytes = measure_datasource_exec_bytes(&rt, &fixture); + + let mut exec_group = c.benchmark_group("json_boundary_datasource_exec"); + exec_group.bench_function( + BenchmarkId::new("execute", format!("read_bytes={exec_bytes}")), + |b| { + b.iter(|| { + fixture.store.reset(); + rt.block_on(run_datasource_exec( + Arc::clone(&fixture.exec), + Arc::clone(&fixture.task_ctx), + )); + }); + }, + ); + exec_group.finish(); +} + +criterion_group!(benches, bench_json_boundary); +criterion_main!(benches); diff --git a/datafusion/datasource-json/src/boundary_utils.rs b/datafusion/datasource-json/src/boundary_utils.rs index 7807c0fe60e97..0791bbb17a5ce 100644 --- a/datafusion/datasource-json/src/boundary_utils.rs +++ b/datafusion/datasource-json/src/boundary_utils.rs @@ -110,7 +110,16 @@ pub async fn get_aligned_bytes( #[cfg(test)] mod tests { use super::*; + use async_trait::async_trait; + use datafusion_datasource::{FileRange, PartitionedFile, calculate_range}; + use futures::stream::BoxStream; use object_store::memory::InMemory; + use object_store::{ + GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectStore, + PutMultipartOptions, PutOptions, PutPayload, PutResult, + }; + use std::fmt; + use std::sync::atomic::{AtomicU64, Ordering}; #[tokio::test] async fn test_get_aligned_bytes_start_at_beginning() { @@ -212,4 +221,165 @@ mod tests { assert_eq!(result.unwrap().as_ref(), b"line1"); } + + #[derive(Debug)] + struct CountingObjectStore { + inner: Arc, + requested_bytes: AtomicU64, + requested_calls: AtomicU64, + } + + impl CountingObjectStore { + fn new(inner: Arc) -> Self { + Self { + inner, + requested_bytes: AtomicU64::new(0), + requested_calls: AtomicU64::new(0), + } + } + + fn reset(&self) { + self.requested_bytes.store(0, Ordering::Relaxed); + self.requested_calls.store(0, Ordering::Relaxed); + } + + fn requested_bytes(&self) -> u64 { + self.requested_bytes.load(Ordering::Relaxed) + } + } + + impl fmt::Display for CountingObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "CountingObjectStore({})", self.inner) + } + } + + #[async_trait] + impl ObjectStore for CountingObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> object_store::Result { + self.inner.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> object_store::Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store::Result { + if let Some(range) = options.range.as_ref() { + let requested = match range { + GetRange::Bounded(r) => r.end.saturating_sub(r.start), + GetRange::Offset(_) | GetRange::Suffix(_) => 0, + }; + self.requested_bytes.fetch_add(requested, Ordering::Relaxed); + } + self.requested_calls.fetch_add(1, Ordering::Relaxed); + self.inner.get_opts(location, options).await + } + + async fn delete(&self, location: &Path) -> object_store::Result<()> { + self.inner.delete(location).await + } + + fn list( + &self, + prefix: Option<&Path>, + ) -> BoxStream<'static, object_store::Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> object_store::Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists( + &self, + from: &Path, + to: &Path, + ) -> object_store::Result<()> { + self.inner.copy_if_not_exists(from, to).await + } + } + + fn build_fixed_lines(line_len: usize, lines: usize) -> Bytes { + let body_len = line_len.saturating_sub(1); + let mut data = Vec::with_capacity(line_len * lines); + for _ in 0..lines { + data.extend(std::iter::repeat(b'a').take(body_len)); + data.push(b'\n'); + } + Bytes::from(data) + } + + #[tokio::test] + async fn test_get_aligned_bytes_reduces_requested_bytes() { + let inner: Arc = Arc::new(InMemory::new()); + let store = Arc::new(CountingObjectStore::new(Arc::clone(&inner))); + let store_dyn: Arc = store.clone(); + let path = Path::from("amplification.json"); + + let data = build_fixed_lines(128, 16_384); + let file_size = data.len(); + inner.put(&path, data.into()).await.unwrap(); + + let start = 1_000_003usize; + let raw_end = start + 64_000; + let end = (raw_end / 128).max(1) * 128; + + let object_meta = inner.head(&path).await.unwrap(); + let file = PartitionedFile { + object_meta, + partition_values: vec![], + range: Some(FileRange { + start: start as i64, + end: end as i64, + }), + statistics: None, + ordering: None, + extensions: None, + metadata_size_hint: None, + }; + + store.reset(); + let _ = calculate_range(&file, &store_dyn, None).await.unwrap(); + let old_bytes = store.requested_bytes(); + + store.reset(); + let _ = get_aligned_bytes( + &store_dyn, + &path, + start, + end, + file_size, + b'\n', + DEFAULT_BOUNDARY_WINDOW, + ) + .await + .unwrap(); + let new_bytes = store.requested_bytes(); + + assert!( + old_bytes >= new_bytes * 10, + "expected old path to request significantly more bytes, old={old_bytes}, new={new_bytes}" + ); + } } diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 8106b8ce6243a..a03830846859c 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -31,9 +31,7 @@ use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream}; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::projection::{ProjectionOpener, SplitProjection}; -use datafusion_datasource::{ - ListingTableUrl, PartitionedFile, RangeCalculation, as_file_source, calculate_range, -}; +use datafusion_datasource::{ListingTableUrl, PartitionedFile, as_file_source}; use datafusion_physical_plan::projection::ProjectionExprs; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; From a11a7e98650b656fc57a8848d7085ca2ccb4c40a Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Wed, 7 Jan 2026 21:07:58 +0200 Subject: [PATCH 3/7] chore --- .../datasource-json/benches/json_boundary.rs | 46 ++++++++++++++++--- 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/datafusion/datasource-json/benches/json_boundary.rs b/datafusion/datasource-json/benches/json_boundary.rs index 299a4049b0cbd..96307049e81f5 100644 --- a/datafusion/datasource-json/benches/json_boundary.rs +++ b/datafusion/datasource-json/benches/json_boundary.rs @@ -41,25 +41,31 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use tokio::runtime::{Builder, Runtime}; +// Add CPU cost per requested KB to make read amplification visible in timings. +const CPU_COST_PER_KB_ROUNDS: u32 = 64; +const BYTES_PER_KB: u64 = 1024; + #[derive(Debug)] struct CountingObjectStore { inner: Arc, requested_bytes: AtomicU64, - requested_calls: AtomicU64, + cpu_cost_per_kb_rounds: u32, } impl CountingObjectStore { - fn new(inner: Arc) -> Self { + fn new_with_cpu_cost( + inner: Arc, + cpu_cost_per_kb_rounds: u32, + ) -> Self { Self { inner, requested_bytes: AtomicU64::new(0), - requested_calls: AtomicU64::new(0), + cpu_cost_per_kb_rounds, } } fn reset(&self) { self.requested_bytes.store(0, Ordering::Relaxed); - self.requested_calls.store(0, Ordering::Relaxed); } fn requested_bytes(&self) -> u64 { @@ -97,15 +103,21 @@ impl ObjectStore for CountingObjectStore { location: &Path, options: GetOptions, ) -> object_store::Result { + let should_burn_cpu = self.cpu_cost_per_kb_rounds > 0; + let mut requested_len = 0u64; if let Some(range) = options.range.as_ref() { let requested = match range { GetRange::Bounded(r) => r.end.saturating_sub(r.start), GetRange::Offset(_) | GetRange::Suffix(_) => 0, }; + requested_len = requested; self.requested_bytes.fetch_add(requested, Ordering::Relaxed); } - self.requested_calls.fetch_add(1, Ordering::Relaxed); - self.inner.get_opts(location, options).await + let result = self.inner.get_opts(location, options).await; + if should_burn_cpu { + burn_cpu_kb(requested_len, self.cpu_cost_per_kb_rounds); + } + result } async fn delete(&self, location: &Path) -> object_store::Result<()> { @@ -158,6 +170,21 @@ fn build_fixed_json_lines(line_len: usize, lines: usize) -> Bytes { Bytes::from(data) } +fn burn_cpu_kb(bytes: u64, rounds: u32) { + if bytes == 0 || rounds == 0 { + return; + } + let kb = (bytes + BYTES_PER_KB - 1) / BYTES_PER_KB; + let mut checksum = 0u64; + let mut remaining = kb.saturating_mul(rounds as u64); + while remaining > 0 { + checksum = checksum.wrapping_add(remaining); + checksum = checksum.rotate_left(5) ^ 0x9e3779b97f4a7c15; + remaining -= 1; + } + std::hint::black_box(checksum); +} + struct Fixture { store: Arc, task_ctx: Arc, @@ -166,7 +193,10 @@ struct Fixture { fn build_fixture(rt: &Runtime) -> Fixture { let inner: Arc = Arc::new(InMemory::new()); - let store = Arc::new(CountingObjectStore::new(Arc::clone(&inner))); + let store = Arc::new(CountingObjectStore::new_with_cpu_cost( + Arc::clone(&inner), + CPU_COST_PER_KB_ROUNDS, + )); let store_dyn: Arc = store.clone(); let path = Path::from("bench.json"); @@ -183,6 +213,7 @@ fn build_fixture(rt: &Runtime) -> Fixture { let task_ctx = Arc::new(TaskContext::default()); let runtime_env = task_ctx.runtime_env(); let object_store_url = ObjectStoreUrl::parse("test://bucket").unwrap(); + // Register a CPU-costed store to approximate non-streaming remote reads. runtime_env.register_object_store(object_store_url.as_ref(), Arc::clone(&store_dyn)); let schema = Arc::new(Schema::new(vec![Field::new( "value", @@ -253,6 +284,7 @@ fn bench_json_boundary(c: &mut Criterion) { let exec_bytes = measure_datasource_exec_bytes(&rt, &fixture); let mut exec_group = c.benchmark_group("json_boundary_datasource_exec"); + // The read_bytes tag is the primary signal; time reflects simulated CPU cost. exec_group.bench_function( BenchmarkId::new("execute", format!("read_bytes={exec_bytes}")), |b| { From d9ec46495dabf4d290ecd50cd44b507a32e3b984 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Wed, 7 Jan 2026 21:32:00 +0200 Subject: [PATCH 4/7] chore --- .../datasource-json/benches/json_boundary.rs | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/datafusion/datasource-json/benches/json_boundary.rs b/datafusion/datasource-json/benches/json_boundary.rs index 96307049e81f5..36fda2d504970 100644 --- a/datafusion/datasource-json/benches/json_boundary.rs +++ b/datafusion/datasource-json/benches/json_boundary.rs @@ -18,7 +18,7 @@ use arrow::datatypes::{DataType, Field, Schema}; use async_trait::async_trait; use bytes::Bytes; -use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use criterion::{Criterion, Throughput, criterion_group, criterion_main}; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; @@ -284,19 +284,17 @@ fn bench_json_boundary(c: &mut Criterion) { let exec_bytes = measure_datasource_exec_bytes(&rt, &fixture); let mut exec_group = c.benchmark_group("json_boundary_datasource_exec"); - // The read_bytes tag is the primary signal; time reflects simulated CPU cost. - exec_group.bench_function( - BenchmarkId::new("execute", format!("read_bytes={exec_bytes}")), - |b| { - b.iter(|| { - fixture.store.reset(); - rt.block_on(run_datasource_exec( - Arc::clone(&fixture.exec), - Arc::clone(&fixture.task_ctx), - )); - }); - }, - ); + exec_group.throughput(Throughput::Bytes(exec_bytes)); + // Fixed benchmark id for baseline comparisons; read_bytes is reported as throughput. + exec_group.bench_function("execute", |b| { + b.iter(|| { + fixture.store.reset(); + rt.block_on(run_datasource_exec( + Arc::clone(&fixture.exec), + Arc::clone(&fixture.task_ctx), + )); + }); + }); exec_group.finish(); } From 54a6cfc784c324ff3629d7008048e4b5b917c21c Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Thu, 8 Jan 2026 10:58:51 +0200 Subject: [PATCH 5/7] feat: update boundary handling to use u64 for sizes and add error handling for zero scan window --- .../datasource-json/src/boundary_utils.rs | 54 ++++++++++++++----- datafusion/datasource-json/src/source.rs | 16 ++++-- 2 files changed, 52 insertions(+), 18 deletions(-) diff --git a/datafusion/datasource-json/src/boundary_utils.rs b/datafusion/datasource-json/src/boundary_utils.rs index 0791bbb17a5ce..6563828e7b373 100644 --- a/datafusion/datasource-json/src/boundary_utils.rs +++ b/datafusion/datasource-json/src/boundary_utils.rs @@ -20,7 +20,7 @@ use datafusion_common::{DataFusionError, Result}; use object_store::{ObjectStore, path::Path}; use std::sync::Arc; -pub const DEFAULT_BOUNDARY_WINDOW: usize = 4096; // 4KB +pub const DEFAULT_BOUNDARY_WINDOW: u64 = 4096; // 4KB /// Fetch bytes for [start, end) and align boundaries in memory. /// @@ -36,12 +36,18 @@ pub const DEFAULT_BOUNDARY_WINDOW: usize = 4096; // 4KB pub async fn get_aligned_bytes( store: &Arc, location: &Path, - start: usize, - end: usize, - file_size: usize, + start: u64, + end: u64, + file_size: u64, terminator: u8, - scan_window: usize, + scan_window: u64, ) -> Result> { + if scan_window == 0 { + return Err(DataFusionError::Internal( + "scan_window must be greater than 0".to_string(), + )); + } + if start >= end || start >= file_size { return Ok(None); } @@ -49,7 +55,7 @@ pub async fn get_aligned_bytes( let fetch_start = start.saturating_sub(1); let fetch_end = std::cmp::min(end, file_size); let bytes = store - .get_range(location, (fetch_start as u64)..(fetch_end as u64)) + .get_range(location, fetch_start..fetch_end) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; @@ -80,12 +86,15 @@ pub async fn get_aligned_bytes( } // Slow path: need to extend, preallocate capacity - let mut buffer = Vec::with_capacity(data.len() + scan_window); + let scan_window_usize = usize::try_from(scan_window).map_err(|_| { + DataFusionError::Internal("scan_window must fit in usize".to_string()) + })?; + let mut buffer = Vec::with_capacity(data.len().saturating_add(scan_window_usize)); buffer.extend_from_slice(&data); - let mut cursor = fetch_end as u64; + let mut cursor = fetch_end; - while cursor < file_size as u64 { - let chunk_end = std::cmp::min(cursor + scan_window as u64, file_size as u64); + while cursor < file_size { + let chunk_end = std::cmp::min(cursor.saturating_add(scan_window), file_size); let chunk = store .get_range(location, cursor..chunk_end) .await @@ -222,6 +231,23 @@ mod tests { assert_eq!(result.unwrap().as_ref(), b"line1"); } + #[tokio::test] + async fn test_get_aligned_bytes_rejects_zero_scan_window() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + store.put(&path, "line1\n".into()).await.unwrap(); + + let err = get_aligned_bytes(&store, &path, 0, 6, 6, b'\n', 0) + .await + .unwrap_err(); + + assert!( + matches!(err, DataFusionError::Internal(ref msg) if msg.contains("scan_window")), + "unexpected error: {err}" + ); + } + #[derive(Debug)] struct CountingObjectStore { inner: Arc, @@ -338,10 +364,10 @@ mod tests { let path = Path::from("amplification.json"); let data = build_fixed_lines(128, 16_384); - let file_size = data.len(); + let file_size = data.len() as u64; inner.put(&path, data.into()).await.unwrap(); - let start = 1_000_003usize; + let start = 1_000_003u64; let raw_end = start + 64_000; let end = (raw_end / 128).max(1) * 128; @@ -350,8 +376,8 @@ mod tests { object_meta, partition_values: vec![], range: Some(FileRange { - start: start as i64, - end: end as i64, + start: i64::try_from(start).unwrap(), + end: i64::try_from(end).unwrap(), }), statistics: None, ordering: None, diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index a03830846859c..2e9de2485c404 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -187,7 +187,7 @@ impl FileOpener for JsonOpener { let file_compression_type = self.file_compression_type.to_owned(); Ok(Box::pin(async move { - let file_size = partitioned_file.object_meta.size as usize; + let file_size = partitioned_file.object_meta.size; let location = &partitioned_file.object_meta.location; let file_range = if file_compression_type.is_compressed() { @@ -197,8 +197,16 @@ impl FileOpener for JsonOpener { }; if let Some(file_range) = file_range.as_ref() { - let raw_start = file_range.start as usize; - let raw_end = file_range.end as usize; + let raw_start = u64::try_from(file_range.start).map_err(|_| { + DataFusionError::Internal( + "file range start must be non-negative".to_string(), + ) + })?; + let raw_end = u64::try_from(file_range.end).map_err(|_| { + DataFusionError::Internal( + "file range end must be non-negative".to_string(), + ) + })?; let aligned_bytes = get_aligned_bytes( &store, location, @@ -245,7 +253,7 @@ impl FileOpener for JsonOpener { Some(_) => { file.seek(SeekFrom::Start(result.range.start as _))?; let limit = result.range.end - result.range.start; - file_compression_type.convert_read(file.take(limit as u64))? + file_compression_type.convert_read(file.take(limit))? } }; From a9e95b0c13a8914cedc15a8de43fdff100b5766e Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Thu, 8 Jan 2026 11:57:35 +0200 Subject: [PATCH 6/7] chore: fix clippy --- datafusion/datasource-json/benches/json_boundary.rs | 4 ++-- datafusion/datasource-json/src/boundary_utils.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource-json/benches/json_boundary.rs b/datafusion/datasource-json/benches/json_boundary.rs index 36fda2d504970..6bfd329588a0a 100644 --- a/datafusion/datasource-json/benches/json_boundary.rs +++ b/datafusion/datasource-json/benches/json_boundary.rs @@ -160,7 +160,7 @@ fn build_fixed_json_lines(line_len: usize, lines: usize) -> Bytes { let padding_len = line_len - prefix.len() - suffix.len(); let mut line = Vec::with_capacity(line_len); line.extend_from_slice(prefix.as_bytes()); - line.extend(std::iter::repeat(b'a').take(padding_len)); + line.extend(std::iter::repeat_n(b'a', padding_len)); line.extend_from_slice(suffix.as_bytes()); let mut data = Vec::with_capacity(line_len * lines); @@ -174,7 +174,7 @@ fn burn_cpu_kb(bytes: u64, rounds: u32) { if bytes == 0 || rounds == 0 { return; } - let kb = (bytes + BYTES_PER_KB - 1) / BYTES_PER_KB; + let kb = bytes.div_ceil(BYTES_PER_KB); let mut checksum = 0u64; let mut remaining = kb.saturating_mul(rounds as u64); while remaining > 0 { diff --git a/datafusion/datasource-json/src/boundary_utils.rs b/datafusion/datasource-json/src/boundary_utils.rs index 6563828e7b373..9861a35f71f2e 100644 --- a/datafusion/datasource-json/src/boundary_utils.rs +++ b/datafusion/datasource-json/src/boundary_utils.rs @@ -350,7 +350,7 @@ mod tests { let body_len = line_len.saturating_sub(1); let mut data = Vec::with_capacity(line_len * lines); for _ in 0..lines { - data.extend(std::iter::repeat(b'a').take(body_len)); + data.extend(std::iter::repeat_n(b'a', body_len)); data.push(b'\n'); } Bytes::from(data) From 2adb780ae45bf1af47c3181dee0997e0e695f753 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Sat, 10 Jan 2026 14:20:13 +0200 Subject: [PATCH 7/7] feat: add unit tests for get_aligned_bytes function to validate boundary handling --- .../datasource-json/src/boundary_utils.rs | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/datafusion/datasource-json/src/boundary_utils.rs b/datafusion/datasource-json/src/boundary_utils.rs index 9861a35f71f2e..1184de9c684f6 100644 --- a/datafusion/datasource-json/src/boundary_utils.rs +++ b/datafusion/datasource-json/src/boundary_utils.rs @@ -356,6 +356,79 @@ mod tests { Bytes::from(data) } + #[tokio::test] + async fn test_get_aligned_bytes_start_equals_end() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + store.put(&path, "line1\n".into()).await.unwrap(); + + let result = get_aligned_bytes(&store, &path, 3, 3, 6, b'\n', 4096) + .await + .unwrap(); + + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_get_aligned_bytes_start_beyond_file_size() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + store.put(&path, "line1\n".into()).await.unwrap(); + + let result = get_aligned_bytes(&store, &path, 10, 20, 6, b'\n', 4096) + .await + .unwrap(); + + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_get_aligned_bytes_multi_window_extension() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + // Create a line longer than scan_window (use scan_window=4) + // "aaaaaaaaaa\n" = 11 bytes, need multiple 4-byte windows to find newline + store.put(&path, "aaaaaaaaaa\nbbbb\n".into()).await.unwrap(); + + // Request [0, 5), end is in the middle of line, need to extend + // with scan_window=4, need 2 extensions to reach position 10 (the newline) + let result = get_aligned_bytes(&store, &path, 0, 5, 16, b'\n', 4) + .await + .unwrap(); + + assert_eq!(result.unwrap().as_ref(), b"aaaaaaaaaa\n"); + } + + #[tokio::test] + async fn test_get_aligned_bytes_partitions_complete_coverage() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + // 5 lines, each 10 bytes = 50 bytes total + let content = "aaaaaaaaa\nbbbbbbbbb\nccccccccc\nddddddddd\neeeeeeeee\n"; + store.put(&path, content.into()).await.unwrap(); + let file_size = content.len() as u64; + + // Split at arbitrary boundaries: [0, 15), [15, 35), [35, 50) + let boundaries = vec![0u64, 15, 35, file_size]; + let mut combined = Vec::new(); + + for window in boundaries.windows(2) { + let (start, end) = (window[0], window[1]); + let bytes = + get_aligned_bytes(&store, &path, start, end, file_size, b'\n', 4096) + .await + .unwrap() + .unwrap(); + combined.extend_from_slice(&bytes); + } + + assert_eq!(combined, content.as_bytes()); + } + #[tokio::test] async fn test_get_aligned_bytes_reduces_requested_bytes() { let inner: Arc = Arc::new(InMemory::new());