Skip to content

Commit 4bf0666

Browse files
committed
Use files_per_task and cardinality_effect_factor for choosing the right tasks for the different stages
1 parent 0a5ba3c commit 4bf0666

File tree

4 files changed

+131
-18
lines changed

4 files changed

+131
-18
lines changed

benchmarks/src/tpch/run.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ use datafusion::prelude::*;
4545
use datafusion_distributed::test_utils::localhost::{
4646
LocalHostChannelResolver, spawn_flight_service,
4747
};
48-
use datafusion_distributed::test_utils::test_task_estimator::FixedDataSourceExecTaskEstimator;
4948
use datafusion_distributed::{
5049
DistributedExt, DistributedSessionBuilder, DistributedSessionBuilderContext, NetworkBoundaryExt,
5150
};
@@ -101,10 +100,6 @@ pub struct RunOpt {
101100
#[structopt(short = "t", long = "sorted")]
102101
sorted: bool,
103102

104-
/// Sets how many tasks are used per stage.
105-
#[structopt(long)]
106-
stage_tasks: Option<usize>,
107-
108103
/// Spawns a worker in the specified port.
109104
#[structopt(long)]
110105
spawn: Option<u16>,
@@ -116,6 +111,14 @@ pub struct RunOpt {
116111
/// Number of physical threads per worker.
117112
#[structopt(long)]
118113
threads: Option<usize>,
114+
115+
/// Number of files per each distributed task.
116+
#[structopt(long)]
117+
files_per_task: Option<usize>,
118+
119+
/// Task count scale factor for when nodes in stages change the cardinality of the data
120+
#[structopt(long)]
121+
cardinality_task_sf: Option<f64>,
119122
}
120123

121124
#[async_trait]
@@ -136,16 +139,17 @@ impl DistributedSessionBuilder for RunOpt {
136139
.with_config(config)
137140
.with_distributed_user_codec(InMemoryCacheExecCodec)
138141
.with_distributed_execution(LocalHostChannelResolver::new(self.workers.clone()))
139-
.with_distributed_option_extension_from_headers::<WarmingUpMarker>(&ctx.headers)?;
142+
.with_distributed_option_extension_from_headers::<WarmingUpMarker>(&ctx.headers)?
143+
.with_distributed_files_per_task(
144+
self.files_per_task.unwrap_or(get_available_parallelism()),
145+
)?
146+
.with_distributed_cardinality_effect_task_scale_factor(
147+
self.cardinality_task_sf.unwrap_or(1.0),
148+
)?;
140149

141150
if self.mem_table {
142151
builder = builder.with_physical_optimizer_rule(Arc::new(InMemoryDataSourceRule));
143152
}
144-
if !self.workers.is_empty() {
145-
builder = builder.with_distributed_task_estimator(FixedDataSourceExecTaskEstimator(
146-
self.stage_tasks.unwrap_or(self.workers.len()),
147-
));
148-
}
149153

150154
Ok(builder.build())
151155
}

src/distributed_ext.rs

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::config_extension_ext::{
44
};
55
use crate::distributed_planner::set_distributed_task_estimator;
66
use crate::protobuf::{set_distributed_user_codec, set_distributed_user_codec_arc};
7-
use crate::{ChannelResolver, DistributedPhysicalOptimizerRule, TaskEstimator};
7+
use crate::{ChannelResolver, DistributedConfig, DistributedPhysicalOptimizerRule, TaskEstimator};
88
use datafusion::common::DataFusionError;
99
use datafusion::config::ConfigExtension;
1010
use datafusion::execution::SessionStateBuilder;
@@ -250,6 +250,45 @@ pub trait DistributedExt: Sized {
250250
&mut self,
251251
estimator: T,
252252
);
253+
254+
/// Sets the maximum number of files each task in a stage with a FileScanConfig node will
255+
/// handle. Reducing this number will increment the amount of tasks. By default, this
256+
/// is close to the number of cores in the machine.
257+
fn with_distributed_files_per_task(
258+
self,
259+
files_per_task: usize,
260+
) -> Result<Self, DataFusionError>;
261+
262+
/// Same as [DistributedExt::with_distributed_files_per_task] but with an in-place mutation.
263+
fn set_distributed_files_per_task(
264+
&mut self,
265+
files_per_task: usize,
266+
) -> Result<(), DataFusionError>;
267+
268+
/// The number of tasks in each stage is calculated in a bottom-to-top fashion.
269+
///
270+
/// Bottom stages containing leaf nodes will provide an estimation of the amount of tasks
271+
/// for those stages, but upper stages might see a reduction (or increment) in the amount
272+
/// of tasks based on the cardinality effect bottom stages have in the data.
273+
///
274+
/// For example: If there are two stages, and the leaf stage is estimated to use 10 tasks,
275+
/// the upper stage might use less (e.g. 5) if it sees that the leaf stage is returning
276+
/// less data because of filters or aggregations.
277+
///
278+
/// This function sets the scale factor for when encountering these nodes that change the
279+
/// cardinality of the data. For example, if a stage with 10 tasks contains an AggregateExec
280+
/// node, and the scale factor is 2.0, the following stage will use 10 / 2.0 = 5 tasks.
281+
fn with_distributed_cardinality_effect_task_scale_factor(
282+
self,
283+
factor: f64,
284+
) -> Result<Self, DataFusionError>;
285+
286+
/// Same as [DistributedExt::with_distributed_cardinality_effect_task_scale_factor] but with
287+
/// an in-place mutation.
288+
fn set_distributed_cardinality_effect_task_scale_factor(
289+
&mut self,
290+
factor: f64,
291+
) -> Result<(), DataFusionError>;
253292
}
254293

