Skip to content

Commit b900fec

Browse files
authored
ArrowFlightReadExec should always have a RepartitionExec as a child + uncomment working tests (#120)
1 parent 9459890 commit b900fec

File tree

9 files changed

+83
-125
lines changed

9 files changed

+83
-125
lines changed

src/physical_optimizer.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use datafusion::{
1313
config::ConfigOptions,
1414
error::Result,
1515
physical_optimizer::PhysicalOptimizerRule,
16-
physical_plan::{repartition::RepartitionExec, ExecutionPlan, ExecutionPlanProperties},
16+
physical_plan::{repartition::RepartitionExec, ExecutionPlan},
1717
};
1818
use uuid::Uuid;
1919

@@ -94,10 +94,7 @@ impl DistributedPhysicalOptimizerRule {
9494
};
9595

9696
return Ok(Transformed::yes(Arc::new(
97-
ArrowFlightReadExec::new_pending(
98-
Arc::clone(&maybe_isolated_plan),
99-
maybe_isolated_plan.output_partitioning().clone(),
100-
),
97+
ArrowFlightReadExec::new_pending(Arc::clone(&maybe_isolated_plan)),
10198
)));
10299
}
103100

src/plan/arrow_flight_read.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ use datafusion::execution::{SendableRecordBatchStream, TaskContext};
1616
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
1717
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
1818
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
19-
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
19+
use datafusion::physical_plan::{
20+
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
21+
};
2022
use futures::{StreamExt, TryFutureExt, TryStreamExt};
2123
use http::Extensions;
2224
use prost::Message;
@@ -57,11 +59,11 @@ pub struct ArrowFlightReadReadyExec {
5759
}
5860

