Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dashmap = "6.1.0"
prost = "0.13.5"
rand = "0.8.5"
object_store = "0.12.3"
bytes = "1.10.1"

# integration_tests deps
insta = { version = "1.43.1", features = ["filters"], optional = true }
Expand Down
1 change: 0 additions & 1 deletion src/distributed_physical_optimizer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ impl DistributedPhysicalOptimizerRule {
distributed.data = Arc::new(CoalescePartitionsExec::new(distributed.data));
}

let inputs = inputs.into_iter().map(Arc::new).collect();
let mut stage = StageExec::new(query_id, *num, distributed.data, inputs, n_tasks);
*num += 1;

Expand Down
1 change: 1 addition & 0 deletions src/execution_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ mod stage;
pub use network_coalesce::{NetworkCoalesceExec, NetworkCoalesceReady};
pub use network_shuffle::{NetworkShuffleExec, NetworkShuffleReadyExec};
pub use partition_isolator::PartitionIsolatorExec;
pub(crate) use stage::InputStage;
pub use stage::display_plan_graphviz;
pub use stage::{DistributedTaskContext, ExecutionTask, StageExec};
17 changes: 8 additions & 9 deletions src/execution_plans/network_coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::distributed_physical_optimizer_rule::{NetworkBoundary, limit_tasks_er
use crate::execution_plans::{DistributedTaskContext, StageExec};
use crate::flight_service::DoGet;
use crate::metrics::proto::MetricsSetProto;
use crate::protobuf::{DistributedCodec, StageKey, proto_from_stage};
use crate::protobuf::{DistributedCodec, StageKey, proto_from_input_stage};
use crate::protobuf::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
use arrow_flight::Ticket;
use arrow_flight::decode::FlightRecordBatchStream;
Expand Down Expand Up @@ -235,12 +235,11 @@ impl ExecutionPlan for NetworkCoalesceExec {
// the `NetworkCoalesceExec` node can only be executed in the context of a `StageExec`
let stage = StageExec::from_ctx(&context)?;

// of our child stages find the one that matches the one we are supposed to be
// reading from
let child_stage = stage.child_stage(self_ready.stage_num)?;
// of our input stages find the one that we are supposed to be reading from
let input_stage = stage.input_stage(self_ready.stage_num)?;

let codec = DistributedCodec::new_combined_with_user(context.session_config());
let child_stage_proto = proto_from_stage(child_stage, &codec).map_err(|e| {
let input_stage_proto = proto_from_input_stage(input_stage, &codec).map_err(|e| {
internal_datafusion_err!("NetworkCoalesceExec: failed to convert stage to proto: {e}")
})?;

Expand All @@ -251,7 +250,7 @@ impl ExecutionPlan for NetworkCoalesceExec {
}

let partitions_per_task =
self.properties().partitioning.partition_count() / child_stage.tasks.len();
self.properties().partitioning.partition_count() / input_stage.tasks().len();

let target_task = partition / partitions_per_task;
let target_partition = partition % partitions_per_task;
Expand All @@ -261,11 +260,11 @@ impl ExecutionPlan for NetworkCoalesceExec {
Extensions::default(),
Ticket {
ticket: DoGet {
stage_proto: Some(child_stage_proto.clone()),
stage_proto: input_stage_proto,
target_partition: target_partition as u64,
stage_key: Some(StageKey {
query_id: stage.query_id.to_string(),
stage_id: child_stage.num as u64,
stage_id: input_stage.num() as u64,
task_number: target_task as u64,
}),
target_task_index: target_task as u64,
Expand All @@ -275,7 +274,7 @@ impl ExecutionPlan for NetworkCoalesceExec {
},
);

let Some(task) = child_stage.tasks.get(target_task) else {
let Some(task) = input_stage.tasks().get(target_task) else {
return internal_err!("ProgrammingError: Task {target_task} not found");
};

Expand Down
16 changes: 8 additions & 8 deletions src/execution_plans/network_shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::distributed_physical_optimizer_rule::NetworkBoundary;
use crate::execution_plans::{DistributedTaskContext, StageExec};
use crate::flight_service::DoGet;
use crate::metrics::proto::MetricsSetProto;
use crate::protobuf::{DistributedCodec, StageKey, proto_from_stage};
use crate::protobuf::{DistributedCodec, StageKey, proto_from_input_stage};
use crate::protobuf::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
use arrow_flight::Ticket;
use arrow_flight::decode::FlightRecordBatchStream;
Expand Down Expand Up @@ -293,34 +293,34 @@ impl ExecutionPlan for NetworkShuffleExec {

// of our child stages find the one that matches the one we are supposed to be
// reading from
let child_stage = stage.child_stage(self_ready.stage_num)?;
let input_stage = stage.input_stage(self_ready.stage_num)?;

let codec = DistributedCodec::new_combined_with_user(context.session_config());
let child_stage_proto = proto_from_stage(child_stage, &codec).map_err(|e| {
let input_stage_proto = proto_from_input_stage(input_stage, &codec).map_err(|e| {
internal_datafusion_err!("NetworkShuffleExec: failed to convert stage to proto: {e}")
})?;

let child_stage_tasks = child_stage.tasks.clone();
let child_stage_num = child_stage.num as u64;
let input_stage_tasks = input_stage.tasks().to_vec();
let input_stage_num = input_stage.num() as u64;
let query_id = stage.query_id.to_string();

let context_headers = ContextGrpcMetadata::headers_from_ctx(&context);
let task_context = DistributedTaskContext::from_ctx(&context);
let off = self_ready.properties.partitioning.partition_count() * task_context.task_index;

let stream = child_stage_tasks.into_iter().enumerate().map(|(i, task)| {
let stream = input_stage_tasks.into_iter().enumerate().map(|(i, task)| {
let channel_resolver = Arc::clone(&channel_resolver);

let ticket = Request::from_parts(
MetadataMap::from_headers(context_headers.clone()),
Extensions::default(),
Ticket {
ticket: DoGet {
stage_proto: Some(child_stage_proto.clone()),
stage_proto: input_stage_proto.clone(),
target_partition: (off + partition) as u64,
stage_key: Some(StageKey {
query_id: query_id.clone(),
stage_id: child_stage_num,
stage_id: input_stage_num,
task_number: i as u64,
}),
target_task_index: i as u64,
Expand Down
Loading