Skip to content

Commit 48910a6

Browse files
authored
Improve default task estimator (#275)
1 parent f7318aa commit 48910a6

File tree

3 files changed

+952
-909
lines changed

3 files changed

+952
-909
lines changed

src/distributed_planner/task_estimator.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use datafusion::datasource::physical_plan::FileScanConfig;
66
use datafusion::physical_plan::ExecutionPlan;
77
use datafusion::prelude::SessionConfig;
88
use delegate::delegate;
9-
use std::collections::HashSet;
109
use std::fmt::Debug;
1110
use std::sync::Arc;
1211

@@ -205,22 +204,15 @@ impl TaskEstimator for FileScanConfigTaskEstimator {
205204

206205
let d_cfg = cfg.extensions.get::<DistributedConfig>()?;
207206

208-
// Count how many distinct files we have in the FileScanConfig. Each file in each
209-
// file group is a PartitionedFile rather than a full file, so it's possible that
210-
// many entries refer to different chunks of the same physical file. By keeping a
211-
// HashSet of the different locations of the PartitionedFiles we count how many actual
212-
// different files we have.
213-
let mut distinct_files = HashSet::new();
207+
// Count how many partitioned files we have in the FileScanConfig.
208+
let mut partitioned_files = 0;
214209
for file_group in &file_scan.file_groups {
215-
for file in file_group.iter() {
216-
distinct_files.insert(file.object_meta.location.clone());
217-
}
210+
partitioned_files += file_group.len();
218211
}
219-
let distinct_files = distinct_files.len();
220212

221213
// Based on the user-provided files_per_task configuration, do the math to calculate
222214
// how many tasks should be used, without surpassing the number of available workers.
223-
let task_count = distinct_files.div_ceil(d_cfg.files_per_task);
215+
let task_count = partitioned_files.div_ceil(d_cfg.files_per_task);
224216

225217
Some(TaskEstimation {
226218
task_count: TaskCountAnnotation::Desired(task_count),
@@ -238,7 +230,7 @@ impl TaskEstimator for FileScanConfigTaskEstimator {
238230
}
239231
// Based on the task count, attempt to scale up the partitions in the DataSourceExec by
240232
// repartitioning it. This will result in a DataSourceExec with potentially a lot of
241-
// partitions, but as we are going wrap it with PartitionIsolatorExec that's fine.
233+
// partitions, but as we are going to wrap it with PartitionIsolatorExec, that's fine.
242234
let dse: &DataSourceExec = plan.as_any().downcast_ref()?;
243235
let file_scan: &FileScanConfig = dse.data_source().as_any().downcast_ref()?;
244236

0 commit comments

Comments
 (0)