From 9734a95dde3886cd6ebc35a2cbfcce1296c5291a Mon Sep 17 00:00:00 2001 From: Rob Tandy Date: Wed, 6 Aug 2025 14:15:22 -0400 Subject: [PATCH 1/2] add comment for execution stage struct --- src/stage/stage.rs | 58 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/src/stage/stage.rs b/src/stage/stage.rs index b161f03..fa5bf81 100644 --- a/src/stage/stage.rs +++ b/src/stage/stage.rs @@ -15,10 +15,64 @@ use crate::task::ExecutionTask; use crate::ChannelManager; /// A unit of isolation for a portion of a physical execution plan -/// that can be executed independently. +/// that can be executed independently and across a network boundary. +/// It implements [`ExecutionPlan`] and can be executed to produce a +/// stream of record batches. /// -/// see https://howqueryengineswork.com/13-distributed-query.html +/// An ExecutionTask is a finer grained unit of work compared to an ExecutionStage. +/// One ExecutionStage will create one or more ExecutionTasks /// +/// When an [`ExecutionStage`] is execute()'d if will execute its plan and return a stream +/// of record batches. +/// +/// If the stage has input stages, then it those input stages will be executed on remote resources +/// and will be provided the remainder of the stage tree. +/// +/// For example if our stage tree looks like this: +/// +/// ```text +/// ┌─────────┐ +/// │ stage 1 │ +/// └───┬─────┘ +/// │ +/// ┌──────┴────────┐ +/// ┌────┴────┐ ┌────┴────┐ +/// │ stage 2 │ │ stage 3 │ +/// └────┬────┘ └─────────┘ +/// │ +/// ┌──────┴────────┐ +/// ┌────┴────┐ ┌────┴────┐ +/// │ stage 4 │ │ Stage 5 │ +/// └─────────┘ └─────────┘ +/// +/// ``` +/// +/// Then executing Stage 1 will run its plan locally. Stage 1 has two inputs, Stage 2 and Stage 3. We +/// know these will execute on remote resources. As such the plan for Stage 1 must contain an +/// [`ArrowFlightReadExec`] node that will read the results of Stage 2 and Stage 3 and coalese the +/// results. +/// +/// When Stage 1's [`ArrowFlightReadExec`] node is executed, it makes an ArrowFlightRequest to the +/// host assigned in the Stage. It provides the following Stage tree serialilzed in the body of the +/// Arrow Flight Ticket: +/// +/// ```text +/// ┌─────────┐ +/// │ Stage 2 │ +/// └────┬────┘ +/// │ +/// ┌──────┴────────┐ +/// ┌────┴────┐ ┌────┴────┐ +/// │ Stage 4 │ │ Stage 5 │ +/// └─────────┘ └─────────┘ +/// +/// ``` +/// +/// The receiving ArrowFlightEndpoint will then execute Stage 2 and will repeat this process. +/// +/// When Stage 4 4 is executed, it has no input tasks, so it is assumed that the plan included in that +/// Stage can complete on its own; its likely holding a leaf node in the overall phyysical plan and +/// producing data from a [`DataSourceExec`]. #[derive(Debug, Clone)] pub struct ExecutionStage { /// Our stage number From 6fdd65fb01786b4ff2f528fa792abe87bacb6047 Mon Sep 17 00:00:00 2001 From: Rob Tandy Date: Thu, 7 Aug 2025 10:42:11 -0400 Subject: [PATCH 2/2] address comments on doc clarity --- src/stage/stage.rs | 4 ++-- tests/distributed_aggregation.rs | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/stage/stage.rs b/src/stage/stage.rs index fa5bf81..f035a4d 100644 --- a/src/stage/stage.rs +++ b/src/stage/stage.rs @@ -25,7 +25,7 @@ use crate::ChannelManager; /// When an [`ExecutionStage`] is execute()'d if will execute its plan and return a stream /// of record batches. /// -/// If the stage has input stages, then it those input stages will be executed on remote resources +/// If the stage has input stages, then those input stages will be executed on remote resources /// and will be provided the remainder of the stage tree. /// /// For example if our stage tree looks like this: @@ -70,7 +70,7 @@ use crate::ChannelManager; /// /// The receiving ArrowFlightEndpoint will then execute Stage 2 and will repeat this process. /// -/// When Stage 4 4 is executed, it has no input tasks, so it is assumed that the plan included in that +/// When Stage 4 is executed, it has no input tasks, so it is assumed that the plan included in that /// Stage can complete on its own; its likely holding a leaf node in the overall phyysical plan and /// producing data from a [`DataSourceExec`]. #[derive(Debug, Clone)] diff --git a/tests/distributed_aggregation.rs b/tests/distributed_aggregation.rs index 3776606..1258c2a 100644 --- a/tests/distributed_aggregation.rs +++ b/tests/distributed_aggregation.rs @@ -13,9 +13,8 @@ mod tests { use std::error::Error; #[tokio::test] - #[ignore] async fn distributed_aggregation() -> Result<(), Box> { - // FIXME these ports are in use on my machine, we should find unused ports + // FIXME: these ports are in use on my machine, we should find unused ports // Changed them for now let (ctx, _guard) = start_localhost_context([40050, 40051, 40052], NoopSessionBuilder).await;