Skip to content

Commit a1f5d8b

Browse files
committed
Remove unnecessary StageExec proto serde overhead
1 parent 2df3467 commit a1f5d8b

File tree

10 files changed

+235
-133
lines changed

10 files changed

+235
-133
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ dashmap = "6.1.0"
3030
prost = "0.13.5"
3131
rand = "0.8.5"
3232
object_store = "0.12.3"
33+
bytes = "1.10.1"
3334

3435
# integration_tests deps
3536
insta = { version = "1.43.1", features = ["filters"], optional = true }

src/distributed_physical_optimizer_rule.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,6 @@ impl DistributedPhysicalOptimizerRule {
268268
Ok(Transformed::new(node, true, TreeNodeRecursion::Jump))
269269
})?;
270270

271-
let inputs = inputs.into_iter().map(Arc::new).collect();
272271
let mut stage = StageExec::new(query_id, *num, distributed.data, inputs, n_tasks);
273272
*num += 1;
274273

src/execution_plans/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@ mod stage;
88
pub use network_coalesce::{NetworkCoalesceExec, NetworkCoalesceReady};
99
pub use network_shuffle::{NetworkShuffleExec, NetworkShuffleReadyExec};
1010
pub use partition_isolator::PartitionIsolatorExec;
11+
pub(crate) use stage::InputStage;
1112
pub use stage::display_plan_graphviz;
1213
pub use stage::{DistributedTaskContext, ExecutionTask, StageExec};

src/execution_plans/network_coalesce.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::distributed_physical_optimizer_rule::{NetworkBoundary, limit_tasks_er
66
use crate::execution_plans::{DistributedTaskContext, StageExec};
77
use crate::flight_service::DoGet;
88
use crate::metrics::proto::MetricsSetProto;
9-
use crate::protobuf::{DistributedCodec, StageKey, proto_from_stage};
9+
use crate::protobuf::{DistributedCodec, StageKey, proto_from_input_stage};
1010
use crate::protobuf::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
1111
use arrow_flight::Ticket;
1212
use arrow_flight::decode::FlightRecordBatchStream;
@@ -249,12 +249,11 @@ impl ExecutionPlan for NetworkCoalesceExec {
249249
// the `NetworkCoalesceExec` node can only be executed in the context of a `StageExec`
250250
let stage = StageExec::from_ctx(&context)?;
251251

252-
// of our child stages find the one that matches the one we are supposed to be
253-
// reading from
254-
let child_stage = stage.child_stage(self_ready.stage_num)?;
252+
// of our input stages find the one that we are supposed to be reading from
253+
let input_stage = stage.input_stage(self_ready.stage_num)?;
255254

256255
let codec = DistributedCodec::new_combined_with_user(context.session_config());
257-
let child_stage_proto = proto_from_stage(child_stage, &codec).map_err(|e| {
256+
let input_stage_proto = proto_from_input_stage(input_stage, &codec).map_err(|e| {
258257
internal_datafusion_err!("NetworkCoalesceExec: failed to convert stage to proto: {e}")
259258
})?;
260259

@@ -265,7 +264,7 @@ impl ExecutionPlan for NetworkCoalesceExec {
265264
}
266265

267266
let partitions_per_task =
268-
self.properties().partitioning.partition_count() / child_stage.tasks.len();
267+
self.properties().partitioning.partition_count() / input_stage.tasks().len();
269268

270269
let target_task = partition / partitions_per_task;
271270
let target_partition = partition % partitions_per_task;
@@ -275,11 +274,11 @@ impl ExecutionPlan for NetworkCoalesceExec {
275274
Extensions::default(),
276275
Ticket {
277276
ticket: DoGet {
278-
stage_proto: Some(child_stage_proto.clone()),
277+
stage_proto: input_stage_proto,
279278
target_partition: target_partition as u64,
280279
stage_key: Some(StageKey {
281280
query_id: stage.query_id.to_string(),
282-
stage_id: child_stage.num as u64,
281+
stage_id: input_stage.num() as u64,
283282
task_number: target_task as u64,
284283
}),
285284
target_task_index: target_task as u64,
@@ -289,7 +288,7 @@ impl ExecutionPlan for NetworkCoalesceExec {
289288
},
290289
);
291290

292-
let Some(task) = child_stage.tasks.get(target_task) else {
291+
let Some(task) = input_stage.tasks().get(target_task) else {
293292
return internal_err!("ProgrammingError: Task {target_task} not found");
294293
};
295294

src/execution_plans/network_shuffle.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::distributed_physical_optimizer_rule::NetworkBoundary;
66
use crate::execution_plans::{DistributedTaskContext, StageExec};
77
use crate::flight_service::DoGet;
88
use crate::metrics::proto::MetricsSetProto;
9-
use crate::protobuf::{DistributedCodec, StageKey, proto_from_stage};
9+
use crate::protobuf::{DistributedCodec, StageKey, proto_from_input_stage};
1010
use crate::protobuf::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
1111
use arrow_flight::Ticket;
1212
use arrow_flight::decode::FlightRecordBatchStream;
@@ -293,34 +293,34 @@ impl ExecutionPlan for NetworkShuffleExec {
293293

294294
// of our child stages find the one that matches the one we are supposed to be
295295
// reading from
296-
let child_stage = stage.child_stage(self_ready.stage_num)?;
296+
let input_stage = stage.input_stage(self_ready.stage_num)?;
297297

298298
let codec = DistributedCodec::new_combined_with_user(context.session_config());
299-
let child_stage_proto = proto_from_stage(child_stage, &codec).map_err(|e| {
299+
let input_stage_proto = proto_from_input_stage(input_stage, &codec).map_err(|e| {
300300
internal_datafusion_err!("NetworkShuffleExec: failed to convert stage to proto: {e}")
301301
})?;
302302

303-
let child_stage_tasks = child_stage.tasks.clone();
304-
let child_stage_num = child_stage.num as u64;
303+
let input_stage_tasks = input_stage.tasks().to_vec();
304+
let input_stage_num = input_stage.num() as u64;
305305
let query_id = stage.query_id.to_string();
306306

307307
let context_headers = ContextGrpcMetadata::headers_from_ctx(&context);
308308
let task_context = DistributedTaskContext::from_ctx(&context);
309309
let off = self_ready.properties.partitioning.partition_count() * task_context.task_index;
310310

311-
let stream = child_stage_tasks.into_iter().enumerate().map(|(i, task)| {
311+
let stream = input_stage_tasks.into_iter().enumerate().map(|(i, task)| {
312312
let channel_resolver = Arc::clone(&channel_resolver);
313313

314314
let ticket = Request::from_parts(
315315
MetadataMap::from_headers(context_headers.clone()),
316316
Extensions::default(),
317317
Ticket {
318318
ticket: DoGet {
319-
stage_proto: Some(child_stage_proto.clone()),
319+
stage_proto: input_stage_proto.clone(),
320320
target_partition: (off + partition) as u64,
321321
stage_key: Some(StageKey {
322322
query_id: query_id.clone(),
323-
stage_id: child_stage_num,
323+
stage_id: input_stage_num,
324324
task_number: i as u64,
325325
}),
326326
target_task_index: i as u64,

0 commit comments

Comments
 (0)