Skip to content

Commit f653b10

Browse files
committed
Merge branch 'main' into gabrielmusat/collect-metrics-optionally
# Conflicts: # benchmarks/cdk/bin/datafusion-bench.ts # src/distributed_planner/distributed_config.rs
2 parents bce118b + f6dfaa6 commit f653b10

File tree

10 files changed

+1036
-993
lines changed

10 files changed

+1036
-993
lines changed

benchmarks/cdk/bin/datafusion-bench.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ async function main () {
1515
.option('-i, --iterations <number>', 'Number of iterations', '3')
1616
.option('--files-per-task <number>', 'Files per task', '4')
1717
.option('--cardinality-task-sf <number>', 'Cardinality task scale factor', '2')
18+
.option('--shuffle-batch-size <number>', 'Shuffle batch coalescing size (number of rows)', '8192')
1819
.option('--collect-metrics <boolean>', 'Propagates metric collection', 'true')
1920
.option('--query <number>', 'A specific query to run', undefined)
2021
.parse(process.argv);
@@ -25,6 +26,7 @@ async function main () {
2526
const iterations = parseInt(options.iterations);
2627
const filesPerTask = parseInt(options.filesPerTask);
2728
const cardinalityTaskSf = parseInt(options.cardinalityTaskSf);
29+
const shuffleBatchSize = parseInt(options.shuffleBatchSize);
2830
const collectMetrics = options.collectMetrics === 'true' || options.collectMetrics === 1
2931

3032
// Compare with previous results first
@@ -36,6 +38,7 @@ async function main () {
3638
await query(`
3739
SET distributed.files_per_task=${filesPerTask};
3840
SET distributed.cardinality_task_count_factor=${cardinalityTaskSf};
41+
SET distributed.shuffle_batch_size=${shuffleBatchSize};
3942
SET distributed.collect_metrics=${collectMetrics}
4043
`)
4144

benchmarks/cdk/lib/cdk-stack.ts

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -92,16 +92,11 @@ export class CdkStack extends Stack {
9292
for (let i = 0; i < config.instanceCount; i++) {
9393
const userData = ec2.UserData.forLinux();
9494

95-
// Download worker binary from S3 asset
96-
userData.addS3DownloadCommand({
97-
bucket: workerBinary.bucket,
98-
bucketKey: workerBinary.s3ObjectKey,
99-
localFile: '/usr/local/bin/worker',
100-
});
101-
10295
userData.addCommands(
103-
// Make binary executable
104-
'chmod +x /usr/local/bin/worker',
96+
// Install Rust tooling.
97+
'yum install gcc',
98+
"curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh",
99+
'cargo install --locked tokio-console',
105100

106101
// Create systemd service
107102
`cat > /etc/systemd/system/worker.service << 'EOF'
@@ -160,7 +155,9 @@ sudo journalctl -u worker.service -f -o cat
160155
description: 'Session Manager commands to connect to instances',
161156
});
162157

163-
// Custom resource to restart worker service on every deploy
158+
// Downloads the latest version of the worker binary and restarts the systemd service.
159+
// This is done instead of the userData.addS3Download() so that the instance does not need
160+
// to restart every time a new worker binary is available.
164161
const restartWorker = new cr.AwsCustomResource(this, 'RestartWorkerService', {
165162
onUpdate: {
166163
service: 'SSM',

src/distributed_planner/distributed_config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ extensions_options! {
2424
/// - If a node reduces the cardinality of the data, this factor will decrease.
2525
/// - In any other situation, this factor is left intact.
2626
pub cardinality_task_count_factor: f64, default = cardinality_task_count_factor_default()
27+
/// Upon shuffling over the network, data streams need to be disassembled in a lot of output
28+
/// partitions, which means the resulting streams might contain a lot of tiny record batches
29+
/// to be sent over the wire. This parameter controls the batch size in number of rows for
30+
/// the CoalesceBatchExec operator that is placed at the top of the stage for sending bigger
31+
/// batches over the wire.
32+
/// If set to 0, batch coalescing is disabled on network shuffle operations.
33+
pub shuffle_batch_size: usize, default = 8192
2734
/// Propagate collected metrics from all nodes in the plan across network boundaries
2835
/// so that they can be reconstructed on the head node of the plan.
2936
pub collect_metrics: bool, default = false

src/distributed_planner/distributed_physical_optimizer_rule.rs

Lines changed: 72 additions & 53 deletions
Large diffs are not rendered by default.

src/execution_plans/network_shuffle.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use arrow_flight::decode::FlightRecordBatchStream;
1717
use arrow_flight::error::FlightError;
1818
use bytes::Bytes;
1919
use dashmap::DashMap;
20+
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
2021
use datafusion::common::{exec_err, internal_datafusion_err, plan_err};
2122
use datafusion::error::DataFusionError;
2223
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
@@ -176,18 +177,29 @@ impl NetworkBoundary for NetworkShuffleExec {
176177
return plan_err!("cannot only return wrapped child if on Pending state");
177178
};
178179

179-
// TODO: Avoid downcasting once https://github.com/apache/datafusion/pull/17990 is shipped.
180-
let Some(r_exe) = pending.input.as_any().downcast_ref::<RepartitionExec>() else {
181-
return plan_err!("NetworkShuffleExec.input must always be RepartitionExec");
182-
};
183-
184-
let next_stage_plan = Arc::new(RepartitionExec::try_new(
185-
require_one_child(r_exe.children())?,
186-
scale_partitioning(r_exe.partitioning(), |p| p * n_tasks),
187-
)?);
180+
let transformed = Arc::clone(&pending.input).transform_down(|plan| {
181+
if let Some(r_exe) = plan.as_any().downcast_ref::<RepartitionExec>() {
182+
// Scale the input RepartitionExec to account for all the tasks to which it will
183+
// need to fan data out.
184+
let scaled = Arc::new(RepartitionExec::try_new(
185+
require_one_child(r_exe.children())?,
186+
scale_partitioning(r_exe.partitioning(), |p| p * n_tasks),
187+
)?);
188+
Ok(Transformed::new(scaled, true, TreeNodeRecursion::Stop))
189+
} else if matches!(plan.output_partitioning(), Partitioning::Hash(_, _)) {
190+
// This might be a passthrough node, like a CoalesceBatchesExec or something like that.
191+
// This is fine, we can let the node be here.
192+
Ok(Transformed::no(plan))
193+
} else {
194+
return plan_err!(
195+
"NetworkShuffleExec input must be hash partitioned, but {} is not",
196+
plan.name()
197+
);
198+
}
199+
})?;
188200

189201
Ok(InputStageInfo {
190-
plan: next_stage_plan,
202+
plan: transformed.data,
191203
task_count: pending.input_tasks,
192204
})
193205
}

src/test_utils/insta.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ pub fn settings() -> insta::Settings {
164164
settings.add_filter(r"input_batches=\d+", "input_batches=<metric>");
165165
settings.add_filter(r"input_rows=\d+", "input_rows=<metric>");
166166
settings.add_filter(r"output_batches=\d+", "output_batches=<metric>");
167+
settings.add_filter(
168+
r"output_bytes=\d+.\d [(B)|(KB)|(MB)]",
169+
"output_bytes=<metric>",
170+
);
167171
settings.add_filter(r"build_mem_used=\d+", "build_mem_used=<metric>");
168172
settings.add_filter(r"build_time=[\d.]+[a-zA-Zµnms]+", "build_time=<metric>");
169173
settings.add_filter(r"join_time=[\d.]+[a-zA-Zµnms]+", "join_time=<metric>");

tests/distributed_aggregation.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,14 @@ mod tests {
5959
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
6060
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
6161
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
62-
│ CoalesceBatchesExec: target_batch_size=8192
63-
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
62+
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
6463
└──────────────────────────────────────────────────
6564
┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
66-
│ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1
67-
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
68-
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
69-
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
65+
│ CoalesceBatchesExec: target_batch_size=8192
66+
│ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1
67+
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
68+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
69+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
7070
└──────────────────────────────────────────────────
7171
",
7272
);
@@ -141,14 +141,14 @@ mod tests {
141141
┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
142142
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday]
143143
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
144-
│ CoalesceBatchesExec: target_batch_size=8192
145-
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
144+
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
146145
└──────────────────────────────────────────────────
147146
┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
148-
│ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1
149-
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
150-
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
151-
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
147+
│ CoalesceBatchesExec: target_batch_size=8192
148+
│ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1
149+
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
150+
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
151+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
152152
└──────────────────────────────────────────────────
153153
",
154154
);

0 commit comments

Comments
 (0)