Skip to content

Commit b208ab2

Browse files
Refactor partition groups and add tests
This change adds a new abstraction called a `Grouper` in `distribution_strategy.rs`. This is responsible for grouping partitions into partition groups. The new `distribution_strategy` module can be used to store other splitters and, in the future, distribution strategies like round robin etc. A "partition group" is now a range [starting_partition, ending_partition) instead of a Vec, so we improve space efficiency. In the proto layer, the `DDTask` proto still uses a Vec, so we simply expand the range into a Vec when creating them in `planning.rs::assign_to_workers`. Testing - unit tests in isolator.rs - unit tests for `build_replacement` in `planning.rs` - unit tests for the grouper in `distribution_strategy.rs`
1 parent 07e26de commit b208ab2

File tree

6 files changed

+325
-52
lines changed

6 files changed

+325
-52
lines changed

src/codec.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::sync::Arc;
22

3-
use arrow::datatypes::Schema;
3+
use arrow::datatypes::{Field, Schema};
44
use datafusion::{
55
common::{internal_datafusion_err, internal_err, Result},
66
execution::FunctionRegistry,
@@ -243,10 +243,10 @@ mod test {
243243
isolator::PartitionIsolatorExec, max_rows::MaxRowsExec, stage_reader::DDStageReaderExec,
244244
};
245245

246-
fn create_test_schema() -> Arc<arrow::datatypes::Schema> {
247-
Arc::new(arrow::datatypes::Schema::new(vec![
248-
arrow::datatypes::Field::new("a", DataType::Int32, false),
249-
arrow::datatypes::Field::new("b", DataType::Int32, false),
246+
fn create_test_schema() -> Arc<Schema> {
247+
Arc::new(Schema::new(vec![
248+
Field::new("a", DataType::Int32, false),
249+
Field::new("b", DataType::Int32, false),
250250
]))
251251
}
252252

src/distribution_strategy.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// A Grouper is used to group partitions.
2+
pub trait Grouper {
3+
// group groups the number of partitions into a vec of groups.
4+
fn group(&self, num_partitions: usize) -> Vec<PartitionGroup>;
5+
}
6+
7+
// PartitionGroup is a struct that represents a range of partitions from [start, end). This is
8+
// more space efficient than a vector of u64s.
9+
#[derive(Debug, Clone, Copy, PartialEq)]
10+
pub struct PartitionGroup {
11+
start: usize,
12+
end: usize,
13+
}
14+
15+
impl PartitionGroup {
16+
// new creates a new PartitionGroup containing partitions in the range [start..end).
17+
pub fn new(start: usize, end: usize) -> Self {
18+
Self { start, end }
19+
}
20+
21+
// start is the first in the range
22+
pub fn start(&self) -> usize {
23+
self.start
24+
}
25+
26+
// end is the exclusive end partition in the range
27+
pub fn end(&self) -> usize {
28+
self.end
29+
}
30+
}
31+
32+
// PartitionGrouper groups a number partitions together depending on a partition_group_size.
33+
// Ex. 10 partitions with a group size of 3 will yield groups [(0..3), (3..6), (6..9), (9)].
34+
// - A partition_group_size of 0 will panic
35+
// - Grouping 0 partitions will result an empty vec
36+
// - It's possible for bad groupings to exist. Ex. if the group size is 99 and there are 100
37+
// partitions, then you will get unbalanced partitions [(0..99), (99)]
38+
#[derive(Debug, Copy, Clone, PartialEq)]
39+
pub struct PartitionGrouper {
40+
partition_group_size: usize,
41+
}
42+
43+
impl PartitionGrouper {
44+
pub fn new(partition_group_size: usize) -> Self {
45+
assert!(
46+
partition_group_size > 0,
47+
"partition groups cannot be size 0"
48+
);
49+
PartitionGrouper {
50+
partition_group_size,
51+
}
52+
}
53+
}
54+
55+
impl Grouper for PartitionGrouper {
56+
// group implements the Grouper trait
57+
fn group(&self, num_partitions: usize) -> Vec<PartitionGroup> {
58+
(0..num_partitions)
59+
.step_by(self.partition_group_size)
60+
.map(|start| {
61+
let end = std::cmp::min(start + self.partition_group_size, num_partitions);
62+
PartitionGroup { start, end }
63+
})
64+
.collect()
65+
}
66+
}
67+
68+
#[cfg(test)]
69+
mod tests {
70+
use super::*;
71+
72+
#[test]
73+
fn test_partition_grouper_basic() {
74+
let grouper = PartitionGrouper::new(4);
75+
let groups = grouper.group(10);
76+
77+
let expected = vec![
78+
PartitionGroup { start: 0, end: 4 },
79+
PartitionGroup { start: 4, end: 8 },
80+
PartitionGroup { start: 8, end: 10 },
81+
];
82+
83+
assert_eq!(groups, expected);
84+
}
85+
86+
#[test]
87+
fn test_partition_grouper_uneven() {
88+
let grouper = PartitionGrouper::new(2);
89+
let groups = grouper.group(5);
90+
91+
let expected = vec![
92+
PartitionGroup { start: 0, end: 2 },
93+
PartitionGroup { start: 2, end: 4 },
94+
PartitionGroup { start: 4, end: 5 },
95+
];
96+
97+
assert_eq!(groups, expected);
98+
}
99+
100+
#[test]
101+
#[should_panic]
102+
fn test_invalid_group_size() {
103+
PartitionGrouper::new(0);
104+
}
105+
106+
#[test]
107+
fn test_num_partitions_smaller_than_group_size() {
108+
let g = PartitionGrouper::new(2);
109+
let groups = g.group(1);
110+
let expected = vec![
111+
PartitionGroup { start: 0, end: 1 },
112+
];
113+
assert_eq!(groups, expected);
114+
}
115+
116+
#[test]
117+
fn test_num_partitions_equal_to_group_size() {
118+
let g = PartitionGrouper::new(2);
119+
let groups = g.group(2);
120+
let expected = vec![
121+
PartitionGroup { start: 0, end: 2 },
122+
];
123+
assert_eq!(groups, expected);
124+
}
125+
126+
#[test]
127+
fn test_zero_partitions_to_group() {
128+
let g = PartitionGrouper::new(2);
129+
let groups = g.group(0);
130+
let expected = vec![];
131+
assert_eq!(groups, expected);
132+
}
133+
}

src/isolator.rs

Lines changed: 76 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ impl ExecutionPlan for PartitionIsolatorExec {
7373
}
7474

7575
fn with_new_children(
76-
self: std::sync::Arc<Self>,
77-
children: Vec<std::sync::Arc<dyn ExecutionPlan>>,
78-
) -> Result<std::sync::Arc<dyn ExecutionPlan>> {
76+
self: Arc<Self>,
77+
children: Vec<Arc<dyn ExecutionPlan>>,
78+
) -> Result<Arc<dyn ExecutionPlan>> {
7979
// TODO: generalize this
8080
assert_eq!(children.len(), 1);
8181
Ok(Arc::new(Self::new(
@@ -87,7 +87,7 @@ impl ExecutionPlan for PartitionIsolatorExec {
8787
fn execute(
8888
&self,
8989
partition: usize,
90-
context: std::sync::Arc<datafusion::execution::TaskContext>,
90+
context: Arc<datafusion::execution::TaskContext>,
9191
) -> Result<SendableRecordBatchStream> {
9292
let config = context.session_config();
9393
let partition_group = &config
@@ -143,3 +143,75 @@ impl ExecutionPlan for PartitionIsolatorExec {
143143
output_stream
144144
}
145145
}
146+
147+
#[cfg(test)]
148+
mod tests {
149+
use super::*;
150+
use crate::{record_batch_exec::RecordBatchExec, vocab::CtxPartitionGroup};
151+
use arrow::array::{Int32Array, RecordBatch};
152+
use datafusion::{
153+
arrow::datatypes::{DataType, Field, Schema},
154+
prelude::SessionContext,
155+
};
156+
use std::sync::Arc;
157+
158+
fn create_test_record_batch_exec() -> Arc<dyn ExecutionPlan> {
159+
let schema = Arc::new(Schema::new(vec![Field::new(
160+
"col1",
161+
DataType::Int32,
162+
false,
163+
)]));
164+
let batch = RecordBatch::try_new(
165+
schema.clone(),
166+
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
167+
)
168+
.unwrap();
169+
Arc::new(RecordBatchExec::new(batch))
170+
}
171+
172+
#[test]
173+
fn test_partition_isolator_exec() {
174+
let input = create_test_record_batch_exec();
175+
let partition_count = 3;
176+
let isolator = PartitionIsolatorExec::new(input, partition_count);
177+
178+
// Test success case: valid partition with partition group
179+
let ctx = SessionContext::new();
180+
let partition_group = vec![0u64, 1u64, 2u64];
181+
{
182+
let state = ctx.state_ref();
183+
let mut guard = state.write();
184+
let config = guard.config_mut();
185+
config.set_extension(Arc::new(CtxPartitionGroup(partition_group)));
186+
}
187+
188+
let task_context = ctx.task_ctx();
189+
190+
// Success case: execute valid partition
191+
let result = isolator.execute(0, task_context.clone());
192+
assert!(result.is_ok());
193+
194+
// Error case: try to execute partition beyond partition_count
195+
let result = isolator.execute(4, task_context.clone());
196+
assert!(result.is_err());
197+
assert!(result.err().unwrap().to_string()
198+
.contains("Invalid partition 4 for PartitionIsolatorExec"));
199+
200+
// Error case: test without partition group extension
201+
let empty_ctx = SessionContext::new();
202+
let empty_task_context = empty_ctx.task_ctx();
203+
204+
let result = isolator.execute(0, empty_task_context);
205+
assert!(result.is_err());
206+
assert!(result.err().unwrap().to_string()
207+
.contains("PartitionGroup not set in session config"));
208+
209+
// Empty task ctx
210+
let result = isolator.execute(1, SessionContext::new().task_ctx());
211+
if let Err(e) = result {
212+
assert!(e
213+
.to_string()
214+
.contains("PartitionGroup not set in session config"));
215+
}
216+
}
217+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub use proto::generated::protobuf;
2525
pub mod analyze;
2626
pub mod codec;
2727
pub mod customizer;
28+
pub mod distribution_strategy;
2829
pub mod explain;
2930
pub mod flight;
3031
pub mod friendly;

0 commit comments

Comments
 (0)