Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 57 additions & 5 deletions src/distributed_physical_optimizer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use datafusion::physical_plan::ExecutionPlanProperties;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::streaming::StreamingTableExec;
use datafusion::{
common::tree_node::{Transformed, TreeNode},
config::ConfigOptions,
Expand Down Expand Up @@ -196,7 +197,23 @@ impl DistributedPhysicalOptimizerRule {
pub fn distribute_plan(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let stage = Self::_distribute_plan_inner(Uuid::new_v4(), plan, &mut 1, 0, 1)?;
let stage = match Self::_distribute_plan_inner(Uuid::new_v4(), plan.clone(), &mut 1, 0, 1) {
Ok(stage) => stage,
Err(err) => {
return match get_distribute_plan_err(&err) {
Some(DistributedPlanError::NonDistributable(_)) => plan
.transform_down(|plan| {
// If the node cannot be distributed, rollback all the network boundaries.
if let Some(nb) = plan.as_network_boundary() {
return Ok(Transformed::yes(nb.rollback()?));
}
Ok(Transformed::no(plan))
})
.map(|v| v.data),
_ => Err(err),
};
}
};
let plan = stage.plan.decoded()?;
Ok(Arc::new(DistributedExec::new(Arc::clone(plan))))
}
Expand All @@ -219,6 +236,11 @@ impl DistributedPhysicalOptimizerRule {
}
}

// We cannot distribute [StreamingTableExec] nodes, so abort distribution.
if plan.as_any().is::<StreamingTableExec>() {
return Err(non_distributable_err(StreamingTableExec::static_name()))
}

if let Some(node) = plan.as_any().downcast_ref::<PartitionIsolatorExec>() {
// If there's only 1 task, no need to perform any isolation.
if n_tasks == 1 {
Expand Down Expand Up @@ -259,6 +281,11 @@ impl DistributedPhysicalOptimizerRule {
}
dnode = Referenced::Arced(dnode.as_ref().with_input_task_count(*limit)?);
}
Some(DistributedPlanError::NonDistributable(_)) => {
// This full plan is non-distributable, so abort any task and stage
// assignation.
return Err(e);
}
},
}
};
Expand Down Expand Up @@ -376,14 +403,17 @@ enum DistributedPlanError {
/// Prompts the planner to limit the amount of tasks used in the stage that is currently
/// being planned.
LimitTasks(usize),
/// Signals the planner that this whole plan is non-distributable. This can happen if
/// certain nodes are present, like [StreamingTableExec], which are typically used in
/// queries that rather performing some execution, they perform some introspection.
NonDistributable(&'static str),
}

impl Display for DistributedPlanError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
DistributedPlanError::LimitTasks(n) => {
write!(f, "LimitTasksErr: {n}")
}
DistributedPlanError::LimitTasks(n) => write!(f, "LimitTasksErr: {n}"),
DistributedPlanError::NonDistributable(name) => write!(f, "NonDistributable: {name}"),
}
}
}
Expand All @@ -396,6 +426,12 @@ pub fn limit_tasks_err(limit: usize) -> DataFusionError {
DataFusionError::External(Box::new(DistributedPlanError::LimitTasks(limit)))
}

/// Builds a [DistributedPlanError::NonDistributable] error. This error prompts the distributed
/// planner to not distribute the query at all.
pub fn non_distributable_err(name: &'static str) -> DataFusionError {
DataFusionError::External(Box::new(DistributedPlanError::NonDistributable(name)))
}

