Skip to content

Commit 3c7e489

Browse files
committed
Add DistributedPlanError::NonDistributable and do not distribute SHOW COLUMNS operations
1 parent f4b94f0 commit 3c7e489

File tree

2 files changed

+142
-5
lines changed

2 files changed

+142
-5
lines changed

src/distributed_physical_optimizer_rule.rs

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use datafusion::physical_plan::ExecutionPlanProperties;
1010
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
1111
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
1212
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
13+
use datafusion::physical_plan::streaming::StreamingTableExec;
1314
use datafusion::{
1415
common::tree_node::{Transformed, TreeNode},
1516
config::ConfigOptions,
@@ -196,7 +197,23 @@ impl DistributedPhysicalOptimizerRule {
196197
pub fn distribute_plan(
197198
plan: Arc<dyn ExecutionPlan>,
198199
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
199-
let stage = Self::_distribute_plan_inner(Uuid::new_v4(), plan, &mut 1, 0, 1)?;
200+
let stage = match Self::_distribute_plan_inner(Uuid::new_v4(), plan.clone(), &mut 1, 0, 1) {
201+
Ok(stage) => stage,
202+
Err(err) => {
203+
return match get_distribute_plan_err(&err) {
204+
Some(DistributedPlanError::NonDistributable(_)) => plan
205+
.transform_down(|plan| {
206+
// If the node cannot be distributed, rollback all the network boundaries.
207+
if let Some(nb) = plan.as_network_boundary() {
208+
return Ok(Transformed::yes(nb.rollback()?));
209+
}
210+
Ok(Transformed::no(plan))
211+
})
212+
.map(|v| v.data),
213+
_ => Err(err),
214+
};
215+
}
216+
};
200217
let plan = stage.plan.decoded()?;
201218
Ok(Arc::new(DistributedExec::new(Arc::clone(plan))))
202219
}
@@ -219,6 +236,11 @@ impl DistributedPhysicalOptimizerRule {
219236
}
220237
}
221238

239+
// We cannot distribute [StreamingTableExec] nodes, so abort distribution.
240+
if plan.as_any().is::<StreamingTableExec>() {
241+
return Err(non_distributable_err(StreamingTableExec::static_name()))
242+
}
243+
222244
if let Some(node) = plan.as_any().downcast_ref::<PartitionIsolatorExec>() {
223245
// If there's only 1 task, no need to perform any isolation.
224246
if n_tasks == 1 {
@@ -259,6 +281,11 @@ impl DistributedPhysicalOptimizerRule {
259281
}
260282
dnode = Referenced::Arced(dnode.as_ref().with_input_task_count(*limit)?);
261283
}
284+
Some(DistributedPlanError::NonDistributable(_)) => {
285+
// This full plan is non-distributable, so abort any task and stage
286+
// assignation.
287+
return Err(e);
288+
}
262289
},
263290
}
264291
};
@@ -376,14 +403,17 @@ enum DistributedPlanError {
376403
/// Prompts the planner to limit the amount of tasks used in the stage that is currently
377404
/// being planned.
378405
LimitTasks(usize),
406+
/// Signals the planner that this whole plan is non-distributable. This can happen if
407+
/// certain nodes are present, like [StreamingTableExec], which are typically used in
408+
/// queries that rather performing some execution, they perform some introspection.
409+
NonDistributable(&'static str),
379410
}
380411

381412
impl Display for DistributedPlanError {
382413
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
383414
match self {
384-
DistributedPlanError::LimitTasks(n) => {
385-
write!(f, "LimitTasksErr: {n}")
386-
}
415+
DistributedPlanError::LimitTasks(n) => write!(f, "LimitTasksErr: {n}"),
416+
DistributedPlanError::NonDistributable(name) => write!(f, "NonDistributable: {name}"),
387417
}
388418
}
389419
}
@@ -396,6 +426,12 @@ pub fn limit_tasks_err(limit: usize) -> DataFusionError {
396426
DataFusionError::External(Box::new(DistributedPlanError::LimitTasks(limit)))
397427
}
398428