5961
impl ArrowFlightReadExec {
60-
pub fn new_pending(child: Arc<dyn ExecutionPlan>, partitioning: Partitioning) -> Self {
62+
pub fn new_pending(child: Arc<dyn ExecutionPlan>) -> Self {
6163
Self::Pending(ArrowFlightReadPendingExec {
6264
properties: PlanProperties::new(
6365
EquivalenceProperties::new(child.schema()),
64-
partitioning,
66+
child.output_partitioning().clone(),
6567
EmissionType::Incremental,
6668
Boundedness::Bounded,
6769
),

src/test_utils/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,4 @@ pub mod insta;
22
pub mod localhost;
33
pub mod mock_exec;
44
pub mod parquet;
5-
pub mod plan;
65
pub mod tpch;

src/test_utils/plan.rs

Lines changed: 0 additions & 65 deletions
This file was deleted.

tests/custom_config_extension.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ mod tests {
99
};
1010
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
1111
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
12+
use datafusion::physical_plan::repartition::RepartitionExec;
1213
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1314
use datafusion::physical_plan::{
1415
execute_stream, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
@@ -46,10 +47,9 @@ mod tests {
4647
let mut plan: Arc<dyn ExecutionPlan> = Arc::new(CustomConfigExtensionRequiredExec::new());
4748

4849
for size in [1, 2, 3] {
49-
plan = Arc::new(ArrowFlightReadExec::new_pending(
50-
plan,
51-
Partitioning::RoundRobinBatch(size),
52-
));
50+
plan = Arc::new(ArrowFlightReadExec::new_pending(Arc::new(
51+
RepartitionExec::try_new(plan, Partitioning::RoundRobinBatch(size))?,
52+
)));
5353
}
5454

5555
let plan = DistributedPhysicalOptimizerRule::default().distribute_plan(plan)?;

tests/custom_extension_codec.rs

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ mod tests {
3636
use std::sync::Arc;
3737

3838
#[tokio::test]
39-
#[ignore]
4039
async fn custom_extension_codec() -> Result<(), Box<dyn std::error::Error>> {
4140
async fn build_state(
4241
ctx: DistributedSessionBuilderContext,
@@ -66,17 +65,16 @@ mod tests {
6665
│partitions [out:1 <-- in:1 ] SortExec: expr=[numbers@0 DESC NULLS LAST], preserve_partitioning=[false]
6766
│partitions [out:1 <-- in:10 ] RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=10
6867
│partitions [out:10 ] ArrowFlightReadExec: Stage 2
69-
7068
└──────────────────────────────────────────────────
71-
┌───── Stage 2 Task: partitions: 0,unassigned]
72-
│partitions [out:1 <-- in:1 ] SortExec: expr=[numbers@0 DESC NULLS LAST], preserve_partitioning=[false]
73-
│partitions [out:1 ] ArrowFlightReadExec: Stage 1
74-
69+
┌───── Stage 2 Task: partitions: 0..9,unassigned]
70+
│partitions [out:10 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
71+
│partitions [out:1 <-- in:1 ] SortExec: expr=[numbers@0 DESC NULLS LAST], preserve_partitioning=[false]
72+
partitions [out:1 ] ArrowFlightReadExec: Stage 1
7573
└──────────────────────────────────────────────────
7674
┌───── Stage 1 Task: partitions: 0,unassigned]
77-
│partitions [out:1 <-- in:1 ] FilterExec: numbers@0 > 1
78-
│partitions [out:1 ] Int64ListExec: length=6
79-
75+
│partitions [out:1 <-- in:1 ] RepartitionExec: partitioning=Hash([numbers@0], 1), input_partitions=1
76+
│partitions [out:1 <-- in:1 ] FilterExec: numbers@0 > 1
77+
partitions [out:1 ] Int64ListExec: length=6
8078
└──────────────────────────────────────────────────
8179
");
8280

@@ -125,10 +123,12 @@ mod tests {
125123
)?);
126124

127125
if distributed {
128-
plan = Arc::new(ArrowFlightReadExec::new_pending(
129-
plan.clone(),
130-
Partitioning::Hash(vec![col("numbers", &plan.schema())?], 1),
131-
));
126+
plan = Arc::new(ArrowFlightReadExec::new_pending(Arc::new(
127+
RepartitionExec::try_new(
128+
plan.clone(),
129+
Partitioning::Hash(vec![col("numbers", &plan.schema())?], 1),
130+
)?,
131+
)));
132132
}
133133

134134
plan = Arc::new(SortExec::new(
@@ -141,10 +141,9 @@ mod tests {
141141
));
142142

143143
if distributed {
144-
plan = Arc::new(ArrowFlightReadExec::new_pending(
145-
plan.clone(),
146-
Partitioning::RoundRobinBatch(10),
147-
));
144+
plan = Arc::new(ArrowFlightReadExec::new_pending(Arc::new(
145+
RepartitionExec::try_new(plan.clone(), Partitioning::RoundRobinBatch(10))?,
146+
)));
148147

149148
plan = Arc::new(RepartitionExec::try_new(
150149
plan,

tests/distributed_aggregation.rs

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
#[cfg(all(feature = "integration", test))]
22
mod tests {
33
use datafusion::arrow::util::pretty::pretty_format_batches;
4+
use datafusion::physical_optimizer::PhysicalOptimizerRule;
45
use datafusion::physical_plan::{displayable, execute_stream};
56
use datafusion_distributed::test_utils::localhost::start_localhost_context;
67
use datafusion_distributed::test_utils::parquet::register_parquet_tables;
7-
use datafusion_distributed::test_utils::plan::distribute_aggregate;
8-
use datafusion_distributed::{assert_snapshot, DefaultSessionBuilder};
8+
use datafusion_distributed::{
9+
assert_snapshot, DefaultSessionBuilder, DistributedPhysicalOptimizerRule,
10+
};
911
use futures::TryStreamExt;
1012
use std::error::Error;
1113

@@ -21,7 +23,9 @@ mod tests {
2123

2224
let physical_str = displayable(physical.as_ref()).indent(true).to_string();
2325

24-
let physical_distributed = distribute_aggregate(physical.clone())?;
26+
let physical_distributed = DistributedPhysicalOptimizerRule::default()
27+
.with_maximum_partitions_per_task(1)
28+
.optimize(physical.clone(), &Default::default())?;
2529

2630
let physical_distributed_str = displayable(physical_distributed.as_ref())
2731
.indent(true)
@@ -48,21 +52,23 @@ mod tests {
4852
@r"
4953
┌───── Stage 3 Task: partitions: 0,unassigned]
5054
│partitions [out:1 <-- in:1 ] ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
51-
│partitions [out:1 <-- in:8 ] SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
52-
│partitions [out:8 <-- in:8 ] SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
53-
│partitions [out:8 <-- in:8 ] ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
54-
│partitions [out:8 <-- in:8 ] AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
55-
│partitions [out:8 ] ArrowFlightReadExec: Stage 2
55+
│partitions [out:1 <-- in:3 ] SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
56+
│partitions [out:3 <-- in:3 ] SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
57+
│partitions [out:3 <-- in:3 ] ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
58+
│partitions [out:3 <-- in:3 ] AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
59+
│partitions [out:3 <-- in:3 ] CoalesceBatchesExec: target_batch_size=8192
60+
│partitions [out:3 ] ArrowFlightReadExec: Stage 2
5661
└──────────────────────────────────────────────────
57-
┌───── Stage 2 Task: partitions: 0..2,unassigned]
58-
│partitions [out:3 <-- in:3 ] CoalesceBatchesExec: target_batch_size=8192
59-
│partitions [out:3 <-- in:3 ] RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=3
60-
│partitions [out:3 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
61-
│partitions [out:1 <-- in:1 ] AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
62-
│partitions [out:1 ] ArrowFlightReadExec: Stage 1
62+
┌───── Stage 2 Task: partitions: 0,unassigned],Task: partitions: 1,unassigned],Task: partitions: 2,unassigned]
63+
│partitions [out:3 <-- in:1 ] RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=1
64+
│partitions [out:1 <-- in:3 ] PartitionIsolatorExec [providing upto 1 partitions]
65+
│partitions [out:3 ] ArrowFlightReadExec: Stage 1
6366
└──────────────────────────────────────────────────
64-
┌───── Stage 1 Task: partitions: 0,unassigned]
65-
│partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday], file_type=parquet
67+
┌───── Stage 1 Task: partitions: 0,unassigned],Task: partitions: 1,unassigned],Task: partitions: 2,unassigned]
68+
│partitions [out:3 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
69+
│partitions [out:1 <-- in:1 ] PartitionIsolatorExec [providing upto 1 partitions]
70+
│partitions [out:1 <-- in:1 ] AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
71+
│partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday], file_type=parquet
6672
└──────────────────────────────────────────────────
6773
",
6874
);

tests/error_propagation.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod tests {
77
};
88
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
99
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
10+
use datafusion::physical_plan::repartition::RepartitionExec;
1011
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1112
use datafusion::physical_plan::{
1213
execute_stream, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
@@ -42,10 +43,9 @@ mod tests {
4243
let mut plan: Arc<dyn ExecutionPlan> = Arc::new(ErrorExec::new("something failed"));
4344

4445
for size in [1, 2, 3] {
45-
plan = Arc::new(ArrowFlightReadExec::new_pending(
46-
plan,
47-
Partitioning::RoundRobinBatch(size),
48-
));
46+
plan = Arc::new(ArrowFlightReadExec::new_pending(Arc::new(
47+
RepartitionExec::try_new(plan, Partitioning::RoundRobinBatch(size))?,
48+
)));
4949
}
5050
let plan = DistributedPhysicalOptimizerRule::default().distribute_plan(plan)?;
5151
let stream = execute_stream(Arc::new(plan), ctx.task_ctx())?;

tests/highly_distributed_query.rs

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
#[cfg(all(feature = "integration", test))]
22
mod tests {
33
use datafusion::physical_expr::Partitioning;
4+
use datafusion::physical_plan::repartition::RepartitionExec;
45
use datafusion::physical_plan::{displayable, execute_stream};
56
use datafusion_distributed::test_utils::localhost::start_localhost_context;
67
use datafusion_distributed::test_utils::parquet::register_parquet_tables;
7-
use datafusion_distributed::{assert_snapshot, ArrowFlightReadExec, DefaultSessionBuilder};
8+
use datafusion_distributed::{
9+
assert_snapshot, ArrowFlightReadExec, DefaultSessionBuilder,
10+
DistributedPhysicalOptimizerRule,
11+
};
812
use futures::TryStreamExt;
913
use std::error::Error;
1014
use std::sync::Arc;
1115

1216
#[tokio::test]
13-
#[ignore]
1417
async fn highly_distributed_query() -> Result<(), Box<dyn Error>> {
1518
let (ctx, _guard) = start_localhost_context(9, DefaultSessionBuilder).await;
1619
register_parquet_tables(&ctx).await?;
@@ -21,11 +24,17 @@ mod tests {
2124

2225
let mut physical_distributed = physical.clone();
2326
for size in [1, 10, 5] {
24-
physical_distributed = Arc::new(ArrowFlightReadExec::new_pending(
25-
physical_distributed,
26-
Partitioning::RoundRobinBatch(size),
27-
));
27+
physical_distributed = Arc::new(ArrowFlightReadExec::new_pending(Arc::new(
28+
RepartitionExec::try_new(
29+
physical_distributed,
30+
Partitioning::RoundRobinBatch(size),
31+
)?,
32+
)));
2833
}
34+
35+
let physical_distributed =
36+
DistributedPhysicalOptimizerRule::default().distribute_plan(physical_distributed)?;
37+
let physical_distributed = Arc::new(physical_distributed);
2938
let physical_distributed_str = displayable(physical_distributed.as_ref())
3039
.indent(true)
3140
.to_string();
@@ -36,10 +45,21 @@ mod tests {
3645

3746
assert_snapshot!(physical_distributed_str,
3847
@r"
39-
ArrowFlightReadExec: input_tasks=5 hash_expr=[] stage_id=UUID input_stage_id=UUID input_hosts=[http://localhost:50050/, http://localhost:50051/, http://localhost:50053/, http://localhost:50054/, http://localhost:50055/]
40-
ArrowFlightReadExec: input_tasks=10 hash_expr=[] stage_id=UUID input_stage_id=UUID input_hosts=[http://localhost:50056/, http://localhost:50057/, http://localhost:50058/, http://localhost:50059/, http://localhost:50050/, http://localhost:50051/, http://localhost:50053/, http://localhost:50054/, http://localhost:50055/, http://localhost:50056/]
41-
ArrowFlightReadExec: input_tasks=1 hash_expr=[] stage_id=UUID input_stage_id=UUID input_hosts=[http://localhost:50057/]
42-
DataSourceExec: file_groups={1 group: [[/testdata/flights-1m.parquet]]}, projection=[FL_DATE, DEP_DELAY, ARR_DELAY, AIR_TIME, DISTANCE, DEP_TIME, ARR_TIME], file_type=parquet
48+
┌───── Stage 4 Task: partitions: 0..4,unassigned]
49+
│partitions [out:5 ] ArrowFlightReadExec: Stage 3
50+
└──────────────────────────────────────────────────
51+
┌───── Stage 3 Task: partitions: 0..4,unassigned]
52+
│partitions [out:5 <-- in:10 ] RepartitionExec: partitioning=RoundRobinBatch(5), input_partitions=10
53+
│partitions [out:10 ] ArrowFlightReadExec: Stage 2
54+
└──────────────────────────────────────────────────
55+
┌───── Stage 2 Task: partitions: 0..9,unassigned]
56+
│partitions [out:10 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
57+
│partitions [out:1 ] ArrowFlightReadExec: Stage 1
58+
└──────────────────────────────────────────────────
59+
┌───── Stage 1 Task: partitions: 0,unassigned]
60+
│partitions [out:1 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=1
61+
│partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/flights-1m.parquet]]}, projection=[FL_DATE, DEP_DELAY, ARR_DELAY, AIR_TIME, DISTANCE, DEP_TIME, ARR_TIME], file_type=parquet
62+
└──────────────────────────────────────────────────
4363
",
4464
);
4565

0 commit comments

Comments
 (0)