Skip to content

Commit bf7b224

Browse files
Refactor partition groups and add tests
This change adds a new abstraction called a `Splitter` in `distribution_strategy.rs`. This is responsible for splitting partitions into partition groups. The new `distribution_strategy` module can be used to store other splitters and, in the future, distribution strategies like round robin etc. A "partition group" is now a range [starting_partition, ending_partition) instead of a Vec, so we improve space efficiency. In the proto layer, the `DDTask` proto still uses a Vec, so we simply expand the range into a Vec when creating them in `planning.rs::assign_to_workers`. Testing - unit tests in isolator.rs - unit tests for `build_replacement` in `planning.rs` - unit tests for the splitter in `distribution_strategy.rs`
1 parent 07e26de commit bf7b224

File tree

7 files changed

+307
-46
lines changed

7 files changed

+307
-46
lines changed

src/codec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ mod test {
305305
let schema = create_test_schema();
306306
let part = Partitioning::UnknownPartitioning(2);
307307
let reader_exec = Arc::new(DDStageReaderExec::try_new(part, schema, 1).unwrap());
308-
let exec = Arc::new(PartitionIsolatorExec::new(reader_exec, 4));
308+
let exec = Arc::new(PartitionIsolatorExec::new(reader_exec, 2));
309309

310310
verify_round_trip(exec);
311311
}

src/distribution_strategy.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// A Splitter is used to group partitions.
2+
pub trait Splitter {
3+
fn split(&self, num_partitions: usize) -> Vec<PartitionGroup>;
4+
}
5+
6+
// PartitionGroup is a struct that represents a range of partitions from [start, end). This is
7+
// more space efficient than a vector of u64s.
8+
#[derive(Debug, Clone, Copy, PartialEq)]
9+
pub struct PartitionGroup {
10+
start: usize,
11+
end: usize,
12+
}
13+
14+
impl PartitionGroup {
15+
pub fn new(start: usize, end: usize) -> Self {
16+
Self { start, end }
17+
}
18+
19+
// start is the first in the range
20+
pub fn start(&self) -> usize {
21+
self.start
22+
}
23+
24+
// end is the exclusive end partition in the range
25+
pub fn end(&self) -> usize {
26+
self.end
27+
}
28+
}
29+
30+
#[derive(Debug, Copy, Clone, PartialEq)]
31+
pub struct PartitionSplitter{
32+
partition_group_size: usize,
33+
}
34+
35+
impl PartitionSplitter {
36+
pub fn new(partition_group_size: usize) -> Self {
37+
assert!(partition_group_size > 0, "partition groups cannot be size 0");
38+
PartitionSplitter {
39+
partition_group_size,
40+
}
41+
}
42+
}
43+
44+
impl Splitter for PartitionSplitter {
45+
fn split(&self, num_partitions: usize) -> Vec<PartitionGroup> {
46+
(0..num_partitions)
47+
.step_by(self.partition_group_size)
48+
.map(|start| {
49+
let end = std::cmp::min(start + self.partition_group_size, num_partitions);
50+
PartitionGroup { start, end }
51+
})
52+
.collect()
53+
}
54+
}
55+
56+
57+
#[cfg(test)]
58+
mod tests {
59+
use super::*;
60+
61+
#[test]
62+
fn test_partition_splitter_basic() {
63+
let splitter = PartitionSplitter::new(4);
64+
let groups = splitter.split(10);
65+
66+
let expected = vec![
67+
PartitionGroup { start: 0, end: 4 },
68+
PartitionGroup { start: 4, end: 8 },
69+
PartitionGroup { start: 8, end: 10 },
70+
];
71+
72+
assert_eq!(groups, expected);
73+
}
74+
75+
#[test]
76+
fn test_partition_splitter_uneven() {
77+
let splitter = PartitionSplitter::new(2);
78+
let groups = splitter.split(5);
79+
80+
let expected = vec![
81+
PartitionGroup { start: 0, end: 2 },
82+
PartitionGroup { start: 2, end: 4 },
83+
PartitionGroup { start: 4, end: 5 },
84+
];
85+
86+
assert_eq!(groups, expected);
87+
}
88+
}

