Skip to content

Commit a017de9

Browse files
authored
fix: Correct DataFusion repartitioning (#1554)
While the numbers are worse, I believe this is actually correct. Also added caching for file statistics in benchmarks, which I think should make the benchmarks faster overall.
1 parent 6624f31 commit a017de9

File tree

5 files changed

+83
-23
lines changed

5 files changed

+83
-23
lines changed

.github/workflows/bench-pr.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ jobs:
3030
name: DataFusion
3131
- id: random_access
3232
name: Random Access
33-
- id: compress_noci
33+
- id: compress
3434
name: Vortex Compression
35+
3536
runs-on: self-hosted
3637
if: ${{ contains(github.event.head_commit.message, '[benchmark]') || github.event.label.name == 'benchmark' && github.event_name == 'pull_request' }}
3738
steps:

bench-vortex/benches/clickbench.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@ use std::path::PathBuf;
44
use std::process::Command;
55

66
use bench_vortex::clickbench::{clickbench_queries, HITS_SCHEMA};
7-
use bench_vortex::{clickbench, execute_query, idempotent, IdempotentPath};
7+
use bench_vortex::{clickbench, execute_query, get_session_with_cache, idempotent, IdempotentPath};
88
use criterion::{criterion_group, criterion_main, Criterion};
9-
use datafusion::prelude::SessionContext;
109
use tokio::runtime::Builder;
1110

1211
fn benchmark(c: &mut Criterion) {
@@ -42,8 +41,9 @@ fn benchmark(c: &mut Criterion) {
4241
.unwrap();
4342
}
4443

45-
let session_context = SessionContext::new();
44+
let session_context = get_session_with_cache();
4645
let context = session_context.clone();
46+
4747
runtime.block_on(async move {
4848
clickbench::register_vortex_files(&context, "hits", basepath.as_path(), &HITS_SCHEMA)
4949
.await

bench-vortex/src/bin/clickbench.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ use std::time::{Duration, Instant};
88
use bench_vortex::clickbench::{self, clickbench_queries, HITS_SCHEMA};
99
use bench_vortex::display::{print_measurements_json, render_table, DisplayFormat};
1010
use bench_vortex::{
11-
execute_query, idempotent, physical_plan, setup_logger, Format, IdempotentPath as _,
12-
Measurement,
11+
execute_query, get_session_with_cache, idempotent, physical_plan, setup_logger, Format,
12+
IdempotentPath as _, Measurement,
1313
};
1414
use clap::Parser;
15-
use datafusion::prelude::SessionContext;
1615
use indicatif::ProgressBar;
1716
use itertools::Itertools;
1817
use log::LevelFilter;
@@ -120,7 +119,7 @@ fn main() {
120119
let mut all_measurements = Vec::default();
121120

122121
for format in &formats {
123-
let session_context = SessionContext::new();
122+
let session_context = get_session_with_cache();
124123
let context = session_context.clone();
125124
match format {
126125
Format::Parquet => runtime.block_on(async {

bench-vortex/src/lib.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ use std::sync::{Arc, LazyLock};
88
use std::time::Duration;
99

1010
use arrow_array::{RecordBatch, RecordBatchReader};
11-
use datafusion::prelude::SessionContext;
11+
use datafusion::execution::cache::cache_manager::CacheManagerConfig;
12+
use datafusion::execution::cache::cache_unit::{DefaultFileStatisticsCache, DefaultListFilesCache};
13+
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
14+
use datafusion::prelude::{SessionConfig, SessionContext};
1215
use datafusion_physical_plan::{collect, ExecutionPlan};
1316
use itertools::Itertools;
1417
use log::LevelFilter;
@@ -297,6 +300,23 @@ impl Measurement {
297300
}
298301
}
299302

303+
pub fn get_session_with_cache() -> SessionContext {
304+
let cache_config = CacheManagerConfig::default();
305+
let file_static_cache = Arc::new(DefaultFileStatisticsCache::default());
306+
let list_file_cache = Arc::new(DefaultListFilesCache::default());
307+
308+
let cache_config = cache_config
309+
.with_files_statistics_cache(Some(file_static_cache))
310+
.with_list_files_cache(Some(list_file_cache));
311+
312+
let rt = RuntimeEnvBuilder::new()
313+
.with_cache_manager(cache_config)
314+
.build_arc()
315+
.expect("could not build runtime environment");
316+
317+
SessionContext::new_with_config_rt(SessionConfig::default(), rt)
318+
}
319+
300320
#[cfg(test)]
301321
mod test {
302322
use std::fs::File;

vortex-datafusion/src/persistent/execution.rs

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@ use std::fmt;
22
use std::sync::Arc;
33

44
use datafusion::config::ConfigOptions;
5-
use datafusion::datasource::physical_plan::{FileGroupPartitioner, FileScanConfig, FileStream};
5+
use datafusion::datasource::listing::PartitionedFile;
6+
use datafusion::datasource::physical_plan::{FileScanConfig, FileStream};
67
use datafusion_common::{project_schema, Result as DFResult, Statistics};
78
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
89
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
910
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
1011
use datafusion_physical_plan::{
1112
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties,
1213
};
14+
use itertools::Itertools;
1315
use vortex_array::Context;
1416

1517
use crate::persistent::opener::VortexFileOpener;
@@ -47,7 +49,7 @@ impl VortexExec {
4749

4850
let plan_properties = PlanProperties::new(
4951
EquivalenceProperties::new_with_orderings(projected_schema, &orderings),
50-
Partitioning::UnknownPartitioning(1),
52+
Partitioning::UnknownPartitioning(file_scan_config.file_groups.len()),
5153
ExecutionMode::Bounded,
5254
);
5355

@@ -60,6 +62,7 @@ impl VortexExec {
6062
ctx,
6163
})
6264
}
65+
6366
pub(crate) fn into_arc(self) -> Arc<dyn ExecutionPlan> {
6467
Arc::new(self) as _
6568
}
@@ -103,6 +106,7 @@ impl ExecutionPlan for VortexExec {
103106
partition: usize,
104107
context: Arc<TaskContext>,
105108
) -> DFResult<SendableRecordBatchStream> {
109+
log::debug!("Executing partition {partition}");
106110
let object_store = context
107111
.runtime_env()
108112
.object_store(&self.file_scan_config.object_store_url)?;
@@ -128,21 +132,57 @@ impl ExecutionPlan for VortexExec {
128132
fn repartitioned(
129133
&self,
130134
target_partitions: usize,
131-
config: &ConfigOptions,
135+
_config: &ConfigOptions,
132136
) -> DFResult<Option<Arc<dyn ExecutionPlan>>> {
133-
let repartition_file_min_size = config.optimizer.repartition_file_min_size;
134-
let repartitioned_file_groups_option = FileGroupPartitioner::new()
135-
.with_target_partitions(target_partitions)
136-
.with_repartition_file_min_size(repartition_file_min_size)
137-
.with_preserve_order_within_groups(self.properties().output_ordering().is_some())
138-
.repartition_file_groups(&self.file_scan_config.file_groups);
137+
let file_groups = self.file_scan_config.file_groups.clone();
138+
139+
let repartitioned_file_groups = repartition_by_count(file_groups, target_partitions);
139140

140141
let mut new_plan = self.clone();
141-
if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
142-
let mut config = new_plan.file_scan_config;
143-
config = config.with_file_groups(repartitioned_file_groups);
144-
new_plan.file_scan_config = config;
145-
}
142+
let mut config = new_plan.file_scan_config;
143+
let num_partitions = repartitioned_file_groups.len();
144+
145+
log::debug!("VortexExec repartitioned to {num_partitions} partitions");
146+
config = config.with_file_groups(repartitioned_file_groups);
147+
new_plan.file_scan_config = config;
148+
new_plan.plan_properties.partitioning = Partitioning::UnknownPartitioning(num_partitions);
149+
146150
Ok(Some(Arc::new(new_plan)))
147151
}
148152
}
153+
154+
fn repartition_by_count(
155+
file_groups: Vec<Vec<PartitionedFile>>,
156+
desired_partitions: usize,
157+
) -> Vec<Vec<PartitionedFile>> {
158+
let all_files = file_groups.into_iter().concat();
159+
160+
let approx_files_per_partition = all_files.len().div_ceil(desired_partitions);
161+
let mut repartitioned_file_groups = Vec::default();
162+
163+
for chunk in &all_files.into_iter().chunks(approx_files_per_partition) {
164+
repartitioned_file_groups.push(chunk.collect::<Vec<_>>());
165+
}
166+
167+
repartitioned_file_groups
168+
}
169+
170+
#[cfg(test)]
171+
mod tests {
172+
use super::*;
173+
174+
#[test]
175+
fn basic_repartition_test() {
176+
let input_file_groups = vec![vec![
177+
PartitionedFile::new("a", 0),
178+
PartitionedFile::new("b", 0),
179+
PartitionedFile::new("c", 0),
180+
PartitionedFile::new("d", 0),
181+
PartitionedFile::new("e", 0),
182+
]];
183+
184+
let file_groups = repartition_by_count(input_file_groups, 2);
185+
186+
assert_eq!(file_groups.len(), 2);
187+
}
188+
}

0 commit comments

Comments
 (0)