Skip to content

Commit a350566

Browse files
committed
address lints and add back stream_partitioner_registry
1 parent 5ad36d8 commit a350566

File tree

6 files changed

+173
-305
lines changed

6 files changed

+173
-305
lines changed

src/flight_service/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
mod do_get;
22
mod service;
33
mod session_builder;
4+
mod stream_partitioner_registry;
45

56
pub(crate) use do_get::DoGet;
67

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
use dashmap::{DashMap, Entry};
2+
use datafusion::error::DataFusionError;
3+
use datafusion::physical_plan::repartition::RepartitionExec;
4+
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
5+
use std::sync::Arc;
6+
use uuid::Uuid;
7+
// TODO: find some way of cleaning up abandoned partitioners
8+
9+
/// Keeps track of all the [StreamPartitioner] currently running in the program, identifying them
10+
/// by stage id.
11+
#[derive(Default)]
12+
pub struct StreamPartitionerRegistry {
13+
map: DashMap<(Uuid, usize), Arc<RepartitionExec>>,
14+
}
15+
16+
impl StreamPartitionerRegistry {
17+
/// Builds a new [StreamPartitioner] if there was not one for this specific stage id.
18+
/// If there was already one, return a reference to it.
19+
pub fn get_or_create_stream_partitioner(
20+
&self,
21+
id: Uuid,
22+
actor_idx: usize,
23+
plan: Arc<dyn ExecutionPlan>,
24+
partitioning: Partitioning,
25+
) -> Result<Arc<RepartitionExec>, DataFusionError> {
26+
match self.map.entry((id, actor_idx)) {
27+
Entry::Occupied(entry) => Ok(Arc::clone(entry.get())),
28+
Entry::Vacant(entry) => Ok(Arc::clone(
29+
&entry.insert(Arc::new(RepartitionExec::try_new(plan, partitioning)?)),
30+
)),
31+
}
32+
}
33+
}
34+
35+
#[cfg(test)]
36+
mod tests {
37+
use super::*;
38+
use crate::test_utils::mock_exec::MockExec;
39+
use datafusion::arrow::array::{RecordBatch, UInt32Array};
40+
use datafusion::arrow::datatypes::{DataType, Field, Schema};
41+
use datafusion::execution::TaskContext;
42+
use datafusion::physical_expr::expressions::col;
43+
use futures::StreamExt;
44+
45+
#[tokio::test]
46+
async fn round_robin_1() -> Result<(), Box<dyn std::error::Error>> {
47+
const PARTITIONS: usize = 10;
48+
49+
let registry = StreamPartitionerRegistry::default();
50+
let partitioner = registry.get_or_create_stream_partitioner(
51+
Uuid::new_v4(),
52+
0,
53+
mock_exec(15, 10),
54+
Partitioning::RoundRobinBatch(PARTITIONS),
55+
)?;
56+
57+
let rows_per_partition = gather_rows_per_partition(&partitioner).await;
58+
59+
assert_eq!(
60+
rows_per_partition,
61+
vec![20, 20, 20, 20, 20, 10, 10, 10, 10, 10]
62+
);
63+
Ok(())
64+
}
65+
66+
#[tokio::test]
67+
async fn round_robin_2() -> Result<(), Box<dyn std::error::Error>> {
68+
const PARTITIONS: usize = 10;
69+
70+
let registry = StreamPartitionerRegistry::default();
71+
let partitioner = registry.get_or_create_stream_partitioner(
72+
Uuid::new_v4(),
73+
0,
74+
mock_exec(5, 10),
75+
Partitioning::RoundRobinBatch(PARTITIONS),
76+
)?;
77+
78+
let rows_per_partition = gather_rows_per_partition(&partitioner).await;
79+
80+
assert_eq!(rows_per_partition, vec![10, 10, 10, 10, 10, 0, 0, 0, 0, 0]);
81+
Ok(())
82+
}
83+
84+
#[tokio::test]
85+
async fn hash_1() -> Result<(), Box<dyn std::error::Error>> {
86+
const PARTITIONS: usize = 10;
87+
88+
let registry = StreamPartitionerRegistry::default();
89+
let partitioner = registry.get_or_create_stream_partitioner(
90+
Uuid::new_v4(),
91+
0,
92+
mock_exec(15, 10),
93+
Partitioning::Hash(vec![col("c0", &test_schema())?], PARTITIONS),
94+
)?;
95+
96+
let rows_per_partition = gather_rows_per_partition(&partitioner).await;
97+
98+
assert_eq!(
99+
rows_per_partition,
100+
vec![30, 15, 0, 45, 0, 15, 0, 15, 15, 15]
101+
);
102+
Ok(())
103+
}
104+
105+
#[tokio::test]
106+
async fn hash_2() -> Result<(), Box<dyn std::error::Error>> {
107+
const PARTITIONS: usize = 10;
108+
109+
let registry = StreamPartitionerRegistry::default();
110+
let partitioner = registry.get_or_create_stream_partitioner(
111+
Uuid::new_v4(),
112+
0,
113+
mock_exec(5, 10),
114+
Partitioning::Hash(vec![col("c0", &test_schema())?], PARTITIONS),
115+
)?;
116+
117+
let rows_per_partition = gather_rows_per_partition(&partitioner).await;
118+
119+
assert_eq!(rows_per_partition, vec![10, 5, 0, 15, 0, 5, 0, 5, 5, 5]);
120+
Ok(())
121+
}
122+
123+
async fn gather_rows_per_partition(partitioner: &RepartitionExec) -> Vec<usize> {
124+
let mut data = vec![];
125+
let n_partitions = partitioner.partitioning().partition_count();
126+
let ctx = Arc::new(TaskContext::default());
127+
for i in 0..n_partitions {
128+
let mut stream = partitioner.execute(i, ctx.clone()).unwrap();
129+
data.push(0);
130+
while let Some(msg) = stream.next().await {
131+
data[i] += msg.unwrap().num_rows();
132+
}
133+
}
134+
data
135+
}
136+
137+
fn test_schema() -> Arc<Schema> {
138+
Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]))
139+
}
140+
141+
fn mock_exec(n_batches: usize, n_rows: usize) -> Arc<dyn ExecutionPlan> {
142+
Arc::new(MockExec::new(
143+
create_vec_batches(n_batches, n_rows),
144+
test_schema(),
145+
))
146+
}
147+
148+
/// Create vector batches
149+
fn create_vec_batches(
150+
n_batches: usize,
151+
n_rows: usize,
152+
) -> Vec<Result<RecordBatch, DataFusionError>> {
153+
let batch = create_batch(n_rows);
154+
(0..n_batches).map(|_| Ok(batch.clone())).collect()
155+
}
156+
157+
/// Create batch
158+
fn create_batch(n_rows: usize) -> RecordBatch {
159+
let schema = test_schema();
160+
let mut data = vec![];
161+
for i in 0..n_rows {
162+
data.push(i as u32)
163+
}
164+
RecordBatch::try_new(schema, vec![Arc::new(UInt32Array::from(data))]).unwrap()
165+
}
166+
}

