Skip to content

Commit 83be5a6

Browse files
Merge branch 'main' into aleksandarskrbic/refactor/scan-tests
2 parents 313c948 + 1c29adf commit 83be5a6

File tree

11 files changed

+1223
-34
lines changed

11 files changed

+1223
-34
lines changed

kernel/src/actions/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ static LOG_ADD_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
9797
)]))
9898
});
9999

100+
static LOG_REMOVE_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
101+
Arc::new(StructType::new_unchecked([StructField::nullable(
102+
REMOVE_NAME,
103+
Remove::to_schema(),
104+
)]))
105+
});
106+
100107
static LOG_COMMIT_INFO_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
101108
Arc::new(StructType::new_unchecked([StructField::nullable(
102109
COMMIT_INFO_NAME,
@@ -137,6 +144,10 @@ pub(crate) fn get_log_add_schema() -> &'static SchemaRef {
137144
&LOG_ADD_SCHEMA
138145
}
139146

147+
pub(crate) fn get_log_remove_schema() -> &'static SchemaRef {
148+
&LOG_REMOVE_SCHEMA
149+
}
150+
140151
pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef {
141152
&LOG_COMMIT_INFO_SCHEMA
142153
}

kernel/src/engine/arrow_expression/evaluate_expression.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,9 +225,9 @@ pub fn evaluate_expression(
225225
(Struct(fields), Some(DataType::Struct(output_schema))) => {
226226
evaluate_struct_expression(fields, batch, output_schema)
227227
}
228-
(Struct(_), _) => Err(Error::generic(
229-
"Data type is required to evaluate struct expressions",
230-
)),
228+
(Struct(_), dt) => Err(Error::Generic(format!(
229+
"Struct expression expects a DataType::Struct result, but got {dt:?}"
230+
))),
231231
(Transform(transform), Some(DataType::Struct(output_schema))) => {
232232
evaluate_transform_expression(transform, batch, output_schema)
233233
}

kernel/src/engine/default/json.rs

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
126126
while let Some(item) = stream.next().await {
127127
if tx.send(item).is_err() {
128128
warn!("read_json receiver end of channel dropped before sending completed");
129+
break;
129130
}
130131
}
131132
});
@@ -199,7 +200,19 @@ impl JsonOpener {
199200
let reader = ReaderBuilder::new(schema)
200201
.with_batch_size(batch_size)
201202
.build(BufReader::new(file))?;
202-
Ok(futures::stream::iter(reader).map_err(Error::from).boxed())
203+
204+
let mut seen_error = false;
205+
Ok(futures::stream::iter(reader)
206+
.map_err(Error::from)
207+
.take_while(move |result| {
208+
// Emit exactly one error, then stop the stream. We check seen_error BEFORE
209+
// updating it so the first error passes through, but subsequent items don't.
210+
// This is necessary because Arrow's Reader loops the same error indefinitely.
211+
let return_this = !seen_error;
212+
seen_error = seen_error || result.is_err();
213+
futures::future::ready(return_this)
214+
})
215+
.boxed())
203216
}
204217
GetResultPayload::Stream(s) => {
205218
let mut decoder = ReaderBuilder::new(schema)
@@ -271,6 +284,7 @@ mod tests {
271284
PutPayload, PutResult, Result,
272285
};
273286
use serde_json::json;
287+
use tracing::info;
274288

275289
// TODO: should just use the one from test_utils, but running into dependency issues
276290
fn into_record_batch(engine_data: Box<dyn EngineData>) -> RecordBatch {
@@ -623,6 +637,67 @@ mod tests {
623637
);
624638
}
625639

640+
use crate::engine::default::DefaultEngine;
641+
use crate::schema::StructType;
642+
use crate::Engine;
643+
use std::io::Write;
644+
use tempfile::NamedTempFile;
645+
646+
fn make_invalid_named_temp() -> (NamedTempFile, Url) {
647+
let mut temp_file = NamedTempFile::new().expect("Failed to create temp file");
648+
write!(temp_file, r#"this is not valid json"#).expect("Failed to write to temp file");
649+
let path = temp_file.path();
650+
let file_url = Url::from_file_path(path).expect("Failed to create file URL");
651+
652+
info!("Created temporary malformed file at: {file_url}");
653+
(temp_file, file_url)
654+
}
655+
656+
#[test]
657+
fn test_read_invalid_json() -> Result<(), Box<dyn std::error::Error>> {
658+
let _ = tracing_subscriber::fmt().try_init();
659+
let (_temp_file1, file_url1) = make_invalid_named_temp();
660+
let (_temp_file2, file_url2) = make_invalid_named_temp();
661+
let field = StructField::nullable("name", crate::schema::DataType::BOOLEAN);
662+
let schema = Arc::new(StructType::try_new(vec![field]).unwrap());
663+
let default_engine = DefaultEngine::new(Arc::new(LocalFileSystem::new()));
664+
665+
// Helper to check that we get expected number of errors then stream ends
666+
let check_errors = |file_urls: Vec<_>, expected_errors: usize| {
667+
let file_vec: Vec<_> = file_urls
668+
.into_iter()
669+
.map(|url| FileMeta::new(url, 1, 1))
670+
.collect();
671+
672+
let mut iter = default_engine
673+
.json_handler()
674+
.read_json_files(&file_vec, schema.clone(), None)
675+
.unwrap();
676+
677+
for _ in 0..expected_errors {
678+
assert!(
679+
iter.next().unwrap().is_err(),
680+
"Read succeeded unexpectedly. The JSON should have been invalid."
681+
);
682+
}
683+
684+
assert!(
685+
iter.next().is_none(),
686+
"The stream should end once the read result fails"
687+
);
688+
};
689+
690+
// CASE 1: Single failing file
691+
info!("\nAttempting to read single malformed JSON file...");
692+
check_errors(vec![file_url1.clone()], 1);
693+
694+
// CASE 2: Two failing files
695+
info!("\nAttempting to read two malformed JSON files...");
696+
check_errors(vec![file_url1, file_url2], 2);
697+
698+
Ok(())
699+
}
700+
626701
#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
627702
async fn test_read_json_files_ordering() {
628703
// this test checks that the read_json_files method returns the files in order in the

kernel/src/scan/log_replay.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,11 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> {
272272
}
273273
}
274274

275+
pub(crate) static FILE_CONSTANT_VALUES_NAME: &str = "fileConstantValues";
276+
pub(crate) static BASE_ROW_ID_NAME: &str = "baseRowId";
277+
pub(crate) static DEFAULT_ROW_COMMIT_VERSION_NAME: &str = "defaultRowCommitVersion";
278+
pub(crate) static TAGS_NAME: &str = "tags";
279+
275280
// NB: If you update this schema, ensure you update the comment describing it in the doc comment
276281
// for `scan_row_schema` in scan/mod.rs! You'll also need to update ScanFileVisitor as the
277282
// indexes will be off, and [`get_add_transform_expr`] below to match it.
@@ -280,15 +285,24 @@ pub(crate) static SCAN_ROW_SCHEMA: LazyLock<Arc<StructType>> = LazyLock::new(||
280285
let partition_values = MapType::new(DataType::STRING, DataType::STRING, true);
281286
let file_constant_values = StructType::new_unchecked([
282287
StructField::nullable("partitionValues", partition_values),
283-
StructField::nullable("baseRowId", DataType::LONG),
288+
StructField::nullable(BASE_ROW_ID_NAME, DataType::LONG),
289+
StructField::nullable(DEFAULT_ROW_COMMIT_VERSION_NAME, DataType::LONG),
290+
StructField::nullable(
291+
"tags",
292+
MapType::new(
293+
DataType::STRING,
294+
DataType::STRING,
295+
/*valueContainsNull*/ true,
296+
),
297+
),
284298
]);
285299
Arc::new(StructType::new_unchecked([
286300
StructField::nullable("path", DataType::STRING),
287301
StructField::nullable("size", DataType::LONG),
288302
StructField::nullable("modificationTime", DataType::LONG),
289303
StructField::nullable("stats", DataType::STRING),
290304
StructField::nullable("deletionVector", DeletionVectorDescriptor::to_schema()),
291-
StructField::nullable("fileConstantValues", file_constant_values),
305+
StructField::nullable(FILE_CONSTANT_VALUES_NAME, file_constant_values),
292306
]))
293307
});
294308

@@ -307,6 +321,8 @@ fn get_add_transform_expr() -> ExpressionRef {
307321
Arc::new(Expression::Struct(vec![
308322
column_expr_ref!("add.partitionValues"),
309323
column_expr_ref!("add.baseRowId"),
324+
column_expr_ref!("add.defaultRowCommitVersion"),
325+
column_expr_ref!("add.tags"),
310326
])),
311327
]))
312328
});
@@ -325,8 +341,10 @@ pub(crate) fn get_scan_metadata_transform_expr() -> ExpressionRef {
325341
column_expr_ref!("size"),
326342
column_expr_ref!("modificationTime"),
327343
column_expr_ref!("stats"),
344+
column_expr_ref!("fileConstantValues.tags"),
328345
column_expr_ref!("deletionVector"),
329346
column_expr_ref!("fileConstantValues.baseRowId"),
347+
column_expr_ref!("fileConstantValues.defaultRowCommitVersion"),
330348
],
331349
))]))
332350
});