fn get_distribute_plan_err(err: &DataFusionError) -> Option<&DistributedPlanError> {
let DataFusionError::External(err) = err else {
return None;
Expand Down Expand Up @@ -630,6 +666,20 @@ mod tests {
");
}

#[tokio::test]
async fn test_show_columns() {
let query = r#"SHOW COLUMNS from weather"#;
let plan = sql_to_explain(query, 2).await.unwrap();
assert_snapshot!(plan, @r"
CoalescePartitionsExec
ProjectionExec: expr=[table_catalog@0 as table_catalog, table_schema@1 as table_schema, table_name@2 as table_name, column_name@3 as column_name, data_type@5 as data_type, is_nullable@4 as is_nullable]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: table_name@2 = weather
RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
StreamingTableExec: partition_sizes=1, projection=[table_catalog, table_schema, table_name, column_name, is_nullable, data_type]
");
}

async fn sql_to_explain(query: &str, tasks: usize) -> Result<String, DataFusionError> {
sql_to_explain_with_rule(
query,
Expand All @@ -644,7 +694,9 @@ mod tests {
query: &str,
rule: DistributedPhysicalOptimizerRule,
) -> Result<String, DataFusionError> {
let config = SessionConfig::new().with_target_partitions(4);
let config = SessionConfig::new()
.with_target_partitions(4)
.with_information_schema(true);

let state = SessionStateBuilder::new()
.with_default_features()
Expand Down
85 changes: 85 additions & 0 deletions tests/introspection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#[cfg(all(feature = "integration", test))]
mod tests {
use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::execute_stream;
use datafusion::prelude::SessionConfig;
use datafusion_distributed::test_utils::localhost::start_localhost_context;
use datafusion_distributed::test_utils::parquet::register_parquet_tables;
use datafusion_distributed::{
DefaultSessionBuilder, DistributedPhysicalOptimizerRule,
MappedDistributedSessionBuilderExt, assert_snapshot, display_plan_ascii,
};
use futures::TryStreamExt;
use std::error::Error;

#[tokio::test]
async fn distributed_show_columns() -> Result<(), Box<dyn Error>> {
let (ctx, _guard) = start_localhost_context(
3,
DefaultSessionBuilder.map(|mut v: SessionStateBuilder| {
v = v.with_config(SessionConfig::default().with_information_schema(true));
Ok(v.build())
}),
)
.await;
register_parquet_tables(&ctx).await?;

let df = ctx.sql(r#"SHOW COLUMNS from weather"#).await?;
let physical = df.create_physical_plan().await?;
let physical_distributed = DistributedPhysicalOptimizerRule::default()
.with_network_shuffle_tasks(2)
.with_network_coalesce_tasks(2)
.optimize(physical.clone(), &Default::default())?;

let physical_distributed_str = display_plan_ascii(physical_distributed.as_ref());

assert_snapshot!(physical_distributed_str,
@r"
CoalescePartitionsExec
ProjectionExec: expr=[table_catalog@0 as table_catalog, table_schema@1 as table_schema, table_name@2 as table_name, column_name@3 as column_name, data_type@5 as data_type, is_nullable@4 as is_nullable]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: table_name@2 = weather
RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
StreamingTableExec: partition_sizes=1, projection=[table_catalog, table_schema, table_name, column_name, is_nullable, data_type]
",
);

let batches_distributed = pretty_format_batches(
&execute_stream(physical_distributed, ctx.task_ctx())?
.try_collect::<Vec<_>>()
.await?,
)?;
assert_snapshot!(batches_distributed, @r"
+---------------+--------------+------------+---------------+-----------+-------------+
| table_catalog | table_schema | table_name | column_name | data_type | is_nullable |
+---------------+--------------+------------+---------------+-----------+-------------+
| datafusion | public | weather | MinTemp | Float64 | YES |
| datafusion | public | weather | MaxTemp | Float64 | YES |
| datafusion | public | weather | Rainfall | Float64 | YES |
| datafusion | public | weather | Evaporation | Float64 | YES |
| datafusion | public | weather | Sunshine | Utf8View | YES |
| datafusion | public | weather | WindGustDir | Utf8View | YES |
| datafusion | public | weather | WindGustSpeed | Utf8View | YES |
| datafusion | public | weather | WindDir9am | Utf8View | YES |
| datafusion | public | weather | WindDir3pm | Utf8View | YES |
| datafusion | public | weather | WindSpeed9am | Utf8View | YES |
| datafusion | public | weather | WindSpeed3pm | Int64 | YES |
| datafusion | public | weather | Humidity9am | Int64 | YES |
| datafusion | public | weather | Humidity3pm | Int64 | YES |
| datafusion | public | weather | Pressure9am | Float64 | YES |
| datafusion | public | weather | Pressure3pm | Float64 | YES |
| datafusion | public | weather | Cloud9am | Int64 | YES |
| datafusion | public | weather | Cloud3pm | Int64 | YES |
| datafusion | public | weather | Temp9am | Float64 | YES |
| datafusion | public | weather | Temp3pm | Float64 | YES |
| datafusion | public | weather | RainToday | Utf8View | YES |
| datafusion | public | weather | RISK_MM | Float64 | YES |
| datafusion | public | weather | RainTomorrow | Utf8View | YES |
+---------------+--------------+------------+---------------+-----------+-------------+
");

Ok(())
}
}