diff --git a/Cargo.toml b/Cargo.toml index 564be59..aa7982a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,3 +53,4 @@ tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d82343252 tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" } parquet = "55.2.0" arrow = "55.2.0" +tokio-stream = "0.1.17" diff --git a/src/errors/mod.rs b/src/errors/mod.rs index e2de185..e310756 100644 --- a/src/errors/mod.rs +++ b/src/errors/mod.rs @@ -1,3 +1,5 @@ +#![allow(clippy::upper_case_acronyms, clippy::vec_box)] + use crate::errors::datafusion_error::DataFusionErrorProto; use datafusion::common::internal_datafusion_err; use datafusion::error::DataFusionError; diff --git a/src/flight_service/mod.rs b/src/flight_service/mod.rs index 4ae7b59..e3b53c8 100644 --- a/src/flight_service/mod.rs +++ b/src/flight_service/mod.rs @@ -1,7 +1,6 @@ mod do_get; mod service; mod session_builder; -mod stream_partitioner_registry; pub(crate) use do_get::DoGet; diff --git a/src/flight_service/stream_partitioner_registry.rs b/src/flight_service/stream_partitioner_registry.rs deleted file mode 100644 index 784d81a..0000000 --- a/src/flight_service/stream_partitioner_registry.rs +++ /dev/null @@ -1,166 +0,0 @@ -use dashmap::{DashMap, Entry}; -use datafusion::error::DataFusionError; -use datafusion::physical_plan::repartition::RepartitionExec; -use datafusion::physical_plan::{ExecutionPlan, Partitioning}; -use std::sync::Arc; -use uuid::Uuid; -// TODO: find some way of cleaning up abandoned partitioners - -/// Keeps track of all the [StreamPartitioner] currently running in the program, identifying them -/// by stage id. -#[derive(Default)] -pub struct StreamPartitionerRegistry { - map: DashMap<(Uuid, usize), Arc>, -} - -impl StreamPartitionerRegistry { - /// Builds a new [StreamPartitioner] if there was not one for this specific stage id. - /// If there was already one, return a reference to it. - pub fn get_or_create_stream_partitioner( - &self, - id: Uuid, - actor_idx: usize, - plan: Arc, - partitioning: Partitioning, - ) -> Result, DataFusionError> { - match self.map.entry((id, actor_idx)) { - Entry::Occupied(entry) => Ok(Arc::clone(entry.get())), - Entry::Vacant(entry) => Ok(Arc::clone( - &entry.insert(Arc::new(RepartitionExec::try_new(plan, partitioning)?)), - )), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::test_utils::mock_exec::MockExec; - use datafusion::arrow::array::{RecordBatch, UInt32Array}; - use datafusion::arrow::datatypes::{DataType, Field, Schema}; - use datafusion::execution::TaskContext; - use datafusion::physical_expr::expressions::col; - use futures::StreamExt; - - #[tokio::test] - async fn round_robin_1() -> Result<(), Box> { - const PARTITIONS: usize = 10; - - let registry = StreamPartitionerRegistry::default(); - let partitioner = registry.get_or_create_stream_partitioner( - Uuid::new_v4(), - 0, - mock_exec(15, 10), - Partitioning::RoundRobinBatch(PARTITIONS), - )?; - - let rows_per_partition = gather_rows_per_partition(&partitioner).await; - - assert_eq!( - rows_per_partition, - vec![20, 20, 20, 20, 20, 10, 10, 10, 10, 10] - ); - Ok(()) - } - - #[tokio::test] - async fn round_robin_2() -> Result<(), Box> { - const PARTITIONS: usize = 10; - - let registry = StreamPartitionerRegistry::default(); - let partitioner = registry.get_or_create_stream_partitioner( - Uuid::new_v4(), - 0, - mock_exec(5, 10), - Partitioning::RoundRobinBatch(PARTITIONS), - )?; - - let rows_per_partition = gather_rows_per_partition(&partitioner).await; - - assert_eq!(rows_per_partition, vec![10, 10, 10, 10, 10, 0, 0, 0, 0, 0]); - Ok(()) - } - - #[tokio::test] - async fn hash_1() -> Result<(), Box> { - const PARTITIONS: usize = 10; - - let registry = StreamPartitionerRegistry::default(); - let partitioner = registry.get_or_create_stream_partitioner( - Uuid::new_v4(), - 0, - mock_exec(15, 10), - Partitioning::Hash(vec![col("c0", &test_schema())?], PARTITIONS), - )?; - - let rows_per_partition = gather_rows_per_partition(&partitioner).await; - - assert_eq!( - rows_per_partition, - vec![30, 15, 0, 45, 0, 15, 0, 15, 15, 15] - ); - Ok(()) - } - - #[tokio::test] - async fn hash_2() -> Result<(), Box> { - const PARTITIONS: usize = 10; - - let registry = StreamPartitionerRegistry::default(); - let partitioner = registry.get_or_create_stream_partitioner( - Uuid::new_v4(), - 0, - mock_exec(5, 10), - Partitioning::Hash(vec![col("c0", &test_schema())?], PARTITIONS), - )?; - - let rows_per_partition = gather_rows_per_partition(&partitioner).await; - - assert_eq!(rows_per_partition, vec![10, 5, 0, 15, 0, 5, 0, 5, 5, 5]); - Ok(()) - } - - async fn gather_rows_per_partition(partitioner: &RepartitionExec) -> Vec { - let mut data = vec![]; - let n_partitions = partitioner.partitioning().partition_count(); - let ctx = Arc::new(TaskContext::default()); - for i in 0..n_partitions { - let mut stream = partitioner.execute(i, ctx.clone()).unwrap(); - data.push(0); - while let Some(msg) = stream.next().await { - data[i] += msg.unwrap().num_rows(); - } - } - data - } - - fn test_schema() -> Arc { - Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])) - } - - fn mock_exec(n_batches: usize, n_rows: usize) -> Arc { - Arc::new(MockExec::new( - create_vec_batches(n_batches, n_rows), - test_schema(), - )) - } - - /// Create vector batches - fn create_vec_batches( - n_batches: usize, - n_rows: usize, - ) -> Vec> { - let batch = create_batch(n_rows); - (0..n_batches).map(|_| Ok(batch.clone())).collect() - } - - /// Create batch - fn create_batch(n_rows: usize) -> RecordBatch { - let schema = test_schema(); - let mut data = vec![]; - for i in 0..n_rows { - data.push(i as u32) - } - RecordBatch::try_new(schema, vec![Arc::new(UInt32Array::from(data))]).unwrap() - } -} diff --git a/src/lib.rs b/src/lib.rs index ee36dcb..826bdf7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +#![deny(clippy::all)] + mod channel_manager; mod common; mod composed_extension_codec; diff --git a/src/test_utils/plan.rs b/src/test_utils/plan.rs index 69feeef..b545451 100644 --- a/src/test_utils/plan.rs +++ b/src/test_utils/plan.rs @@ -3,9 +3,7 @@ use datafusion::common::plan_err; use datafusion::common::tree_node::{Transformed, TreeNode}; use datafusion::error::DataFusionError; use datafusion::physical_expr::Partitioning; -use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode}; -use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::ExecutionPlan; use std::sync::Arc;