Skip to content

Commit 7fde30a

Browse files
authored
fix: handle invalid byte ranges in calculate_range for single-line files (#19607)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #19605. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> The `calculate_range` function creates invalid byte ranges (where `start > end`) when reading single-line CSV/JSON files that are split into multiple partitions. This causes an error like: ``` ObjectStore(Generic { store: "S3", source: Inconsistent { start: 1149247, end: 1149246 } }) ``` When `find_first_newline` doesn't find a newline (single-line file), it returns the remaining file length. This causes `start + start_delta` to exceed `end + end_delta`, creating an invalid range. The current check only handles `range.start == range.end`, not `range.start > range.end`. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> 1. Added an early termination check after computing `start_delta`: if the first newline after `start` is beyond the partition boundary (`start + start_delta > end`), return `TerminateEarly` since no complete records exist in this partition. 2. Changed the final range validation from `==` to `>=` as a safety net for edge cases. 3. Added a regression test that reproduces the bug with a single-line file split into partitions. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes, added `test_calculate_range_single_line_file` which: - Creates a single-line JSON file without new-lines - Simulates partition 2 (middle to end of file) - Verifies that `calculate_range` returns `TerminateEarly` instead of an invalid range ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> No
1 parent 09455f1 commit 7fde30a

File tree

1 file changed

+32
-1
lines changed
  • datafusion/datasource/src

1 file changed

+32
-1
lines changed

datafusion/datasource/src/mod.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,10 @@ pub async fn calculate_range(
350350
0
351351
};
352352

353+
if start + start_delta > end {
354+
return Ok(RangeCalculation::TerminateEarly);
355+
}
356+
353357
let end_delta = if end != file_size {
354358
find_first_newline(store, location, end - 1, file_size, newline).await?
355359
} else {
@@ -358,7 +362,7 @@ pub async fn calculate_range(
358362

359363
let range = start + start_delta..end + end_delta;
360364

361-
if range.start == range.end {
365+
if range.start >= range.end {
362366
return Ok(RangeCalculation::TerminateEarly);
363367
}
364368

@@ -723,4 +727,31 @@ mod tests {
723727
// testing an empty path with `ignore_subdirectory` set to false
724728
assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
725729
}
730+
731+
/// Regression test for <https://github.com/apache/datafusion/issues/19605>
732+
#[tokio::test]
733+
async fn test_calculate_range_single_line_file() {
734+
use super::{PartitionedFile, RangeCalculation, calculate_range};
735+
use object_store::ObjectStore;
736+
use object_store::memory::InMemory;
737+
738+
let content = r#"{"id":1,"data":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}"#;
739+
let file_size = content.len() as u64;
740+
741+
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
742+
let path = Path::from("test.json");
743+
store.put(&path, content.into()).await.unwrap();
744+
745+
let mid = file_size / 2;
746+
let partitioned_file = PartitionedFile::new_with_range(
747+
path.to_string(),
748+
file_size,
749+
mid as i64,
750+
file_size as i64,
751+
);
752+
753+
let result = calculate_range(&partitioned_file, &store, None).await;
754+
755+
assert!(matches!(result, Ok(RangeCalculation::TerminateEarly)));
756+
}
726757
}

0 commit comments

Comments
 (0)