diff --git a/.github/workflows/audit.yml b/.github/workflows/audit.yml index 491fa27c2a56a..ce41442f3c167 100644 --- a/.github/workflows/audit.yml +++ b/.github/workflows/audit.yml @@ -44,4 +44,4 @@ jobs: - name: Run audit check # Ignored until https://github.com/apache/datafusion/issues/15571 # ignored py03 warning until arrow 55 upgrade - run: cargo audit --ignore RUSTSEC-2024-0370 --ignore RUSTSEC-2025-0020 + run: cargo audit --ignore RUSTSEC-2024-0370 --ignore RUSTSEC-2025-0020 --ignore RUSTSEC-2025-0047 diff --git a/Cargo.lock b/Cargo.lock index 30065a8afbd80..9a6be2dc5b6af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1818,7 +1818,7 @@ dependencies = [ [[package]] name = "datafusion" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "arrow-ipc", @@ -1890,7 +1890,7 @@ dependencies = [ [[package]] name = "datafusion-benchmarks" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "datafusion", @@ -1914,7 +1914,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "async-trait", @@ -1938,7 +1938,7 @@ dependencies = [ [[package]] name = "datafusion-catalog-listing" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "async-trait", @@ -1959,7 +1959,7 @@ dependencies = [ [[package]] name = "datafusion-cli" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "assert_cmd", @@ -1991,7 +1991,7 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "49.0.0" +version = "49.0.2" dependencies = [ "ahash 0.8.12", "apache-avro", @@ -2019,7 +2019,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" -version = "49.0.0" +version = "49.0.2" dependencies = [ "futures", "log", @@ -2028,7 +2028,7 @@ dependencies = [ [[package]] name = "datafusion-datasource" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "async-compression", @@ -2063,7 +2063,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-avro" -version = "49.0.0" +version = "49.0.2" dependencies = [ "apache-avro", "arrow", @@ -2088,7 +2088,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-csv" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "async-trait", @@ -2111,7 +2111,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-json" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "async-trait", @@ -2134,7 +2134,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-parquet" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "async-trait", @@ -2166,11 +2166,11 @@ dependencies = [ [[package]] name = "datafusion-doc" -version = "49.0.0" +version = "49.0.2" [[package]] name = "datafusion-examples" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "arrow-flight", @@ -2201,7 +2201,7 @@ dependencies = [ [[package]] name = "datafusion-execution" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "chrono", @@ -2220,7 +2220,7 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "async-trait", @@ -2243,7 +2243,7 @@ dependencies = [ [[package]] name = "datafusion-expr-common" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "datafusion-common", @@ -2254,7 +2254,7 @@ dependencies = [ [[package]] name = "datafusion-ffi" -version = "49.0.0" +version = "49.0.2" dependencies = [ "abi_stable", "arrow", @@ -2275,7 +2275,7 @@ dependencies = [ [[package]] name = "datafusion-functions" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "arrow-buffer", @@ -2304,7 +2304,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" -version = "49.0.0" +version = "49.0.2" dependencies = [ "ahash 0.8.12", "arrow", @@ -2325,7 +2325,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" -version = "49.0.0" +version = "49.0.2" dependencies = [ "ahash 0.8.12", "arrow", @@ -2338,7 +2338,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "arrow-ord", @@ -2360,7 +2360,7 @@ dependencies = [ [[package]] name = "datafusion-functions-table" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "async-trait", @@ -2374,7 +2374,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "datafusion-common", @@ -2390,7 +2390,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" -version = "49.0.0" +version = "49.0.2" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -2398,7 +2398,7 @@ dependencies = [ [[package]] name = "datafusion-macros" -version = "49.0.0" +version = "49.0.2" dependencies = [ "datafusion-expr", "quote", @@ -2407,7 +2407,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "async-trait", @@ -2434,7 +2434,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "49.0.0" +version = "49.0.2" dependencies = [ "ahash 0.8.12", "arrow", @@ -2459,7 +2459,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" -version = "49.0.0" +version = "49.0.2" dependencies = [ "ahash 0.8.12", "arrow", @@ -2471,7 +2471,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "datafusion-common", @@ -2492,7 +2492,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" -version = "49.0.0" +version = "49.0.2" dependencies = [ "ahash 0.8.12", "arrow", @@ -2528,7 +2528,7 @@ dependencies = [ [[package]] name = "datafusion-proto" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "chrono", @@ -2550,7 +2550,7 @@ dependencies = [ [[package]] name = "datafusion-proto-common" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "datafusion-common", @@ -2563,7 +2563,7 @@ dependencies = [ [[package]] name = "datafusion-pruning" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "arrow-schema", @@ -2582,7 +2582,7 @@ dependencies = [ [[package]] name = "datafusion-session" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "async-trait", @@ -2604,7 +2604,7 @@ dependencies = [ [[package]] name = "datafusion-spark" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "criterion", @@ -2620,7 +2620,7 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "bigdecimal", @@ -2644,7 +2644,7 @@ dependencies = [ [[package]] name = "datafusion-sqllogictest" -version = "49.0.0" +version = "49.0.2" dependencies = [ "arrow", "async-trait", @@ -2677,7 +2677,7 @@ dependencies = [ [[package]] name = "datafusion-substrait" -version = "49.0.0" +version = "49.0.2" dependencies = [ "async-recursion", "async-trait", @@ -2697,7 +2697,7 @@ dependencies = [ [[package]] name = "datafusion-wasmtest" -version = "49.0.0" +version = "49.0.2" dependencies = [ "chrono", "console_error_panic_hook", @@ -4012,9 +4012,9 @@ checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" [[package]] name = "libmimalloc-sys" -version = "0.1.43" +version = "0.1.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf88cd67e9de251c1781dbe2f641a1a3ad66eaae831b8a2c38fbdc5ddae16d4d" +checksum = "ec9d6fac27761dabcd4ee73571cdb06b7022dc99089acbe5435691edffaac0f4" dependencies = [ "cc", "libc", @@ -4145,9 +4145,9 @@ dependencies = [ [[package]] name = "mimalloc" -version = "0.1.47" +version = "0.1.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1791cbe101e95af5764f06f20f6760521f7158f69dbf9d6baf941ee1bf6bc40" +checksum = "995942f432bbb4822a7e9c3faa87a695185b0d09273ba85f097b54f4e458f2af" dependencies = [ "libmimalloc-sys", ] @@ -4248,12 +4248,11 @@ dependencies = [ [[package]] name = "nu-ansi-term" -version = "0.46.0" +version = "0.50.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399" dependencies = [ - "overload", - "winapi", + "windows-sys 0.52.0", ] [[package]] @@ -4453,12 +4452,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" -[[package]] -name = "overload" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" - [[package]] name = "owo-colors" version = "4.2.1" @@ -6737,9 +6730,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.19" +version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" dependencies = [ "nu-ansi-term", "sharded-slab", diff --git a/Cargo.toml b/Cargo.toml index 11cd3c637a971..601d11f12dd81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ repository = "https://github.com/apache/datafusion" # Define Minimum Supported Rust Version (MSRV) rust-version = "1.85.1" # Define DataFusion version -version = "49.0.0" +version = "49.0.2" [workspace.dependencies] # We turn off default-features for some dependencies here so the workspaces which inherit them can diff --git a/datafusion-examples/examples/planner_api.rs b/datafusion-examples/examples/planner_api.rs index 3e718d71f1fbb..55aec7b0108a4 100644 --- a/datafusion-examples/examples/planner_api.rs +++ b/datafusion-examples/examples/planner_api.rs @@ -80,35 +80,9 @@ async fn to_physical_plan_in_one_api_demo( displayable(physical_plan.as_ref()).indent(false) ); - let traversal = extract_node_ids_from_execution_plan_tree(physical_plan.as_ref()); - let expected_traversal = vec![ - Some(0), - Some(1), - Some(2), - Some(3), - Some(4), - Some(5), - Some(6), - Some(7), - Some(8), - Some(9), - ]; - assert_eq!(expected_traversal, traversal); Ok(()) } -fn extract_node_ids_from_execution_plan_tree( - physical_plan: &dyn ExecutionPlan, -) -> Vec> { - let mut traversed_nodes: Vec> = vec![]; - for child in physical_plan.children() { - let node_ids = extract_node_ids_from_execution_plan_tree(child.as_ref()); - traversed_nodes.extend(node_ids); - } - traversed_nodes.push(physical_plan.properties().node_id()); - traversed_nodes -} - /// Converts a logical plan into a physical plan by utilizing the analyzer, /// optimizer, and query planner APIs separately. This flavor gives more /// control over the planning process. diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs index 31568a3d61756..27becaa96f62f 100644 --- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs @@ -224,7 +224,7 @@ impl SortTest { /// Sort the input using SortExec and ensure the results are /// correct according to `Vec::sort` both with and without spilling async fn run(&self) -> (Vec>, Vec) { - let input = Arc::clone(self.input()); + let input = self.input.clone(); let first_batch = input .iter() .flat_map(|p| p.iter()) diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 5b37c55c09e41..399033de7d05a 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -165,7 +165,9 @@ async fn page_index_filter_one_col() { // 5.create filter date_string_col == "01/01/09"`; // Note this test doesn't apply type coercion so the literal must match the actual view type - let filter = col("date_string_col").eq(lit(ScalarValue::new_utf8view("01/01/09"))); + // xudong: use new_utf8, because schema_force_view_types was changed to false now. + // qi: when schema_force_view_types setting to true, we should change back to utf8view + let filter = col("date_string_col").eq(lit(ScalarValue::new_utf8("01/01/09"))); let batches = get_filter_results(&state, filter.clone(), false).await; assert_eq!(batches[0].num_rows(), 14); diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index fd847763124ab..561f417e59403 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -3612,10 +3612,11 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> { ); // Apply the function - let result = replace_order_preserving_variants(dist_context)?; + let result = replace_order_preserving_variants(dist_context, false)?; // Verify the plan was transformed to CoalescePartitionsExec result + .0 .plan .as_any() .downcast_ref::() @@ -3623,7 +3624,7 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> { // Verify fetch was preserved assert_eq!( - result.plan.fetch(), + result.0.plan.fetch(), Some(5), "Fetch value was not preserved after transformation" ); diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 2251c5c20fe51..aeeece1e47f35 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -469,12 +469,6 @@ impl FileSource for ParquetSource { .file_column_projection_indices() .unwrap_or_else(|| (0..base_config.file_schema.fields().len()).collect()); - if self.schema_adapter_factory.is_some() { - log::warn!("The SchemaAdapter API will be removed from ParquetSource in a future release. \ - Use PhysicalExprAdapterFactory API instead. \ - See https://github.com/apache/datafusion/issues/16800 for discussion and https://datafusion.apache.org/library-user-guide/upgrading.html#datafusion-49-0-0 for upgrade instructions."); - } - let (expr_adapter_factory, schema_adapter_factory) = match ( base_config.expr_adapter_factory.as_ref(), self.schema_adapter_factory.as_ref(), diff --git a/datafusion/datasource/src/sink.rs b/datafusion/datasource/src/sink.rs index 86900d3681f48..422915857a792 100644 --- a/datafusion/datasource/src/sink.rs +++ b/datafusion/datasource/src/sink.rs @@ -252,14 +252,14 @@ impl ExecutionPlan for DataSinkExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = DataSinkExec::new( Arc::clone(self.input()), Arc::clone(&self.sink), self.sort_order.clone(), ); - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 5fe3e333722fa..d127fd4d51d85 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -22,6 +22,7 @@ use std::fmt; use std::fmt::{Debug, Formatter}; use std::sync::Arc; +use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_plan::execution_plan::{ Boundedness, EmissionType, SchedulingType, }; @@ -324,15 +325,47 @@ impl ExecutionPlan for DataSourceExec { &self, projection: &ProjectionExec, ) -> Result>> { - self.data_source.try_swapping_with_projection(projection) + match self.data_source.try_swapping_with_projection(projection)? { + Some(new_plan) => { + if let Some(new_data_source_exec) = + new_plan.as_any().downcast_ref::() + { + let projection_mapping = ProjectionMapping::try_new( + projection.expr().iter().cloned(), + &self.schema(), + )?; + + // Project the equivalence properties to the new schema + let projected_eq_properties = self + .cache + .eq_properties + .project(&projection_mapping, new_data_source_exec.schema()); + + let preserved_exec = DataSourceExec { + data_source: Arc::clone(&new_data_source_exec.data_source), + cache: PlanProperties::new( + projected_eq_properties, + new_data_source_exec.cache.partitioning.clone(), + new_data_source_exec.cache.emission_type, + new_data_source_exec.cache.boundedness, + ) + .with_scheduling_type(new_data_source_exec.cache.scheduling_type), + }; + Ok(Some(Arc::new(preserved_exec))) + } else { + Ok(Some(new_plan)) + } + } + None => Ok(None), + } } fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { - let mut new_plan = DataSourceExec::new(self.data_source.clone()); - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let mut new_plan = DataSourceExec::new(Arc::clone(&self.data_source)); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/ffi/src/record_batch_stream.rs b/datafusion/ffi/src/record_batch_stream.rs index 78d65a816fcc2..6c2282df88dd0 100644 --- a/datafusion/ffi/src/record_batch_stream.rs +++ b/datafusion/ffi/src/record_batch_stream.rs @@ -57,6 +57,9 @@ pub struct FFI_RecordBatchStream { /// Return the schema of the record batch pub schema: unsafe extern "C" fn(stream: &Self) -> WrappedSchema, + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(arg: &mut Self), + /// Internal data. This is only to be accessed by the provider of the plan. /// The foreign library should never attempt to access this data. pub private_data: *mut c_void, @@ -82,6 +85,7 @@ impl FFI_RecordBatchStream { FFI_RecordBatchStream { poll_next: poll_next_fn_wrapper, schema: schema_fn_wrapper, + release: release_fn_wrapper, private_data, } } @@ -96,6 +100,12 @@ unsafe extern "C" fn schema_fn_wrapper(stream: &FFI_RecordBatchStream) -> Wrappe (*stream).schema().into() } +unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_RecordBatchStream) { + let private_data = + Box::from_raw(provider.private_data as *mut RecordBatchStreamPrivateData); + drop(private_data); +} + fn record_batch_to_wrapped_array( record_batch: RecordBatch, ) -> RResult { @@ -197,6 +207,12 @@ impl Stream for FFI_RecordBatchStream { } } +impl Drop for FFI_RecordBatchStream { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index 56c5ee1aaa676..5bf9020cd16ad 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -178,6 +178,10 @@ impl AggregateUDFImpl for StringAgg { ))) } + fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF { + datafusion_expr::ReversedUDAF::Reversed(string_agg_udaf()) + } + fn documentation(&self) -> Option<&Documentation> { self.doc() } diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index ea14cf114030a..280010e3d92c0 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -61,7 +61,6 @@ pub mod push_down_limit; pub mod replace_distinct_aggregate; pub mod scalar_subquery_to_join; pub mod simplify_expressions; -mod simplify_predicates; pub mod single_distinct_to_groupby; pub mod utils; diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 63d6d7f5082c2..acb08e77914e9 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -41,7 +41,7 @@ use datafusion_expr::{ use crate::optimizer::ApplyOrder; use crate::utils::{has_all_column_refs, is_restrict_null_predicate}; -use crate::{simplify_predicates::simplify_predicates, OptimizerConfig, OptimizerRule}; +use crate::{simplify_expressions::simplify_predicates, OptimizerConfig, OptimizerRule}; /// Optimizer rule for pushing (moving) filter expressions down in a plan so /// they are applied as early as possible. @@ -2311,7 +2311,7 @@ mod tests { plan, @r" Projection: test.a, test1.d - Cross Join: + Cross Join: Projection: test.a, test.b, test.c TableScan: test, full_filters=[test.a = Int32(1)] Projection: test1.d, test1.e, test1.f @@ -2341,7 +2341,7 @@ mod tests { plan, @r" Projection: test.a, test1.a - Cross Join: + Cross Join: Projection: test.a, test.b, test.c TableScan: test, full_filters=[test.a = Int32(1)] Projection: test1.a, test1.b, test1.c diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_predicates.rs b/datafusion/optimizer/src/simplify_expressions/simplify_predicates.rs index 9d9e840636b8a..32b2315e15d58 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_predicates.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_predicates.rs @@ -204,16 +204,17 @@ fn find_most_restrictive_predicate( if let Some(scalar) = scalar_value { if let Some(current_best) = best_value { - let comparison = scalar.try_cmp(current_best)?; - let is_better = if find_greater { - comparison == std::cmp::Ordering::Greater - } else { - comparison == std::cmp::Ordering::Less - }; - - if is_better { - best_value = Some(scalar); - most_restrictive_idx = idx; + if let Some(comparison) = scalar.partial_cmp(current_best) { + let is_better = if find_greater { + comparison == std::cmp::Ordering::Greater + } else { + comparison == std::cmp::Ordering::Less + }; + + if is_better { + best_value = Some(scalar); + most_restrictive_idx = idx; + } } } else { best_value = Some(scalar); diff --git a/datafusion/physical-expr-common/src/datum.rs b/datafusion/physical-expr-common/src/datum.rs index 233deff758c7b..7084bc440e86b 100644 --- a/datafusion/physical-expr-common/src/datum.rs +++ b/datafusion/physical-expr-common/src/datum.rs @@ -154,9 +154,26 @@ pub fn compare_op_for_nested( if matches!(op, Operator::IsDistinctFrom | Operator::IsNotDistinctFrom) { Ok(BooleanArray::new(values, None)) } else { - // If one of the side is NULL, we returns NULL + // If one of the side is NULL, we return NULL // i.e. NULL eq NULL -> NULL - let nulls = NullBuffer::union(l.nulls(), r.nulls()); + // For nested comparisons, we need to ensure the null buffer matches the result length + let nulls = match (is_l_scalar, is_r_scalar) { + (false, false) | (true, true) => NullBuffer::union(l.nulls(), r.nulls()), + (true, false) => { + // When left is null-scalar and right is array, expand left nulls to match result length + match l.nulls().filter(|nulls| !nulls.is_valid(0)) { + Some(_) => Some(NullBuffer::new_null(len)), // Left scalar is null + None => r.nulls().cloned(), // Left scalar is non-null + } + } + (false, true) => { + // When right is null-scalar and left is array, expand right nulls to match result length + match r.nulls().filter(|nulls| !nulls.is_valid(0)) { + Some(_) => Some(NullBuffer::new_null(len)), // Right scalar is null + None => l.nulls().cloned(), // Right scalar is non-null + } + } + }; Ok(BooleanArray::new(values, nulls)) } } diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index ba30b916b9f87..ea10b1197b1d7 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -32,6 +32,10 @@ use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash}; /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it. +/// +/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also +/// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where +/// the same `ExecutionPlan` is reused with different data. #[derive(Debug)] pub struct DynamicFilterPhysicalExpr { /// The original children of this PhysicalExpr, if any. @@ -121,6 +125,10 @@ impl DynamicFilterPhysicalExpr { /// do not change* since those will be used to determine what columns need to read or projected /// when evaluating the expression. /// + /// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also + /// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where + /// the same `ExecutionPlan` is reused with different data. + /// /// [`collect_columns`]: crate::utils::collect_columns #[allow(dead_code)] // Only used in tests for now pub fn new( diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 163a76956ef5c..42c08ce8f437e 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -940,20 +940,21 @@ fn add_merge_on_top( input: DistributionContext, fetch: &mut Option, ) -> DistributionContext { - // Add SortPreservingMerge only when partition count is larger than 1. + // Apply only when the partition count is larger than one. if input.plan.output_partitioning().partition_count() > 1 { // When there is an existing ordering, we preserve ordering // when decreasing partitions. This will be un-done in the future // if any of the following conditions is true // - Preserving ordering is not helpful in terms of satisfying ordering requirements // - Usage of order preserving variants is not desirable - // (determined by flag `config.optimizer.bounded_order_preserving_variants`) - let new_plan = if let Some(ordering) = input.plan.output_ordering() { + // (determined by flag `config.optimizer.prefer_existing_sort`) + let new_plan = if let Some(req) = input.plan.output_ordering() { Arc::new( - SortPreservingMergeExec::new(ordering.clone(), Arc::clone(&input.plan)) + SortPreservingMergeExec::new(req.clone(), Arc::clone(&input.plan)) .with_fetch(fetch.take()), ) as _ } else { + // If there is no input order, we can simply coalesce partitions: Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _ }; @@ -980,6 +981,7 @@ fn add_merge_on_top( /// ```text /// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet", /// ``` +#[allow(clippy::type_complexity)] fn remove_dist_changing_operators( mut distribution_context: DistributionContext, ) -> Result<( @@ -1030,7 +1032,8 @@ fn remove_dist_changing_operators( /// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", /// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet", /// ``` -fn replace_order_preserving_variants( +#[allow(clippy::type_complexity)] +pub fn replace_order_preserving_variants( mut context: DistributionContext, ordering_satisfied: bool, ) -> Result<(DistributionContext, Option)> { diff --git a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs index 6d327d321c4cd..b536e7960208e 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs @@ -136,11 +136,6 @@ pub fn plan_with_order_preserving_variants( return Ok(sort_input); } else if is_coalesce_partitions(&sort_input.plan) && is_spm_better { let child = &sort_input.children[0].plan; - let mut fetch = fetch; - if let Some(coalesce_fetch) = sort_input.plan.fetch() { - // Get the min fetch between the `fetch` and the coalesce's fetch: - fetch = Some(coalesce_fetch.min(fetch.unwrap_or(usize::MAX))) - }; if let Some(ordering) = child.output_ordering() { let mut fetch = fetch; if let Some(coalesce_fetch) = sort_input.plan.fetch() { diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index f4127bab29417..c64be0de1e83f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -28,7 +28,7 @@ use datafusion_common::Result; use datafusion_expr::EmitTo; -pub(crate) mod multi_group_by; +pub mod multi_group_by; mod row; mod single_group_by; diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 2ac0389454dec..9b547a45e8b08 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -91,6 +91,11 @@ pub trait GroupColumn: Send + Sync { /// Returns the number of rows stored in this builder fn len(&self) -> usize; + /// true if len == 0 + fn is_empty(&self) -> bool { + self.len() == 0 + } + /// Returns the number of bytes used by this [`GroupColumn`] fn size(&self) -> usize; diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 692d66d9f27b1..11812135719ca 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1019,7 +1019,7 @@ impl ExecutionPlan for AggregateExec { } fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = AggregateExec { mode: self.mode, @@ -1036,7 +1036,7 @@ impl ExecutionPlan for AggregateExec { metrics: self.metrics.clone(), }; - let new_props: PlanProperties = new_plan.cache.clone().with_node_id(_node_id); + let new_props: PlanProperties = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 791a5f8cb2d9c..c67430b467b69 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -213,7 +213,7 @@ impl ExecutionPlan for AnalyzeExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = AnalyzeExec::new( self.verbose, @@ -221,7 +221,7 @@ impl ExecutionPlan for AnalyzeExec { Arc::clone(self.input()), Arc::clone(&self.schema), ); - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 5ebb9ddab2661..2f598a8ecea88 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -229,12 +229,12 @@ impl ExecutionPlan for CoalesceBatchesExec { } fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = CoalesceBatchesExec::new(Arc::clone(self.input()), self.target_batch_size) .with_fetch(self.fetch()); - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 0ef4e18764188..685e751832eb4 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -226,10 +226,11 @@ impl ExecutionPlan for CoalescePartitionsExec { } fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = CoalescePartitionsExec::new(Arc::clone(self.input())); - let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.fetch = self.fetch; + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs index be0afa07eac2c..89d5ba6f4da9b 100644 --- a/datafusion/physical-plan/src/coop.rs +++ b/datafusion/physical-plan/src/coop.rs @@ -254,7 +254,7 @@ impl ExecutionPlan for CooperativeExec { } fn maintains_input_order(&self) -> Vec { - self.input.maintains_input_order() + vec![true; self.children().len()] } fn children(&self) -> Vec<&Arc> { diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 40b4ec61dc102..acd00ac3fa255 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -175,6 +175,16 @@ impl ExecutionPlan for EmptyExec { None, )) } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = EmptyExec::new(Arc::clone(&self.schema)); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 1f06d94bd6ed0..2bcc88ffa3de9 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -48,7 +48,7 @@ use crate::stream::RecordBatchStreamAdapter; use arrow::array::{Array, RecordBatch}; use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; -use datafusion_common::{exec_err, Constraints, Result}; +use datafusion_common::{exec_err, Constraints, DataFusionError, Result}; use datafusion_common_runtime::JoinSet; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; @@ -118,10 +118,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// Returns an error if this individual node does not conform to its invariants. /// These invariants are typically only checked in debug mode. /// - /// A default set of invariants is provided in the default implementation. + /// A default set of invariants is provided in the [check_default_invariants] function. + /// The default implementation of `check_invariants` calls this function. /// Extension nodes can provide their own invariants. - fn check_invariants(&self, _check: InvariantLevel) -> Result<()> { - Ok(()) + fn check_invariants(&self, check: InvariantLevel) -> Result<()> { + check_default_invariants(self, check) } /// Specifies the data distribution requirements for all the @@ -195,6 +196,31 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { children: Vec>, ) -> Result>; + /// Reset any internal state within this [`ExecutionPlan`]. + /// + /// This method is called when an [`ExecutionPlan`] needs to be re-executed, + /// such as in recursive queries. Unlike [`ExecutionPlan::with_new_children`], this method + /// ensures that any stateful components (e.g., [`DynamicFilterPhysicalExpr`]) + /// are reset to their initial state. + /// + /// The default implementation simply calls [`ExecutionPlan::with_new_children`] with the existing children, + /// effectively creating a new instance of the [`ExecutionPlan`] with the same children but without + /// necessarily resetting any internal state. Implementations that require resetting of some + /// internal state should override this method to provide the necessary logic. + /// + /// This method should *not* reset state recursively for children, as it is expected that + /// it will be called from within a walk of the execution plan tree so that it will be called on each child later + /// or was already called on each child. + /// + /// Note to implementers: unlike [`ExecutionPlan::with_new_children`] this method does not accept new children as an argument, + /// thus it is expected that any cached plan properties will remain valid after the reset. + /// + /// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr + fn reset_state(self: Arc) -> Result> { + let children = self.children().into_iter().cloned().collect(); + self.with_new_children(children) + } + /// If supported, attempt to increase the partitioning of this `ExecutionPlan` to /// produce `target_partitions` partitions. /// @@ -1079,6 +1105,37 @@ impl PlanProperties { } } +macro_rules! check_len { + ($target:expr, $func_name:ident, $expected_len:expr) => { + let actual_len = $target.$func_name().len(); + if actual_len != $expected_len { + return internal_err!( + "{}::{} returned Vec with incorrect size: {} != {}", + $target.name(), + stringify!($func_name), + actual_len, + $expected_len + ); + } + }; +} + +/// Checks a set of invariants that apply to all ExecutionPlan implementations. +/// Returns an error if the given node does not conform. +pub fn check_default_invariants( + plan: &P, + _check: InvariantLevel, +) -> Result<(), DataFusionError> { + let children_len = plan.children().len(); + + check_len!(plan, maintains_input_order, children_len); + check_len!(plan, required_input_ordering, children_len); + check_len!(plan, required_input_distribution, children_len); + check_len!(plan, benefits_from_input_partitioning, children_len); + + Ok(()) +} + /// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful /// especially for the distributed engine to judge whether need to deal with shuffling. /// Currently, there are 3 kinds of execution plan which needs data exchange diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index b6215a3c2efc4..5f8df698a7ad1 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -421,12 +421,12 @@ impl ExecutionPlan for FilterExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = FilterExec::try_new(Arc::clone(&self.predicate), Arc::clone(self.input()))? .with_projection(self.projection.clone())?; - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 7431f7509a6f3..7c841ebfde628 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -270,6 +270,18 @@ impl ExecutionPlan for CrossJoinExec { ))) } + fn reset_state(self: Arc) -> Result> { + let new_exec = CrossJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + schema: Arc::clone(&self.schema), + left_fut: Default::default(), // reset the build side! + metrics: ExecutionPlanMetricsSet::default(), + cache: self.cache.clone(), + }; + Ok(Arc::new(new_exec)) + } + fn required_input_distribution(&self) -> Vec { vec![ Distribution::SinglePartition, @@ -350,11 +362,11 @@ impl ExecutionPlan for CrossJoinExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = CrossJoinExec::new(Arc::clone(&self.left), Arc::clone(&self.right)); - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index ac1493d09ce7e..2e088055c723b 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -769,6 +769,26 @@ impl ExecutionPlan for HashJoinExec { )?)) } + fn reset_state(self: Arc) -> Result> { + // Reset the left_fut to allow re-execution + Ok(Arc::new(HashJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + on: self.on.clone(), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + left_fut: OnceAsync::default(), + random_state: self.random_state.clone(), + mode: self.mode, + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + cache: self.cache.clone(), + })) + } + fn execute( &self, partition: usize, @@ -906,7 +926,7 @@ impl ExecutionPlan for HashJoinExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = HashJoinExec::try_new( Arc::clone(&self.left), @@ -918,7 +938,7 @@ impl ExecutionPlan for HashJoinExec { *self.partition_mode(), self.null_equality, )?; - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index e9e6904a035af..ab1cf9cd83c8f 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -545,7 +545,7 @@ impl ExecutionPlan for SortMergeJoinExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = SortMergeJoinExec::try_new( Arc::clone(&self.left), @@ -556,7 +556,7 @@ impl ExecutionPlan for SortMergeJoinExec { self.sort_options.clone(), self.null_equality, )?; - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 29c22e116f22e..de721a1d3fc83 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -476,7 +476,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = SymmetricHashJoinExec::try_new( Arc::clone(&self.left), @@ -489,7 +489,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { self.right_sort_exprs.clone(), self.mode, )?; - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 34aca9a74a951..c1410677330a5 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -212,11 +212,11 @@ impl ExecutionPlan for GlobalLimitExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = GlobalLimitExec::new(Arc::clone(self.input()), self.skip, self.fetch); - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/node_id.rs b/datafusion/physical-plan/src/node_id.rs index 2a246db0a77b5..7b8d0281eb73b 100644 --- a/datafusion/physical-plan/src/node_id.rs +++ b/datafusion/physical-plan/src/node_id.rs @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + use std::sync::Arc; use crate::ExecutionPlan; diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index de242c98399fb..e2b66c7e90d51 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -186,10 +186,10 @@ impl ExecutionPlan for PlaceholderRowExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = PlaceholderRowExec::new(Arc::clone(&self.schema)); - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index dee9812215386..c7bc92bac7953 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -249,7 +249,7 @@ impl ExecutionPlan for ProjectionExec { Ok(stats_projection( input_stats, self.expr.iter().map(|(e, _)| Arc::clone(e)), - Arc::clone(&self.schema), + Arc::clone(&self.input.schema()), )) } @@ -262,11 +262,11 @@ impl ExecutionPlan for ProjectionExec { } fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = ProjectionExec::try_new(self.expr.clone(), Arc::clone(self.input()))?; - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } @@ -1040,8 +1040,10 @@ mod tests { use crate::common::collect; use crate::test; + use crate::test::exec::StatisticsExec; - use arrow::datatypes::DataType; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::stats::{ColumnStatistics, Precision, Statistics}; use datafusion_common::ScalarValue; use datafusion_expr::Operator; @@ -1240,4 +1242,86 @@ mod tests { assert_eq!(result, expected); } + + #[test] + fn test_projection_statistics_uses_input_schema() { + let input_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + Field::new("d", DataType::Int32, false), + Field::new("e", DataType::Int32, false), + Field::new("f", DataType::Int32, false), + ]); + + let input_statistics = Statistics { + num_rows: Precision::Exact(10), + column_statistics: vec![ + ColumnStatistics { + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + ..Default::default() + }, + ColumnStatistics { + min_value: Precision::Exact(ScalarValue::Int32(Some(5))), + max_value: Precision::Exact(ScalarValue::Int32(Some(50))), + ..Default::default() + }, + ColumnStatistics { + min_value: Precision::Exact(ScalarValue::Int32(Some(10))), + max_value: Precision::Exact(ScalarValue::Int32(Some(40))), + ..Default::default() + }, + ColumnStatistics { + min_value: Precision::Exact(ScalarValue::Int32(Some(20))), + max_value: Precision::Exact(ScalarValue::Int32(Some(30))), + ..Default::default() + }, + ColumnStatistics { + min_value: Precision::Exact(ScalarValue::Int32(Some(21))), + max_value: Precision::Exact(ScalarValue::Int32(Some(29))), + ..Default::default() + }, + ColumnStatistics { + min_value: Precision::Exact(ScalarValue::Int32(Some(24))), + max_value: Precision::Exact(ScalarValue::Int32(Some(26))), + ..Default::default() + }, + ], + ..Default::default() + }; + + let input = Arc::new(StatisticsExec::new(input_statistics, input_schema)); + + // Create projection expressions that reference columns from the input schema and the length + // of output schema columns < input schema columns and hence if we use the last few columns + // from the input schema in the expressions here, bounds_check would fail on them if output + // schema is supplied to the partitions_statistics method. + let exprs: Vec<(Arc, String)> = vec![ + ( + Arc::new(Column::new("c", 2)) as Arc, + "c_renamed".to_string(), + ), + ( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("e", 4)), + Operator::Plus, + Arc::new(Column::new("f", 5)), + )) as Arc, + "e_plus_f".to_string(), + ), + ]; + + let projection = ProjectionExec::try_new(exprs, input).unwrap(); + + let stats = projection.partition_statistics(None).unwrap(); + + assert_eq!(stats.num_rows, Precision::Exact(10)); + assert_eq!( + stats.column_statistics.len(), + 2, + "Expected 2 columns in projection statistics" + ); + assert!(stats.total_byte_size.is_exact().unwrap_or(false)); + } } diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 8bba8c08dea39..bbbdb9e89ccdb 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -209,7 +209,7 @@ impl ExecutionPlan for RecursiveQueryExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = RecursiveQueryExec::try_new( self.name.clone(), @@ -217,7 +217,7 @@ impl ExecutionPlan for RecursiveQueryExec { Arc::clone(&self.recursive_term), self.is_distinct, )?; - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } @@ -387,7 +387,7 @@ fn assign_work_table( } /// Some plans will change their internal states after execution, making them unable to be executed again. -/// This function uses `ExecutionPlan::with_new_children` to fork a new plan with initial states. +/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan. /// /// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan. /// However, if the data of the left table is derived from the work table, it will become outdated @@ -398,8 +398,7 @@ fn reset_plan_states(plan: Arc) -> Result() { Ok(Transformed::no(plan)) } else { - let new_plan = Arc::clone(&plan) - .with_new_children(plan.children().into_iter().cloned().collect())?; + let new_plan = Arc::clone(&plan).reset_state()?; Ok(Transformed::yes(new_plan)) } }) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 0dd9d935d95ca..a49f8eb293ec2 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -767,7 +767,7 @@ impl ExecutionPlan for RepartitionExec { } fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = RepartitionExec { input: Arc::clone(&self.input), @@ -776,7 +776,7 @@ impl ExecutionPlan for RepartitionExec { preserve_order: self.preserve_order, cache: self.cache.clone(), }; - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index c2ecd514086c9..077a5141d36d8 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -326,7 +326,7 @@ impl ExecutionPlan for PartialSortExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = PartialSortExec { expr: self.expr.clone(), @@ -337,7 +337,7 @@ impl ExecutionPlan for PartialSortExec { fetch: self.fetch, cache: self.cache.clone(), }; - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 5186293e59975..dbf0cad74178e 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -899,6 +899,29 @@ impl SortExec { self } + /// Add or reset `self.filter` to a new `DynamicFilterPhysicalExpr`. + fn create_filter(&self) -> Arc { + let children = self + .expr + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true))) + } + + fn cloned(&self) -> Self { + SortExec { + input: Arc::clone(&self.input), + expr: self.expr.clone(), + metrics_set: self.metrics_set.clone(), + preserve_partitioning: self.preserve_partitioning, + common_sort_prefix: self.common_sort_prefix.clone(), + fetch: self.fetch, + cache: self.cache.clone(), + filter: self.filter.clone(), + } + } + /// Modify how many rows to include in the result /// /// If None, then all rows will be returned, in sorted order. @@ -920,25 +943,13 @@ impl SortExec { } let filter = fetch.is_some().then(|| { // If we already have a filter, keep it. Otherwise, create a new one. - self.filter.clone().unwrap_or_else(|| { - let children = self - .expr - .iter() - .map(|sort_expr| Arc::clone(&sort_expr.expr)) - .collect::>(); - Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true))) - }) + self.filter.clone().unwrap_or_else(|| self.create_filter()) }); - SortExec { - input: Arc::clone(&self.input), - expr: self.expr.clone(), - metrics_set: self.metrics_set.clone(), - preserve_partitioning: self.preserve_partitioning, - common_sort_prefix: self.common_sort_prefix.clone(), - fetch, - cache, - filter, - } + let mut new_sort = self.cloned(); + new_sort.fetch = fetch; + new_sort.cache = cache; + new_sort.filter = filter; + new_sort } /// Input schema @@ -1110,10 +1121,35 @@ impl ExecutionPlan for SortExec { self: Arc, children: Vec>, ) -> Result> { - let mut new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0])) - .with_fetch(self.fetch) - .with_preserve_partitioning(self.preserve_partitioning); - new_sort.filter = self.filter.clone(); + let mut new_sort = self.cloned(); + assert!( + children.len() == 1, + "SortExec should have exactly one child" + ); + new_sort.input = Arc::clone(&children[0]); + // Recompute the properties based on the new input since they may have changed + let (cache, sort_prefix) = Self::compute_properties( + &new_sort.input, + new_sort.expr.clone(), + new_sort.preserve_partitioning, + )?; + new_sort.cache = cache; + new_sort.common_sort_prefix = sort_prefix; + + Ok(Arc::new(new_sort)) + } + + fn reset_state(self: Arc) -> Result> { + let children = self.children().into_iter().cloned().collect(); + let new_sort = self.with_new_children(children)?; + let mut new_sort = new_sort + .as_any() + .downcast_ref::() + .expect("cloned 1 lines above this line, we know the type") + .clone(); + // Our dynamic filter and execution metrics are the state we need to reset. + new_sort.filter = Some(new_sort.create_filter()); + new_sort.metrics_set = ExecutionPlanMetricsSet::new(); Ok(Arc::new(new_sort)) } @@ -1240,7 +1276,7 @@ impl ExecutionPlan for SortExec { } fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let new_plan = SortExec { input: Arc::clone(self.input()), @@ -1248,7 +1284,7 @@ impl ExecutionPlan for SortExec { fetch: self.fetch, metrics_set: self.metrics_set.clone(), preserve_partitioning: self.preserve_partitioning, - cache: self.cache.clone().with_node_id(_node_id), + cache: self.cache.clone().with_node_id(node_id), common_sort_prefix: self.common_sort_prefix.clone(), filter: self.filter.clone(), }; diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 5420b53bad08a..6a6fafb9ff078 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -364,12 +364,12 @@ impl ExecutionPlan for SortPreservingMergeExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = SortPreservingMergeExec::new(self.expr.clone(), Arc::clone(self.input())) .with_fetch(self.fetch()); - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 6d273c787b363..db7d6ae0ea5bb 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -342,7 +342,7 @@ impl ExecutionPlan for StreamingTableExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = StreamingTableExec { partitions: self.partitions.clone(), @@ -354,7 +354,7 @@ impl ExecutionPlan for StreamingTableExec { cache: self.cache.clone(), metrics: self.metrics.clone(), }; - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index aed0c702ad411..96b64d98615ec 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -33,7 +33,8 @@ use super::{ SendableRecordBatchStream, Statistics, }; use crate::execution_plan::{ - boundedness_from_children, emission_type_from_children, InvariantLevel, + boundedness_from_children, check_default_invariants, emission_type_from_children, + InvariantLevel, }; use crate::metrics::BaselineMetrics; use crate::projection::{make_with_child, ProjectionExec}; @@ -176,7 +177,9 @@ impl ExecutionPlan for UnionExec { &self.cache } - fn check_invariants(&self, _check: InvariantLevel) -> Result<()> { + fn check_invariants(&self, check: InvariantLevel) -> Result<()> { + check_default_invariants(self, check)?; + (self.inputs().len() >= 2) .then_some(()) .ok_or(DataFusionError::Internal( @@ -300,10 +303,10 @@ impl ExecutionPlan for UnionExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = UnionExec::new(self.inputs.clone()); - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index df2b6535c656f..3e183e9417fbb 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -204,7 +204,7 @@ impl ExecutionPlan for UnnestExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = UnnestExec::new( Arc::clone(self.input()), @@ -213,7 +213,7 @@ impl ExecutionPlan for UnnestExec { Arc::clone(&self.schema), self.options.clone(), ); - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 388b6110e4d8c..e5dc3b1162338 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -231,13 +231,13 @@ impl ExecutionPlan for ValuesExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = ValuesExec::try_new_from_batches( Arc::clone(&self.schema), self.data.clone(), )?; - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index e6e83d7eff99a..511d8e69fa167 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -304,14 +304,14 @@ impl ExecutionPlan for WindowAggExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = WindowAggExec::try_new( self.window_expr.clone(), Arc::clone(self.input()), self.can_repartition, )?; - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index b7b7c136948f5..dfdbc37daca27 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -174,14 +174,6 @@ impl ExecutionPlan for WorkTableExec { &self.cache } - fn maintains_input_order(&self) -> Vec { - vec![false] - } - - fn benefits_from_input_partitioning(&self) -> Vec { - vec![false] - } - fn children(&self) -> Vec<&Arc> { vec![] } @@ -223,11 +215,11 @@ impl ExecutionPlan for WorkTableExec { fn with_node_id( self: Arc, - _node_id: usize, + node_id: usize, ) -> Result>> { let mut new_plan = WorkTableExec::new(self.name.clone(), Arc::clone(&self.schema)); - let new_props = new_plan.cache.clone().with_node_id(_node_id); + let new_props = new_plan.cache.clone().with_node_id(node_id); new_plan.cache = new_props; Ok(Some(Arc::new(new_plan))) } diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index e9a8b83dc4f91..a1eeabdf87f4a 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -46,7 +46,7 @@ avro = ["datafusion/avro", "datafusion-common/avro"] [dependencies] arrow = { workspace = true } chrono = { workspace = true } -datafusion = { workspace = true, default-features = false } +datafusion = { workspace = true, default-features = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } datafusion-proto-common = { workspace = true } @@ -56,7 +56,6 @@ prost = { workspace = true } serde = { version = "1.0", optional = true } serde_json = { workspace = true, optional = true } [dev-dependencies] -datafusion = { workspace = true, default-features = true } datafusion-functions = { workspace = true, default-features = true } datafusion-functions-aggregate = { workspace = true } datafusion-functions-window-common = { workspace = true } diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index 2df162f21e3a3..f7da0f1f2f741 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -97,29 +97,6 @@ //! assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); //! # Ok(()) //! # } -//! ``` -//! # Example: Serializing [`ExecutionPlan`]s -//! -//! ``` -//! # use datafusion::prelude::*; -//! # use datafusion_common::Result; -//! # use datafusion_proto::bytes::{physical_plan_from_bytes,physical_plan_to_bytes}; -//! # #[tokio::main] -//! # async fn main() -> Result<()>{ -//! // Create a plan that scans table 't' -//! let ctx = SessionContext::new(); -//! ctx.register_csv("t1", "tests/testdata/test.csv", CsvReadOptions::default()).await?; -//! let physical_plan = ctx.table("t1").await?.create_physical_plan().await?; -//! -//! // Convert the plan into bytes (for sending over the network, etc.) -//! let bytes = physical_plan_to_bytes(physical_plan.clone())?; -//! -//! // Decode bytes from somewhere (over network, etc.) back to ExecutionPlan -//! let physical_round_trip = physical_plan_from_bytes(&bytes, &ctx)?; -//! assert_eq!(format!("{:?}", physical_plan), format!("{:?}", physical_round_trip)); -//! # Ok(()) -//! # } -//! ``` pub mod bytes; pub mod common; pub mod generated; diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 52e0b20db2c2e..6e7546737d72c 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -611,29 +611,6 @@ impl protobuf::PhysicalPlanNode { ) -> Result> { let input: Arc = into_physical_plan(&filter.input, registry, runtime, extension_codec)?; - let projection = if !filter.projection.is_empty() { - Some( - filter - .projection - .iter() - .map(|i| *i as usize) - .collect::>(), - ) - } else { - None - }; - - // Use the projected schema if projection is present, otherwise use the full schema - let predicate_schema = if let Some(ref proj_indices) = projection { - // Create projected schema for parsing the predicate - let projected_fields: Vec<_> = proj_indices - .iter() - .map(|&i| input.schema().field(i).clone()) - .collect(); - Arc::new(Schema::new(projected_fields)) - } else { - input.schema() - }; let predicate = filter .expr @@ -642,7 +619,7 @@ impl protobuf::PhysicalPlanNode { parse_physical_expr( expr, registry, - predicate_schema.as_ref(), + input.schema().as_ref(), extension_codec, ) }) @@ -653,6 +630,17 @@ impl protobuf::PhysicalPlanNode { ) })?; let filter_selectivity = filter.default_filter_selectivity.try_into(); + let projection = if !filter.projection.is_empty() { + Some( + filter + .projection + .iter() + .map(|i| *i as usize) + .collect::>(), + ) + } else { + None + }; let filter = FilterExec::try_new(predicate, input)?.with_projection(projection)?; match filter_selectivity { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 2d27a21447b22..ae6dea83182e9 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -32,6 +32,9 @@ use arrow::csv::WriterBuilder; use arrow::datatypes::{Fields, TimeUnit}; use datafusion::physical_expr::aggregate::AggregateExprBuilder; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion::physical_plan::node_id::{ + annotate_node_id_for_execution_plan, NodeIdAnnotator, +}; use datafusion_expr::dml::InsertOp; use datafusion_functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf; use datafusion_functions_aggregate::array_agg::array_agg_udaf; @@ -134,13 +137,22 @@ fn roundtrip_test_and_return( ctx: &SessionContext, codec: &dyn PhysicalExtensionCodec, ) -> Result> { + let mut annotator = NodeIdAnnotator::new(); + let exec_plan = annotate_node_id_for_execution_plan(&exec_plan, &mut annotator)?; let proto: protobuf::PhysicalPlanNode = protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), codec) .expect("to proto"); let runtime = ctx.runtime_env(); - let result_exec_plan: Arc = proto + let mut result_exec_plan: Arc = proto .try_into_physical_plan(ctx, runtime.deref(), codec) .expect("from proto"); + + // Re-annotate the deserialized plan with node IDs to match the original plan structure + // This ensures that the roundtrip preserves the node_id values for comparison + let mut annotator = NodeIdAnnotator::new(); + result_exec_plan = + annotate_node_id_for_execution_plan(&result_exec_plan, &mut annotator)?; + assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}")); Ok(result_exec_plan) } @@ -1839,3 +1851,36 @@ async fn test_round_trip_tpch_queries() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_tpch_part_in_list_query_with_real_parquet_data() -> Result<()> { + use datafusion_common::test_util::datafusion_test_data; + + let ctx = SessionContext::new(); + + // Register the TPC-H part table using the local test data + let test_data = datafusion_test_data(); + let table_sql = format!( + "CREATE EXTERNAL TABLE part STORED AS PARQUET LOCATION '{test_data}/tpch_part_small.parquet'" +); + ctx.sql(&table_sql).await.map_err(|e| { + DataFusionError::External(format!("Failed to create part table: {e}").into()) + })?; + + // Test the exact problematic query + let sql = + "SELECT p_size FROM part WHERE p_size IN (14, 6, 5, 31) and p_partkey > 1000"; + + let logical_plan = ctx.sql(sql).await?.into_unoptimized_plan(); + let optimized_plan = ctx.state().optimize(&logical_plan)?; + let physical_plan = ctx.state().create_physical_plan(&optimized_plan).await?; + + // Serialize the physical plan - bug may happen here already but not necessarily manifests + let codec = DefaultPhysicalExtensionCodec {}; + let proto = PhysicalPlanNode::try_from_physical_plan(physical_plan.clone(), &codec)?; + + // This will fail with the bug, but should succeed when fixed + let _deserialized_plan = + proto.try_into_physical_plan(&ctx, ctx.runtime_env().as_ref(), &codec)?; + Ok(()) +} diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index bdf327c98248a..753820b6b6193 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -6028,6 +6028,106 @@ GROUP BY dummy ---- text1 + +# Test string_agg with ORDER BY clasuses (issue #17011) +statement ok +create table t (k varchar, v int); + +statement ok +insert into t values ('a', 2), ('b', 3), ('c', 1), ('d', null); + +query T +select string_agg(k, ',' order by k) from t; +---- +a,b,c,d + +query T +select string_agg(k, ',' order by k desc) from t; +---- +d,c,b,a + +query T +select string_agg(k, ',' order by v) from t; +---- +c,a,b,d + +query T +select string_agg(k, ',' order by v nulls first) from t; +---- +d,c,a,b + +query T +select string_agg(k, ',' order by v desc) from t; +---- +d,b,a,c + +query T +select string_agg(k, ',' order by v desc nulls last) from t; +---- +b,a,c,d + +query T +-- odd indexes should appear first, ties solved by v +select string_agg(k, ',' order by v % 2 == 0, v) from t; +---- +c,b,a,d + +query T +-- odd indexes should appear first, ties solved by v desc +select string_agg(k, ',' order by v % 2 == 0, v desc) from t; +---- +b,c,a,d + +query T +select string_agg(k, ',' order by + case + when k = 'a' then 3 + when k = 'b' then 0 + when k = 'c' then 2 + when k = 'd' then 1 + end) +from t; +---- +b,d,c,a + +query T +select string_agg(k, ',' order by + case + when k = 'a' then 3 + when k = 'b' then 0 + when k = 'c' then 2 + when k = 'd' then 1 + end desc) +from t; +---- +a,c,d,b + +query TT +explain select string_agg(k, ',' order by v) from t; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[string_agg(t.k, Utf8(",")) ORDER BY [t.v ASC NULLS LAST]]] +02)--TableScan: t projection=[k, v] +physical_plan +01)AggregateExec: mode=Single, gby=[], aggr=[string_agg(t.k,Utf8(",")) ORDER BY [t.v ASC NULLS LAST]] +02)--SortExec: expr=[v@1 ASC NULLS LAST], preserve_partitioning=[false] +03)----DataSourceExec: partitions=1, partition_sizes=[1] + +query TT +explain select string_agg(k, ',' order by v desc) from t; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[string_agg(t.k, Utf8(",")) ORDER BY [t.v DESC NULLS FIRST]]] +02)--TableScan: t projection=[k, v] +physical_plan +01)AggregateExec: mode=Single, gby=[], aggr=[string_agg(t.k,Utf8(",")) ORDER BY [t.v DESC NULLS FIRST]] +02)--SortExec: expr=[v@1 DESC], preserve_partitioning=[false] +03)----DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +drop table t; + + # Tests for aggregating with NaN values statement ok CREATE TABLE float_table ( diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index f67ea5ed80f12..0b72b16b7a588 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -6206,11 +6206,7 @@ physical_plan 04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))] 05)--------ProjectionExec: expr=[] 06)----------CoalesceBatchesExec: target_batch_size=8192 -<<<<<<< HEAD -07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8)), 1, 32) IN ([Literal { value: Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"), field: Field { name: "7f4b18de3cfeb9b4ac78c381ee2ad278", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("a"), field: Field { name: "a", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("b"), field: Field { name: "b", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("c"), field: Field { name: "c", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }]) -======= 07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([Literal { value: Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("a"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("b"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("c"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }]) ->>>>>>> upstream/branch-49 08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 09)----------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192] @@ -6239,11 +6235,7 @@ physical_plan 04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))] 05)--------ProjectionExec: expr=[] 06)----------CoalesceBatchesExec: target_batch_size=8192 -<<<<<<< HEAD -07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8)), 1, 32) IN ([Literal { value: Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"), field: Field { name: "7f4b18de3cfeb9b4ac78c381ee2ad278", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("a"), field: Field { name: "a", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("b"), field: Field { name: "b", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("c"), field: Field { name: "c", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }]) -======= 07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([Literal { value: Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("a"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("b"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("c"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }]) ->>>>>>> upstream/branch-49 08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 09)----------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192] @@ -6272,11 +6264,7 @@ physical_plan 04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))] 05)--------ProjectionExec: expr=[] 06)----------CoalesceBatchesExec: target_batch_size=8192 -<<<<<<< HEAD -07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8)), 1, 32) IN ([Literal { value: Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"), field: Field { name: "7f4b18de3cfeb9b4ac78c381ee2ad278", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("a"), field: Field { name: "a", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("b"), field: Field { name: "b", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("c"), field: Field { name: "c", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }]) -======= 07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([Literal { value: Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("a"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("b"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("c"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }]) ->>>>>>> upstream/branch-49 08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 09)----------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192] @@ -8077,6 +8065,18 @@ FixedSizeList(Field { name: "item", data_type: Int32, nullable: true, dict_id: 0 statement error create table varying_fixed_size_col_table (a int[3]) as values ([1,2,3]), ([4,5]); +statement ok +COPY (select [[true, false], [false, true]] a, [false, true] b union select [[null, null]], null) to 'test_files/scratch/array/array_has/single_file.parquet' stored as parquet; + +statement ok +CREATE EXTERNAL TABLE array_has STORED AS PARQUET location 'test_files/scratch/array/array_has/single_file.parquet'; + +query B +select array_contains(a, b) from array_has order by 1 nulls last; +---- +true +NULL + ### Delete tables statement ok @@ -8255,3 +8255,6 @@ drop table values_all_empty; statement ok drop table fixed_size_col_table; + +statement ok +drop table array_has; diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index 32320a06f4fb0..5f8fd1a0b5efd 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -996,6 +996,61 @@ physical_plan 08)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 09)------------WorkTableExec: name=numbers +# Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions +query II +with recursive r as ( + select 0 as k, 0 as v + union all + ( + select * + from r + order by v + limit 1 + ) +) +select * +from r +limit 5; +---- +0 0 +0 0 +0 0 +0 0 +0 0 + +query TT +explain +with recursive r as ( + select 0 as k, 0 as v + union all + ( + select * + from r + order by v + limit 1 + ) +) +select * +from r +limit 5; +---- +logical_plan +01)SubqueryAlias: r +02)--Limit: skip=0, fetch=5 +03)----RecursiveQuery: is_distinct=false +04)------Projection: Int64(0) AS k, Int64(0) AS v +05)--------EmptyRelation +06)------Sort: r.v ASC NULLS LAST, fetch=1 +07)--------Projection: r.k, r.v +08)----------TableScan: r +physical_plan +01)GlobalLimitExec: skip=0, fetch=5 +02)--RecursiveQueryExec: name=r, is_distinct=false +03)----ProjectionExec: expr=[0 as k, 0 as v] +04)------PlaceholderRowExec +05)----SortExec: TopK(fetch=1), expr=[v@1 ASC NULLS LAST], preserve_partitioning=[false] +06)------WorkTableExec: name=r + statement count 0 set datafusion.execution.enable_recursive_ctes = false; diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 1d1534b99b55e..ca7cfabdd007d 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -300,8 +300,8 @@ initial_physical_plan 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] initial_physical_plan_with_schema -01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] +01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] physical_plan after OutputRequirements 01)OutputRequirementExec: order_by=[], dist_by=Unspecified, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] @@ -327,7 +327,7 @@ physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] +physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] statement ok @@ -344,8 +344,8 @@ initial_physical_plan_with_stats 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] initial_physical_plan_with_schema -01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] +01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] physical_plan after OutputRequirements 01)OutputRequirementExec: order_by=[], dist_by=Unspecified 02)--GlobalLimitExec: skip=0, fetch=10 @@ -372,7 +372,7 @@ physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] +physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] statement ok diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 08b3428ebb2aa..25c78e658d413 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -360,11 +360,7 @@ physical_plan 21)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 22)│ DataSourceExec ││ ProjectionExec │ 23)│ -------------------- ││ -------------------- │ -<<<<<<< HEAD -24)│ bytes: 1560 ││ date_col: date_col │ -======= 24)│ bytes: 520 ││ date_col: date_col │ ->>>>>>> upstream/branch-49 25)│ format: memory ││ int_col: int_col │ 26)│ rows: 1 ││ │ 27)│ ││ string_col: │ @@ -1024,10 +1020,6 @@ physical_plan 16)│ files: 1 │ 17)│ format: parquet │ 18)└───────────────────────────┘ -<<<<<<< HEAD - -======= ->>>>>>> upstream/branch-49 # Query with projection on memory query TT @@ -1294,10 +1286,6 @@ physical_plan 07)│ files: 1 ││ files: 1 │ 08)│ format: csv ││ format: parquet │ 09)└───────────────────────────┘└───────────────────────────┘ -<<<<<<< HEAD - -======= ->>>>>>> upstream/branch-49 # Query with sort merge join. statement ok diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 6c40a71fa6ef5..71251c8b1625b 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4240,23 +4240,23 @@ set datafusion.execution.target_partitions = 1; # Note we use csv as MemoryExec does not support limit push down (so doesn't manifest # bugs if limits are improperly pushed down) query I -COPY (values (1), (2), (3), (4), (5)) TO 'test_files/scratch/limit/t1.csv' +COPY (values (1), (2), (3), (4), (5)) TO 'test_files/scratch/joins/t1.csv' STORED AS CSV ---- 5 # store t2 in different order so the top N rows are not the same as the top N rows of t1 query I -COPY (values (5), (4), (3), (2), (1)) TO 'test_files/scratch/limit/t2.csv' +COPY (values (5), (4), (3), (2), (1)) TO 'test_files/scratch/joins/t2.csv' STORED AS CSV ---- 5 statement ok -create external table t1(a int) stored as CSV location 'test_files/scratch/limit/t1.csv'; +create external table t1(a int) stored as CSV location 'test_files/scratch/joins/t1.csv'; statement ok -create external table t2(b int) stored as CSV location 'test_files/scratch/limit/t2.csv'; +create external table t2(b int) stored as CSV location 'test_files/scratch/joins/t2.csv'; ###### ## LEFT JOIN w/ LIMIT @@ -4288,8 +4288,8 @@ logical_plan physical_plan 01)CoalesceBatchesExec: target_batch_size=3, fetch=2 02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, b@0)] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], limit=2, file_type=csv, has_header=true -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], file_type=csv, has_header=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]}, projection=[a], limit=2, file_type=csv, has_header=true +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]}, projection=[b], file_type=csv, has_header=true ###### ## RIGHT JOIN w/ LIMIT @@ -4322,8 +4322,8 @@ logical_plan physical_plan 01)CoalesceBatchesExec: target_batch_size=3, fetch=2 02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, b@0)] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], file_type=csv, has_header=true -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], limit=2, file_type=csv, has_header=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]}, projection=[a], file_type=csv, has_header=true +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]}, projection=[b], limit=2, file_type=csv, has_header=true ###### ## FULL JOIN w/ LIMIT @@ -4359,8 +4359,8 @@ logical_plan physical_plan 01)CoalesceBatchesExec: target_batch_size=3, fetch=2 02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(a@0, b@0)] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], file_type=csv, has_header=true -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], file_type=csv, has_header=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]}, projection=[a], file_type=csv, has_header=true +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]}, projection=[b], file_type=csv, has_header=true statement ok drop table t1; diff --git a/datafusion/sqllogictest/test_files/listing_table_statistics.slt b/datafusion/sqllogictest/test_files/listing_table_statistics.slt index 890d1f2e9250e..d3af6b321c7e6 100644 --- a/datafusion/sqllogictest/test_files/listing_table_statistics.slt +++ b/datafusion/sqllogictest/test_files/listing_table_statistics.slt @@ -35,7 +35,7 @@ query TT explain format indent select * from t; ---- logical_plan TableScan: t projection=[int_col, str_col] -physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/table/2.parquet]]}, projection=[int_col, str_col], file_type=parquet, statistics=[Rows=Exact(4), Bytes=Exact(288), [(Col[0]: Min=Exact(Int64(-1)) Max=Exact(Int64(3)) Null=Exact(0)),(Col[1]: Min=Exact(Utf8View("a")) Max=Exact(Utf8View("d")) Null=Exact(0))]] +physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/table/2.parquet]]}, projection=[int_col, str_col], file_type=parquet, statistics=[Rows=Exact(4), Bytes=Exact(288), [(Col[0]: Min=Exact(Int64(-1)) Max=Exact(Int64(3)) Null=Exact(0)),(Col[1]: Min=Exact(Utf8("a")) Max=Exact(Utf8("d")) Null=Exact(0))]] statement ok drop table t; diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index 26165ca2d735c..038db74c90c2c 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -430,15 +430,15 @@ select arrow_typeof(binaryview_col), binaryview_col FROM binary_as_string_default; ---- -Binary 616161 Binary 616161 Binary 616161 -Binary 626262 Binary 626262 Binary 626262 -Binary 636363 Binary 636363 Binary 636363 -Binary 646464 Binary 646464 Binary 646464 -Binary 656565 Binary 656565 Binary 656565 -Binary 666666 Binary 666666 Binary 666666 -Binary 676767 Binary 676767 Binary 676767 -Binary 686868 Binary 686868 Binary 686868 -Binary 696969 Binary 696969 Binary 696969 +Binary 616161 LargeBinary 616161 BinaryView 616161 +Binary 626262 LargeBinary 626262 BinaryView 626262 +Binary 636363 LargeBinary 636363 BinaryView 636363 +Binary 646464 LargeBinary 646464 BinaryView 646464 +Binary 656565 LargeBinary 656565 BinaryView 656565 +Binary 666666 LargeBinary 666666 BinaryView 666666 +Binary 676767 LargeBinary 676767 BinaryView 676767 +Binary 686868 LargeBinary 686868 BinaryView 686868 +Binary 696969 LargeBinary 696969 BinaryView 696969 # Run an explain plan to show the cast happens in the plan (a CAST is needed for the predicates) query TT @@ -451,13 +451,13 @@ EXPLAIN binaryview_col LIKE '%a%'; ---- logical_plan -01)Filter: CAST(binary_as_string_default.binary_col AS Utf8) LIKE Utf8("%a%") AND CAST(binary_as_string_default.largebinary_col AS Utf8) LIKE Utf8("%a%") AND CAST(binary_as_string_default.binaryview_col AS Utf8) LIKE Utf8("%a%") -02)--TableScan: binary_as_string_default projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[CAST(binary_as_string_default.binary_col AS Utf8) LIKE Utf8("%a%"), CAST(binary_as_string_default.largebinary_col AS Utf8) LIKE Utf8("%a%"), CAST(binary_as_string_default.binaryview_col AS Utf8) LIKE Utf8("%a%")] +01)Filter: CAST(binary_as_string_default.binary_col AS Utf8) LIKE Utf8("%a%") AND CAST(binary_as_string_default.largebinary_col AS LargeUtf8) LIKE LargeUtf8("%a%") AND CAST(binary_as_string_default.binaryview_col AS Utf8View) LIKE Utf8View("%a%") +02)--TableScan: binary_as_string_default projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[CAST(binary_as_string_default.binary_col AS Utf8) LIKE Utf8("%a%"), CAST(binary_as_string_default.largebinary_col AS LargeUtf8) LIKE LargeUtf8("%a%"), CAST(binary_as_string_default.binaryview_col AS Utf8View) LIKE Utf8View("%a%")] physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 -02)--FilterExec: CAST(binary_col@0 AS Utf8) LIKE %a% AND CAST(largebinary_col@1 AS Utf8) LIKE %a% AND CAST(binaryview_col@2 AS Utf8) LIKE %a% +02)--FilterExec: CAST(binary_col@0 AS Utf8) LIKE %a% AND CAST(largebinary_col@1 AS LargeUtf8) LIKE %a% AND CAST(binaryview_col@2 AS Utf8View) LIKE %a% 03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=CAST(binary_col@0 AS Utf8View) LIKE %a% AND CAST(largebinary_col@1 AS Utf8View) LIKE %a% AND CAST(binaryview_col@2 AS Utf8View) LIKE %a% +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=CAST(binary_col@0 AS Utf8) LIKE %a% AND CAST(largebinary_col@1 AS LargeUtf8) LIKE %a% AND CAST(binaryview_col@2 AS Utf8View) LIKE %a% statement ok @@ -478,15 +478,15 @@ select arrow_typeof(binaryview_col), binaryview_col FROM binary_as_string_option; ---- -Utf8 aaa Utf8 aaa Utf8 aaa -Utf8 bbb Utf8 bbb Utf8 bbb -Utf8 ccc Utf8 ccc Utf8 ccc -Utf8 ddd Utf8 ddd Utf8 ddd -Utf8 eee Utf8 eee Utf8 eee -Utf8 fff Utf8 fff Utf8 fff -Utf8 ggg Utf8 ggg Utf8 ggg -Utf8 hhh Utf8 hhh Utf8 hhh -Utf8 iii Utf8 iii Utf8 iii +Utf8 aaa LargeUtf8 aaa Utf8View aaa +Utf8 bbb LargeUtf8 bbb Utf8View bbb +Utf8 ccc LargeUtf8 ccc Utf8View ccc +Utf8 ddd LargeUtf8 ddd Utf8View ddd +Utf8 eee LargeUtf8 eee Utf8View eee +Utf8 fff LargeUtf8 fff Utf8View fff +Utf8 ggg LargeUtf8 ggg Utf8View ggg +Utf8 hhh LargeUtf8 hhh Utf8View hhh +Utf8 iii LargeUtf8 iii Utf8View iii # Run an explain plan to show the cast happens in the plan (there should be no casts) query TT @@ -499,8 +499,8 @@ EXPLAIN binaryview_col LIKE '%a%'; ---- logical_plan -01)Filter: binary_as_string_option.binary_col LIKE Utf8("%a%") AND binary_as_string_option.largebinary_col LIKE Utf8("%a%") AND binary_as_string_option.binaryview_col LIKE Utf8("%a%") -02)--TableScan: binary_as_string_option projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[binary_as_string_option.binary_col LIKE Utf8("%a%"), binary_as_string_option.largebinary_col LIKE Utf8("%a%"), binary_as_string_option.binaryview_col LIKE Utf8("%a%")] +01)Filter: binary_as_string_option.binary_col LIKE Utf8("%a%") AND binary_as_string_option.largebinary_col LIKE LargeUtf8("%a%") AND binary_as_string_option.binaryview_col LIKE Utf8View("%a%") +02)--TableScan: binary_as_string_option projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[binary_as_string_option.binary_col LIKE Utf8("%a%"), binary_as_string_option.largebinary_col LIKE LargeUtf8("%a%"), binary_as_string_option.binaryview_col LIKE Utf8View("%a%")] physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: binary_col@0 LIKE %a% AND largebinary_col@1 LIKE %a% AND binaryview_col@2 LIKE %a% @@ -665,8 +665,8 @@ query TT explain select * from foo where starts_with(column1, 'f'); ---- logical_plan -01)Filter: foo.column1 LIKE Utf8View("f%") -02)--TableScan: foo projection=[column1], partial_filters=[foo.column1 LIKE Utf8View("f%")] +01)Filter: foo.column1 LIKE Utf8("f%") +02)--TableScan: foo projection=[column1], partial_filters=[foo.column1 LIKE Utf8("f%")] physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: column1@0 LIKE f% diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 24e76a570c009..9f6f81789e894 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -528,3 +528,35 @@ query TT select val, part from t_pushdown where part = val AND part = 'a'; ---- a a + +statement ok +COPY ( + SELECT + '00000000000000000000000000000001' AS trace_id, + '2023-10-01 00:00:00'::timestamptz AS start_timestamp, + 'prod' as deployment_environment +) +TO 'test_files/scratch/parquet_filter_pushdown/data/1.parquet'; + +statement ok +COPY ( + SELECT + '00000000000000000000000000000002' AS trace_id, + '2024-10-01 00:00:00'::timestamptz AS start_timestamp, + 'staging' as deployment_environment +) +TO 'test_files/scratch/parquet_filter_pushdown/data/2.parquet'; + +statement ok +CREATE EXTERNAL TABLE t1 STORED AS PARQUET LOCATION 'test_files/scratch/parquet_filter_pushdown/data/'; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = true; + +query T +SELECT deployment_environment +FROM t1 +WHERE trace_id = '00000000000000000000000000000002' +ORDER BY start_timestamp, trace_id; +---- +staging diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part index 4cfbdc18ca508..8a1c4592641ca 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part @@ -59,7 +59,7 @@ logical_plan 03)----Projection: lineitem.l_extendedprice, lineitem.l_discount 04)------Inner Join: lineitem.l_partkey = part.p_partkey Filter: part.p_brand = Utf8View("Brand#12") AND part.p_container IN ([Utf8View("SM CASE"), Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#23") AND part.p_container IN ([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED PACK")]) AND lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) AND part.p_size <= Int32(10) OR part.p_brand = Utf8View("Brand#34") AND part.p_container IN ([Utf8View("LG CASE"), Utf8View("LG BOX"), Utf8View("LG PACK"), Utf8View("LG PKG")]) AND lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2) AND part.p_size <= Int32(15) 05)--------Projection: lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount -06)----------Filter: (lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2)) AND (lineitem.l_shipmode = Utf8View("AIR") OR lineitem.l_shipmode = Utf8View("AIR REG")) AND lineitem.l_shipinstruct = Utf8View("DELIVER IN PERSON") +06)----------Filter: lineitem.l_shipinstruct = Utf8View("DELIVER IN PERSON") AND (lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2)) AND (lineitem.l_shipmode = Utf8View("AIR") OR lineitem.l_shipmode = Utf8View("AIR REG")) 07)------------TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice, l_discount, l_shipinstruct, l_shipmode], partial_filters=[lineitem.l_shipmode = Utf8View("AIR") OR lineitem.l_shipmode = Utf8View("AIR REG"), lineitem.l_shipinstruct = Utf8View("DELIVER IN PERSON"), lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2)] 08)--------Filter: (part.p_brand = Utf8View("Brand#12") AND part.p_container IN ([Utf8View("SM CASE"), Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#23") AND part.p_container IN ([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = Utf8View("Brand#34") AND part.p_container IN ([Utf8View("LG CASE"), Utf8View("LG BOX"), Utf8View("LG PACK"), Utf8View("LG PKG")]) AND part.p_size <= Int32(15)) AND part.p_size >= Int32(1) 09)----------TableScan: part projection=[p_partkey, p_brand, p_size, p_container], partial_filters=[part.p_size >= Int32(1), part.p_brand = Utf8View("Brand#12") AND part.p_container IN ([Utf8View("SM CASE"), Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#23") AND part.p_container IN ([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = Utf8View("Brand#34") AND part.p_container IN ([Utf8View("LG CASE"), Utf8View("LG BOX"), Utf8View("LG PACK"), Utf8View("LG PKG")]) AND part.p_size <= Int32(15)] @@ -73,7 +73,7 @@ physical_plan 07)------------CoalesceBatchesExec: target_batch_size=8192 08)--------------RepartitionExec: partitioning=Hash([l_partkey@0], 4), input_partitions=4 09)----------------CoalesceBatchesExec: target_batch_size=8192 -10)------------------FilterExec: (l_quantity@1 >= Some(100),15,2 AND l_quantity@1 <= Some(1100),15,2 OR l_quantity@1 >= Some(1000),15,2 AND l_quantity@1 <= Some(2000),15,2 OR l_quantity@1 >= Some(2000),15,2 AND l_quantity@1 <= Some(3000),15,2) AND (l_shipmode@5 = AIR OR l_shipmode@5 = AIR REG) AND l_shipinstruct@4 = DELIVER IN PERSON, projection=[l_partkey@0, l_quantity@1, l_extendedprice@2, l_discount@3] +10)------------------FilterExec: l_shipinstruct@4 = DELIVER IN PERSON AND (l_quantity@1 >= Some(100),15,2 AND l_quantity@1 <= Some(1100),15,2 OR l_quantity@1 >= Some(1000),15,2 AND l_quantity@1 <= Some(2000),15,2 OR l_quantity@1 >= Some(2000),15,2 AND l_quantity@1 <= Some(3000),15,2) AND (l_shipmode@5 = AIR OR l_shipmode@5 = AIR REG), projection=[l_partkey@0, l_quantity@1, l_extendedprice@2, l_discount@3] 11)--------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_partkey, l_quantity, l_extendedprice, l_discount, l_shipinstruct, l_shipmode], file_type=csv, has_header=false 12)------------CoalesceBatchesExec: target_batch_size=8192 13)--------------RepartitionExec: partitioning=Hash([p_partkey@0], 4), input_partitions=4 diff --git a/dev/changelog/49.0.1.md b/dev/changelog/49.0.1.md new file mode 100644 index 0000000000000..06d7c1e2c77a6 --- /dev/null +++ b/dev/changelog/49.0.1.md @@ -0,0 +1,48 @@ + + +# Apache DataFusion 49.0.1 Changelog + +This release consists of 5 commits from 5 contributors. See credits at the end of this changelog for more information. + +See the [upgrade guide](https://datafusion.apache.org/library-user-guide/upgrading.html) for information on how to upgrade from previous versions. + +**Other:** + +- [branch-49] Final Changelog Tweaks [#16852](https://github.com/apache/datafusion/pull/16852) (alamb) +- [branch-49] remove warning from every file open [#17059](https://github.com/apache/datafusion/pull/17059) (mbutrovich) +- [branch-49] Backport PR #16995 to branch-49 [#17068](https://github.com/apache/datafusion/pull/17068) (pepijnve) +- [branch-49] Backport "Add ExecutionPlan::reset_state (apache#17028)" to v49 [#17096](https://github.com/apache/datafusion/pull/17096) (adriangb) +- [branch-49] Backport #17129 to branch 49 [#17143](https://github.com/apache/datafusion/pull/17143) (AdamGS) +- [branch-49] Backport Pass the input schema to stats_projection for ProjectionExpr (#17123) [#17174](https://github.com/apache/datafusion/pull/17174) (alamb) +- [branch-49] fix: string_agg not respecting ORDER BY [#17058](https://github.com/apache/datafusion/pull/17058) (nuno-faria) + +## Credits + +Thank you to everyone who contributed to this release. Here is a breakdown of commits (PRs merged) per contributor. + +``` + 1 Adam Gutglick + 1 Adrian Garcia Badaracco + 1 Andrew Lamb + 1 Matt Butrovich + 1 Pepijn Van Eeckhoudt +``` + +Thank you also to everyone who contributed in other ways such as filing issues, reviewing PRs, and providing feedback on this release. diff --git a/dev/changelog/49.0.2.md b/dev/changelog/49.0.2.md new file mode 100644 index 0000000000000..7e6fc3e7eb487 --- /dev/null +++ b/dev/changelog/49.0.2.md @@ -0,0 +1,45 @@ + + +# Apache DataFusion 49.0.2 Changelog + +This release consists of 3 commits from 3 contributors. See credits at the end of this changelog for more information. + +See the [upgrade guide](https://datafusion.apache.org/library-user-guide/upgrading.html) for information on how to upgrade from previous versions. + +**Fixed bugs:** + +- fix: align `array_has` null buffer for scalar (#17272) [#17274](https://github.com/apache/datafusion/pull/17274) (comphead) + +**Other:** + +- [branch-49] Backport fix: deserialization error for FilterExec (predicates with inlist) [#17254](https://github.com/apache/datafusion/pull/17254) (haohuaijin) +- [branch-49] FFI_RecordBatchStream was causing a memory leak (#17190) [#17270](https://github.com/apache/datafusion/pull/17270) (timsaucer) + +## Credits + +Thank you to everyone who contributed to this release. Here is a breakdown of commits (PRs merged) per contributor. + +``` + 1 Huaijin + 1 Oleks V + 1 Tim Saucer +``` + +Thank you also to everyone who contributed in other ways such as filing issues, reviewing PRs, and providing feedback on this release. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 9ac1c59caa800..ce983bfe96b9f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -56,7 +56,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.metadata_size_hint | NULL | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | | datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | | datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | -| datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | +| datafusion.execution.parquet.schema_force_view_types | false | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | @@ -70,7 +70,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.statistics_enabled | page | (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.max_statistics_size | 4096 | (writing) Sets max statistics size for any column. If NULL, uses default parquet writer setting max_statistics_size is deprecated, currently it is not being used | | datafusion.execution.parquet.max_row_group_size | 1048576 | (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. | -| datafusion.execution.parquet.created_by | datafusion version 49.0.0 | (writing) Sets "created by" property | +| datafusion.execution.parquet.created_by | datafusion version 49.0.2 | (writing) Sets "created by" property | | datafusion.execution.parquet.column_index_truncate_length | 64 | (writing) Sets column index truncate length | | datafusion.execution.parquet.statistics_truncate_length | NULL | (writing) Sets statictics truncate length. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.data_page_row_count_limit | 20000 | (writing) Sets best effort maximum number of rows in data page |