diff --git a/arrow-datafusion b/arrow-datafusion index c93bf47..b971600 160000 --- a/arrow-datafusion +++ b/arrow-datafusion @@ -1 +1 @@ -Subproject commit c93bf47812c6c7ad8b1abf3921754a3e4d85bcf8 +Subproject commit b9716007e187b5c2108f4800484d88c170f57cb8 diff --git a/vayu-common/src/lib.rs b/vayu-common/src/lib.rs index d7370c8..d20f708 100644 --- a/vayu-common/src/lib.rs +++ b/vayu-common/src/lib.rs @@ -1,8 +1,10 @@ use arrow::record_batch::RecordBatch; use datafusion::common::Result; +use datafusion::physical_plan::aggregates::AggregateMode; use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use std::sync::Arc; pub mod store; +// use vayu::operators::aggregate::AggregateOperator; pub trait PhysicalOperator { fn name(&self) -> String; @@ -24,21 +26,30 @@ pub enum SchedulerSourceType { RecordBatchStore(i32), } +#[derive(Clone)] pub enum SchedulerSinkType { - // StoreRecordBatch(i32), - BuildAndStoreHashMap(i32, Arc), + StoreRecordBatch(i32), + // FinalAggregation(i32, AggregateOperator), PrintOutput, } +#[derive(Clone)] +pub enum FinalizeSinkType { + PrintFromStore(i32), + FinalAggregate(Arc, i32), + BuildAndStoreHashMap(i32, Arc), +} +#[derive(Clone)] pub struct DatafusionPipeline { pub plan: Arc, pub sink: Option, pub id: i32, } -pub struct DatafusionPipelineWithSource { - pub source: Arc, - pub plan: Arc, - pub sink: Option, +#[derive(Clone)] +pub struct SchedulerPipeline { + pub source: Option>, + pub pipeline: DatafusionPipeline, + pub finalize: FinalizeSinkType, } pub struct DatafusionPipelineWithData { @@ -55,13 +66,18 @@ pub struct VayuPipelineWithData { pub data: RecordBatch, } pub struct Task { - pub pipelines: Vec, + pub pipelines: Vec, +} + +pub enum VayuMessage { + Normal(DatafusionPipelineWithData), + Finalize((FinalizeSinkType, i32)), } impl Task { pub fn new() -> Self { Task { pipelines: vec![] } } - pub fn add_pipeline(&mut self, pipeline: DatafusionPipelineWithSource) { + pub fn add_pipeline(&mut self, pipeline: SchedulerPipeline) { self.pipelines.push(pipeline); } } diff --git a/vayu-common/src/store.rs b/vayu-common/src/store.rs index ff3b7a0..14cc9a5 100644 --- a/vayu-common/src/store.rs +++ b/vayu-common/src/store.rs @@ -1,40 +1,44 @@ +use arrow::record_batch::RecordBatch; use core::panic; use datafusion::physical_plan::joins::hash_join::JoinLeftData; use std::collections::HashMap; +#[derive(Clone)] pub enum Blob { - // RecordBatchBlob(Vec), + RecordBatchBlob(RecordBatch), HashMapBlob(JoinLeftData), } -impl Blob { - pub fn get_map(self) -> JoinLeftData { - match self { - Blob::HashMapBlob(m) => m, - _ => panic!("error"), - } - } - // pub fn get_records(self) -> Vec { - // match self { - // Blob::RecordBatchBlob(records) => records, - // _ => panic!("error"), - // } - // } - // pub fn append_records(&mut self, batches: Vec) { - // match self { - // Blob::RecordBatchBlob(records) => { - // // TODO: check if schema is same - // records.extend(batches) - // } - // _ => panic!("error"), - // } - // } -} +// impl Blob { +// // pub fn get_map(self) -> JoinLeftData { +// // match self { +// // Blob::HashMapBlob(m) => m, +// // _ => panic!("error"), +// // } +// // } +// pub fn get_records(self) -> Vec { +// match self { +// Blob::RecordBatchBlob(records) => records, +// _ => panic!("error"), +// } +// } +// pub fn append_records(&mut self, batches: Vec) { +// match self { +// Blob::RecordBatchBlob(records) => { +// // TODO: check if schema is same +// records.extend(batches) +// } +// _ => panic!("error"), +// } +// } +// } -// right now this is typedef of HashMap, -// but we may need something else in near future +// store store a vector of blobs +// each blob would be output of one of the threads +// finalize step would remove the vec of blob and combine then store the result again +#[derive(Clone)] pub struct Store { - store: HashMap, + pub store: HashMap>, } impl Store { pub fn new() -> Store { @@ -42,21 +46,30 @@ impl Store { store: HashMap::new(), } } - pub fn insert(&mut self, key: i32, value: Blob) { - self.store.insert(key, value); + pub fn insert(&mut self, key: i32, mut value: Blob) { + let blob = self.store.get_mut(&key); + let mut blob = match blob { + Some(r) => r, + None => { + self.store.insert(key, vec![]); + self.store.get_mut(&key).unwrap() + } + }; + blob.push(value); } - // pub fn append(&mut self, key: i32, value: Vec) { - // let blob = self.remove(key); - // let mut blob = match blob { - // Some(r) => r, - // None => Blob::RecordBatchBlob(Vec::new()), - // }; - // blob.append_records(value); - // self.store.insert(key, blob); - // } - pub fn remove(&mut self, key: i32) -> Option { + pub fn append(&mut self, key: i32, mut value: Vec) { + let blob = self.remove(key); + let mut blob = match blob { + Some(r) => r, + None => vec![], + }; + blob.append(&mut value); + self.store.insert(key, blob); + } + pub fn remove(&mut self, key: i32) -> Option> { self.store.remove(&key) - // let x = self.store.remove(&key).unwrap().value(); - // Some(x) + } + pub fn get(&mut self, key: i32) -> Option<&Vec> { + self.store.get(&key) } } diff --git a/vayu/src/df2vayu.rs b/vayu/src/df2vayu.rs index f80613a..db90fd3 100644 --- a/vayu/src/df2vayu.rs +++ b/vayu/src/df2vayu.rs @@ -1,3 +1,5 @@ +use crate::dummy; +use crate::operators::aggregate::AggregateOperator; use crate::operators::filter::FilterOperator; use crate::operators::join::HashProbeOperator; use crate::operators::projection::ProjectionOperator; @@ -5,8 +7,14 @@ use crate::Store; use ahash::random_state::RandomSource; use ahash::RandomState; use arrow::array::BooleanBufferBuilder; +use arrow::compute::kernels::concat_elements; use datafusion::datasource::physical_plan::CsvExec; +use datafusion::datasource::physical_plan::ParquetExec; +use datafusion::physical_plan::aggregates::AggregateExec; +use datafusion::physical_plan::aggregates::AggregateMode; +use datafusion::physical_plan::aggregates::StreamType; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::joins::hash_join::BuildSide; use datafusion::physical_plan::joins::hash_join::BuildSideReadyState; @@ -20,7 +28,9 @@ use std::sync::Arc; use vayu_common::VayuPipeline; pub fn df2vayu(plan: Arc, store: &mut Store, pipeline_id: i32) -> VayuPipeline { + let plan2 = plan.clone(); let p = plan.as_any(); + let batch_size = 1024; let config = SessionConfig::new().with_batch_size(batch_size); let ctx = Arc::new(SessionContext::new_with_config(config)); @@ -34,6 +44,24 @@ pub fn df2vayu(plan: Arc, store: &mut Store, pipeline_id: i32 sink: None, }; } + if let Some(_) = p.downcast_ref::() { + return VayuPipeline { + operators: vec![], + sink: None, + }; + } + if let Some(exec) = p.downcast_ref::() { + let mut pipeline = df2vayu(exec.input().clone(), store, pipeline_id); + // check if no group by present + if !exec.group_by().expr().is_empty() { + panic!("group by present- not handled"); + } + + let tt = Box::new(AggregateOperator::new(exec)); + println!("adding aggregate"); + pipeline.operators.push(tt); + return pipeline; + } if let Some(exec) = p.downcast_ref::() { let mut pipeline = df2vayu(exec.input().clone(), store, pipeline_id); let tt = Box::new(FilterOperator::new(exec.predicate().clone())); @@ -51,25 +79,53 @@ pub fn df2vayu(plan: Arc, store: &mut Store, pipeline_id: i32 return pipeline; } if let Some(exec) = p.downcast_ref::() { + let mut exec2 = exec.clone(); // this function will only be called for probe side // build side wont have hashjoinexec in make_pipeline call // let dummy = exec.left().execute(0, context.clone()); let mut pipeline = df2vayu(exec.right().clone(), store, pipeline_id); println!("adding hashprobe"); - - let mut hashjoinstream = exec.get_hash_join_stream(0, context).unwrap(); - println!("got joinstream"); - + let tt = dummy::DummyExec::new( + exec.properties().clone(), + exec.statistics().unwrap(), + exec.left().schema(), + ); + let tt2 = dummy::DummyExec::new( + exec.properties().clone(), + exec.statistics().unwrap(), + exec.right().schema(), + ); + let x = plan2 + .with_new_children(vec![Arc::new(tt), Arc::new(tt2)]) + .unwrap(); + let x1 = x.as_any(); + let exec = if let Some(exec) = x1.downcast_ref::() { + exec + } else { + panic!("wrongg"); + }; // using uuid but this value would be present in HashProbeExec itself // TODO: remove from the correct key - let build_map = store.remove(42).unwrap(); - let left_data = Arc::new(build_map.get_map()); + println!("{:?}", store.store.keys()); + let mut build_map = store.remove(42).unwrap(); + let mut cmap = build_map.clone(); + store.append(42, cmap); + let map = build_map.remove(0); + let build_map = match map { + vayu_common::store::Blob::HashMapBlob(map) => map, + _ => panic!("what nooo"), + }; + let c = build_map.clone(); + let left_data = Arc::new(build_map); let visited_left_side = BooleanBufferBuilder::new(0); + + let mut hashjoinstream = exec.get_hash_join_stream(0, context).unwrap(); hashjoinstream.build_side = BuildSide::Ready(BuildSideReadyState { left_data, visited_left_side, }); + println!("got joinstream"); let tt = Box::new(HashProbeOperator::new(hashjoinstream)); pipeline.operators.push(tt); return pipeline; @@ -80,6 +136,9 @@ pub fn df2vayu(plan: Arc, store: &mut Store, pipeline_id: i32 if let Some(exec) = p.downcast_ref::() { return df2vayu(exec.input().clone(), store, pipeline_id); } + if let Some(exec) = p.downcast_ref::() { + return df2vayu(exec.input().clone(), store, pipeline_id); + } panic!("should never reach the end"); } @@ -128,6 +187,15 @@ pub fn get_source_node(plan: Arc) -> Arc { if let Some(_) = p.downcast_ref::() { return plan; } + if let Some(_) = p.downcast_ref::() { + return plan; + } + if let Some(exec) = p.downcast_ref::() { + return get_source_node(exec.input().clone()); + } + if let Some(exec) = p.downcast_ref::() { + return get_source_node(exec.input().clone()); + } if let Some(exec) = p.downcast_ref::() { return get_source_node(exec.input().clone()); } @@ -140,5 +208,19 @@ pub fn get_source_node(plan: Arc) -> Arc { if let Some(exec) = p.downcast_ref::() { return get_source_node(exec.input().clone()); } - panic!("No join node found"); + panic!("No source node found"); +} + +pub fn aggregate(exec: Arc) -> AggregateOperator { + let p = exec.as_any(); + let final_aggregate = if let Some(exec) = p.downcast_ref::() { + if !exec.group_by().expr().is_empty() { + panic!("group by present- not handled"); + } + let tt = AggregateOperator::new(exec); + tt + } else { + panic!("not an aggregate"); + }; + final_aggregate } diff --git a/vayu/src/dummy.rs b/vayu/src/dummy.rs new file mode 100644 index 0000000..7b686bf --- /dev/null +++ b/vayu/src/dummy.rs @@ -0,0 +1,109 @@ +use datafusion::physical_plan::ExecutionPlan; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::{any::Any, vec}; + +use arrow::array::{ArrayRef, UInt64Builder}; +use arrow::datatypes::Schema; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion::common::{arrow_datafusion_err, not_impl_err, DataFusionError, Result}; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::metrics::MetricsSet; +use datafusion::physical_plan::DisplayAs; +use datafusion::physical_plan::DisplayFormatType; +use datafusion::physical_plan::PlanProperties; +use datafusion::physical_plan::RecordBatchStream; +use datafusion::physical_plan::SendableRecordBatchStream; +use datafusion::physical_plan::Statistics; +use futures::stream::Stream; + +use futures::{FutureExt, StreamExt}; +#[derive(Debug)] +pub struct DummyExec { + cache: PlanProperties, + statistics: Statistics, + schema: Arc, +} + +impl DummyExec { + pub fn new(cache: PlanProperties, statistics: Statistics, schema: Arc) -> Self { + DummyExec { + cache, + statistics, + schema, + } + } +} +impl ExecutionPlan for DummyExec { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + Ok(self) + } + + fn benefits_from_input_partitioning(&self) -> Vec { + vec![] + } + + fn maintains_input_order(&self) -> Vec { + vec![] + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let stream = DummyStream { + schema: self.schema.clone(), + }; + Ok(Box::pin(stream)) + } + + fn metrics(&self) -> Option { + None + } + + fn statistics(&self) -> Result { + Ok(self.statistics.clone()) + } +} + +impl DisplayAs for DummyExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "dummmyyy")?; + Ok(()) + } +} +struct DummyStream { + schema: Arc, +} +impl Stream for DummyStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + return Poll::Pending; + } +} + +impl RecordBatchStream for DummyStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} diff --git a/vayu/src/lib.rs b/vayu/src/lib.rs index 0fa494d..6195555 100644 --- a/vayu/src/lib.rs +++ b/vayu/src/lib.rs @@ -1,28 +1,103 @@ use arrow::array::RecordBatch; use arrow::util::pretty; use vayu_common::DatafusionPipelineWithData; +use vayu_common::IntermediateOperator; use vayu_common::VayuPipeline; pub mod operators; +use crate::operators::aggregate::AggregateOperator; +use datafusion::physical_plan::coalesce_batches::concat_batches; use std::sync::{Arc, Mutex}; - pub mod sinks; use vayu_common::store::Store; pub mod df2vayu; +mod dummy; pub struct VayuExecutionEngine { // this is per node store - pub store: Store, + // pub store: Store, // this is global store pub global_store: Arc>, // Note: only one of them will survive lets see which } +fn get_batches_from_record_batch_blob(vblob: Vec) -> Vec { + let mut batches = vec![]; + for val in vblob { + match val { + vayu_common::store::Blob::RecordBatchBlob(batch) => { + batches.push(batch); + } + vayu_common::store::Blob::HashMapBlob(_) => { + panic!("not done") + } + } + } + batches +} +fn get_record_batch_blob_from_batches(batch: Vec) -> Vec { + let mut vblob = vec![]; + for val in batch { + vblob.push(vayu_common::store::Blob::RecordBatchBlob(val)); + } + vblob +} impl VayuExecutionEngine { pub fn new(global_store: Arc>) -> VayuExecutionEngine { VayuExecutionEngine { - store: Store::new(), + // store: Store::new(), global_store, } } + pub fn finalize(&mut self, sink: vayu_common::FinalizeSinkType) { + println!("running finalize"); + + match sink { + vayu_common::FinalizeSinkType::PrintFromStore(uuid) => { + println!("running print from store {uuid}"); + let mut store = self.global_store.lock().unwrap(); + println!("{:?}", store.store.keys()); + + let blob = store.remove(uuid); + + drop(store); + let result = blob.unwrap(); + let batches = get_batches_from_record_batch_blob(result); + // pretty::print_batches(&batches).unwrap(); + } + vayu_common::FinalizeSinkType::FinalAggregate(plan, uuid) => { + println!("running FinalAggregate from store {uuid}"); + let mut store = self.global_store.lock().unwrap(); + println!("{:?}", store.store.keys()); + + let blob = store.remove(uuid); + + drop(store); + let result = blob.unwrap(); + let batches = get_batches_from_record_batch_blob(result); + + let mut operator = df2vayu::aggregate(plan); + let batch = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap(); + + let result = operator.execute(&batch).unwrap(); + pretty::print_batches(&[result.clone()]).unwrap(); + } + vayu_common::FinalizeSinkType::BuildAndStoreHashMap(uuid, join_node) => { + let mut sink = sinks::HashMapSink::new(uuid, join_node); + + let mut store = self.global_store.lock().unwrap(); + println!("{:?}", store.store.keys()); + + let blob = store.remove(uuid).unwrap(); + drop(store); + let batches = get_batches_from_record_batch_blob(blob); + + let hashmap = sink.build_map(batches); + let mut store = self.global_store.lock().unwrap(); + store.insert(uuid, hashmap.unwrap()); + drop(store); + println!("storing the map {uuid}"); + } + } + } pub fn sink(&mut self, sink: vayu_common::SchedulerSinkType, result: Vec) { println!( "runningsink size {}x{}", @@ -33,35 +108,33 @@ impl VayuExecutionEngine { vayu_common::SchedulerSinkType::PrintOutput => { pretty::print_batches(&result).unwrap(); } - // vayu_common::SchedulerSinkType::StoreRecordBatch(uuid) => { - // self.store.append(uuid, result); - // } - vayu_common::SchedulerSinkType::BuildAndStoreHashMap(uuid, join_node) => { - let mut sink = sinks::HashMapSink::new(uuid, join_node); - let hashmap = sink.build_map(result); - println!("BuildAndStoreHashMap storing in uuid {uuid}"); - - // old store - // self.store.insert(uuid, hashmap.unwrap()); - // new store - let mut map = self.global_store.lock().unwrap(); - map.insert(uuid, hashmap.unwrap()); + vayu_common::SchedulerSinkType::StoreRecordBatch(uuid) => { + println!("storing at store {uuid}"); + let mut store = self.global_store.lock().unwrap(); + let t = get_record_batch_blob_from_batches(result); + store.append(uuid, t); + + println!("{:?}", store.store.keys()); + drop(store); } }; } pub fn execute(&mut self, pipeline: DatafusionPipelineWithData) { let data = pipeline.data; - let sink = pipeline.pipeline.sink; + + let pipeline = pipeline.pipeline; + + let sink = pipeline.sink; let mut store = self.global_store.lock().unwrap(); - let mut pipeline: VayuPipeline = - df2vayu::df2vayu(pipeline.pipeline.plan, &mut store, pipeline.pipeline.id); + let mut pipeline: VayuPipeline = df2vayu::df2vayu(pipeline.plan, &mut store, pipeline.id); drop(store); pipeline.sink = sink; self.execute_internal(pipeline, data); } + pub fn execute_internal(&mut self, mut pipeline: VayuPipeline, mut data: RecordBatch) { println!("operators size {}", pipeline.operators.len()); for x in &mut pipeline.operators { diff --git a/vayu/src/operators/aggregate.rs b/vayu/src/operators/aggregate.rs new file mode 100644 index 0000000..3c27e25 --- /dev/null +++ b/vayu/src/operators/aggregate.rs @@ -0,0 +1,79 @@ +use arrow::datatypes::SchemaRef; +use datafusion::arrow::array::RecordBatch; +use datafusion::error::Result; +use datafusion::physical_plan::aggregates::aggregate_expressions; +use datafusion::physical_plan::aggregates::create_accumulators; +use datafusion::physical_plan::aggregates::finalize_aggregation; +use datafusion::physical_plan::aggregates::no_grouping::aggregate_batch; +use datafusion::physical_plan::aggregates::no_grouping::AggregateStream; +use datafusion::physical_plan::aggregates::AccumulatorItem; +use datafusion::physical_plan::aggregates::AggregateExec; +use datafusion::physical_plan::aggregates::AggregateMode; +use datafusion::physical_plan::aggregates::StreamType; +use datafusion::physical_plan::metrics::BaselineMetrics; +use datafusion::physical_plan::projection::batch_project; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::PhysicalExpr; +use std::sync::Arc; +use vayu_common::{IntermediateOperator, PhysicalOperator}; +pub struct AggregateOperator { + schema: SchemaRef, + mode: AggregateMode, + aggregate_expressions: Vec>>, + filter_expressions: Vec>>, + accumulators: Vec, +} +impl AggregateOperator { + pub fn new(agg: &AggregateExec) -> Self { + let agg_filter_expr = agg.filter_expr.clone(); + let new_mode = *agg.mode(); + println!("mode:{:?}", new_mode); + + let aggregate_expressions = + aggregate_expressions(&agg.aggr_expr(), &agg.mode(), 0).unwrap(); + let filter_expressions = match new_mode { + AggregateMode::Partial | AggregateMode::Single | AggregateMode::SinglePartitioned => { + agg_filter_expr + } + AggregateMode::Final | AggregateMode::FinalPartitioned => { + vec![None; agg.aggr_expr().len()] + } + }; + let accumulators = create_accumulators(&agg.aggr_expr).unwrap(); + AggregateOperator { + schema: Arc::clone(&agg.schema()), + mode: new_mode, + aggregate_expressions, + filter_expressions, + accumulators, + } + } +} + +impl IntermediateOperator for AggregateOperator { + fn execute(&mut self, input: &RecordBatch) -> Result { + let result = aggregate_batch( + &self.mode, + input.clone(), + &mut self.accumulators, + &self.aggregate_expressions, + &self.filter_expressions, + ); + + // only finally + let result = finalize_aggregation(&mut self.accumulators, &self.mode); + // println!("{:?}", result); + // println!("{:?}", self.schema); + println!("mode {:?}", self.mode); + let result = result.and_then(|columns| { + RecordBatch::try_new(self.schema.clone(), columns).map_err(Into::into) + }); + result + } +} + +impl PhysicalOperator for AggregateOperator { + fn name(&self) -> String { + String::from("aggregate") + } +} diff --git a/vayu/src/operators/mod.rs b/vayu/src/operators/mod.rs index 1155748..3c37eb4 100644 --- a/vayu/src/operators/mod.rs +++ b/vayu/src/operators/mod.rs @@ -1,3 +1,4 @@ +pub mod aggregate; pub mod filter; pub mod join; pub mod projection; diff --git a/vayu/src/sinks.rs b/vayu/src/sinks.rs index 0965460..a651241 100644 --- a/vayu/src/sinks.rs +++ b/vayu/src/sinks.rs @@ -34,12 +34,12 @@ impl HashMapSink { } } pub fn build_map(&mut self, result: Vec) -> Option { - let random_state = RandomState::with_seeds(0, 0, 0, 0); + let random_state: RandomState = RandomState::with_seeds(0, 0, 0, 0); let ctx: SessionContext = SessionContext::new(); let reservation = MemoryConsumer::new("HashJoinInput").register(ctx.task_ctx().memory_pool()); - let hash_map = hash_join::create_hash_build_map( + let hash_map: hash_join::JoinLeftData = hash_join::create_hash_build_map( result, random_state, self.on_left.clone(), diff --git a/vayuDB/Cargo.toml b/vayuDB/Cargo.toml index e09f041..4ba413e 100644 --- a/vayuDB/Cargo.toml +++ b/vayuDB/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] vayu = { version = "0.1.0", path = "../vayu"} datafusion = { version = "36.0.0", path = "../arrow-datafusion/datafusion/core"} +datafusion-benchmarks = { version = "36.0.0", path = "../arrow-datafusion/benchmarks"} vayu-common = { version = "0.1.0", path = "../vayu-common"} futures = { version = "0.3.28" } heapless = "0.8.0" @@ -16,3 +17,4 @@ crossbeam-skiplist = "0.1.3" crossbeam-utils = "0.8.19" leapfrog = {version = "0.3.0", features = ["stable_alloc"]} lockfree = "0.5.1" + diff --git a/vayuDB/README.md b/vayuDB/README.md index dab2c4f..4657287 100644 --- a/vayuDB/README.md +++ b/vayuDB/README.md @@ -48,7 +48,7 @@ Scheduler will keep on sending the tasks. ## Common Vayu Structures ``` -pub struct DatafusionPipelineWithSource { +pub struct SchedulerPipeline { pub source: Arc, pub plan: Arc, pub sink: SchedulerSinkType, @@ -60,7 +60,7 @@ pub struct DatafusionPipelineWithSource { 1. Scheduler ``` pub fn new() -> Self -pub fn get_pipeline(&mut self) -> Poll +pub fn get_pipeline(&mut self) -> Poll pub fn ack_pipeline(&mut self, pipeline_id: i32); ``` diff --git a/vayuDB/src/dummy_tasks.rs b/vayuDB/src/dummy_tasks.rs index cb51013..286ad8b 100644 --- a/vayuDB/src/dummy_tasks.rs +++ b/vayuDB/src/dummy_tasks.rs @@ -5,92 +5,103 @@ use datafusion::physical_plan::joins::HashJoinExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::CsvReadOptions; use datafusion::prelude::SessionContext; +use futures::io::Sink; +use std::path::Path; use std::sync::Arc; use vayu::df2vayu; use vayu::operators::join; -use vayu_common::DatafusionPipelineWithSource; +use vayu_common::DatafusionPipeline; +use vayu_common::SchedulerPipeline; use vayu_common::Task; -pub async fn test_filter_project() -> Result { - // create local execution context - let ctx: SessionContext = SessionContext::new(); - // register csv file with the execution context +// pub async fn test_filter_project_aggregate() -> Result { +// // create local execution context +// let ctx: SessionContext = SessionContext::new(); +// // register csv file with the execution context - ctx.register_csv( - "aggregate_test_100", - &format!("./testing/data/csv/aggregate_test_100.csv"), - CsvReadOptions::new(), - ) - .await?; - let sql = "SELECT c1,c3 as neg,c4 as pos,c13 FROM aggregate_test_100 WHERE (c3 < 0 AND c1='a') OR ( c4 > 0 AND c1='b' ) "; - let plan = get_execution_plan_from_sql(&ctx, sql).await?; - let source = df2vayu::get_source_node(plan.clone()); - let mut task = Task::new(); +// ctx.register_csv( +// "aggregate_test_100", +// &format!("./testing/data/csv/aggregate_test_100.csv"), +// CsvReadOptions::new(), +// ) +// .await?; +// let sql = "SELECT count(c1),sum(c3),sum(c4),count(c13) FROM aggregate_test_100 WHERE (c3 < 0 AND c1='a') OR ( c4 > 0 AND c1='b' ) "; +// let plan = get_execution_plan_from_sql(&ctx, sql).await?; +// let source = df2vayu::get_source_node(plan.clone()); +// let mut task = Task::new(); - let pipeline = DatafusionPipelineWithSource { - source, - plan, - sink: Some(vayu_common::SchedulerSinkType::PrintOutput), - }; - task.add_pipeline(pipeline); +// let pipeline = DatafusionPipeline { +// plan, +// sink: Some(vayu_common::SchedulerSinkType::StoreRecordBatch(1)), +// id: 1, +// }; +// let finalize = Sink{ - return Ok(task); -} +// } +// let pipeline = SchedulerPipeline { source, pipeline }; +// task.add_pipeline(pipeline); -pub async fn test_hash_join() -> Result { - let ctx: SessionContext = SessionContext::new(); - // register csv file with the execution context - ctx.register_csv( - "a", - &format!("./testing/data/csv/join_test_A.csv"), - CsvReadOptions::new(), - ) - .await?; - ctx.register_csv( - "b", - &format!("./testing/data/csv/join_test_B.csv"), - CsvReadOptions::new(), - ) - .await?; +// return Ok(task); +// } - // get execution plan from th sql query - let sql = "SELECT * FROM a,b WHERE a.a1 = b.b1 "; - let plan = get_execution_plan_from_sql(&ctx, sql).await?; - let mut task = Task::new(); +// pub async fn test_hash_join() -> Result { +// let ctx: SessionContext = SessionContext::new(); +// // register csv file with the execution context +// ctx.register_csv( +// "a", +// &format!("./testing/data/csv/join_test_A.csv"), +// CsvReadOptions::new(), +// ) +// .await?; +// ctx.register_csv( +// "b", +// &format!("./testing/data/csv/join_test_B.csv"), +// CsvReadOptions::new(), +// ) +// .await?; - let uuid = 42; - let (join_node, build_plan) = df2vayu::get_hash_build_pipeline(plan.clone(), uuid); +// // get execution plan from th sql query +// let sql = "SELECT * FROM a,b WHERE a.a1 = b.b1 "; +// let plan = get_execution_plan_from_sql(&ctx, sql).await?; +// let mut task = Task::new(); - let build_source_pipeline = df2vayu::get_source_node(build_plan.clone()); - let sink = vayu_common::SchedulerSinkType::BuildAndStoreHashMap(uuid, join_node); - let build_pipeline = DatafusionPipelineWithSource { - source: build_source_pipeline, - plan: build_plan, - sink: Some(sink), - }; - task.add_pipeline(build_pipeline); - // TODO: set this uuid in probe also - let probe_plan = plan.clone(); - let probe_source_node = df2vayu::get_source_node(probe_plan.clone()); +// let uuid = 42; +// let (join_node, build_plan) = df2vayu::get_hash_build_pipeline(plan.clone(), uuid); - let probe_pipeline = DatafusionPipelineWithSource { - source: probe_source_node, - plan: probe_plan, - sink: Some(vayu_common::SchedulerSinkType::PrintOutput), - }; - task.add_pipeline(probe_pipeline); +// let build_source_pipeline = df2vayu::get_source_node(build_plan.clone()); +// let sink = vayu_common::SchedulerSinkType::BuildAndStoreHashMap(uuid, join_node); +// let build_pipeline = SchedulerPipeline { +// source: build_source_pipeline, +// plan: build_plan, +// sink: Some(sink), +// }; +// task.add_pipeline(build_pipeline); +// // TODO: set this uuid in probe also +// let probe_plan = plan.clone(); +// let probe_source_node = df2vayu::get_source_node(probe_plan.clone()); - Ok(task) -} +// let probe_pipeline = SchedulerPipeline { +// source: probe_source_node, +// plan: probe_plan, +// sink: Some(vayu_common::SchedulerSinkType::PrintOutput), +// }; +// task.add_pipeline(probe_pipeline); + +// Ok(task) +// } pub async fn get_execution_plan_from_sql( ctx: &SessionContext, sql: &str, ) -> Result> { // create datafusion logical plan - let logical_plan = SessionState::create_logical_plan(&ctx.state(), sql).await?; + let logical_plan = SessionState::create_logical_plan(&ctx.state(), sql) + .await + .unwrap(); // create datafusion physical plan - let plan = SessionState::create_physical_plan(&ctx.state(), &logical_plan).await?; + let plan = SessionState::create_physical_plan(&ctx.state(), &logical_plan) + .await + .unwrap(); // print datafusion physical plan // println!( // "Detailed physical plan:\n{}", diff --git a/vayuDB/src/io_service.rs b/vayuDB/src/io_service.rs index e03db11..92c7e1e 100644 --- a/vayuDB/src/io_service.rs +++ b/vayuDB/src/io_service.rs @@ -17,21 +17,35 @@ impl IOService { } } pub fn submit_request(&mut self, source: Arc) -> i32 { - let context = SessionContext::new().task_ctx(); + println!("submit_request"); + let context: Arc = SessionContext::new().task_ctx(); let stream = source.execute(0, context).unwrap(); self.stream = Some(stream); self.uuid = 1; self.uuid } - pub fn poll_response(&mut self) -> Poll<(i32, RecordBatch)> { + pub fn poll_response(&mut self) -> Poll<(i32, Option)> { + // println!("poll_response"); + if self.stream.is_none() { return Poll::Pending; } - let stream = self.stream.take(); + println!("poll_response2"); + // let stream = self.stream.take(); + + let data = futures::executor::block_on(self.stream.as_mut().unwrap().next()); - let data = futures::executor::block_on(stream.unwrap().next()) - .unwrap() - .unwrap(); - Poll::Ready((self.uuid, data)) + match data { + Some(data) => { + let data = data.unwrap(); + println!("DATA in poll response size {}", data.num_rows()); + Poll::Ready((self.uuid, Some(data))) + } + None => { + println!("setting stream to null because got no data from this stream"); + self.stream = None; + Poll::Ready((self.uuid, None)) + } + } } } diff --git a/vayuDB/src/main.rs b/vayuDB/src/main.rs index 6dac326..1e4f053 100644 --- a/vayuDB/src/main.rs +++ b/vayuDB/src/main.rs @@ -1,30 +1,47 @@ use crossbeam_channel::{bounded, Receiver, Sender}; +use datafusion::arrow::array::ArrowNativeTypeOp; use std::collections::HashMap; use std::task::Poll; use std::thread; +use vayu_common::SchedulerPipeline; use vayu_common::{DatafusionPipeline, DatafusionPipelineWithData}; mod dummy_tasks; mod io_service; mod scheduler; +mod tpch_tasks; use std::collections::LinkedList; use std::sync::Arc; use std::sync::Mutex; use vayu_common; +use vayu_common::VayuMessage; use vayu_common::store::Store; + +use crate::scheduler::Scheduler; fn start_worker( - receiver: Receiver, + receiver: Receiver, sender: Sender<(usize, i32)>, global_store: Arc>, thread_id: usize, ) { let mut executor = vayu::VayuExecutionEngine::new(global_store); // Receive structs sent over the channel - while let Ok(pipeline) = receiver.recv() { - let pipeline_id = pipeline.pipeline.id; - println!("{thread_id}:got a pipeline for the thread, executing ..."); - executor.execute(pipeline); - sender.send((thread_id, pipeline_id)).unwrap(); + while let Ok(message) = receiver.recv() { + match message { + VayuMessage::Normal(pipeline) => { + let pipeline_id = pipeline.pipeline.id; + println!("{thread_id}:got a pipeline for the thread, executing ..."); + executor.execute(pipeline); + println!("{thread_id}:done executing ..."); + sender.send((thread_id, pipeline_id)).unwrap(); + } + VayuMessage::Finalize((sink, pipeline_id)) => { + println!("{thread_id}:got a finalize pipeline for the thread, executing ..."); + executor.finalize(sink); + println!("{thread_id}:done executing ..."); + sender.send((thread_id, pipeline_id)).unwrap(); + } + } } } @@ -47,7 +64,7 @@ fn main() { bounded(0); // vector to store main_thread->worker channels - let mut senders: Vec> = Vec::new(); + let mut senders: Vec> = Vec::new(); let mut free_threads: LinkedList = LinkedList::new(); for thread_num in 0..num_threads { @@ -80,63 +97,125 @@ fn main() { let mut io_service = io_service::IOService::new(); // TODO: create task_queue - buffer tasks - let mut request_pipeline_map: HashMap = HashMap::new(); + let mut request_pipeline_map: HashMap = HashMap::new(); + let mut completed_pipeline_list: LinkedList = LinkedList::new(); // right now a pipeline would be assigned to a worker only when it is free // but we will poll some extra pipelines from the scheduler and send it to the io service // so that we can start working on it once any worker is free let mut next_id = 0; - loop { - // poll scheduler for a new task - let pipeline = scheduler.get_pipeline(next_id); - if let Poll::Ready(pipeline) = pipeline { - // TODO: add support for multiple dependent pipeline - println!("got a pipeline from scheduler"); - - // submit the source request to io service - let request_num = io_service.submit_request(pipeline.source); - println!("sent the request to the io_service"); - - // insert the pipeline into the local map - request_pipeline_map.insert( - request_num, - DatafusionPipeline { - plan: pipeline.plan, - sink: pipeline.sink, - id: next_id, - }, - ); - next_id += 1; - } - if let Ok((thread_id, finished_pipeline_id)) = informer_receiver.try_recv() { - if finished_pipeline_id != -1 { - scheduler.ack_pipeline(finished_pipeline_id); + // poll scheduler for a new task + let pipeline = scheduler.get_pipeline(next_id); + if let Poll::Ready(mut pipeline) = pipeline { + // TODO: add support for multiple dependent pipeline + println!("got a pipeline from scheduler"); + + let source = pipeline.source.take().unwrap(); + // submit the source request to io service + let request_num = io_service.submit_request(source); + println!("sent the request to the io_service"); + + // insert the pipeline into the local map + request_pipeline_map.insert(request_num, (pipeline, 0)); + next_id += 1; + } + + loop { + if let Ok((thread_id, request_num)) = informer_receiver.try_recv() { + println!("got ack from thread {}", thread_id); + if request_num != -1 { + let pipeline = request_pipeline_map.remove(&request_num); + match pipeline { + Some((pipeline, processing_count)) => { + let processing_count = processing_count - 1; + println!("current processing count is {processing_count}"); + if processing_count == 0 { + completed_pipeline_list.push_back(pipeline); + } else { + request_pipeline_map.insert(request_num, (pipeline, processing_count)); + } + } + None => { + println!("inform scheduler we are done"); + // let pipeline = scheduler.get_pipeline(next_id); + // if let Poll::Ready(mut pipeline) = pipeline { + // // TODO: add support for multiple dependent pipeline + // println!("got a pipeline from scheduler"); + + // let source = pipeline.source.take().unwrap(); + // // submit the source request to io service + // let request_num = io_service.submit_request(source); + // println!("sent the request to the io_service"); + + // // insert the pipeline into the local map + // request_pipeline_map.insert(request_num, (pipeline, 0)); + // next_id += 1; + // } + + // scheduler.ack_pipeline(request_num); + } + } } + // add in the queue free_threads.push_back(thread_id); } - if let Some(&thread_id) = free_threads.front() { - // poll io_service for a response - let response = io_service.poll_response(); - if let Poll::Ready((request_num, data)) = response { - free_threads.pop_front(); + if free_threads.len() == 0 { + continue; + } + // check from finalize queue + if !completed_pipeline_list.is_empty() { + println!("removing item from completed list"); + + let thread_id = free_threads.pop_front().unwrap(); + // pipeline. + //data is finished + let pipeline = completed_pipeline_list.pop_front().unwrap(); + let msg = VayuMessage::Finalize((pipeline.finalize, pipeline.pipeline.id)); + senders[thread_id].send(msg).expect("Failed to send struct"); + println!("finalize:sent the pipeline and the data to the worker"); + continue; + } + // println!("free thread available"); + // poll io_service for a response + let response = io_service.poll_response(); + if let Poll::Ready((request_num, data)) = response { + if data.is_none() { + let mv = request_pipeline_map.get(&request_num); + + assert!(mv.is_some()); + let (_, processing_count) = mv.unwrap(); + println!("no more data left. processing count is {processing_count}"); + if processing_count.is_zero() { + let pipeline = request_pipeline_map.remove(&request_num); + assert!(pipeline.is_some()); + let (pipeline, _) = pipeline.unwrap(); + completed_pipeline_list.push_back(pipeline); + } + } else { + let data = data.unwrap(); + let thread_id = free_threads.pop_front().unwrap(); println!("got a response from the io_service"); - // TODO: handle when a source gives multiple record batches // get the pipeline from the local map let pipeline = request_pipeline_map.remove(&request_num); + assert!(pipeline.is_some()); - let pipeline = pipeline.unwrap(); + let (pipeline, processing_count) = pipeline.unwrap(); + + request_pipeline_map.insert(request_num, (pipeline.clone(), processing_count + 1)); // send over channel - let msg = DatafusionPipelineWithData { pipeline, data }; + let msg = VayuMessage::Normal(DatafusionPipelineWithData { + pipeline: pipeline.pipeline, + data, + }); senders[thread_id].send(msg).expect("Failed to send struct"); println!("sent the pipeline and the data to the worker"); - - // assign the next pipeline to some other worker - // worker_id = round_robin(worker_id, num_threads); } + // assign the next pipeline to some other worker + // worker_id = round_robin(worker_id, num_threads); } } } diff --git a/vayuDB/src/scheduler.rs b/vayuDB/src/scheduler.rs index 8343eb9..2fb2b9a 100644 --- a/vayuDB/src/scheduler.rs +++ b/vayuDB/src/scheduler.rs @@ -1,6 +1,8 @@ -use crate::dummy_tasks::{test_filter_project, test_hash_join}; +// use crate::dummy_tasks::test_hash_join; +use crate::tpch_tasks::{test_tpchq1, test_tpchq2}; +use datafusion_benchmarks::tpch; use std::{hash::Hash, task::Poll}; -use vayu_common::DatafusionPipelineWithSource; +use vayu_common::SchedulerPipeline; #[derive(PartialEq)] enum HashJoinState { CanSendBuild, @@ -12,7 +14,7 @@ pub struct Scheduler { turn: usize, // stored_id: i32, state: HashJoinState, - probe_pipeline: Option, + probe_pipeline: Option, } impl Scheduler { @@ -23,26 +25,46 @@ impl Scheduler { probe_pipeline: None, } } - pub fn get_pipeline(&mut self, id: i32) -> Poll { + + pub fn get_pipeline(&mut self, id: i32) -> Poll { + let mut task = futures::executor::block_on(test_tpchq1()).unwrap(); + let pipeline = task.pipelines.remove(0); + return Poll::Ready(pipeline); + + // if self.probe_pipeline.is_none() { + // let mut task = futures::executor::block_on(test_tpchq2()).unwrap(); + // let pipeline = task.pipelines.remove(0); + // self.probe_pipeline = Some(task.pipelines.remove(0)); + // self.state = HashJoinState::BuildSent(id); + // return Poll::Ready(pipeline); + // } else { + // let pipeline = self.probe_pipeline.take(); + // return Poll::Ready(pipeline.unwrap()); + // } + + // let mut task = futures::executor::block_on(test_filter_project_aggregate()).unwrap(); + // let pipeline = task.pipelines.remove(0); + // return Poll::Ready(pipeline); + self.turn = 1 - self.turn; - if self.turn == 0 && self.state == HashJoinState::CanSendBuild { - let mut task = futures::executor::block_on(test_hash_join()).unwrap(); - self.probe_pipeline = Some(task.pipelines.remove(1)); - let build_pipeline = task.pipelines.remove(0); + // if self.turn == 0 && self.state == HashJoinState::CanSendBuild { + // let mut task = futures::executor::block_on(test_hash_join()).unwrap(); + // self.probe_pipeline = Some(task.pipelines.remove(1)); + // let build_pipeline = task.pipelines.remove(0); - self.state = HashJoinState::BuildSent(id); - return Poll::Ready(build_pipeline); - } else if self.turn == 0 && self.state == HashJoinState::CanSendProbe { - self.state = HashJoinState::ProbeSent(id); - assert!(self.probe_pipeline.is_some()); - let probe_pipeline = self.probe_pipeline.take().unwrap(); - return Poll::Ready(probe_pipeline); - } else { - let mut task = futures::executor::block_on(test_filter_project()).unwrap(); - let pipeline = task.pipelines.remove(0); - return Poll::Ready(pipeline); - // return Poll::Pending; - } + // self.state = HashJoinState::BuildSent(id); + // return Poll::Ready(build_pipeline); + // } else if self.turn == 0 && self.state == HashJoinState::CanSendProbe { + // self.state = HashJoinState::ProbeSent(id); + // assert!(self.probe_pipeline.is_some()); + // let probe_pipeline = self.probe_pipeline.take().unwrap(); + // return Poll::Ready(probe_pipeline); + // } else { + // let mut task = futures::executor::block_on(test_filter_project_aggregate()).unwrap(); + // let pipeline = task.pipelines.remove(0); + // return Poll::Ready(pipeline); + // // return Poll::Pending; + // } } pub fn ack_pipeline(&mut self, ack_id: i32) { match self.state { diff --git a/vayuDB/src/tpch_tasks.rs b/vayuDB/src/tpch_tasks.rs new file mode 100644 index 0000000..7c3164a --- /dev/null +++ b/vayuDB/src/tpch_tasks.rs @@ -0,0 +1,167 @@ +use crate::dummy_tasks::get_execution_plan_from_sql; +use datafusion::common::exec_err; +use datafusion::common::plan_err; +use datafusion::common::Result; +use datafusion::physical_plan::displayable; +use datafusion::prelude::SessionContext; +use datafusion_benchmarks::tpch::RunOpt; +use datafusion_benchmarks::CommonOpt; +use std::fs; +use std::path::Path; +use std::path::PathBuf; +use std::process::exit; +use vayu::df2vayu; +use vayu::operators::aggregate::AggregateOperator; +use vayu_common::DatafusionPipeline; +use vayu_common::SchedulerPipeline; +use vayu_common::SchedulerSinkType; +use vayu_common::Task; +fn get_tpch_data_path() -> Result { + let path = std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string()); + if !Path::new(&path).exists() { + return exec_err!( + "Benchmark data not found (set TPCH_DATA env var to override): {}", + path + ); + } + Ok(path) +} +// export TPCH_DATA=/Users/kothari/Desktop/course/15721/15721-s24-ee2/arrow-datafusion/benchmarks/data/tpch_sf1 + +/// Get the SQL statements from the specified query file +pub fn get_query_sql(query: usize) -> Result> { + if query > 0 && query < 23 { + let possibilities = vec![ + format!("queries/q{query}.sql"), + format!("/Users/kothari/Desktop/course/15721/15721-s24-ee2/arrow-datafusion/benchmarks/queries/q{query}.sql"), + ]; + let mut errors = vec![]; + for filename in possibilities { + match fs::read_to_string(&filename) { + Ok(contents) => { + return Ok(contents + .split(';') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .map(|s| s.to_string()) + .collect()); + } + Err(e) => errors.push(format!("{filename}: {e}")), + }; + } + plan_err!("invalid query. Could not find query: {:?}", errors) + } else { + plan_err!("invalid query. Expected value between 1 and 22") + } +} + +pub async fn tpch_common() -> Result { + let ctx = SessionContext::default(); + let path = get_tpch_data_path()?; + let common = CommonOpt { + iterations: 1, + partitions: Some(2), + batch_size: 81920, + debug: false, + }; + let opt = RunOpt { + query: Some(1), + common, + path: PathBuf::from(path.to_string()), + file_format: "parquet".to_string(), + mem_table: false, + output_path: None, + disable_statistics: false, + }; + opt.register_tables(&ctx).await.unwrap(); + return Ok(ctx); +} +pub async fn test_tpchq1() -> Result { + // this is aggregate + let ctx = tpch_common().await.unwrap(); + let queries = get_query_sql(1).unwrap(); + // println!("{:?}", queries); + let sql = queries.get(0).unwrap(); + + let plan = get_execution_plan_from_sql(&ctx, sql).await.unwrap(); + let final_aggregate = plan.clone(); + + let plan = plan.children().get(0).unwrap().clone(); + println!( + "=== Physical plan ===\n{}\n", + displayable(plan.as_ref()).indent(true) + ); + let source = Some(df2vayu::get_source_node(plan.clone())); + + let mut task = Task::new(); + + let uuid = 55; + let pipeline = DatafusionPipeline { + plan, + sink: Some(SchedulerSinkType::StoreRecordBatch(uuid)), + id: 1, + }; + + let finalize = vayu_common::FinalizeSinkType::FinalAggregate(final_aggregate, uuid); + let pipeline = SchedulerPipeline { + source, + pipeline, + finalize, + }; + + task.add_pipeline(pipeline); + + return Ok(task); +} + +pub async fn test_tpchq2() -> Result { + // this is join + let ctx = tpch_common().await.unwrap(); + let queries = get_query_sql(2).unwrap(); + // println!("{:?}", queries); + let sql = queries.get(0).unwrap(); + + let plan = get_execution_plan_from_sql(&ctx, sql).await.unwrap(); + + let plan = plan.children().get(0).unwrap().clone(); + println!( + "=== Physical plan ===\n{}\n", + displayable(plan.as_ref()).indent(true) + ); + + let uuid = 42; + let mut task = Task::new(); + + let (join_node, build_plan) = df2vayu::get_hash_build_pipeline(plan.clone(), uuid); + + let build_source_pipeline = Some(df2vayu::get_source_node(build_plan.clone())); + let build_pipeline = DatafusionPipeline { + plan: build_plan, + sink: Some(SchedulerSinkType::StoreRecordBatch(uuid)), + id: 1, + }; + let build_pipeline = SchedulerPipeline { + source: build_source_pipeline, + pipeline: build_pipeline, + finalize: vayu_common::FinalizeSinkType::BuildAndStoreHashMap(uuid, join_node), + }; + task.add_pipeline(build_pipeline); + let uuid2 = 98; + // TODO: set this uuid in probe also + let probe_plan = plan.clone(); + let probe_source_node = Some(df2vayu::get_source_node(probe_plan.clone())); + let probe_pipeline = DatafusionPipeline { + plan: probe_plan, + sink: Some(SchedulerSinkType::StoreRecordBatch(uuid2)), + id: 1, + }; + + let probe_pipeline = SchedulerPipeline { + source: probe_source_node, + pipeline: probe_pipeline, + finalize: vayu_common::FinalizeSinkType::PrintFromStore(uuid2), + }; + task.add_pipeline(probe_pipeline); + + return Ok(task); +}