diff --git a/src/stage/stage.rs b/src/stage/stage.rs index b161f03..f035a4d 100644 --- a/src/stage/stage.rs +++ b/src/stage/stage.rs @@ -15,10 +15,64 @@ use crate::task::ExecutionTask; use crate::ChannelManager; /// A unit of isolation for a portion of a physical execution plan -/// that can be executed independently. +/// that can be executed independently and across a network boundary. +/// It implements [`ExecutionPlan`] and can be executed to produce a +/// stream of record batches. /// -/// see https://howqueryengineswork.com/13-distributed-query.html +/// An ExecutionTask is a finer grained unit of work compared to an ExecutionStage. +/// One ExecutionStage will create one or more ExecutionTasks /// +/// When an [`ExecutionStage`] is execute()'d if will execute its plan and return a stream +/// of record batches. +/// +/// If the stage has input stages, then those input stages will be executed on remote resources +/// and will be provided the remainder of the stage tree. +/// +/// For example if our stage tree looks like this: +/// +/// ```text +/// ┌─────────┐ +/// │ stage 1 │ +/// └───┬─────┘ +/// │ +/// ┌──────┴────────┐ +/// ┌────┴────┐ ┌────┴────┐ +/// │ stage 2 │ │ stage 3 │ +/// └────┬────┘ └─────────┘ +/// │ +/// ┌──────┴────────┐ +/// ┌────┴────┐ ┌────┴────┐ +/// │ stage 4 │ │ Stage 5 │ +/// └─────────┘ └─────────┘ +/// +/// ``` +/// +/// Then executing Stage 1 will run its plan locally. Stage 1 has two inputs, Stage 2 and Stage 3. We +/// know these will execute on remote resources. As such the plan for Stage 1 must contain an +/// [`ArrowFlightReadExec`] node that will read the results of Stage 2 and Stage 3 and coalese the +/// results. +/// +/// When Stage 1's [`ArrowFlightReadExec`] node is executed, it makes an ArrowFlightRequest to the +/// host assigned in the Stage. It provides the following Stage tree serialilzed in the body of the +/// Arrow Flight Ticket: +/// +/// ```text +/// ┌─────────┐ +/// │ Stage 2 │ +/// └────┬────┘ +/// │ +/// ┌──────┴────────┐ +/// ┌────┴────┐ ┌────┴────┐ +/// │ Stage 4 │ │ Stage 5 │ +/// └─────────┘ └─────────┘ +/// +/// ``` +/// +/// The receiving ArrowFlightEndpoint will then execute Stage 2 and will repeat this process. +/// +/// When Stage 4 is executed, it has no input tasks, so it is assumed that the plan included in that +/// Stage can complete on its own; its likely holding a leaf node in the overall phyysical plan and +/// producing data from a [`DataSourceExec`]. #[derive(Debug, Clone)] pub struct ExecutionStage { /// Our stage number diff --git a/tests/distributed_aggregation.rs b/tests/distributed_aggregation.rs index 3776606..1258c2a 100644 --- a/tests/distributed_aggregation.rs +++ b/tests/distributed_aggregation.rs @@ -13,9 +13,8 @@ mod tests { use std::error::Error; #[tokio::test] - #[ignore] async fn distributed_aggregation() -> Result<(), Box> { - // FIXME these ports are in use on my machine, we should find unused ports + // FIXME: these ports are in use on my machine, we should find unused ports // Changed them for now let (ctx, _guard) = start_localhost_context([40050, 40051, 40052], NoopSessionBuilder).await;