Skip to content

Commit e932754

Browse files
committed
Update all tests to use the new task estimation mechanism
1 parent 4bf0666 commit e932754

File tree

11 files changed

+431
-498
lines changed

11 files changed

+431
-498
lines changed

examples/in_memory_cluster.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use datafusion::common::DataFusionError;
55
use datafusion::execution::SessionStateBuilder;
66
use datafusion::physical_plan::displayable;
77
use datafusion::prelude::{ParquetReadOptions, SessionContext};
8-
use datafusion_distributed::test_utils::test_task_estimator::FixedDataSourceExecTaskEstimator;
98
use datafusion_distributed::{
109
ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelResolver, DistributedExt,
1110
DistributedSessionBuilderContext, create_flight_client,
@@ -27,9 +26,6 @@ struct Args {
2726

2827
#[structopt(long)]
2928
explain: bool,
30-
31-
#[structopt(long, default_value = "3")]
32-
stage_tasks: usize,
3329
}
3430

3531
#[tokio::main]
@@ -39,7 +35,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
3935
let state = SessionStateBuilder::new()
4036
.with_default_features()
4137
.with_distributed_execution(InMemoryChannelResolver::new())
42-
.with_distributed_task_estimator(FixedDataSourceExecTaskEstimator(args.stage_tasks))
4338
.build();
4439

4540
let ctx = SessionContext::from(state);

examples/localhost_run.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use datafusion::common::DataFusionError;
66
use datafusion::execution::SessionStateBuilder;
77
use datafusion::physical_plan::displayable;
88
use datafusion::prelude::{ParquetReadOptions, SessionContext};
9-
use datafusion_distributed::test_utils::test_task_estimator::FixedDataSourceExecTaskEstimator;
109
use datafusion_distributed::{BoxCloneSyncChannel, ChannelResolver, DistributedExt};
1110
use futures::TryStreamExt;
1211
use std::error::Error;
@@ -26,9 +25,6 @@ struct Args {
2625

2726
#[structopt(long)]
2827
explain: bool,
29-
30-
#[structopt(long, default_value = "3")]
31-
stage_tasks: usize,
3228
}
3329

3430
#[tokio::main]
@@ -43,7 +39,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
4339
let state = SessionStateBuilder::new()
4440
.with_default_features()
4541
.with_distributed_execution(localhost_resolver)
46-
.with_distributed_task_estimator(FixedDataSourceExecTaskEstimator(args.stage_tasks))
4742
.build();
4843

4944
let ctx = SessionContext::from(state);

src/distributed_planner/distributed_config.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@ extensions_options! {
1515
pub struct DistributedConfig {
1616
/// Sets the maximum amount of files that will be assigned to each task. Reducing this
1717
/// number will spawn more tasks for the same number of files.
18-
pub files_per_task: usize, default = get_available_parallelism()
18+
pub files_per_task: usize, default = files_per_task_default()
1919
/// Task multiplying factor for when a node declares that it changes the cardinality
2020
/// of the data:
2121
/// - If a node is increasing the cardinality of the data, this factor will increase.
2222
/// - If a node reduces the cardinality of the data, this factor will decrease.
2323
/// - In any other situation, this factor is left intact.
24-
pub cardinality_task_count_factor: f64, default = 1.0
24+
pub cardinality_task_count_factor: f64, default = cardinality_task_count_factor_default()
2525
/// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to
2626
/// estimate how many tasks should be spawned for the [Stage] containing the leaf node.
2727
pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default()
@@ -31,6 +31,22 @@ extensions_options! {
3131
}
3232
}
3333

34+
fn files_per_task_default() -> usize {
35+
if cfg!(test) || cfg!(feature = "integration") {
36+
1
37+
} else {
38+
get_available_parallelism()
39+
}
40+
}
41+
42+
fn cardinality_task_count_factor_default() -> f64 {
43+
if cfg!(test) || cfg!(feature = "integration") {
44+
1.5
45+
} else {
46+
1.0
47+
}
48+
}
49+
3450
impl DistributedConfig {
3551
/// Appends a [TaskEstimator] to the list. [TaskEstimator] will be executed sequentially in
3652
/// order on leaf nodes, and the first one to provide a value is the one that gets to decide

src/distributed_planner/distributed_physical_optimizer_rule.rs

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,6 @@ mod tests {
401401
use crate::DistributedExt;
402402
use crate::test_utils::in_memory_channel_resolver::InMemoryChannelResolver;
403403
use crate::test_utils::parquet::register_parquet_tables;
404-
use crate::test_utils::test_task_estimator::FixedDataSourceExecTaskEstimator;
405404
use crate::{assert_snapshot, display_plan_ascii};
406405
use datafusion::error::DataFusionError;
407406
use datafusion::execution::SessionStateBuilder;
@@ -458,13 +457,13 @@ mod tests {
458457
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
459458
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
460459
│ CoalesceBatchesExec: target_batch_size=8192
461-
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
460+
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
462461
└──────────────────────────────────────────────────
463-
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
462+
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
464463
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
465-
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
464+
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
466465
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
467-
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
466+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
468467
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
469468
└──────────────────────────────────────────────────
470469
");
@@ -487,13 +486,13 @@ mod tests {
487486
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
488487
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
489488
│ CoalesceBatchesExec: target_batch_size=8192
490-
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
489+
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
491490
└──────────────────────────────────────────────────
492-
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
491+
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
493492
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
494-
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
493+
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
495494
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
496-
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
495+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
497496
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
498497
└──────────────────────────────────────────────────
499498
");
@@ -550,30 +549,30 @@ mod tests {
550549
│ ProjectionExec: expr=[avg(weather.MaxTemp)@1 as MaxTemp, RainTomorrow@0 as RainTomorrow]
551550
│ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
552551
│ CoalesceBatchesExec: target_batch_size=8192
553-
│ [Stage 3] => NetworkShuffleExec: output_partitions=4, input_tasks=2
552+
│ [Stage 3] => NetworkShuffleExec: output_partitions=4, input_tasks=3
554553
└──────────────────────────────────────────────────
555554
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
556555
│ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow]
557556
│ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MinTemp)]
558557
│ CoalesceBatchesExec: target_batch_size=8192
559-
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
558+
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
560559
└──────────────────────────────────────────────────
561-
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
560+
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
562561
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 8), input_partitions=4
563562
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)]
564563
│ CoalesceBatchesExec: target_batch_size=8192
565564
│ FilterExec: RainToday@1 = yes, projection=[MinTemp@0, RainTomorrow@2]
566-
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
567-
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
565+
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
566+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
568567
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)]
569568
└──────────────────────────────────────────────────
570-
┌───── Stage 3 ── Tasks: t0:[p0..p3] t1:[p0..p3]
569+
┌───── Stage 3 ── Tasks: t0:[p0..p3] t1:[p0..p3] t2:[p0..p3]
571570
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
572571
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
573572
│ CoalesceBatchesExec: target_batch_size=8192
574573
│ FilterExec: RainToday@1 = no, projection=[MaxTemp@0, RainTomorrow@2]
575-
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
576-
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
574+
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
575+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
577576
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = no, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= no AND no <= RainToday_max@1, required_guarantees=[RainToday in (no)]
578577
└──────────────────────────────────────────────────
579578
");
@@ -588,11 +587,11 @@ mod tests {
588587
assert_snapshot!(plan, @r"
589588
┌───── DistributedExec ── Tasks: t0:[p0]
590589
│ SortPreservingMergeExec: [MinTemp@0 DESC]
591-
│ [Stage 1] => NetworkCoalesceExec: output_partitions=4, input_tasks=2
590+
│ [Stage 1] => NetworkCoalesceExec: output_partitions=3, input_tasks=3
592591
└──────────────────────────────────────────────────
593-
┌───── Stage 1 ── Tasks: t0:[p0..p1] t1:[p2..p3]
592+
┌───── Stage 1 ── Tasks: t0:[p0] t1:[p1] t2:[p2]
594593
│ SortExec: expr=[MinTemp@0 DESC], preserve_partitioning=[true]
595-
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
594+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
596595
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet
597596
└──────────────────────────────────────────────────
598597
");
@@ -612,13 +611,13 @@ mod tests {
612611
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
613612
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
614613
│ CoalesceBatchesExec: target_batch_size=8192
615-
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
614+
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
616615
└──────────────────────────────────────────────────
617-
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
616+
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
618617
│ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 8), input_partitions=4
619-
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
618+
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
620619
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
621-
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
620+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
622621
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday, WindGustDir], file_type=parquet
623622
└──────────────────────────────────────────────────
624623
");
@@ -649,7 +648,6 @@ mod tests {
649648
.with_default_features()
650649
.with_config(config)
651650
.with_distributed_execution(InMemoryChannelResolver::new())
652-
.with_distributed_task_estimator(FixedDataSourceExecTaskEstimator(2))
653651
.build();
654652

655653
let ctx = SessionContext::new_with_state(state);

src/distributed_planner/task_estimator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ impl TaskEstimator for FileScanConfigTaskEstimator {
119119
Ok(urls) => urls.len(),
120120
Err(_) => 1,
121121
};
122-
task_count = task_count.max(workers);
122+
task_count = task_count.min(workers);
123123

124124
// Based on the task count, attempt to scale up the partitions in the DataSourceExec by
125125
// repartitioning it. This will result in a DataSourceExec with potentially a lot of

src/metrics/task_metrics_collector.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@ mod tests {
133133
use crate::test_utils::in_memory_channel_resolver::InMemoryChannelResolver;
134134
use crate::test_utils::plans::{count_plan_nodes, get_stages_and_stage_keys};
135135
use crate::test_utils::session_context::register_temp_parquet_table;
136-
use crate::test_utils::test_task_estimator::FixedDataSourceExecTaskEstimator;
137136
use datafusion::execution::{SessionStateBuilder, context::SessionContext};
138137
use datafusion::prelude::SessionConfig;
139138
use datafusion::{
@@ -153,7 +152,7 @@ mod tests {
153152
.with_default_features()
154153
.with_config(config)
155154
.with_distributed_execution(InMemoryChannelResolver::new())
156-
.with_distributed_task_estimator(FixedDataSourceExecTaskEstimator(2))
155+
.with_distributed_task_estimator(2)
157156
.build();
158157

159158
let ctx = SessionContext::from(state);

src/metrics/task_metrics_rewriter.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,6 @@ mod tests {
219219

220220
use crate::DistributedExt;
221221
use crate::metrics::task_metrics_rewriter::MetricsWrapperExec;
222-
use crate::test_utils::test_task_estimator::FixedDataSourceExecTaskEstimator;
223222
use datafusion::physical_plan::empty::EmptyExec;
224223
use datafusion::physical_plan::metrics::MetricsSet;
225224
use datafusion::prelude::SessionConfig;
@@ -246,7 +245,7 @@ mod tests {
246245
if distributed {
247246
builder = builder
248247
.with_distributed_execution(InMemoryChannelResolver::new())
249-
.with_distributed_task_estimator(FixedDataSourceExecTaskEstimator(2))
248+
.with_distributed_task_estimator(2)
250249
}
251250

252251
let state = builder.build();

src/test_utils/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,4 @@ pub mod mock_exec;
66
pub mod parquet;
77
pub mod plans;
88
pub mod session_context;
9-
pub mod test_task_estimator;
109
pub mod tpch;

src/test_utils/test_task_estimator.rs

Lines changed: 0 additions & 26 deletions
This file was deleted.

0 commit comments

Comments
 (0)