From d7930bff95a63b519e5dacca5868119e5b5eb963 Mon Sep 17 00:00:00 2001 From: Kyle Booker Date: Sun, 25 Feb 2024 19:21:59 -0500 Subject: [PATCH 1/3] Unfinished skeleton for Aggregate --- .../src/execution/operators/aggregate.rs | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 eggstrain/src/execution/operators/aggregate.rs diff --git a/eggstrain/src/execution/operators/aggregate.rs b/eggstrain/src/execution/operators/aggregate.rs new file mode 100644 index 0000000..f2ce4cc --- /dev/null +++ b/eggstrain/src/execution/operators/aggregate.rs @@ -0,0 +1,31 @@ +use super::{Operator, UnaryOperator}; +use async_trait::async_trait; + +/// TODO docs +pub(crate) struct Aggregate { + children: Vec>, +} + +/// TODO docs +impl Aggregate { + pub(crate) fn new() -> Self { + todo!(); + } + + fn aggregate_in_mem(&self, rb: RecordBatch) -> Result { + todo!(); + } +} + +/// TODO docs +impl Operator for Aggregate { + fn children(&self) -> Vec> { + todo!(); + } +} + +/// TODO docs +#[async_trait] +impl UnaryOperator for Aggregte { + todo!(); +} From 1ffcf04912043e25c5ef9a5bebe13fba6ef078e6 Mon Sep 17 00:00:00 2001 From: Kyle Booker Date: Sun, 25 Feb 2024 19:38:35 -0500 Subject: [PATCH 2/3] More skeleton code --- .../src/execution/operators/aggregate.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/eggstrain/src/execution/operators/aggregate.rs b/eggstrain/src/execution/operators/aggregate.rs index f2ce4cc..53bffdc 100644 --- a/eggstrain/src/execution/operators/aggregate.rs +++ b/eggstrain/src/execution/operators/aggregate.rs @@ -27,5 +27,22 @@ impl Operator for Aggregate { /// TODO docs #[async_trait] impl UnaryOperator for Aggregte { - todo!(); + type In = RecordBatch; + type Out = RecordBatch; + + fn into_unary(self) -> Arc> { + todo!(); + } + + async fn execute( + &self, + rx: broadcast::Receiver, + tx: broadcast::Sender, + ) { + let mut batches = vec![]; + loop { + todo!(); + } + todo!(); + } } From a290334f3c910b99066a008fe3e139c7c7bf7c7b Mon Sep 17 00:00:00 2001 From: Kyle Booker Date: Sun, 25 Feb 2024 19:42:45 -0500 Subject: [PATCH 3/3] More skeleton code for aggregation as pipeline breaker --- .../src/execution/operators/aggregate.rs | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/eggstrain/src/execution/operators/aggregate.rs b/eggstrain/src/execution/operators/aggregate.rs index 53bffdc..55ab28a 100644 --- a/eggstrain/src/execution/operators/aggregate.rs +++ b/eggstrain/src/execution/operators/aggregate.rs @@ -41,7 +41,25 @@ impl UnaryOperator for Aggregte { ) { let mut batches = vec![]; loop { - todo!(); + match rx.recv().await { + Ok(batch) => { + batches.push(batch); + } + Err(e) => match e { + RecvError::Closed => break, + RecvError::Lagged(_) => todo!(), + }, + } + } + + let merged_batch = concat_batches(&self.schema, &batches); + match merged_batch { + Ok(merged_batch) => { + todo!("Implement"); + } + Error => { + todo!("Could not concat the batches"); + } } todo!(); }