Skip to content

Commit 8252b62

Browse files
committed
add more tests
1 parent 57c5c46 commit 8252b62

File tree

2 files changed

+183
-1
lines changed

2 files changed

+183
-1
lines changed

datafusion-testing

Submodule datafusion-testing updated 259 files

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,188 @@ mod tests {
282282
assert_eq!(metric, 2, "Expected all rows to be pruned");
283283
}
284284

285+
#[tokio::test]
286+
async fn test_pushdown_with_missing_column_in_file_multiple_types() {
287+
let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
288+
289+
let file_schema =
290+
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
291+
292+
let table_schema = Arc::new(Schema::new(vec![
293+
Field::new("c1", DataType::Int32, true),
294+
Field::new("c2", DataType::Utf8, true),
295+
]));
296+
297+
let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap();
298+
299+
// Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`,
300+
// the default behavior is to fill in missing columns with nulls.
301+
// Thus this predicate will come back as false.
302+
let filter = col("c2").eq(lit("abc"));
303+
let rt = RoundTrip::new()
304+
.with_schema(table_schema.clone())
305+
.with_predicate(filter.clone())
306+
.with_pushdown_predicate()
307+
.round_trip(vec![batch.clone()])
308+
.await;
309+
let total_rows = rt
310+
.batches
311+
.unwrap()
312+
.iter()
313+
.map(|b| b.num_rows())
314+
.sum::<usize>();
315+
assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
316+
let metrics = rt.parquet_exec.metrics().unwrap();
317+
let metric = get_value(&metrics, "pushdown_rows_pruned");
318+
assert_eq!(metric, 3, "Expected all rows to be pruned");
319+
320+
// If we excplicitly allow nulls the rest of the predicate should work
321+
let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
322+
let rt = RoundTrip::new()
323+
.with_schema(table_schema.clone())
324+
.with_predicate(filter.clone())
325+
.with_pushdown_predicate()
326+
.round_trip(vec![batch.clone()])
327+
.await;
328+
let batches = rt.batches.unwrap();
329+
#[rustfmt::skip]
330+
let expected = [
331+
"+----+----+",
332+
"| c1 | c2 |",
333+
"+----+----+",
334+
"| 1 | |",
335+
"+----+----+",
336+
];
337+
assert_batches_sorted_eq!(expected, &batches);
338+
let metrics = rt.parquet_exec.metrics().unwrap();
339+
let metric = get_value(&metrics, "pushdown_rows_pruned");
340+
assert_eq!(metric, 2, "Expected all rows to be pruned");
341+
}
342+
343+
#[tokio::test]
344+
async fn test_pushdown_with_missing_middle_column() {
345+
let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
346+
let c3 = Arc::new(Int32Array::from(vec![7, 8, 9]));
347+
348+
let file_schema = Arc::new(Schema::new(vec![
349+
Field::new("c1", DataType::Int32, true),
350+
Field::new("c3", DataType::Int32, true),
351+
]));
352+
353+
let table_schema = Arc::new(Schema::new(vec![
354+
Field::new("c1", DataType::Int32, true),
355+
Field::new("c2", DataType::Utf8, true),
356+
Field::new("c3", DataType::Int32, true),
357+
]));
358+
359+
let batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap();
360+
361+
// Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`,
362+
// the default behavior is to fill in missing columns with nulls.
363+
// Thus this predicate will come back as false.
364+
let filter = col("c2").eq(lit("abc"));
365+
let rt = RoundTrip::new()
366+
.with_schema(table_schema.clone())
367+
.with_predicate(filter.clone())
368+
.with_pushdown_predicate()
369+
.round_trip(vec![batch.clone()])
370+
.await;
371+
let total_rows = rt
372+
.batches
373+
.unwrap()
374+
.iter()
375+
.map(|b| b.num_rows())
376+
.sum::<usize>();
377+
assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
378+
let metrics = rt.parquet_exec.metrics().unwrap();
379+
let metric = get_value(&metrics, "pushdown_rows_pruned");
380+
assert_eq!(metric, 3, "Expected all rows to be pruned");
381+
382+
// If we excplicitly allow nulls the rest of the predicate should work
383+
let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
384+
let rt = RoundTrip::new()
385+
.with_schema(table_schema.clone())
386+
.with_predicate(filter.clone())
387+
.with_pushdown_predicate()
388+
.round_trip(vec![batch.clone()])
389+
.await;
390+
let batches = rt.batches.unwrap();
391+
#[rustfmt::skip]
392+
let expected = [
393+
"+----+----+----+",
394+
"| c1 | c2 | c3 |",
395+
"+----+----+----+",
396+
"| 1 | | 7 |",
397+
"+----+----+----+",
398+
];
399+
assert_batches_sorted_eq!(expected, &batches);
400+
let metrics = rt.parquet_exec.metrics().unwrap();
401+
let metric = get_value(&metrics, "pushdown_rows_pruned");
402+
assert_eq!(metric, 2, "Expected all rows to be pruned");
403+
}
404+
405+
#[tokio::test]
406+
async fn test_pushdown_with_file_column_order_mismatch() {
407+
let c3 = Arc::new(Int32Array::from(vec![7, 8, 9]));
408+
409+
let file_schema = Arc::new(Schema::new(vec![
410+
Field::new("c3", DataType::Int32, true),
411+
Field::new("c3", DataType::Int32, true),
412+
]));
413+
414+
let table_schema = Arc::new(Schema::new(vec![
415+
Field::new("c1", DataType::Int32, true),
416+
Field::new("c2", DataType::Utf8, true),
417+
Field::new("c3", DataType::Int32, true),
418+
]));
419+
420+
let batch =
421+
RecordBatch::try_new(file_schema.clone(), vec![c3.clone(), c3]).unwrap();
422+
423+
// Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`,
424+
// the default behavior is to fill in missing columns with nulls.
425+
// Thus this predicate will come back as false.
426+
let filter = col("c2").eq(lit("abc"));
427+
let rt = RoundTrip::new()
428+
.with_schema(table_schema.clone())
429+
.with_predicate(filter.clone())
430+
.with_pushdown_predicate()
431+
.round_trip(vec![batch.clone()])
432+
.await;
433+
let total_rows = rt
434+
.batches
435+
.unwrap()
436+
.iter()
437+
.map(|b| b.num_rows())
438+
.sum::<usize>();
439+
assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
440+
let metrics = rt.parquet_exec.metrics().unwrap();
441+
let metric = get_value(&metrics, "pushdown_rows_pruned");
442+
assert_eq!(metric, 3, "Expected all rows to be pruned");
443+
444+
// If we excplicitly allow nulls the rest of the predicate should work
445+
let filter = col("c2").is_null().and(col("c3").eq(lit(7_i32)));
446+
let rt = RoundTrip::new()
447+
.with_schema(table_schema.clone())
448+
.with_predicate(filter.clone())
449+
.with_pushdown_predicate()
450+
.round_trip(vec![batch.clone()])
451+
.await;
452+
let batches = rt.batches.unwrap();
453+
#[rustfmt::skip]
454+
let expected = [
455+
"+----+----+----+",
456+
"| c1 | c2 | c3 |",
457+
"+----+----+----+",
458+
"| | | 7 |",
459+
"+----+----+----+",
460+
];
461+
assert_batches_sorted_eq!(expected, &batches);
462+
let metrics = rt.parquet_exec.metrics().unwrap();
463+
let metric = get_value(&metrics, "pushdown_rows_pruned");
464+
assert_eq!(metric, 2, "Expected all rows to be pruned");
465+
}
466+
285467
#[tokio::test]
286468
async fn evolved_schema() {
287469
let c1: ArrayRef =

0 commit comments

Comments
 (0)