Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions src/distributed_planner/task_estimator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionConfig;
use delegate::delegate;
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;

Expand Down Expand Up @@ -205,22 +204,15 @@ impl TaskEstimator for FileScanConfigTaskEstimator {

let d_cfg = cfg.extensions.get::<DistributedConfig>()?;

// Count how many distinct files we have in the FileScanConfig. Each file in each
// file group is a PartitionedFile rather than a full file, so it's possible that
// many entries refer to different chunks of the same physical file. By keeping a
// HashSet of the different locations of the PartitionedFiles we count how many actual
// different files we have.
let mut distinct_files = HashSet::new();
// Count how many partitioned files we have in the FileScanConfig.
let mut partitioned_files = 0;
for file_group in &file_scan.file_groups {
for file in file_group.iter() {
distinct_files.insert(file.object_meta.location.clone());
}
partitioned_files += file_group.len();
}
let distinct_files = distinct_files.len();

// Based on the user-provided files_per_task configuration, do the math to calculate
// how many tasks should be used, without surpassing the number of available workers.
let task_count = distinct_files.div_ceil(d_cfg.files_per_task);
let task_count = partitioned_files.div_ceil(d_cfg.files_per_task);

Some(TaskEstimation {
task_count: TaskCountAnnotation::Desired(task_count),
Expand All @@ -238,7 +230,7 @@ impl TaskEstimator for FileScanConfigTaskEstimator {
}
// Based on the task count, attempt to scale up the partitions in the DataSourceExec by
// repartitioning it. This will result in a DataSourceExec with potentially a lot of
// partitions, but as we are going wrap it with PartitionIsolatorExec that's fine.
// partitions, but as we are going to wrap it with PartitionIsolatorExec, that's fine.
let dse: &DataSourceExec = plan.as_any().downcast_ref()?;
let file_scan: &FileScanConfig = dse.data_source().as_any().downcast_ref()?;

Expand Down
Loading