Skip to content

Commit 7fce6ef

Browse files
committed
Don't clone the schema in logical2physical
1 parent b3fcb2a commit 7fce6ef

File tree

9 files changed

+197
-156
lines changed

9 files changed

+197
-156
lines changed

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ mod tests {
159159
let predicate = self
160160
.predicate
161161
.as_ref()
162-
.map(|p| logical2physical(p, &table_schema));
162+
.map(|p| logical2physical(p, Arc::clone(&table_schema)));
163163

164164
let mut source = ParquetSource::default();
165165
if let Some(predicate) = predicate {

datafusion/datasource-parquet/src/opener.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -876,7 +876,7 @@ mod test {
876876

877877
// A filter on "a" should not exclude any rows even if it matches the data
878878
let expr = col("a").eq(lit(1));
879-
let predicate = logical2physical(&expr, &schema);
879+
let predicate = logical2physical(&expr, Arc::clone(&schema));
880880
let opener = make_opener(predicate);
881881
let stream = opener.open(file.clone()).unwrap().await.unwrap();
882882
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -885,7 +885,7 @@ mod test {
885885

886886
// A filter on `b = 5.0` should exclude all rows
887887
let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0))));
888-
let predicate = logical2physical(&expr, &schema);
888+
let predicate = logical2physical(&expr, Arc::clone(&schema));
889889
let opener = make_opener(predicate);
890890
let stream = opener.open(file).unwrap().await.unwrap();
891891
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -951,7 +951,8 @@ mod test {
951951
let expr = col("part").eq(lit(1));
952952
// Mark the expression as dynamic even if it's not to force partition pruning to happen
953953
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
954-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
954+
let predicate =
955+
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
955956
let opener = make_opener(predicate);
956957
let stream = opener.open(file.clone()).unwrap().await.unwrap();
957958
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -962,7 +963,7 @@ mod test {
962963
let expr = col("part").eq(lit(2));
963964
// Mark the expression as dynamic even if it's not to force partition pruning to happen
964965
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
965-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
966+
let predicate = make_dynamic_expr(logical2physical(&expr, table_schema));
966967
let opener = make_opener(predicate);
967968
let stream = opener.open(file).unwrap().await.unwrap();
968969
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1038,7 +1039,7 @@ mod test {
10381039

10391040
// Filter should match the partition value and file statistics
10401041
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0)));
1041-
let predicate = logical2physical(&expr, &table_schema);
1042+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
10421043
let opener = make_opener(predicate);
10431044
let stream = opener.open(file.clone()).unwrap().await.unwrap();
10441045
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1047,7 +1048,7 @@ mod test {
10471048

10481049
// Should prune based on partition value but not file statistics
10491050
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0)));
1050-
let predicate = logical2physical(&expr, &table_schema);
1051+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
10511052
let opener = make_opener(predicate);
10521053
let stream = opener.open(file.clone()).unwrap().await.unwrap();
10531054
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1056,7 +1057,7 @@ mod test {
10561057

10571058
// Should prune based on file statistics but not partition value
10581059
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0)));
1059-
let predicate = logical2physical(&expr, &table_schema);
1060+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
10601061
let opener = make_opener(predicate);
10611062
let stream = opener.open(file.clone()).unwrap().await.unwrap();
10621063
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1065,7 +1066,7 @@ mod test {
10651066

10661067
// Should prune based on both partition value and file statistics
10671068
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0)));
1068-
let predicate = logical2physical(&expr, &table_schema);
1069+
let predicate = logical2physical(&expr, table_schema);
10691070
let opener = make_opener(predicate);
10701071
let stream = opener.open(file).unwrap().await.unwrap();
10711072
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1130,7 +1131,7 @@ mod test {
11301131

11311132
// Filter should match the partition value and data value
11321133
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1)));
1133-
let predicate = logical2physical(&expr, &table_schema);
1134+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
11341135
let opener = make_opener(predicate);
11351136
let stream = opener.open(file.clone()).unwrap().await.unwrap();
11361137
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1139,7 +1140,7 @@ mod test {
11391140

11401141
// Filter should match the partition value but not the data value
11411142
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3)));
1142-
let predicate = logical2physical(&expr, &table_schema);
1143+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
11431144
let opener = make_opener(predicate);
11441145
let stream = opener.open(file.clone()).unwrap().await.unwrap();
11451146
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1148,7 +1149,7 @@ mod test {
11481149

11491150
// Filter should not match the partition value but match the data value
11501151
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1)));
1151-
let predicate = logical2physical(&expr, &table_schema);
1152+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
11521153
let opener = make_opener(predicate);
11531154
let stream = opener.open(file.clone()).unwrap().await.unwrap();
11541155
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1157,7 +1158,7 @@ mod test {
11571158

11581159
// Filter should not match the partition value or the data value
11591160
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3)));
1160-
let predicate = logical2physical(&expr, &table_schema);
1161+
let predicate = logical2physical(&expr, table_schema);
11611162
let opener = make_opener(predicate);
11621163
let stream = opener.open(file).unwrap().await.unwrap();
11631164
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1222,15 +1223,15 @@ mod test {
12221223

12231224
// Filter should NOT match the stats but the file is never attempted to be pruned because the filters are not dynamic
12241225
let expr = col("part").eq(lit(2));
1225-
let predicate = logical2physical(&expr, &table_schema);
1226+
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
12261227
let opener = make_opener(predicate);
12271228
let stream = opener.open(file.clone()).unwrap().await.unwrap();
12281229
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
12291230
assert_eq!(num_batches, 1);
12301231
assert_eq!(num_rows, 3);
12311232

12321233
// If we make the filter dynamic, it should prune
1233-
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1234+
let predicate = make_dynamic_expr(logical2physical(&expr, table_schema));
12341235
let opener = make_opener(predicate);
12351236
let stream = opener.open(file.clone()).unwrap().await.unwrap();
12361237
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1369,7 +1370,8 @@ mod test {
13691370
max_predicate_cache_size: None,
13701371
};
13711372

1372-
let predicate = logical2physical(&col("a").eq(lit(1u64)), &table_schema);
1373+
let predicate =
1374+
logical2physical(&col("a").eq(lit(1u64)), Arc::clone(&table_schema));
13731375
let opener = make_opener(predicate);
13741376
let stream = opener.open(file.clone()).unwrap().await.unwrap();
13751377
let batches = collect_batches(stream).await;

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -512,19 +512,18 @@ mod test {
512512

513513
let metadata = reader.metadata();
514514

515-
let table_schema =
515+
let table_schema = Arc::new(
516516
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
517-
.expect("parsing schema");
517+
.expect("parsing schema"),
518+
);
518519

519520
let expr = col("int64_list").is_not_null();
520-
let expr = logical2physical(&expr, &table_schema);
521+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
521522

522523
let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
523-
let table_schema = Arc::new(table_schema.clone());
524-
525524
let candidate = FilterCandidateBuilder::new(
526525
expr,
527-
table_schema.clone(),
526+
Arc::clone(&table_schema),
528527
table_schema,
529528
schema_adapter_factory,
530529
)
@@ -547,24 +546,23 @@ mod test {
547546

548547
// This is the schema we would like to coerce to,
549548
// which is different from the physical schema of the file.
550-
let table_schema = Schema::new(vec![Field::new(
549+
let table_schema = Arc::new(Schema::new(vec![Field::new(
551550
"timestamp_col",
552551
DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
553552
false,
554-
)]);
553+
)]));
555554

556555
// Test all should fail
557556
let expr = col("timestamp_col").lt(Expr::Literal(
558557
ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
559558
None,
560559
));
561-
let expr = logical2physical(&expr, &table_schema);
560+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
562561
let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
563-
let table_schema = Arc::new(table_schema.clone());
564562
let candidate = FilterCandidateBuilder::new(
565563
expr,
566-
file_schema.clone(),
567-
table_schema.clone(),
564+
Arc::clone(&file_schema),
565+
Arc::clone(&table_schema),
568566
schema_adapter_factory,
569567
)
570568
.build(&metadata)
@@ -599,7 +597,7 @@ mod test {
599597
ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
600598
None,
601599
));
602-
let expr = logical2physical(&expr, &table_schema);
600+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
603601
let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
604602
let candidate = FilterCandidateBuilder::new(
605603
expr,
@@ -629,7 +627,7 @@ mod test {
629627
let table_schema = Arc::new(get_lists_table_schema());
630628

631629
let expr = col("utf8_list").is_not_null();
632-
let expr = logical2physical(&expr, &table_schema);
630+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
633631
check_expression_can_evaluate_against_schema(&expr, &table_schema);
634632

635633
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
@@ -647,22 +645,22 @@ mod test {
647645

648646
#[test]
649647
fn basic_expr_doesnt_prevent_pushdown() {
650-
let table_schema = get_basic_table_schema();
648+
let table_schema = Arc::new(get_basic_table_schema());
651649

652650
let expr = col("string_col").is_null();
653-
let expr = logical2physical(&expr, &table_schema);
651+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
654652

655653
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
656654
}
657655

658656
#[test]
659657
fn complex_expr_doesnt_prevent_pushdown() {
660-
let table_schema = get_basic_table_schema();
658+
let table_schema = Arc::new(get_basic_table_schema());
661659

662660
let expr = col("string_col")
663661
.is_not_null()
664662
.or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)), None)));
665-
let expr = logical2physical(&expr, &table_schema);
663+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
666664

667665
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
668666
}

