Skip to content

Commit 5f4ab65

Browse files
committed
Don't clone the schema in logical2physical
1 parent 05ec0ae commit 5f4ab65

File tree

9 files changed

+203
-160
lines changed

9 files changed

+203
-160
lines changed

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ mod tests {
159159
let predicate = self
160160
.predicate
161161
.as_ref()
162-
.map(|p| logical2physical(p, &table_schema));
162+
.map(|p| logical2physical(p, Arc::clone(&table_schema)));
163163

164164
let mut source = ParquetSource::new(table_schema);
165165
if let Some(predicate) = predicate {

datafusion/datasource-parquet/src/opener.rs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1350,7 +1350,7 @@ mod test {
13501350

13511351
// A filter on "a" should not exclude any rows even if it matches the data
13521352
let expr = col("a").eq(lit(1));
1353-
let predicate = logical2physical(&expr, &schema);
1353+
let predicate = logical2physical(&expr, Arc::clone(&schema));
13541354
let opener = make_opener(predicate);
13551355
let stream = opener.open(file.clone()).unwrap().await.unwrap();
13561356
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1359,7 +1359,7 @@ mod test {
13591359

13601360
// A filter on `b = 5.0` should exclude all rows
13611361
let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0))));
1362-
let predicate = logical2physical(&expr, &schema);
1362+
let predicate = logical2physical(&expr, Arc::clone(&schema));
13631363
let opener = make_opener(predicate);
13641364
let stream = opener.open(file).unwrap().await.unwrap();
13651365
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1405,7 +1405,8 @@ mod test {
14051405
let expr = col("part").eq(lit(1));
14061406
// Mark the expression as dynamic even if it's not to force partition pruning to happen
14071407
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
1408-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1408+
let predicate =
1409+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
14091410
let opener = make_opener(predicate);
14101411
let stream = opener.open(file.clone()).unwrap().await.unwrap();
14111412
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1416,7 +1417,7 @@ mod test {
14161417
let expr = col("part").eq(lit(2));
14171418
// Mark the expression as dynamic even if it's not to force partition pruning to happen
14181419
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
1419-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1420+
let predicate = make_dynamic_expr(logical2physical(&expr, table_schema));
14201421
let opener = make_opener(predicate);
14211422
let stream = opener.open(file).unwrap().await.unwrap();
14221423
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1472,7 +1473,7 @@ mod test {
14721473

14731474
// Filter should match the partition value and file statistics
14741475
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0)));
1475-
let predicate = logical2physical(&expr, &table_schema);
1476+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
14761477
let opener = make_opener(predicate);
14771478
let stream = opener.open(file.clone()).unwrap().await.unwrap();
14781479
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1481,7 +1482,7 @@ mod test {
14811482

14821483
// Should prune based on partition value but not file statistics
14831484
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0)));
1484-
let predicate = logical2physical(&expr, &table_schema);
1485+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
14851486
let opener = make_opener(predicate);
14861487
let stream = opener.open(file.clone()).unwrap().await.unwrap();
14871488
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1490,7 +1491,7 @@ mod test {
14901491

14911492
// Should prune based on file statistics but not partition value
14921493
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0)));
1493-
let predicate = logical2physical(&expr, &table_schema);
1494+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
14941495
let opener = make_opener(predicate);
14951496
let stream = opener.open(file.clone()).unwrap().await.unwrap();
14961497
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1499,7 +1500,7 @@ mod test {
14991500

15001501
// Should prune based on both partition value and file statistics
15011502
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0)));
1502-
let predicate = logical2physical(&expr, &table_schema);
1503+
let predicate = logical2physical(&expr, table_schema);
15031504
let opener = make_opener(predicate);
15041505
let stream = opener.open(file).unwrap().await.unwrap();
15051506
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1545,7 +1546,7 @@ mod test {
15451546

15461547
// Filter should match the partition value and data value
15471548
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1)));
1548-
let predicate = logical2physical(&expr, &table_schema);
1549+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
15491550
let opener = make_opener(predicate);
15501551
let stream = opener.open(file.clone()).unwrap().await.unwrap();
15511552
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1554,7 +1555,7 @@ mod test {
15541555

15551556
// Filter should match the partition value but not the data value
15561557
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3)));
1557-
let predicate = logical2physical(&expr, &table_schema);
1558+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
15581559
let opener = make_opener(predicate);
15591560
let stream = opener.open(file.clone()).unwrap().await.unwrap();
15601561
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1563,7 +1564,7 @@ mod test {
15631564

15641565
// Filter should not match the partition value but match the data value
15651566
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1)));
1566-
let predicate = logical2physical(&expr, &table_schema);
1567+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
15671568
let opener = make_opener(predicate);
15681569
let stream = opener.open(file.clone()).unwrap().await.unwrap();
15691570
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1572,7 +1573,7 @@ mod test {
15721573

15731574
// Filter should not match the partition value or the data value
15741575
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3)));
1575-
let predicate = logical2physical(&expr, &table_schema);
1576+
let predicate = logical2physical(&expr, table_schema);
15761577
let opener = make_opener(predicate);
15771578
let stream = opener.open(file).unwrap().await.unwrap();
15781579
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1625,7 +1626,7 @@ mod test {
16251626
// This filter could prune based on statistics, but since it's not dynamic it's not applied for pruning
16261627
// (the assumption is this happened already at planning time)
16271628
let expr = col("a").eq(lit(42));
1628-
let predicate = logical2physical(&expr, &table_schema);
1629+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
16291630
let opener = make_opener(predicate);
16301631
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16311632
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1634,7 +1635,8 @@ mod test {
16341635

16351636
// If we make the filter dynamic, it should prune.
16361637
// This allows dynamic filters to prune partitions/files even if they are populated late into execution.
1637-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1638+
let predicate =
1639+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
16381640
let opener = make_opener(predicate);
16391641
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16401642
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1644,7 +1646,8 @@ mod test {
16441646
// If we have a filter that touches partition columns only and is dynamic, it should prune even if there are no stats.
16451647
file.statistics = Some(Arc::new(Statistics::new_unknown(&file_schema)));
16461648
let expr = col("part").eq(lit(2));
1647-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1649+
let predicate =
1650+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
16481651
let opener = make_opener(predicate);
16491652
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16501653
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1653,7 +1656,8 @@ mod test {
16531656

16541657
// Similarly a filter that combines partition and data columns should prune even if there are no stats.
16551658
let expr = col("part").eq(lit(2)).and(col("a").eq(lit(42)));
1656-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1659+
let predicate =
1660+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
16571661
let opener = make_opener(predicate);
16581662
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16591663
let (num_batches, num_rows) = count_batches_and_rows(stream).await;

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -487,15 +487,13 @@ mod test {
487487

488488
let metadata = reader.metadata();
489489

490-
let table_schema =
490+
let table_schema = Arc::new(
491491
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
492-
.expect("parsing schema");
492+
.expect("parsing schema"),
493+
);
493494

494495
let expr = col("int64_list").is_not_null();
495-
let expr = logical2physical(&expr, &table_schema);
496-
497-
let table_schema = Arc::new(table_schema.clone());
498-
496+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
499497
let candidate = FilterCandidateBuilder::new(expr, table_schema)
500498
.build(metadata)
501499
.expect("building candidate");
@@ -516,23 +514,23 @@ mod test {
516514

517515
// This is the schema we would like to coerce to,
518516
// which is different from the physical schema of the file.
519-
let table_schema = Schema::new(vec![Field::new(
517+
let table_schema = Arc::new(Schema::new(vec![Field::new(
520518
"timestamp_col",
521519
DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
522520
false,
523-
)]);
521+
)]));
524522

525523
// Test all should fail
526524
let expr = col("timestamp_col").lt(Expr::Literal(
527525
ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
528526
None,
529527
));
530-
let expr = logical2physical(&expr, &table_schema);
528+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
531529
let expr = DefaultPhysicalExprAdapterFactory {}
532-
.create(Arc::new(table_schema.clone()), Arc::clone(&file_schema))
530+
.create(Arc::clone(&table_schema), Arc::clone(&file_schema))
533531
.rewrite(expr)
534532
.expect("rewriting expression");
535-
let candidate = FilterCandidateBuilder::new(expr, file_schema.clone())
533+
let candidate = FilterCandidateBuilder::new(expr, Arc::clone(&file_schema))
536534
.build(&metadata)
537535
.expect("building candidate")
538536
.expect("candidate expected");
@@ -565,10 +563,10 @@ mod test {
565563
ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
566564
None,
567565
));
568-
let expr = logical2physical(&expr, &table_schema);
566+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
569567
// Rewrite the expression to add CastExpr for type coercion
570568
let expr = DefaultPhysicalExprAdapterFactory {}
571-
.create(Arc::new(table_schema), Arc::clone(&file_schema))
569+
.create(table_schema, Arc::clone(&file_schema))
572570
.rewrite(expr)
573571
.expect("rewriting expression");
574572
let candidate = FilterCandidateBuilder::new(expr, file_schema)
@@ -594,7 +592,7 @@ mod test {
594592
let table_schema = Arc::new(get_lists_table_schema());
595593

596594
let expr = col("utf8_list").is_not_null();
597-
let expr = logical2physical(&expr, &table_schema);
595+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
598596
check_expression_can_evaluate_against_schema(&expr, &table_schema);
599597

600598
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
@@ -612,22 +610,22 @@ mod test {
612610

613611
#[test]
614612
fn basic_expr_doesnt_prevent_pushdown() {
615-
let table_schema = get_basic_table_schema();
613+
let table_schema = Arc::new(get_basic_table_schema());
616614

617615
let expr = col("string_col").is_null();
618-
let expr = logical2physical(&expr, &table_schema);
616+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
619617

620618
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
621619
}
622620

623621
#[test]
624622
fn complex_expr_doesnt_prevent_pushdown() {
625-
let table_schema = get_basic_table_schema();
623+
let table_schema = Arc::new(get_basic_table_schema());
626624

627625
let expr = col("string_col")
628626
.is_not_null()
629627
.or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)), None)));
630-
let expr = logical2physical(&expr, &table_schema);
628+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
631629

632630
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
633631
}

0 commit comments

Comments
 (0)