Skip to content

Commit 0d912ba

Browse files
authored
Remove unnecessary StageExec proto serde overhead (#163)
* Remove unnecessary StageExec proto serde overhead * Add doc comments for protobuf parsing code
1 parent d3b46ab commit 0d912ba

File tree

10 files changed

+251
-134
lines changed

10 files changed

+251
-134
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ dashmap = "6.1.0"
3030
prost = "0.13.5"
3131
rand = "0.8.5"
3232
object_store = "0.12.3"
33+
bytes = "1.10.1"
3334

3435
# integration_tests deps
3536
insta = { version = "1.43.1", features = ["filters"], optional = true }

src/distributed_physical_optimizer_rule.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,6 @@ impl DistributedPhysicalOptimizerRule {
282282
distributed.data = Arc::new(CoalescePartitionsExec::new(distributed.data));
283283
}
284284

285-
let inputs = inputs.into_iter().map(Arc::new).collect();
286285
let mut stage = StageExec::new(query_id, *num, distributed.data, inputs, n_tasks);
287286
*num += 1;
288287

src/execution_plans/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@ mod stage;
88
pub use network_coalesce::{NetworkCoalesceExec, NetworkCoalesceReady};
99
pub use network_shuffle::{NetworkShuffleExec, NetworkShuffleReadyExec};
1010
pub use partition_isolator::PartitionIsolatorExec;
11+
pub(crate) use stage::InputStage;
1112
pub use stage::display_plan_graphviz;
1213
pub use stage::{DistributedTaskContext, ExecutionTask, StageExec};

src/execution_plans/network_coalesce.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::distributed_physical_optimizer_rule::{NetworkBoundary, limit_tasks_er
66
use crate::execution_plans::{DistributedTaskContext, StageExec};
77
use crate::flight_service::DoGet;
88
use crate::metrics::proto::MetricsSetProto;
9-
use crate::protobuf::{DistributedCodec, StageKey, proto_from_stage};
9+
use crate::protobuf::{DistributedCodec, StageKey, proto_from_input_stage};
1010
use crate::protobuf::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
1111
use arrow_flight::Ticket;
1212
use arrow_flight::decode::FlightRecordBatchStream;
@@ -235,12 +235,11 @@ impl ExecutionPlan for NetworkCoalesceExec {
235235
// the `NetworkCoalesceExec` node can only be executed in the context of a `StageExec`
236236
let stage = StageExec::from_ctx(&context)?;
237237

238-
// of our child stages find the one that matches the one we are supposed to be
239-
// reading from
240-
let child_stage = stage.child_stage(self_ready.stage_num)?;
238+
// of our input stages find the one that we are supposed to be reading from
239+
let input_stage = stage.input_stage(self_ready.stage_num)?;
241240

242241
let codec = DistributedCodec::new_combined_with_user(context.session_config());
243-
let child_stage_proto = proto_from_stage(child_stage, &codec).map_err(|e| {
242+
let input_stage_proto = proto_from_input_stage(input_stage, &codec).map_err(|e| {
244243
internal_datafusion_err!("NetworkCoalesceExec: failed to convert stage to proto: {e}")
245244
})?;
246245

@@ -251,7 +250,7 @@ impl ExecutionPlan for NetworkCoalesceExec {
251250
}
252251

253252
let partitions_per_task =
254-
self.properties().partitioning.partition_count() / child_stage.tasks.len();
253+
self.properties().partitioning.partition_count() / input_stage.tasks().len();
255254

256255
let target_task = partition / partitions_per_task;
257256
let target_partition = partition % partitions_per_task;
@@ -261,11 +260,11 @@ impl ExecutionPlan for NetworkCoalesceExec {
261260
Extensions::default(),
262261
Ticket {
263262
ticket: DoGet {
264-
stage_proto: Some(child_stage_proto.clone()),
263+
stage_proto: input_stage_proto,
265264
target_partition: target_partition as u64,
266265
stage_key: Some(StageKey {
267266
query_id: stage.query_id.to_string(),
268-
stage_id: child_stage.num as u64,
267+
stage_id: input_stage.num() as u64,
269268
task_number: target_task as u64,
270269
}),
271270
target_task_index: target_task as u64,
@@ -275,7 +274,7 @@ impl ExecutionPlan for NetworkCoalesceExec {
275274
},
276275
);
277276

278-
let Some(task) = child_stage.tasks.get(target_task) else {
277+
let Some(task) = input_stage.tasks().get(target_task) else {
279278
return internal_err!("ProgrammingError: Task {target_task} not found");
280279
};
281280

src/execution_plans/network_shuffle.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::distributed_physical_optimizer_rule::NetworkBoundary;
66
use crate::execution_plans::{DistributedTaskContext, StageExec};
77
use crate::flight_service::DoGet;
88
use crate::metrics::proto::MetricsSetProto;
9-
use crate::protobuf::{DistributedCodec, StageKey, proto_from_stage};
9+
use crate::protobuf::{DistributedCodec, StageKey, proto_from_input_stage};
1010
use crate::protobuf::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
1111
use arrow_flight::Ticket;
1212
use arrow_flight::decode::FlightRecordBatchStream;
@@ -293,34 +293,34 @@ impl ExecutionPlan for NetworkShuffleExec {
293293

294294
// of our child stages find the one that matches the one we are supposed to be
295295
// reading from
296-
let child_stage = stage.child_stage(self_ready.stage_num)?;
296+
let input_stage = stage.input_stage(self_ready.stage_num)?;
297297

298298
let codec = DistributedCodec::new_combined_with_user(context.session_config());
299-
let child_stage_proto = proto_from_stage(child_stage, &codec).map_err(|e| {
299+
let input_stage_proto = proto_from_input_stage(input_stage, &codec).map_err(|e| {
300300
internal_datafusion_err!("NetworkShuffleExec: failed to convert stage to proto: {e}")
301301
})?;
302302

303-
let child_stage_tasks = child_stage.tasks.clone();
304-
let child_stage_num = child_stage.num as u64;
303+
let input_stage_tasks = input_stage.tasks().to_vec();
304+
let input_stage_num = input_stage.num() as u64;
305305
let query_id = stage.query_id.to_string();
306306

307307
let context_headers = ContextGrpcMetadata::headers_from_ctx(&context);
308308
let task_context = DistributedTaskContext::from_ctx(&context);
309309
let off = self_ready.properties.partitioning.partition_count() * task_context.task_index;
310310

311-
let stream = child_stage_tasks.into_iter().enumerate().map(|(i, task)| {
311+
let stream = input_stage_tasks.into_iter().enumerate().map(|(i, task)| {
312312
let channel_resolver = Arc::clone(&channel_resolver);
313313

314314
let ticket = Request::from_parts(
315315
MetadataMap::from_headers(context_headers.clone()),
316316
Extensions::default(),
317317
Ticket {
318318
ticket: DoGet {
319-
stage_proto: Some(child_stage_proto.clone()),
319+
stage_proto: input_stage_proto.clone(),
320320
target_partition: (off + partition) as u64,
321321
stage_key: Some(StageKey {
322322
query_id: query_id.clone(),
323-
stage_id: child_stage_num,
323+
stage_id: input_stage_num,
324324
task_number: i as u64,
325325
}),
326326
target_task_index: i as u64,

0 commit comments

Comments
 (0)