Skip to content

Commit 8baa2e8

Browse files
committed
Add DistributedPlanError::NonDistributable and do not distribute SHOW COLUMNS operations
1 parent f4b94f0 commit 8baa2e8

File tree

2 files changed

+134
-5
lines changed

2 files changed

+134
-5
lines changed

src/distributed_physical_optimizer_rule.rs

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use datafusion::physical_plan::ExecutionPlanProperties;
1010
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
1111
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
1212
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
13+
use datafusion::physical_plan::streaming::StreamingTableExec;
1314
use datafusion::{
1415
common::tree_node::{Transformed, TreeNode},
1516
config::ConfigOptions,
@@ -196,7 +197,15 @@ impl DistributedPhysicalOptimizerRule {
196197
pub fn distribute_plan(
197198
plan: Arc<dyn ExecutionPlan>,
198199
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
199-
let stage = Self::_distribute_plan_inner(Uuid::new_v4(), plan, &mut 1, 0, 1)?;
200+
let stage = match Self::_distribute_plan_inner(Uuid::new_v4(), plan.clone(), &mut 1, 0, 1) {
201+
Ok(stage) => stage,
202+
Err(err) => {
203+
return match get_distribute_plan_err(&err) {
204+
Some(DistributedPlanError::NonDistributable) => Ok(plan),
205+
_ => Err(err),
206+
};
207+
}
208+
};
200209
let plan = stage.plan.decoded()?;
201210
Ok(Arc::new(DistributedExec::new(Arc::clone(plan))))
202211
}
@@ -219,6 +228,11 @@ impl DistributedPhysicalOptimizerRule {
219228
}
220229
}
221230

231+
// We cannot distribute [StreamingTableExec] nodes, so abort distribution.
232+
if plan.as_any().is::<StreamingTableExec>() {
233+
return Err(non_distributable_err())
234+
}
235+
222236
if let Some(node) = plan.as_any().downcast_ref::<PartitionIsolatorExec>() {
223237
// If there's only 1 task, no need to perform any isolation.
224238
if n_tasks == 1 {
@@ -259,6 +273,11 @@ impl DistributedPhysicalOptimizerRule {
259273
}
260274
dnode = Referenced::Arced(dnode.as_ref().with_input_task_count(*limit)?);
261275
}
276+
Some(DistributedPlanError::NonDistributable) => {
277+
// This full plan is non-distributable, so abort any task and stage
278+
// assignation.
279+
return Err(non_distributable_err());
280+
}
262281
},
263282
}
264283
};
@@ -376,14 +395,17 @@ enum DistributedPlanError {
376395
/// Prompts the planner to limit the amount of tasks used in the stage that is currently
377396
/// being planned.
378397
LimitTasks(usize),
398+
/// Signals the planner that this whole plan is non-distributable. This can happen if
399+
/// certain nodes are present, like [StreamingTableExec], which are typically used in
400+
/// queries that rather performing some execution, they perform some introspection.
401+
NonDistributable,
379402
}
380403

381404
impl Display for DistributedPlanError {
382405
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
383406
match self {
384-
DistributedPlanError::LimitTasks(n) => {
385-
write!(f, "LimitTasksErr: {n}")
386-
}
407+
DistributedPlanError::LimitTasks(n) => write!(f, "LimitTasksErr: {n}"),
408+
DistributedPlanError::NonDistributable => write!(f, "NonDistributable"),
387409
}
388410
}
389411
}
@@ -396,6 +418,12 @@ pub fn limit_tasks_err(limit: usize) -> DataFusionError {
396418
DataFusionError::External(Box::new(DistributedPlanError::LimitTasks(limit)))
397419
}
398420