src/isolator.rs

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ impl ExecutionPlan for PartitionIsolatorExec {
7373
}
7474

7575
fn with_new_children(
76-
self: std::sync::Arc<Self>,
77-
children: Vec<std::sync::Arc<dyn ExecutionPlan>>,
78-
) -> Result<std::sync::Arc<dyn ExecutionPlan>> {
76+
self: Arc<Self>,
77+
children: Vec<Arc<dyn ExecutionPlan>>,
78+
) -> Result<Arc<dyn ExecutionPlan>> {
7979
// TODO: generalize this
8080
assert_eq!(children.len(), 1);
8181
Ok(Arc::new(Self::new(
@@ -87,7 +87,7 @@ impl ExecutionPlan for PartitionIsolatorExec {
8787
fn execute(
8888
&self,
8989
partition: usize,
90-
context: std::sync::Arc<datafusion::execution::TaskContext>,
90+
context: Arc<datafusion::execution::TaskContext>,
9191
) -> Result<SendableRecordBatchStream> {
9292
let config = context.session_config();
9393
let partition_group = &config
@@ -143,3 +143,73 @@ impl ExecutionPlan for PartitionIsolatorExec {
143143
output_stream
144144
}
145145
}
146+
147+
#[cfg(test)]
148+
mod tests {
149+
use super::*;
150+
use datafusion::{
151+
arrow::datatypes::{DataType, Field, Schema},
152+
physical_plan::Partitioning,
153+
};
154+
use crate::record_batch_exec::RecordBatchExec;
155+
use arrow::array::{Int32Array, RecordBatch};
156+
use std::sync::Arc;
157+
158+
fn create_test_record_batch_exec() -> Arc<dyn ExecutionPlan> {
159+
let schema = Arc::new(Schema::new(vec![
160+
Field::new("col1", DataType::Int32, false),
161+
]));
162+
let batch = RecordBatch::try_new(
163+
schema.clone(),
164+
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
165+
).unwrap();
166+
Arc::new(RecordBatchExec::new(batch))
167+
}
168+
169+
#[test]
170+
fn test_partition_isolator_exec_new() {
171+
let input = create_test_record_batch_exec();
172+
let partition_count = 4;
173+
let isolator = PartitionIsolatorExec::new(input.clone(), partition_count);
174+
175+
assert_eq!(isolator.partition_count, partition_count);
176+
assert!(Arc::ptr_eq(&isolator.input, &input));
177+
assert_eq!(
178+
isolator.properties().output_partitioning().partition_count(),
179+
partition_count
180+
);
181+
}
182+
183+
#[test]
184+
fn test_partition_isolator_exec_properties() {
185+
let input = create_test_record_batch_exec();
186+
let partition_count = 3;
187+
let isolator = PartitionIsolatorExec::new(input, partition_count);
188+
189+
let properties = isolator.properties();
190+
match properties.output_partitioning() {
191+
Partitioning::UnknownPartitioning(count) => {
192+
assert_eq!(*count, partition_count);
193+
}
194+
_ => panic!("Expected UnknownPartitioning"),
195+
}
196+
}
197+
198+
#[test]
199+
fn test_partition_isolator_exec_with_new_children() {
200+
let input1 = create_test_record_batch_exec();
201+
let input2 = create_test_record_batch_exec();
202+
let partition_count = 2;
203+
204+
let isolator = Arc::new(PartitionIsolatorExec::new(input1, partition_count));
205+
let new_isolator = isolator.with_new_children(vec![input2.clone()]).unwrap();
206+
207+
let new_isolator_typed = new_isolator
208+
.as_any()
209+
.downcast_ref::<PartitionIsolatorExec>()
210+
.unwrap();
211+
212+
assert_eq!(new_isolator_typed.partition_count, partition_count);
213+
assert!(Arc::ptr_eq(&new_isolator_typed.input, &input2));
214+
}
215+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub use proto::generated::protobuf;
2525
pub mod analyze;
2626
pub mod codec;
2727
pub mod customizer;
28+
pub mod distribution_strategy;
2829
pub mod explain;
2930
pub mod flight;
3031
pub mod friendly;

0 commit comments

Comments
 (0)