diff --git a/eggstrain/src/execution/operators/aggregate.rs b/eggstrain/src/execution/operators/aggregate.rs new file mode 100644 index 0000000..55ab28a --- /dev/null +++ b/eggstrain/src/execution/operators/aggregate.rs @@ -0,0 +1,66 @@ +use super::{Operator, UnaryOperator}; +use async_trait::async_trait; + +/// TODO docs +pub(crate) struct Aggregate { + children: Vec>, +} + +/// TODO docs +impl Aggregate { + pub(crate) fn new() -> Self { + todo!(); + } + + fn aggregate_in_mem(&self, rb: RecordBatch) -> Result { + todo!(); + } +} + +/// TODO docs +impl Operator for Aggregate { + fn children(&self) -> Vec> { + todo!(); + } +} + +/// TODO docs +#[async_trait] +impl UnaryOperator for Aggregte { + type In = RecordBatch; + type Out = RecordBatch; + + fn into_unary(self) -> Arc> { + todo!(); + } + + async fn execute( + &self, + rx: broadcast::Receiver, + tx: broadcast::Sender, + ) { + let mut batches = vec![]; + loop { + match rx.recv().await { + Ok(batch) => { + batches.push(batch); + } + Err(e) => match e { + RecvError::Closed => break, + RecvError::Lagged(_) => todo!(), + }, + } + } + + let merged_batch = concat_batches(&self.schema, &batches); + match merged_batch { + Ok(merged_batch) => { + todo!("Implement"); + } + Error => { + todo!("Could not concat the batches"); + } + } + todo!(); + } +}