Skip to content

Commit 9bacaf2

Browse files
committed
Don't clone the schema in logical2physical
1 parent d51cb32 commit 9bacaf2

File tree

9 files changed

+203
-160
lines changed

9 files changed

+203
-160
lines changed

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ mod tests {
159159
let predicate = self
160160
.predicate
161161
.as_ref()
162-
.map(|p| logical2physical(p, &table_schema));
162+
.map(|p| logical2physical(p, Arc::clone(&table_schema)));
163163

164164
let mut source = ParquetSource::new(table_schema);
165165
if let Some(predicate) = predicate {

datafusion/datasource-parquet/src/opener.rs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1356,7 +1356,7 @@ mod test {
13561356

13571357
// A filter on "a" should not exclude any rows even if it matches the data
13581358
let expr = col("a").eq(lit(1));
1359-
let predicate = logical2physical(&expr, &schema);
1359+
let predicate = logical2physical(&expr, Arc::clone(&schema));
13601360
let opener = make_opener(predicate);
13611361
let stream = opener.open(file.clone()).unwrap().await.unwrap();
13621362
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1365,7 +1365,7 @@ mod test {
13651365

13661366
// A filter on `b = 5.0` should exclude all rows
13671367
let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0))));
1368-
let predicate = logical2physical(&expr, &schema);
1368+
let predicate = logical2physical(&expr, Arc::clone(&schema));
13691369
let opener = make_opener(predicate);
13701370
let stream = opener.open(file).unwrap().await.unwrap();
13711371
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1411,7 +1411,8 @@ mod test {
14111411
let expr = col("part").eq(lit(1));
14121412
// Mark the expression as dynamic even if it's not to force partition pruning to happen
14131413
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
1414-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1414+
let predicate =
1415+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
14151416
let opener = make_opener(predicate);
14161417
let stream = opener.open(file.clone()).unwrap().await.unwrap();
14171418
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1422,7 +1423,7 @@ mod test {
14221423
let expr = col("part").eq(lit(2));
14231424
// Mark the expression as dynamic even if it's not to force partition pruning to happen
14241425
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
1425-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1426+
let predicate = make_dynamic_expr(logical2physical(&expr, table_schema));
14261427
let opener = make_opener(predicate);
14271428
let stream = opener.open(file).unwrap().await.unwrap();
14281429
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1478,7 +1479,7 @@ mod test {
14781479

14791480
// Filter should match the partition value and file statistics
14801481
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0)));
1481-
let predicate = logical2physical(&expr, &table_schema);
1482+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
14821483
let opener = make_opener(predicate);
14831484
let stream = opener.open(file.clone()).unwrap().await.unwrap();
14841485
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1487,7 +1488,7 @@ mod test {
14871488

14881489
// Should prune based on partition value but not file statistics
14891490
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0)));
1490-
let predicate = logical2physical(&expr, &table_schema);
1491+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
14911492
let opener = make_opener(predicate);
14921493
let stream = opener.open(file.clone()).unwrap().await.unwrap();
14931494
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1496,7 +1497,7 @@ mod test {
14961497

14971498
// Should prune based on file statistics but not partition value
14981499
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0)));
1499-
let predicate = logical2physical(&expr, &table_schema);
1500+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
15001501
let opener = make_opener(predicate);
15011502
let stream = opener.open(file.clone()).unwrap().await.unwrap();
15021503
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1505,7 +1506,7 @@ mod test {
15051506

15061507
// Should prune based on both partition value and file statistics
15071508
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0)));
1508-
let predicate = logical2physical(&expr, &table_schema);
1509+
let predicate = logical2physical(&expr, table_schema);
15091510
let opener = make_opener(predicate);
15101511
let stream = opener.open(file).unwrap().await.unwrap();
15111512
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1551,7 +1552,7 @@ mod test {
15511552

15521553
// Filter should match the partition value and data value
15531554
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1)));
1554-
let predicate = logical2physical(&expr, &table_schema);
1555+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
15551556
let opener = make_opener(predicate);
15561557
let stream = opener.open(file.clone()).unwrap().await.unwrap();
15571558
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1560,7 +1561,7 @@ mod test {
15601561

15611562
// Filter should match the partition value but not the data value
15621563
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3)));
1563-
let predicate = logical2physical(&expr, &table_schema);
1564+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
15641565
let opener = make_opener(predicate);
15651566
let stream = opener.open(file.clone()).unwrap().await.unwrap();
15661567
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1569,7 +1570,7 @@ mod test {
15691570

15701571
// Filter should not match the partition value but match the data value
15711572
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1)));
1572-
let predicate = logical2physical(&expr, &table_schema);
1573+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
15731574
let opener = make_opener(predicate);
15741575
let stream = opener.open(file.clone()).unwrap().await.unwrap();
15751576
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1578,7 +1579,7 @@ mod test {
15781579

15791580
// Filter should not match the partition value or the data value
15801581
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3)));
1581-
let predicate = logical2physical(&expr, &table_schema);
1582+
let predicate = logical2physical(&expr, table_schema);
15821583
let opener = make_opener(predicate);
15831584
let stream = opener.open(file).unwrap().await.unwrap();
15841585
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1631,7 +1632,7 @@ mod test {
16311632
// This filter could prune based on statistics, but since it's not dynamic it's not applied for pruning
16321633
// (the assumption is this happened already at planning time)
16331634
let expr = col("a").eq(lit(42));
1634-
let predicate = logical2physical(&expr, &table_schema);
1635+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
16351636
let opener = make_opener(predicate);
16361637
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16371638
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1640,7 +1641,8 @@ mod test {
16401641

16411642
// If we make the filter dynamic, it should prune.
16421643
// This allows dynamic filters to prune partitions/files even if they are populated late into execution.
1643-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1644+
let predicate =
1645+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
16441646
let opener = make_opener(predicate);
16451647
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16461648
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1650,7 +1652,8 @@ mod test {
16501652
// If we have a filter that touches partition columns only and is dynamic, it should prune even if there are no stats.
16511653
file.statistics = Some(Arc::new(Statistics::new_unknown(&file_schema)));
16521654
let expr = col("part").eq(lit(2));
1653-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1655+
let predicate =
1656+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
16541657
let opener = make_opener(predicate);
16551658
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16561659
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1659,7 +1662,8 @@ mod test {
16591662

16601663
// Similarly a filter that combines partition and data columns should prune even if there are no stats.
16611664
let expr = col("part").eq(lit(2)).and(col("a").eq(lit(42)));
1662-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1665+
let predicate =
1666+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
16631667
let opener = make_opener(predicate);
16641668
let stream = opener.open(file.clone()).unwrap().await.unwrap();
16651669
let (num_batches, num_rows) = count_batches_and_rows(stream).await;

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -487,15 +487,13 @@ mod test {
487487

488488
let metadata = reader.metadata();
489489

490-
let table_schema =
490+
let table_schema = Arc::new(
491491
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
492-
.expect("parsing schema");
492+
.expect("parsing schema"),
493+
);
493494

494495
let expr = col("int64_list").is_not_null();
495-
let expr = logical2physical(&expr, &table_schema);
496-
497-
let table_schema = Arc::new(table_schema.clone());
498-
496+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
499497
let candidate = FilterCandidateBuilder::new(expr, table_schema)
500498
.build(metadata)
501499
.expect("building candidate");
@@ -516,23 +514,23 @@ mod test {
516514

517515
// This is the schema we would like to coerce to,
518516
// which is different from the physical schema of the file.
519-
let table_schema = Schema::new(vec![Field::new(
517+
let table_schema = Arc::new(Schema::new(vec![Field::new(
520518
"timestamp_col",
521519
DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
522520
false,
523-
)]);
521+
)]));
524522

525523
// Test all should fail
526524
let expr = col("timestamp_col").lt(Expr::Literal(
527525
ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
528526
None,
529527
));
530-
let expr = logical2physical(&expr, &table_schema);
528+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
531529
let expr = DefaultPhysicalExprAdapterFactory {}
532-
.create(Arc::new(table_schema.clone()), Arc::clone(&file_schema))
530+
.create(Arc::clone(&table_schema), Arc::clone(&file_schema))
533531
.rewrite(expr)
534532
.expect("rewriting expression");
535-
let candidate = FilterCandidateBuilder::new(expr, file_schema.clone())
533+
let candidate = FilterCandidateBuilder::new(expr, Arc::clone(&file_schema))
536534
.build(&metadata)
537535
.expect("building candidate")
538536
.expect("candidate expected");
@@ -565,10 +563,10 @@ mod test {
565563
ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
566564
None,
567565
));
568-
let expr = logical2physical(&expr, &table_schema);
566+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
569567
// Rewrite the expression to add CastExpr for type coercion
570568
let expr = DefaultPhysicalExprAdapterFactory {}
571-
.create(Arc::new(table_schema), Arc::clone(&file_schema))
569+
.create(table_schema, Arc::clone(&file_schema))
572570
.rewrite(expr)
573571
.expect("rewriting expression");
574572
let candidate = FilterCandidateBuilder::new(expr, file_schema)
@@ -594,7 +592,7 @@ mod test {
594592
let table_schema = Arc::new(get_lists_table_schema());
595593

596594
let expr = col("utf8_list").is_not_null();
597-
let expr = logical2physical(&expr, &table_schema);
595+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
598596
check_expression_can_evaluate_against_schema(&expr, &table_schema);
599597

600598
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
@@ -612,22 +610,22 @@ mod test {
612610

613611
#[test]
614612
fn basic_expr_doesnt_prevent_pushdown() {
615-
let table_schema = get_basic_table_schema();
613+
let table_schema = Arc::new(get_basic_table_schema());
616614

617615
let expr = col("string_col").is_null();
618-
let expr = logical2physical(&expr, &table_schema);
616+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
619617

620618
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
621619
}
622620

623621
#[test]
624622
fn complex_expr_doesnt_prevent_pushdown() {
625-
let table_schema = get_basic_table_schema();
623+
let table_schema = Arc::new(get_basic_table_schema());
626624

627625
let expr = col("string_col")
628626
.is_not_null()
629627
.or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)), None)));
630-
let expr = logical2physical(&expr, &table_schema);
628+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
631629

632630
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
633631
}

0 commit comments

Comments
 (0)