Skip to content

Commit ce3f62a

Browse files
XiangpengHaoalamb
andauthored
fix in list round trip in df proto (#16744)
Co-authored-by: Andrew Lamb <[email protected]>
1 parent 95e583f commit ce3f62a

File tree

2 files changed

+49
-16
lines changed

2 files changed

+49
-16
lines changed

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::protobuf::{
3939
use crate::{convert_required, into_required};
4040

4141
use datafusion::arrow::compute::SortOptions;
42-
use datafusion::arrow::datatypes::SchemaRef;
42+
use datafusion::arrow::datatypes::{Schema, SchemaRef};
4343
use datafusion::datasource::file_format::csv::CsvSink;
4444
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
4545
use datafusion::datasource::file_format::json::JsonSink;
@@ -611,14 +611,38 @@ impl protobuf::PhysicalPlanNode {
611611
) -> Result<Arc<dyn ExecutionPlan>> {
612612
let input: Arc<dyn ExecutionPlan> =
613613
into_physical_plan(&filter.input, registry, runtime, extension_codec)?;
614+
let projection = if !filter.projection.is_empty() {
615+
Some(
616+
filter
617+
.projection
618+
.iter()
619+
.map(|i| *i as usize)
620+
.collect::<Vec<_>>(),
621+
)
622+
} else {
623+
None
624+
};
625+
626+
// Use the projected schema if projection is present, otherwise use the full schema
627+
let predicate_schema = if let Some(ref proj_indices) = projection {
628+
// Create projected schema for parsing the predicate
629+
let projected_fields: Vec<_> = proj_indices
630+
.iter()
631+
.map(|&i| input.schema().field(i).clone())
632+
.collect();
633+
Arc::new(Schema::new(projected_fields))
634+
} else {
635+
input.schema()
636+
};
637+
614638
let predicate = filter
615639
.expr
616640
.as_ref()
617641
.map(|expr| {
618642
parse_physical_expr(
619643
expr,
620644
registry,
621-
input.schema().as_ref(),
645+
predicate_schema.as_ref(),
622646
extension_codec,
623647
)
624648
})
@@ -629,17 +653,6 @@ impl protobuf::PhysicalPlanNode {
629653
)
630654
})?;
631655
let filter_selectivity = filter.default_filter_selectivity.try_into();
632-
let projection = if !filter.projection.is_empty() {
633-
Some(
634-
filter
635-
.projection
636-
.iter()
637-
.map(|i| *i as usize)
638-
.collect::<Vec<_>>(),
639-
)
640-
} else {
641-
None
642-
};
643656
let filter =
644657
FilterExec::try_new(predicate, input)?.with_projection(projection)?;
645658
match filter_selectivity {
@@ -727,11 +740,31 @@ impl protobuf::PhysicalPlanNode {
727740
{
728741
let schema =
729742
parse_protobuf_file_scan_schema(scan.base_conf.as_ref().unwrap())?;
743+
744+
// Check if there's a projection and use projected schema for predicate parsing
745+
let base_conf = scan.base_conf.as_ref().unwrap();
746+
let predicate_schema = if !base_conf.projection.is_empty() {
747+
// Create projected schema for parsing the predicate
748+
let projected_fields: Vec<_> = base_conf
749+
.projection
750+
.iter()
751+
.map(|&i| schema.field(i as usize).clone())
752+
.collect();
753+
Arc::new(Schema::new(projected_fields))
754+
} else {
755+
schema
756+
};
757+
730758
let predicate = scan
731759
.predicate
732760
.as_ref()
733761
.map(|expr| {
734-
parse_physical_expr(expr, registry, schema.as_ref(), extension_codec)
762+
parse_physical_expr(
763+
expr,
764+
registry,
765+
predicate_schema.as_ref(),
766+
extension_codec,
767+
)
735768
})
736769
.transpose()?;
737770
let mut options = TableParquetOptions::default();
@@ -745,7 +778,7 @@ impl protobuf::PhysicalPlanNode {
745778
source = source.with_predicate(predicate);
746779
}
747780
let base_config = parse_protobuf_file_scan_config(
748-
scan.base_conf.as_ref().unwrap(),
781+
base_conf,
749782
registry,
750783
extension_codec,
751784
Arc::new(source),

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1739,7 +1739,7 @@ async fn roundtrip_physical_plan_node() {
17391739
}
17401740

17411741
// Failing due to https://github.com/apache/datafusion/pull/16662
1742-
#[ignore]
1742+
// Fixed: Column index mismatch during protobuf deserialization
17431743
#[tokio::test]
17441744
async fn test_tpch_part_in_list_query_with_real_parquet_data() -> Result<()> {
17451745
// Test the specific query: SELECT p_size FROM part WHERE p_size IN (14, 6, 5, 31)

0 commit comments

Comments
 (0)