datafusion/datasource-parquet/src/row_group_filter.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ mod tests {
501501
let schema =
502502
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
503503
let expr = col("c1").gt(lit(15));
504-
let expr = logical2physical(&expr, &schema);
504+
let expr = logical2physical(&expr, Arc::clone(&schema));
505505
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
506506

507507
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
@@ -546,7 +546,7 @@ mod tests {
546546
let schema =
547547
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
548548
let expr = col("c1").gt(lit(15));
549-
let expr = logical2physical(&expr, &schema);
549+
let expr = logical2physical(&expr, Arc::clone(&schema));
550550
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
551551

552552
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
@@ -589,7 +589,7 @@ mod tests {
589589
Field::new("c2", DataType::Int32, false),
590590
]));
591591
let expr = col("c1").gt(lit(15)).and(col("c2").rem(lit(2)).eq(lit(0)));
592-
let expr = logical2physical(&expr, &schema);
592+
let expr = logical2physical(&expr, Arc::clone(&schema));
593593
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
594594

595595
let schema_descr = get_test_schema_descr(vec![
@@ -628,7 +628,7 @@ mod tests {
628628
// if conditions in predicate are joined with OR and an unsupported expression is used
629629
// this bypasses the entire predicate expression and no row groups are filtered out
630630
let expr = col("c1").gt(lit(15)).or(col("c2").rem(lit(2)).eq(lit(0)));
631-
let expr = logical2physical(&expr, &schema);
631+
let expr = logical2physical(&expr, Arc::clone(&schema));
632632
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
633633

634634
// if conditions in predicate are joined with OR and an unsupported expression is used
@@ -654,7 +654,7 @@ mod tests {
654654
Field::new("c2", DataType::Int32, false),
655655
]));
656656
let expr = col("c1").gt(lit(0));
657-
let expr = logical2physical(&expr, &table_schema);
657+
let expr = logical2physical(&expr, Arc::clone(&table_schema));
658658
let pruning_predicate =
659659
PruningPredicate::try_new(expr, table_schema.clone()).unwrap();
660660

@@ -732,7 +732,7 @@ mod tests {
732732
]));
733733
let schema_descr = ArrowSchemaConverter::new().convert(&schema).unwrap();
734734
let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
735-
let expr = logical2physical(&expr, &schema);
735+
let expr = logical2physical(&expr, Arc::clone(&schema));
736736
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
737737
let groups = gen_row_group_meta_data_for_pruning_predicate();
738738