429+
/// Builds a [DistributedPlanError::NonDistributable] error. This error prompts the distributed
430+
/// planner to not distribute the query at all.
431+
pub fn non_distributable_err(name: &'static str) -> DataFusionError {
432+
DataFusionError::External(Box::new(DistributedPlanError::NonDistributable(name)))
433+
}
434+
399435
fn get_distribute_plan_err(err: &DataFusionError) -> Option<&DistributedPlanError> {
400436
let DataFusionError::External(err) = err else {
401437
return None;
@@ -630,6 +666,20 @@ mod tests {
630666
");
631667
}
632668

669+
#[tokio::test]
670+
async fn test_show_columns() {
671+
let query = r#"SHOW COLUMNS from weather"#;
672+
let plan = sql_to_explain(query, 2).await.unwrap();
673+
assert_snapshot!(plan, @r"
674+
CoalescePartitionsExec
675+
ProjectionExec: expr=[table_catalog@0 as table_catalog, table_schema@1 as table_schema, table_name@2 as table_name, column_name@3 as column_name, data_type@5 as data_type, is_nullable@4 as is_nullable]
676+
CoalesceBatchesExec: target_batch_size=8192
677+
FilterExec: table_name@2 = weather
678+
RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
679+
StreamingTableExec: partition_sizes=1, projection=[table_catalog, table_schema, table_name, column_name, is_nullable, data_type]
680+
");
681+
}
682+
633683
async fn sql_to_explain(query: &str, tasks: usize) -> Result<String, DataFusionError> {
634684
sql_to_explain_with_rule(
635685
query,
@@ -644,7 +694,9 @@ mod tests {
644694
query: &str,
645695
rule: DistributedPhysicalOptimizerRule,
646696
) -> Result<String, DataFusionError> {
647-
let config = SessionConfig::new().with_target_partitions(4);
697+
let config = SessionConfig::new()
698+
.with_target_partitions(4)
699+
.with_information_schema(true);
648700

649701
let state = SessionStateBuilder::new()
650702
.with_default_features()

tests/introspection.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#[cfg(all(feature = "integration", test))]
2+
mod tests {
3+
use datafusion::arrow::util::pretty::pretty_format_batches;
4+
use datafusion::execution::SessionStateBuilder;
5+
use datafusion::physical_optimizer::PhysicalOptimizerRule;
6+
use datafusion::physical_plan::execute_stream;
7+
use datafusion::prelude::SessionConfig;
8+
use datafusion_distributed::test_utils::localhost::start_localhost_context;
9+
use datafusion_distributed::test_utils::parquet::register_parquet_tables;
10+
use datafusion_distributed::{
11+
DefaultSessionBuilder, DistributedPhysicalOptimizerRule,
12+
MappedDistributedSessionBuilderExt, assert_snapshot, display_plan_ascii,
13+
};
14+
use futures::TryStreamExt;
15+
use std::error::Error;
16+
17+
#[tokio::test]
18+
async fn distributed_show_columns() -> Result<(), Box<dyn Error>> {
19+
let (ctx, _guard) = start_localhost_context(
20+
3,
21+
DefaultSessionBuilder.map(|mut v: SessionStateBuilder| {
22+
v = v.with_config(SessionConfig::default().with_information_schema(true));
23+
Ok(v.build())
24+
}),
25+
)
26+
.await;
27+
register_parquet_tables(&ctx).await?;
28+
29+
let df = ctx.sql(r#"SHOW COLUMNS from weather"#).await?;
30+
let physical = df.create_physical_plan().await?;
31+
let physical_distributed = DistributedPhysicalOptimizerRule::default()
32+
.with_network_shuffle_tasks(2)
33+
.with_network_coalesce_tasks(2)
34+
.optimize(physical.clone(), &Default::default())?;
35+
36+
let physical_distributed_str = display_plan_ascii(physical_distributed.as_ref());
37+
38+
assert_snapshot!(physical_distributed_str,
39+
@r"
40+
CoalescePartitionsExec
41+
ProjectionExec: expr=[table_catalog@0 as table_catalog, table_schema@1 as table_schema, table_name@2 as table_name, column_name@3 as column_name, data_type@5 as data_type, is_nullable@4 as is_nullable]
42+
CoalesceBatchesExec: target_batch_size=8192
43+
FilterExec: table_name@2 = weather
44+
RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
45+
StreamingTableExec: partition_sizes=1, projection=[table_catalog, table_schema, table_name, column_name, is_nullable, data_type]
46+
",
47+
);
48+
49+
let batches_distributed = pretty_format_batches(
50+
&execute_stream(physical_distributed, ctx.task_ctx())?
51+
.try_collect::<Vec<_>>()
52+
.await?,
53+
)?;
54+
assert_snapshot!(batches_distributed, @r"
55+
+---------------+--------------+------------+---------------+-----------+-------------+
56+
| table_catalog | table_schema | table_name | column_name | data_type | is_nullable |
57+
+---------------+--------------+------------+---------------+-----------+-------------+
58+
| datafusion | public | weather | MinTemp | Float64 | YES |
59+
| datafusion | public | weather | MaxTemp | Float64 | YES |
60+
| datafusion | public | weather | Rainfall | Float64 | YES |
61+
| datafusion | public | weather | Evaporation | Float64 | YES |
62+
| datafusion | public | weather | Sunshine | Utf8View | YES |
63+
| datafusion | public | weather | WindGustDir | Utf8View | YES |
64+
| datafusion | public | weather | WindGustSpeed | Utf8View | YES |
65+
| datafusion | public | weather | WindDir9am | Utf8View | YES |
66+
| datafusion | public | weather | WindDir3pm | Utf8View | YES |
67+
| datafusion | public | weather | WindSpeed9am | Utf8View | YES |
68+
| datafusion | public | weather | WindSpeed3pm | Int64 | YES |
69+
| datafusion | public | weather | Humidity9am | Int64 | YES |
70+
| datafusion | public | weather | Humidity3pm | Int64 | YES |
71+
| datafusion | public | weather | Pressure9am | Float64 | YES |
72+
| datafusion | public | weather | Pressure3pm | Float64 | YES |
73+
| datafusion | public | weather | Cloud9am | Int64 | YES |
74+
| datafusion | public | weather | Cloud3pm | Int64 | YES |
75+
| datafusion | public | weather | Temp9am | Float64 | YES |
76+
| datafusion | public | weather | Temp3pm | Float64 | YES |
77+
| datafusion | public | weather | RainToday | Utf8View | YES |
78+
| datafusion | public | weather | RISK_MM | Float64 | YES |
79+
| datafusion | public | weather | RainTomorrow | Utf8View | YES |
80+
+---------------+--------------+------------+---------------+-----------+-------------+
81+
");
82+
83+
Ok(())
84+
}
85+
}

0 commit comments

Comments
 (0)