diff --git a/src/distributed_physical_optimizer_rule.rs b/src/distributed_physical_optimizer_rule.rs index 90cfd0f..2cc946a 100644 --- a/src/distributed_physical_optimizer_rule.rs +++ b/src/distributed_physical_optimizer_rule.rs @@ -10,6 +10,7 @@ use datafusion::physical_plan::ExecutionPlanProperties; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode}; use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use datafusion::physical_plan::streaming::StreamingTableExec; use datafusion::{ common::tree_node::{Transformed, TreeNode}, config::ConfigOptions, @@ -196,7 +197,23 @@ impl DistributedPhysicalOptimizerRule { pub fn distribute_plan( plan: Arc, ) -> Result, DataFusionError> { - let stage = Self::_distribute_plan_inner(Uuid::new_v4(), plan, &mut 1, 0, 1)?; + let stage = match Self::_distribute_plan_inner(Uuid::new_v4(), plan.clone(), &mut 1, 0, 1) { + Ok(stage) => stage, + Err(err) => { + return match get_distribute_plan_err(&err) { + Some(DistributedPlanError::NonDistributable(_)) => plan + .transform_down(|plan| { + // If the node cannot be distributed, rollback all the network boundaries. + if let Some(nb) = plan.as_network_boundary() { + return Ok(Transformed::yes(nb.rollback()?)); + } + Ok(Transformed::no(plan)) + }) + .map(|v| v.data), + _ => Err(err), + }; + } + }; let plan = stage.plan.decoded()?; Ok(Arc::new(DistributedExec::new(Arc::clone(plan)))) } @@ -219,6 +236,11 @@ impl DistributedPhysicalOptimizerRule { } } + // We cannot distribute [StreamingTableExec] nodes, so abort distribution. + if plan.as_any().is::() { + return Err(non_distributable_err(StreamingTableExec::static_name())) + } + if let Some(node) = plan.as_any().downcast_ref::() { // If there's only 1 task, no need to perform any isolation. if n_tasks == 1 { @@ -259,6 +281,11 @@ impl DistributedPhysicalOptimizerRule { } dnode = Referenced::Arced(dnode.as_ref().with_input_task_count(*limit)?); } + Some(DistributedPlanError::NonDistributable(_)) => { + // This full plan is non-distributable, so abort any task and stage + // assignation. + return Err(e); + } }, } }; @@ -376,14 +403,17 @@ enum DistributedPlanError { /// Prompts the planner to limit the amount of tasks used in the stage that is currently /// being planned. LimitTasks(usize), + /// Signals the planner that this whole plan is non-distributable. This can happen if + /// certain nodes are present, like [StreamingTableExec], which are typically used in + /// queries that rather performing some execution, they perform some introspection. + NonDistributable(&'static str), } impl Display for DistributedPlanError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - DistributedPlanError::LimitTasks(n) => { - write!(f, "LimitTasksErr: {n}") - } + DistributedPlanError::LimitTasks(n) => write!(f, "LimitTasksErr: {n}"), + DistributedPlanError::NonDistributable(name) => write!(f, "NonDistributable: {name}"), } } } @@ -396,6 +426,12 @@ pub fn limit_tasks_err(limit: usize) -> DataFusionError { DataFusionError::External(Box::new(DistributedPlanError::LimitTasks(limit))) } +/// Builds a [DistributedPlanError::NonDistributable] error. This error prompts the distributed +/// planner to not distribute the query at all. +pub fn non_distributable_err(name: &'static str) -> DataFusionError { + DataFusionError::External(Box::new(DistributedPlanError::NonDistributable(name))) +} + fn get_distribute_plan_err(err: &DataFusionError) -> Option<&DistributedPlanError> { let DataFusionError::External(err) = err else { return None; @@ -630,6 +666,20 @@ mod tests { "); } + #[tokio::test] + async fn test_show_columns() { + let query = r#"SHOW COLUMNS from weather"#; + let plan = sql_to_explain(query, 2).await.unwrap(); + assert_snapshot!(plan, @r" + CoalescePartitionsExec + ProjectionExec: expr=[table_catalog@0 as table_catalog, table_schema@1 as table_schema, table_name@2 as table_name, column_name@3 as column_name, data_type@5 as data_type, is_nullable@4 as is_nullable] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: table_name@2 = weather + RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[table_catalog, table_schema, table_name, column_name, is_nullable, data_type] + "); + } + async fn sql_to_explain(query: &str, tasks: usize) -> Result { sql_to_explain_with_rule( query, @@ -644,7 +694,9 @@ mod tests { query: &str, rule: DistributedPhysicalOptimizerRule, ) -> Result { - let config = SessionConfig::new().with_target_partitions(4); + let config = SessionConfig::new() + .with_target_partitions(4) + .with_information_schema(true); let state = SessionStateBuilder::new() .with_default_features() diff --git a/tests/introspection.rs b/tests/introspection.rs new file mode 100644 index 0000000..793506c --- /dev/null +++ b/tests/introspection.rs @@ -0,0 +1,85 @@ +#[cfg(all(feature = "integration", test))] +mod tests { + use datafusion::arrow::util::pretty::pretty_format_batches; + use datafusion::execution::SessionStateBuilder; + use datafusion::physical_optimizer::PhysicalOptimizerRule; + use datafusion::physical_plan::execute_stream; + use datafusion::prelude::SessionConfig; + use datafusion_distributed::test_utils::localhost::start_localhost_context; + use datafusion_distributed::test_utils::parquet::register_parquet_tables; + use datafusion_distributed::{ + DefaultSessionBuilder, DistributedPhysicalOptimizerRule, + MappedDistributedSessionBuilderExt, assert_snapshot, display_plan_ascii, + }; + use futures::TryStreamExt; + use std::error::Error; + + #[tokio::test] + async fn distributed_show_columns() -> Result<(), Box> { + let (ctx, _guard) = start_localhost_context( + 3, + DefaultSessionBuilder.map(|mut v: SessionStateBuilder| { + v = v.with_config(SessionConfig::default().with_information_schema(true)); + Ok(v.build()) + }), + ) + .await; + register_parquet_tables(&ctx).await?; + + let df = ctx.sql(r#"SHOW COLUMNS from weather"#).await?; + let physical = df.create_physical_plan().await?; + let physical_distributed = DistributedPhysicalOptimizerRule::default() + .with_network_shuffle_tasks(2) + .with_network_coalesce_tasks(2) + .optimize(physical.clone(), &Default::default())?; + + let physical_distributed_str = display_plan_ascii(physical_distributed.as_ref()); + + assert_snapshot!(physical_distributed_str, + @r" + CoalescePartitionsExec + ProjectionExec: expr=[table_catalog@0 as table_catalog, table_schema@1 as table_schema, table_name@2 as table_name, column_name@3 as column_name, data_type@5 as data_type, is_nullable@4 as is_nullable] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: table_name@2 = weather + RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[table_catalog, table_schema, table_name, column_name, is_nullable, data_type] + ", + ); + + let batches_distributed = pretty_format_batches( + &execute_stream(physical_distributed, ctx.task_ctx())? + .try_collect::>() + .await?, + )?; + assert_snapshot!(batches_distributed, @r" + +---------------+--------------+------------+---------------+-----------+-------------+ + | table_catalog | table_schema | table_name | column_name | data_type | is_nullable | + +---------------+--------------+------------+---------------+-----------+-------------+ + | datafusion | public | weather | MinTemp | Float64 | YES | + | datafusion | public | weather | MaxTemp | Float64 | YES | + | datafusion | public | weather | Rainfall | Float64 | YES | + | datafusion | public | weather | Evaporation | Float64 | YES | + | datafusion | public | weather | Sunshine | Utf8View | YES | + | datafusion | public | weather | WindGustDir | Utf8View | YES | + | datafusion | public | weather | WindGustSpeed | Utf8View | YES | + | datafusion | public | weather | WindDir9am | Utf8View | YES | + | datafusion | public | weather | WindDir3pm | Utf8View | YES | + | datafusion | public | weather | WindSpeed9am | Utf8View | YES | + | datafusion | public | weather | WindSpeed3pm | Int64 | YES | + | datafusion | public | weather | Humidity9am | Int64 | YES | + | datafusion | public | weather | Humidity3pm | Int64 | YES | + | datafusion | public | weather | Pressure9am | Float64 | YES | + | datafusion | public | weather | Pressure3pm | Float64 | YES | + | datafusion | public | weather | Cloud9am | Int64 | YES | + | datafusion | public | weather | Cloud3pm | Int64 | YES | + | datafusion | public | weather | Temp9am | Float64 | YES | + | datafusion | public | weather | Temp3pm | Float64 | YES | + | datafusion | public | weather | RainToday | Utf8View | YES | + | datafusion | public | weather | RISK_MM | Float64 | YES | + | datafusion | public | weather | RainTomorrow | Utf8View | YES | + +---------------+--------------+------------+---------------+-----------+-------------+ + "); + + Ok(()) + } +}