@@ -763,7 +763,7 @@ mod tests {
763763
let expr = col("c1")
764764
.gt(lit(15))
765765
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
766-
let expr = logical2physical(&expr, &schema);
766+
let expr = logical2physical(&expr, Arc::clone(&schema));
767767
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
768768
let groups = gen_row_group_meta_data_for_pruning_predicate();
769769

@@ -801,7 +801,7 @@ mod tests {
801801
.with_precision(9);
802802
let schema_descr = get_test_schema_descr(vec![field]);
803803
let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
804-
let expr = logical2physical(&expr, &schema);
804+
let expr = logical2physical(&expr, Arc::clone(&schema));
805805
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
806806
let rgm1 = get_row_group_meta_data(
807807
&schema_descr,
@@ -872,7 +872,7 @@ mod tests {
872872
lit(ScalarValue::Decimal128(Some(500), 5, 2)),
873873
Decimal128(11, 2),
874874
));
875-
let expr = logical2physical(&expr, &schema);
875+
let expr = logical2physical(&expr, Arc::clone(&schema));
876876
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
877877
let rgm1 = get_row_group_meta_data(
878878
&schema_descr,
@@ -964,7 +964,7 @@ mod tests {
964964
.with_precision(18);
965965
let schema_descr = get_test_schema_descr(vec![field]);
966966
let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
967-
let expr = logical2physical(&expr, &schema);
967+
let expr = logical2physical(&expr, Arc::clone(&schema));
968968
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
969969
let rgm1 = get_row_group_meta_data(
970970
&schema_descr,
@@ -1025,7 +1025,7 @@ mod tests {
10251025
// cast the type of c1 to decimal(28,3)
10261026
let left = cast(col("c1"), Decimal128(28, 3));
10271027
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
1028-
let expr = logical2physical(&expr, &schema);
1028+
let expr = logical2physical(&expr, Arc::clone(&schema));
10291029
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
10301030
// we must use the big-endian when encode the i128 to bytes or vec[u8].
10311031
let rgm1 = get_row_group_meta_data(
@@ -1103,7 +1103,7 @@ mod tests {
11031103
// cast the type of c1 to decimal(28,3)
11041104
let left = cast(col("c1"), Decimal128(28, 3));
11051105
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
1106-
let expr = logical2physical(&expr, &schema);
1106+
let expr = logical2physical(&expr, Arc::clone(&schema));
11071107
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
11081108
// we must use the big-endian when encode the i128 to bytes or vec[u8].
11091109
let rgm1 = get_row_group_meta_data(
@@ -1266,17 +1266,20 @@ mod tests {
12661266
let data = bytes::Bytes::from(std::fs::read(path).unwrap());
12671267

12681268
// generate pruning predicate
1269-
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
1269+
let schema = Arc::new(Schema::new(vec![Field::new(
1270+
"String",
1271+
DataType::Utf8,
1272+
false,
1273+
)]));
12701274

12711275
let expr = col(r#""String""#).in_list(
12721276
(1..25)
12731277
.map(|i| lit(format!("Hello_Not_Exists{i}")))
12741278
.collect::<Vec<_>>(),
12751279
false,
12761280
);
1277-
let expr = logical2physical(&expr, &schema);
1278-
let pruning_predicate =
1279-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
1281+
let expr = logical2physical(&expr, Arc::clone(&schema));
1282+
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
12801283

12811284
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
12821285
file_name,
@@ -1494,9 +1497,9 @@ mod tests {
14941497
let path = format!("{testdata}/{file_name}");
14951498
let data = bytes::Bytes::from(std::fs::read(path).unwrap());
14961499

1497-
let expr = logical2physical(&expr, &schema);
1498-
let pruning_predicate =
1499-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
1500+
let schema = Arc::new(schema);
1501+
let expr = logical2physical(&expr, Arc::clone(&schema));
1502+
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
15001503

15011504
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
15021505
&file_name,

0 commit comments

Comments
 (0)