src/stage/display.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,13 @@ use super::ExecutionStage;
2626

2727
// Unicode box-drawing characters for creating borders and connections.
2828
const LTCORNER: &str = "┌"; // Left top corner
29-
const RTCORNER: &str = "┐"; // Right top corner
3029
const LDCORNER: &str = "└"; // Left bottom corner
31-
const RDCORNER: &str = "┘"; // Right bottom corner
32-
33-
const TMIDDLE: &str = "┬"; // Top T-junction (connects down)
34-
const LMIDDLE: &str = "├"; // Left T-junction (connects right)
35-
const DMIDDLE: &str = "┴"; // Bottom T-junction (connects up)
36-
3730
const VERTICAL: &str = "│"; // Vertical line
3831
const HORIZONTAL: &str = "─"; // Horizontal line
3932

4033
impl DisplayAs for ExecutionStage {
4134
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
35+
#[allow(clippy::format_in_format_args)]
4236
match t {
4337
DisplayFormatType::Default => {
4438
write!(f, "{}", self.name)

src/stage/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
mod display;
2+
mod execution_stage;
23
mod proto;
3-
mod stage;
44

55
pub use display::display_stage_graphviz;
6+
pub use execution_stage::ExecutionStage;
67
pub use proto::{proto_from_stage, stage_from_proto, ExecutionStageProto};
7-
pub use stage::ExecutionStage;

src/stage/proto.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub struct ExecutionStageProto {
3131
pub plan: Option<Box<PhysicalPlanNode>>,
3232
/// The input stages to this stage
3333
#[prost(repeated, message, tag = "5")]
34-
pub inputs: Vec<Box<ExecutionStageProto>>,
34+
pub inputs: Vec<ExecutionStageProto>,
3535
/// Our tasks which tell us how finely grained to execute the partitions in
3636
/// the plan
3737
#[prost(message, repeated, tag = "6")]
@@ -45,7 +45,7 @@ pub fn proto_from_stage(
4545
let proto_plan = PhysicalPlanNode::try_from_physical_plan(stage.plan.clone(), codec)?;
4646
let inputs = stage
4747
.child_stages_iter()
48-
.map(|s| Ok(Box::new(proto_from_stage(s, codec)?)))
48+
.map(|s| proto_from_stage(s, codec))
4949
.collect::<Result<Vec<_>>>()?;
5050

5151
Ok(ExecutionStageProto {
@@ -74,7 +74,7 @@ pub fn stage_from_proto(
7474
.inputs
7575
.into_iter()
7676
.map(|s| {
77-
stage_from_proto(*s, registry, runtime, codec)
77+
stage_from_proto(s, registry, runtime, codec)
7878
.map(|s| Arc::new(s) as Arc<dyn ExecutionPlan>)
7979
})
8080
.collect::<Result<Vec<_>>>()?;

0 commit comments

Comments
 (0)