Skip to content

Commit 540f29e

Browse files
authored
Fix empty unnest columns handling when pushdown_projection_inexact (#88)
1 parent d8364fb commit 540f29e

File tree

1 file changed

+101
-3
lines changed

1 file changed

+101
-3
lines changed

src/materialized/dependencies.rs

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,21 @@ fn pushdown_projection_inexact(plan: LogicalPlan, indices: &HashSet<usize>) -> R
603603
.map(Expr::Column)
604604
.collect_vec();
605605

606+
// GUARD: if after pushdown the set of relevant unnest columns is empty,
607+
// avoid constructing an Unnest node with zero exec columns (which will
608+
// later error in Unnest::try_new). Instead, simply project the
609+
// desired output columns from the child plan (after pushing down the child projection).
610+
// Related PR: https://github.com/apache/datafusion/pull/16632, after that we must
611+
// also check for empty exec columns here.
612+
if columns_to_unnest.is_empty() {
613+
return LogicalPlanBuilder::from(pushdown_projection_inexact(
614+
Arc::unwrap_or_clone(unnest.input),
615+
&child_indices,
616+
)?)
617+
.project(columns_to_project)?
618+
.build();
619+
}
620+
606621
LogicalPlanBuilder::from(pushdown_projection_inexact(
607622
Arc::unwrap_or_clone(unnest.input),
608623
&child_indices,
@@ -922,16 +937,17 @@ mod test {
922937
use std::{any::Any, collections::HashSet, sync::Arc};
923938

924939
use arrow::util::pretty::pretty_format_batches;
925-
use arrow_schema::SchemaRef;
940+
use arrow_schema::{DataType, Field, FieldRef, Fields, SchemaRef};
926941
use datafusion::{
927942
assert_batches_eq, assert_batches_sorted_eq,
928943
catalog::{Session, TableProvider},
929944
datasource::listing::ListingTableUrl,
930945
execution::session_state::SessionStateBuilder,
931946
prelude::{DataFrame, SessionConfig, SessionContext},
932947
};
933-
use datafusion_common::{Column, Result, ScalarValue};
934-
use datafusion_expr::{Expr, JoinType, LogicalPlan, TableType};
948+
use datafusion_common::{Column, DFSchema, Result, ScalarValue};
949+
use datafusion_expr::builder::unnest;
950+
use datafusion_expr::{EmptyRelation, Expr, JoinType, LogicalPlan, TableType};
935951
use datafusion_physical_plan::ExecutionPlan;
936952
use itertools::Itertools;
937953

@@ -1837,4 +1853,86 @@ mod test {
18371853

18381854
Ok(())
18391855
}
1856+
1857+
#[test]
1858+
fn test_pushdown_unnest_guard_partition_date_only() -> Result<()> {
1859+
// This test simulates a simplified MV scenario:
1860+
//
1861+
// WITH events_structs AS (
1862+
// SELECT id, date, unnest(events) AS evs
1863+
// FROM base_table
1864+
// ),
1865+
// flattened_events AS (
1866+
// SELECT id, date, evs.event_type, evs.event_time
1867+
// FROM events_structs
1868+
// ),
1869+
// SELECT id, date, MAX(...) ...
1870+
// GROUP BY id, date
1871+
//
1872+
// The partition column is "date". During dependency plan
1873+
// building we only request "date" from this subtree,
1874+
// so pushdown_projection_inexact receives indices for
1875+
// the `date` column only. The guard must kick in:
1876+
// unnest(events) becomes unused, and the plan should
1877+
// collapse to just projecting `date` from the child.
1878+
1879+
// 1. Build schema for base table
1880+
let id = Field::new("id", DataType::Utf8, true);
1881+
let date = Field::new("date", DataType::Utf8, true);
1882+
1883+
// events: list<struct<event_type, event_time>>
1884+
let event_type = Field::new("event_type", DataType::Utf8, true);
1885+
let event_time = Field::new("event_time", DataType::Utf8, true);
1886+
let events_struct = Field::new(
1887+
"item",
1888+
DataType::Struct(Fields::from(vec![event_type, event_time])),
1889+
true,
1890+
);
1891+
let events = Field::new(
1892+
"events",
1893+
DataType::List(FieldRef::from(Box::new(events_struct))),
1894+
true,
1895+
);
1896+
1897+
// Build DFSchema: (id, date, events)
1898+
let qualified_fields = vec![
1899+
(None, Arc::new(id.clone())),
1900+
(None, Arc::new(date.clone())),
1901+
(None, Arc::new(events.clone())),
1902+
];
1903+
let df_schema =
1904+
DFSchema::new_with_metadata(qualified_fields, std::collections::HashMap::new())?;
1905+
1906+
// 2. Build a dummy child plan (EmptyRelation with the schema)
1907+
let empty = LogicalPlan::EmptyRelation(EmptyRelation {
1908+
produce_one_row: false,
1909+
schema: Arc::new(df_schema),
1910+
});
1911+
1912+
// 3. Wrap it with an Unnest node on the "events" column
1913+
let events_col = Column::from_name("events");
1914+
let unnest_plan = unnest(empty.clone(), vec![events_col.clone()])?;
1915+
1916+
// 4. Partition column is "date". Look up its actual index dynamically.
1917+
let date_idx = unnest_plan
1918+
.schema()
1919+
.index_of_column(&Column::from_name("date"))?;
1920+
let mut indices: HashSet<usize> = HashSet::new();
1921+
indices.insert(date_idx);
1922+
1923+
// 5. Call pushdown_projection_inexact with {date}
1924+
let res = pushdown_projection_inexact(unnest_plan, &indices)?;
1925+
1926+
// 6. Assert the result schema only contains `date`
1927+
let cols: Vec<String> = res
1928+
.schema()
1929+
.fields()
1930+
.iter()
1931+
.map(|f| f.name().to_string())
1932+
.collect();
1933+
1934+
assert_eq!(cols, vec!["date"]);
1935+
1936+
Ok(())
1937+
}
18401938
}

0 commit comments

Comments
 (0)