kernel/src/scan/mod.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, EmptyColumnResol
2121
use crate::listed_log_files::ListedLogFiles;
2222
use crate::log_replay::{ActionsBatch, HasSelectionVector};
2323
use crate::log_segment::LogSegment;
24+
use crate::scan::log_replay::BASE_ROW_ID_NAME;
2425
use crate::scan::state::{DvInfo, Stats};
2526
use crate::scan::state_info::StateInfo;
2627
use crate::schema::{
@@ -456,6 +457,8 @@ impl Scan {
456457
_existing_predicate: Option<PredicateRef>,
457458
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<ScanMetadata>>>> {
458459
static RESTORED_ADD_SCHEMA: LazyLock<DataType> = LazyLock::new(|| {
460+
use crate::scan::log_replay::DEFAULT_ROW_COMMIT_VERSION_NAME;
461+
459462
let partition_values = MapType::new(DataType::STRING, DataType::STRING, true);
460463
DataType::struct_type_unchecked(vec![StructField::nullable(
461464
"add",
@@ -465,8 +468,13 @@ impl Scan {
465468
StructField::not_null("size", DataType::LONG),
466469
StructField::nullable("modificationTime", DataType::LONG),
467470
StructField::nullable("stats", DataType::STRING),
471+
StructField::nullable(
472+
"tags",
473+
MapType::new(DataType::STRING, DataType::STRING, true),
474+
),
468475
StructField::nullable("deletionVector", DeletionVectorDescriptor::to_schema()),
469-
StructField::nullable("baseRowId", DataType::LONG),
476+
StructField::nullable(BASE_ROW_ID_NAME, DataType::LONG),
477+
StructField::nullable(DEFAULT_ROW_COMMIT_VERSION_NAME, DataType::LONG),
470478
]),
471479
)])
472480
});
@@ -697,7 +705,9 @@ impl Scan {
697705
/// },
698706
/// fileConstantValues: {
699707
/// partitionValues: map<string, string>,
700-
/// baseRowId: long
708+
/// tags: map<string, string>,
709+
/// baseRowId: long,
710+
/// defaultRowCommitVersion: long,
701711
/// }
702712
/// }
703713
/// ```

kernel/src/scan/state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ impl<T> RowVisitor for ScanFileVisitor<'_, T> {
179179
}
180180
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
181181
require!(
182-
getters.len() == 11,
182+
getters.len() == 13,
183183
Error::InternalError(format!(
184184
"Wrong number of ScanFileVisitor getters: {}",
185185
getters.len()

0 commit comments

Comments
 (0)