Skip to content

Commit c8f0856

Browse files
committed
Merge branch 'main' into gabrielmusat/add-trino-to-cdk
# Conflicts: # benchmarks/cdk/bin/datafusion-bench.ts # benchmarks/cdk/lib/cdk-stack.ts
2 parents 1744711 + f6dfaa6 commit c8f0856

File tree

10 files changed

+1039
-994
lines changed

10 files changed

+1039
-994
lines changed

benchmarks/cdk/bin/datafusion-bench.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ async function main() {
1414
.option('-i, --iterations <number>', 'Number of iterations', '3')
1515
.option('--files-per-task <number>', 'Files per task', '4')
1616
.option('--cardinality-task-sf <number>', 'Cardinality task scale factor', '2')
17+
.option('--shuffle-batch-size <number>', 'Shuffle batch coalescing size (number of rows)', '8192')
1718
.option('--query <number>', 'A specific query to run', undefined)
1819
.parse(process.argv);
1920

@@ -23,11 +24,13 @@ async function main() {
2324
const iterations = parseInt(options.iterations);
2425
const filesPerTask = parseInt(options.filesPerTask);
2526
const cardinalityTaskSf = parseInt(options.cardinalityTaskSf);
27+
const shuffleBatchSize = parseInt(options.shuffleBatchSize);
2628
const specificQuery = options.query ? parseInt(options.query) : undefined;
2729

2830
const runner = new DataFusionRunner({
2931
filesPerTask,
3032
cardinalityTaskSf,
33+
shuffleBatchSize,
3134
});
3235

3336
const outputPath = path.join(ROOT, "benchmarks", "data", `tpch_sf${sf}`, "remote-results.json");
@@ -52,6 +55,7 @@ class DataFusionRunner implements BenchmarkRunner {
5255
constructor(private readonly options: {
5356
filesPerTask: number;
5457
cardinalityTaskSf: number;
58+
shuffleBatchSize: number;
5559
}) {
5660
}
5761

@@ -106,7 +110,8 @@ class DataFusionRunner implements BenchmarkRunner {
106110
await this.query(stmt);
107111
await this.query(`
108112
SET distributed.files_per_task=${this.options.filesPerTask};
109-
SET distributed.cardinality_task_count_factor=${this.options.cardinalityTaskSf}
113+
SET distributed.cardinality_task_count_factor=${this.options.cardinalityTaskSf};
114+
SET distributed.shuffle_batch_size=${this.options.shuffleBatchSize}
110115
`);
111116
}
112117

benchmarks/cdk/lib/cdk-stack.ts

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -112,16 +112,11 @@ export class CdkStack extends Stack {
112112
for (let i = 0; i < config.instanceCount; i++) {
113113
const userData = ec2.UserData.forLinux();
114114

115-
// Download worker binary from S3 asset
116-
userData.addS3DownloadCommand({
117-
bucket: workerBinary.bucket,
118-
bucketKey: workerBinary.s3ObjectKey,
119-
localFile: '/usr/local/bin/worker',
120-
});
121-
122115
userData.addCommands(
123-
// Make binary executable
124-
'chmod +x /usr/local/bin/worker',
116+
// Install Rust tooling.
117+
'yum install gcc',
118+
"curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh",
119+
'cargo install --locked tokio-console',
125120

126121
// Create systemd service
127122
`cat > /etc/systemd/system/worker.service << 'EOF'
@@ -181,7 +176,9 @@ sudo journalctl -u worker.service -f -o cat
181176
description: 'Session Manager commands to connect to instances',
182177
});
183178

184-
// Custom resource to restart worker service on every deploy
179+
// Downloads the latest version of the worker binary and restarts the systemd service.
180+
// This is done instead of the userData.addS3Download() so that the instance does not need
181+
// to restart every time a new worker binary is available.
185182
sendCommandsUnconditionally(this, 'RestartWorkerService', instances, [
186183
`aws s3 cp s3://${workerBinary.s3BucketName}/${workerBinary.s3ObjectKey} /usr/local/bin/worker`,
187184
'chmod +x /usr/local/bin/worker',

src/distributed_planner/distributed_config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ extensions_options! {
2424
/// - If a node reduces the cardinality of the data, this factor will decrease.
2525
/// - In any other situation, this factor is left intact.
2626
pub cardinality_task_count_factor: f64, default = cardinality_task_count_factor_default()
27+
/// Upon shuffling over the network, data streams need to be disassembled in a lot of output
28+
/// partitions, which means the resulting streams might contain a lot of tiny record batches
29+
/// to be sent over the wire. This parameter controls the batch size in number of rows for
30+
/// the CoalesceBatchExec operator that is placed at the top of the stage for sending bigger
31+
/// batches over the wire.
32+
/// If set to 0, batch coalescing is disabled on network shuffle operations.
33+
pub shuffle_batch_size: usize, default = 8192
2734
/// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to
2835
/// estimate how many tasks should be spawned for the [Stage] containing the leaf node.
2936
pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default()

src/distributed_planner/distributed_physical_optimizer_rule.rs

Lines changed: 72 additions & 53 deletions
Large diffs are not rendered by default.

src/execution_plans/network_shuffle.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use arrow_flight::decode::FlightRecordBatchStream;
1313
use arrow_flight::error::FlightError;
1414
use bytes::Bytes;
1515
use dashmap::DashMap;
16+
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
1617
use datafusion::common::{exec_err, internal_datafusion_err, plan_err};
1718
use datafusion::error::DataFusionError;
1819
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
@@ -172,18 +173,29 @@ impl NetworkBoundary for NetworkShuffleExec {
172173
return plan_err!("cannot only return wrapped child if on Pending state");
173174
};
174175

175-
// TODO: Avoid downcasting once https://github.com/apache/datafusion/pull/17990 is shipped.
176-
let Some(r_exe) = pending.input.as_any().downcast_ref::<RepartitionExec>() else {
177-
return plan_err!("NetworkShuffleExec.input must always be RepartitionExec");
178-
};
179-
180-
let next_stage_plan = Arc::new(RepartitionExec::try_new(
181-
require_one_child(r_exe.children())?,
182-
scale_partitioning(r_exe.partitioning(), |p| p * n_tasks),
183-
)?);
176+
let transformed = Arc::clone(&pending.input).transform_down(|plan| {
177+
if let Some(r_exe) = plan.as_any().downcast_ref::<RepartitionExec>() {
178+
// Scale the input RepartitionExec to account for all the tasks to which it will
179+
// need to fan data out.
180+
let scaled = Arc::new(RepartitionExec::try_new(
181+
require_one_child(r_exe.children())?,
182+
scale_partitioning(r_exe.partitioning(), |p| p * n_tasks),
183+
)?);
184+
Ok(Transformed::new(scaled, true, TreeNodeRecursion::Stop))
185+
} else if matches!(plan.output_partitioning(), Partitioning::Hash(_, _)) {
186+
// This might be a passthrough node, like a CoalesceBatchesExec or something like that.
187+
// This is fine, we can let the node be here.
188+
Ok(Transformed::no(plan))
189+
} else {
190+
return plan_err!(
191+
"NetworkShuffleExec input must be hash partitioned, but {} is not",
192+
plan.name()
193+
);
194+
}
195+
})?;
184196

185197
Ok(InputStageInfo {
186-
plan: next_stage_plan,
198+
plan: transformed.data,
187199
task_count: pending.input_tasks,
188200
})
189201
}

src/test_utils/insta.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ pub fn settings() -> insta::Settings {
164164
settings.add_filter(r"input_batches=\d+", "input_batches=<metric>");
165165
settings.add_filter(r"input_rows=\d+", "input_rows=<metric>");
166166
settings.add_filter(r"output_batches=\d+", "output_batches=<metric>");
167+
settings.add_filter(
168+
r"output_bytes=\d+.\d [(B)|(KB)|(MB)]",
169+
"output_bytes=<metric>",
170+
);
167171
settings.add_filter(r"build_mem_used=\d+", "build_mem_used=<metric>");
168172
settings.add_filter(r"build_time=[\d.]+[a-zA-Zµnms]+", "build_time=<metric>");
169173
settings.add_filter(r"join_time=[\d.]+[a-zA-Zµnms]+", "join_time=<metric>");

tests/distributed_aggregation.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,14 @@ mod tests {
5959
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
6060
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
6161
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
62-
│ CoalesceBatchesExec: target_batch_size=8192
63-
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
62+
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
6463
└──────────────────────────────────────────────────
6564
┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
66-
│ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1
67-
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
68-
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
69-
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
65+
│ CoalesceBatchesExec: target_batch_size=8192
66+
│ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1
67+
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
68+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
69+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
7070
└──────────────────────────────────────────────────
7171
",
7272
);
@@ -141,14 +141,14 @@ mod tests {
141141
┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
142142
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday]
143143
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
144-
│ CoalesceBatchesExec: target_batch_size=8192
145-
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
144+
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
146145
└──────────────────────────────────────────────────
147146
┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
148-
│ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1
149-
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
150-
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
151-
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
147+
│ CoalesceBatchesExec: target_batch_size=8192
148+
│ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1
149+
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
150+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
151+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
152152
└──────────────────────────────────────────────────
153153
",
154154
);

0 commit comments

Comments
 (0)