Skip to content

Commit 743018b

Browse files
fix: Eliminate endless busy looping in read_json_files on failed read (#1489)
## What changes are proposed in this pull request? This PR ensures that at most one error is emitted for the Arrow Json Reader. In the past, this would endlessly produce error variants since it only terminates upon EOF. Link to the Arrow [`read` function](https://arrow.apache.org/rust/arrow_json/reader/struct.Reader.html#method.read) that shows it only terminates on an EOF. Fixes: #1050 <!-- Uncomment this section if there are any changes affecting public APIs: ### This PR affects the following public APIs If there are breaking changes, please ensure the `breaking-changes` label gets added by CI, and describe why the changes are needed. Note that _new_ public APIs are not considered breaking. --> ## How was this change tested? This takes the repro and ensures that shutting down the tokio runtime will never timeout. A tokio runtime timeout indicates that one of the threads never stops.
1 parent 5f45d9d commit 743018b

File tree

1 file changed

+76
-1
lines changed

1 file changed

+76
-1
lines changed

kernel/src/engine/default/json.rs

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
126126
while let Some(item) = stream.next().await {
127127
if tx.send(item).is_err() {
128128
warn!("read_json receiver end of channel dropped before sending completed");
129+
break;
129130
}
130131
}
131132
});
@@ -199,7 +200,19 @@ impl JsonOpener {
199200
let reader = ReaderBuilder::new(schema)
200201
.with_batch_size(batch_size)
201202
.build(BufReader::new(file))?;
202-
Ok(futures::stream::iter(reader).map_err(Error::from).boxed())
203+
204+
let mut seen_error = false;
205+
Ok(futures::stream::iter(reader)
206+
.map_err(Error::from)
207+
.take_while(move |result| {
208+
// Emit exactly one error, then stop the stream. We check seen_error BEFORE
209+
// updating it so the first error passes through, but subsequent items don't.
210+
// This is necessary because Arrow's Reader loops the same error indefinitely.
211+
let return_this = !seen_error;
212+
seen_error = seen_error || result.is_err();
213+
futures::future::ready(return_this)
214+
})
215+
.boxed())
203216
}
204217
GetResultPayload::Stream(s) => {
205218
let mut decoder = ReaderBuilder::new(schema)
@@ -271,6 +284,7 @@ mod tests {
271284
PutPayload, PutResult, Result,
272285
};
273286
use serde_json::json;
287+
use tracing::info;
274288

275289
// TODO: should just use the one from test_utils, but running into dependency issues
276290
fn into_record_batch(engine_data: Box<dyn EngineData>) -> RecordBatch {
@@ -623,6 +637,67 @@ mod tests {
623637
);
624638
}
625639

640+
use crate::engine::default::DefaultEngine;
641+
use crate::schema::StructType;
642+
use crate::Engine;
643+
use std::io::Write;
644+
use tempfile::NamedTempFile;
645+
646+
fn make_invalid_named_temp() -> (NamedTempFile, Url) {
647+
let mut temp_file = NamedTempFile::new().expect("Failed to create temp file");
648+
write!(temp_file, r#"this is not valid json"#).expect("Failed to write to temp file");
649+
let path = temp_file.path();
650+
let file_url = Url::from_file_path(path).expect("Failed to create file URL");
651+
652+
info!("Created temporary malformed file at: {file_url}");
653+
(temp_file, file_url)
654+
}
655+
656+
#[test]
657+
fn test_read_invalid_json() -> Result<(), Box<dyn std::error::Error>> {
658+
let _ = tracing_subscriber::fmt().try_init();
659+
let (_temp_file1, file_url1) = make_invalid_named_temp();
660+
let (_temp_file2, file_url2) = make_invalid_named_temp();
661+
let field = StructField::nullable("name", crate::schema::DataType::BOOLEAN);
662+
let schema = Arc::new(StructType::try_new(vec![field]).unwrap());
663+
let default_engine = DefaultEngine::new(Arc::new(LocalFileSystem::new()));
664+
665+
// Helper to check that we get expected number of errors then stream ends
666+
let check_errors = |file_urls: Vec<_>, expected_errors: usize| {
667+
let file_vec: Vec<_> = file_urls
668+
.into_iter()
669+
.map(|url| FileMeta::new(url, 1, 1))
670+
.collect();
671+
672+
let mut iter = default_engine
673+
.json_handler()
674+
.read_json_files(&file_vec, schema.clone(), None)
675+
.unwrap();
676+
677+
for _ in 0..expected_errors {
678+
assert!(
679+
iter.next().unwrap().is_err(),
680+
"Read succeeded unexpectedly. The JSON should have been invalid."
681+
);
682+
}
683+
684+
assert!(
685+
iter.next().is_none(),
686+
"The stream should end once the read result fails"
687+
);
688+
};
689+
690+
// CASE 1: Single failing file
691+
info!("\nAttempting to read single malformed JSON file...");
692+
check_errors(vec![file_url1.clone()], 1);
693+
694+
// CASE 2: Two failing files
695+
info!("\nAttempting to read two malformed JSON files...");
696+
check_errors(vec![file_url1, file_url2], 2);
697+
698+
Ok(())
699+
}
700+
626701
#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
627702
async fn test_read_json_files_ordering() {
628703
// this test checks that the read_json_files method returns the files in order in the

0 commit comments

Comments
 (0)