421+
/// Builds a [DistributedPlanError::NonDistributable] error. This error prompts the distributed
422+
/// planner to not distribute the query at all.
423+
pub fn non_distributable_err() -> DataFusionError {
424+
DataFusionError::External(Box::new(DistributedPlanError::NonDistributable))
425+
}
426+
399427
fn get_distribute_plan_err(err: &DataFusionError) -> Option<&DistributedPlanError> {
400428
let DataFusionError::External(err) = err else {
401429
return None;
@@ -630,6 +658,21 @@ mod tests {
630658
");
631659
}
632660

661+
#[tokio::test]
662+
async fn test_show_columns() {
663+
let query = r#"SHOW COLUMNS from weather"#;
664+
let plan = sql_to_explain(query, 2).await.unwrap();
665+
assert_snapshot!(plan, @r"
666+
CoalescePartitionsExec
667+
NetworkCoalesceExec
668+
ProjectionExec: expr=[table_catalog@0 as table_catalog, table_schema@1 as table_schema, table_name@2 as table_name, column_name@3 as column_name, data_type@5 as data_type, is_nullable@4 as is_nullable]
669+
CoalesceBatchesExec: target_batch_size=8192
670+
FilterExec: table_name@2 = weather
671+
RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
672+
StreamingTableExec: partition_sizes=1, projection=[table_catalog, table_schema, table_name, column_name, is_nullable, data_type]
673+
");
674+
}
675+
633676
async fn sql_to_explain(query: &str, tasks: usize) -> Result<String, DataFusionError> {
634677
sql_to_explain_with_rule(
635678
query,
@@ -644,7 +687,9 @@ mod tests {
644687
query: &str,
645688
rule: DistributedPhysicalOptimizerRule,
646689
) -> Result<String, DataFusionError> {
647-
let config = SessionConfig::new().with_target_partitions(4);
690+
let config = SessionConfig::new()
691+
.with_target_partitions(4)
692+
.with_information_schema(true);
648693

649694
let state = SessionStateBuilder::new()
650695
.with_default_features()

tests/introspection.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
#[cfg(all(feature = "integration", test))]
2+
mod tests {
3+
use datafusion::arrow::util::pretty::pretty_format_batches;
4+
use datafusion::execution::SessionStateBuilder;
5+
use datafusion::physical_optimizer::PhysicalOptimizerRule;
6+
use datafusion::physical_plan::execute_stream;
7+
use datafusion::prelude::SessionConfig;
8+
use datafusion_distributed::test_utils::localhost::start_localhost_context;
9+
use datafusion_distributed::test_utils::parquet::register_parquet_tables;
10+
use datafusion_distributed::{
11+
DefaultSessionBuilder, DistributedPhysicalOptimizerRule,
12+
MappedDistributedSessionBuilderExt, assert_snapshot, display_plan_ascii,
13+
};
14+
use futures::TryStreamExt;
15+
use std::error::Error;
16+
17+
#[tokio::test]
18+
async fn distributed_show_columns() -> Result<(), Box<dyn Error>> {
19+
let (ctx, _guard) = start_localhost_context(
20+
3,
21+
DefaultSessionBuilder.map(|mut v: SessionStateBuilder| {
22+
v = v.with_config(SessionConfig::default().with_information_schema(true));
23+
Ok(v.build())
24+
}),
25+
)
26+
.await;
27+
register_parquet_tables(&ctx).await?;
28+
29+
let df = ctx.sql(r#"SHOW COLUMNS from weather"#).await?;
30+
let physical = df.create_physical_plan().await?;
31+
let physical_distributed = DistributedPhysicalOptimizerRule::default()
32+
.with_network_shuffle_tasks(2)
33+
.optimize(physical.clone(), &Default::default())?;
34+
35+
let physical_distributed_str = display_plan_ascii(physical_distributed.as_ref());
36+
37+
assert_snapshot!(physical_distributed_str,
38+
@r"
39+
CoalescePartitionsExec
40+
ProjectionExec: expr=[table_catalog@0 as table_catalog, table_schema@1 as table_schema, table_name@2 as table_name, column_name@3 as column_name, data_type@5 as data_type, is_nullable@4 as is_nullable]
41+
CoalesceBatchesExec: target_batch_size=8192
42+
FilterExec: table_name@2 = weather
43+
RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
44+
StreamingTableExec: partition_sizes=1, projection=[table_catalog, table_schema, table_name, column_name, is_nullable, data_type]
45+
",
46+
);
47+
48+
let batches_distributed = pretty_format_batches(
49+
&execute_stream(physical_distributed, ctx.task_ctx())?
50+
.try_collect::<Vec<_>>()
51+
.await?,
52+
)?;
53+
assert_snapshot!(batches_distributed, @r"
54+
+---------------+--------------+------------+---------------+-----------+-------------+
55+
| table_catalog | table_schema | table_name | column_name | data_type | is_nullable |
56+
+---------------+--------------+------------+---------------+-----------+-------------+
57+
| datafusion | public | weather | MinTemp | Float64 | YES |
58+
| datafusion | public | weather | MaxTemp | Float64 | YES |
59+
| datafusion | public | weather | Rainfall | Float64 | YES |
60+
| datafusion | public | weather | Evaporation | Float64 | YES |
61+
| datafusion | public | weather | Sunshine | Utf8View | YES |
62+
| datafusion | public | weather | WindGustDir | Utf8View | YES |
63+
| datafusion | public | weather | WindGustSpeed | Utf8View | YES |
64+
| datafusion | public | weather | WindDir9am | Utf8View | YES |
65+
| datafusion | public | weather | WindDir3pm | Utf8View | YES |
66+
| datafusion | public | weather | WindSpeed9am | Utf8View | YES |
67+
| datafusion | public | weather | WindSpeed3pm | Int64 | YES |
68+
| datafusion | public | weather | Humidity9am | Int64 | YES |
69+
| datafusion | public | weather | Humidity3pm | Int64 | YES |
70+
| datafusion | public | weather | Pressure9am | Float64 | YES |
71+
| datafusion | public | weather | Pressure3pm | Float64 | YES |
72+
| datafusion | public | weather | Cloud9am | Int64 | YES |
73+
| datafusion | public | weather | Cloud3pm | Int64 | YES |
74+
| datafusion | public | weather | Temp9am | Float64 | YES |
75+
| datafusion | public | weather | Temp3pm | Float64 | YES |
76+
| datafusion | public | weather | RainToday | Utf8View | YES |
77+
| datafusion | public | weather | RISK_MM | Float64 | YES |
78+
| datafusion | public | weather | RainTomorrow | Utf8View | YES |
79+
+---------------+--------------+------------+---------------+-----------+-------------+
80+
");
81+
82+
Ok(())
83+
}
84+
}

0 commit comments

Comments
 (0)