Skip to content

Commit 1cc4daf

Browse files
authored
Resolve ListingScan projection against table schema including partition columns (#17911)
* resolve projection against `ListingTable` table_schema incl. partition columns * add new test alongside old one * name / clarity
1 parent 58ddf0d commit 1cc4daf

File tree

2 files changed

+52
-13
lines changed

2 files changed

+52
-13
lines changed

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -377,16 +377,6 @@ impl AsLogicalPlan for LogicalPlanNode {
377377
LogicalPlanType::ListingScan(scan) => {
378378
let schema: Schema = convert_required!(scan.schema)?;
379379

380-
let mut projection = None;
381-
if let Some(columns) = &scan.projection {
382-
let column_indices = columns
383-
.columns
384-
.iter()
385-
.map(|name| schema.index_of(name))
386-
.collect::<Result<Vec<usize>, _>>()?;
387-
projection = Some(column_indices);
388-
}
389-
390380
let filters =
391381
from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
392382

@@ -494,6 +484,16 @@ impl AsLogicalPlan for LogicalPlanNode {
494484
let table_name =
495485
from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
496486

487+
let mut projection = None;
488+
if let Some(columns) = &scan.projection {
489+
let column_indices = columns
490+
.columns
491+
.iter()
492+
.map(|name| provider.schema().index_of(name))
493+
.collect::<Result<Vec<usize>, _>>()?;
494+
projection = Some(column_indices);
495+
}
496+
497497
LogicalPlanBuilder::scan_with_filters(
498498
table_name,
499499
provider_as_source(Arc::new(provider)),

datafusion/proto/tests/cases/roundtrip_logical_plan.rs

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use datafusion::datasource::file_format::arrow::ArrowFormatFactory;
4646
use datafusion::datasource::file_format::csv::CsvFormatFactory;
4747
use datafusion::datasource::file_format::parquet::ParquetFormatFactory;
4848
use datafusion::datasource::file_format::{format_as_file_type, DefaultFileType};
49+
use datafusion::datasource::DefaultTableSource;
4950
use datafusion::execution::session_state::SessionStateBuilder;
5051
use datafusion::execution::FunctionRegistry;
5152
use datafusion::functions_aggregate::count::count_udaf;
@@ -77,9 +78,9 @@ use datafusion_expr::expr::{
7778
use datafusion_expr::logical_plan::{Extension, UserDefinedLogicalNodeCore};
7879
use datafusion_expr::{
7980
Accumulator, AggregateUDF, ColumnarValue, ExprFunctionExt, ExprSchemable, Literal,
80-
LogicalPlan, Operator, PartitionEvaluator, ScalarUDF, Signature, TryCast, Volatility,
81-
WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, WindowUDF,
82-
WindowUDFImpl,
81+
LogicalPlan, LogicalPlanBuilder, Operator, PartitionEvaluator, ScalarUDF, Signature,
82+
TryCast, Volatility, WindowFrame, WindowFrameBound, WindowFrameUnits,
83+
WindowFunctionDefinition, WindowUDF, WindowUDFImpl,
8384
};
8485
use datafusion_functions_aggregate::average::avg_udaf;
8586
use datafusion_functions_aggregate::expr_fn::{
@@ -2669,6 +2670,44 @@ async fn roundtrip_custom_listing_tables_schema() -> Result<()> {
26692670
Ok(())
26702671
}
26712672

2673+
#[tokio::test]
2674+
async fn roundtrip_custom_listing_tables_schema_table_scan_projection() -> Result<()> {
2675+
let ctx = SessionContext::new();
2676+
// Make sure during round-trip, constraint information is preserved
2677+
let file_format = JsonFormat::default();
2678+
let table_partition_cols = vec![("part".to_owned(), DataType::Int64)];
2679+
let data = "../core/tests/data/partitioned_table_json";
2680+
let listing_table_url = ListingTableUrl::parse(data)?;
2681+
let listing_options = ListingOptions::new(Arc::new(file_format))
2682+
.with_table_partition_cols(table_partition_cols);
2683+
2684+
let config = ListingTableConfig::new(listing_table_url)
2685+
.with_listing_options(listing_options)
2686+
.infer_schema(&ctx.state())
2687+
.await?;
2688+
2689+
let listing_table: Arc<dyn TableProvider> = Arc::new(ListingTable::try_new(config)?);
2690+
2691+
let projection = ["part", "value"]
2692+
.iter()
2693+
.map(|field_name| listing_table.schema().index_of(field_name))
2694+
.collect::<Result<Vec<_>, _>>()?;
2695+
2696+
let plan = LogicalPlanBuilder::scan(
2697+
"hive_style",
2698+
Arc::new(DefaultTableSource::new(listing_table)),
2699+
Some(projection),
2700+
)?
2701+
.limit(0, Some(1))?
2702+
.build()?;
2703+
2704+
let bytes = logical_plan_to_bytes(&plan)?;
2705+
let new_plan = logical_plan_from_bytes(&bytes, &ctx)?;
2706+
2707+
assert_eq!(plan, new_plan);
2708+
Ok(())
2709+
}
2710+
26722711
#[tokio::test]
26732712
async fn roundtrip_arrow_scan() -> Result<()> {
26742713
let ctx = SessionContext::new();

0 commit comments

Comments
 (0)