255294
impl DistributedExt for SessionStateBuilder {
@@ -295,6 +334,26 @@ impl DistributedExt for SessionStateBuilder {
295334
set_distributed_task_estimator(self.config().get_or_insert_default(), estimator)
296335
}
297336

337+
fn set_distributed_files_per_task(
338+
&mut self,
339+
files_per_task: usize,
340+
) -> Result<(), DataFusionError> {
341+
let cfg = self.config().get_or_insert_default();
342+
let d_cfg = DistributedConfig::from_config_options_mut(cfg.options_mut())?;
343+
d_cfg.files_per_task = files_per_task;
344+
Ok(())
345+
}
346+
347+
fn set_distributed_cardinality_effect_task_scale_factor(
348+
&mut self,
349+
factor: f64,
350+
) -> Result<(), DataFusionError> {
351+
let cfg = self.config().get_or_insert_default();
352+
let d_cfg = DistributedConfig::from_config_options_mut(cfg.options_mut())?;
353+
d_cfg.cardinality_task_count_factor = factor;
354+
Ok(())
355+
}
356+
298357
delegate! {
299358
to self {
300359
#[call(set_distributed_option_extension)]
@@ -320,6 +379,14 @@ impl DistributedExt for SessionStateBuilder {
320379
#[call(set_distributed_task_estimator)]
321380
#[expr($;self)]
322381
fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;
382+
383+
#[call(set_distributed_files_per_task)]
384+
#[expr($?;Ok(self))]
385+
fn with_distributed_files_per_task(mut self, files_per_task: usize) -> Result<Self, DataFusionError>;
386+
387+
#[call(set_distributed_cardinality_effect_task_scale_factor)]
388+
#[expr($?;Ok(self))]
389+
fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
323390
}
324391
}
325392
}

src/distributed_planner/distributed_config.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::distributed_planner::task_estimator::CombinedTaskEstimator;
33
use crate::{BoxCloneSyncChannel, ChannelResolver, TaskEstimator};
44
use arrow_flight::flight_service_client::FlightServiceClient;
55
use async_trait::async_trait;
6+
use datafusion::common::utils::get_available_parallelism;
67
use datafusion::common::{DataFusionError, extensions_options, not_impl_err, plan_err};
78
use datafusion::config::{ConfigExtension, ConfigField, ConfigOptions, Visit};
89
use std::fmt::{Debug, Formatter};
@@ -12,6 +13,9 @@ use url::Url;
1213
extensions_options! {
1314
/// Configuration for the distributed planner.
1415
pub struct DistributedConfig {
16+
/// Sets the maximum amount of files that will be assigned to each task. Reducing this
17+
/// number will spawn more tasks for the same number of files.
18+
pub files_per_task: usize, default = get_available_parallelism()
1519
/// Task multiplying factor for when a node declares that it changes the cardinality
1620
/// of the data:
1721
/// - If a node is increasing the cardinality of the data, this factor will increase.
@@ -48,6 +52,14 @@ impl DistributedConfig {
4852
};
4953
Ok(distributed_cfg)
5054
}
55+
56+
/// Gets the [DistributedConfig] from the [ConfigOptions]'s extensions.
57+
pub fn from_config_options_mut(cfg: &mut ConfigOptions) -> Result<&mut Self, DataFusionError> {
58+
let Some(distributed_cfg) = cfg.extensions.get_mut::<DistributedConfig>() else {
59+
return plan_err!("DistributedConfig is not in ConfigOptions.extensions");
60+
};
61+
Ok(distributed_cfg)
62+
}
5163
}
5264

5365
impl ConfigExtension for DistributedConfig {

src/distributed_planner/task_estimator.rs

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
use crate::{DistributedConfig, PartitionIsolatorExec};
1+
use crate::{ChannelResolver, DistributedConfig, PartitionIsolatorExec};
22
use datafusion::catalog::memory::DataSourceExec;
33
use datafusion::config::ConfigOptions;
44
use datafusion::datasource::physical_plan::FileScanConfig;
5-
use datafusion::physical_plan::ExecutionPlan;
5+
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
66
use datafusion::prelude::SessionConfig;
7+
use std::collections::HashSet;
78
use std::fmt::Debug;
89
use std::sync::Arc;
910

@@ -96,14 +97,43 @@ impl TaskEstimator for FileScanConfigTaskEstimator {
9697
fn estimate_tasks(
9798
&self,
9899
plan: &Arc<dyn ExecutionPlan>,
99-
_: &ConfigOptions,
100+
cfg: &ConfigOptions,
100101
) -> Option<TaskEstimation> {
102+
let d_cfg = cfg.extensions.get::<DistributedConfig>()?;
101103
let dse: &DataSourceExec = plan.as_any().downcast_ref()?;
102-
let _config: &FileScanConfig = dse.data_source().as_any().downcast_ref()?;
104+
let file_scan: &FileScanConfig = dse.data_source().as_any().downcast_ref()?;
105+
106+
// Count how many distinct files we have in the FileScanConfig.
107+
let mut distinct_files = HashSet::new();
108+
for file_group in &file_scan.file_groups {
109+
for file in file_group.iter() {
110+
distinct_files.insert(file.object_meta.location.clone());
111+
}
112+
}
113+
let distinct_files = distinct_files.len();
114+
115+
// Based on the user-provided files_per_task configuration, do the math to calculate
116+
// how many tasks should be used, without surpassing the number of available workers.
117+
let mut task_count = distinct_files.div_ceil(d_cfg.files_per_task);
118+
let workers = match d_cfg.__private_channel_resolver.0.get_urls() {
119+
Ok(urls) => urls.len(),
120+
Err(_) => 1,
121+
};
122+
task_count = task_count.max(workers);
123+
124+
// Based on the task count, attempt to scale up the partitions in the DataSourceExec by
125+
// repartitioning it. This will result in a DataSourceExec with potentially a lot of
126+
// partitions, but as we are going wrap it with PartitionIsolatorExec that's fine.
127+
let scaled_partitions = task_count * plan.output_partitioning().partition_count();
128+
let mut plan = Arc::clone(plan);
129+
if let Ok(Some(repartitioned)) = plan.repartitioned(scaled_partitions, cfg) {
130+
plan = repartitioned;
131+
}
132+
plan = Arc::new(PartitionIsolatorExec::new(plan));
103133

104134
Some(TaskEstimation {
105-
task_count: 1, // TODO
106-
new_plan: Some(Arc::new(PartitionIsolatorExec::new(Arc::clone(plan)))),
135+
task_count,
136+
new_plan: Some(plan),
107137
})
108138
}
109139
}

0 commit comments

Comments
 (0)