Skip to content

Commit 0ec389d

Browse files
Refactor partition groups and add tests
This change adds a new abstraction called a `Grouper` in `distribution_strategy.rs`. This is responsible for grouping partitions into partition groups. The new `distribution_strategy` module can be used to store other splitters and, in the future, distribution strategies like round robin etc. A "partition group" is now a range [starting_partition, ending_partition) instead of a Vec, so we improve space efficiency. In the proto layer, the `DDTask` proto still uses a Vec, so we simply expand the range into a Vec when creating them in `planning.rs::assign_to_workers`. Testing - unit tests in isolator.rs - unit tests for `build_replacement` in `planning.rs` - unit tests for the grouper in `distribution_strategy.rs`
1 parent 07e26de commit 0ec389d

File tree

6 files changed

+281
-47
lines changed

6 files changed

+281
-47
lines changed

src/codec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ mod test {
305305
let schema = create_test_schema();
306306
let part = Partitioning::UnknownPartitioning(2);
307307
let reader_exec = Arc::new(DDStageReaderExec::try_new(part, schema, 1).unwrap());
308-
let exec = Arc::new(PartitionIsolatorExec::new(reader_exec, 4));
308+
let exec = Arc::new(PartitionIsolatorExec::new(reader_exec, 2));
309309

310310
verify_round_trip(exec);
311311
}

src/distribution_strategy.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// A Grouper is used to group partitions.
2+
pub trait Grouper {
3+
fn split(&self, num_partitions: usize) -> Vec<PartitionGroup>;
4+
}
5+
6+
// PartitionGroup is a struct that represents a range of partitions from [start, end). This is
7+
// more space efficient than a vector of u64s.
8+
#[derive(Debug, Clone, Copy, PartialEq)]
9+
pub struct PartitionGroup {
10+
start: usize,
11+
end: usize,
12+
}
13+
14+
impl PartitionGroup {
15+
pub fn new(start: usize, end: usize) -> Self {
16+
Self { start, end }
17+
}
18+
19+
// start is the first in the range
20+
pub fn start(&self) -> usize {
21+
self.start
22+
}
23+
24+
// end is the exclusive end partition in the range
25+
pub fn end(&self) -> usize {
26+
self.end
27+
}
28+
}
29+
30+
#[derive(Debug, Copy, Clone, PartialEq)]
31+
pub struct PartitionGrouper {
32+
partition_group_size: usize,
33+
}
34+
35+
impl PartitionGrouper {
36+
pub fn new(partition_group_size: usize) -> Self {
37+
assert!(
38+
partition_group_size > 0,
39+
"partition groups cannot be size 0"
40+
);
41+
PartitionGrouper {
42+
partition_group_size,
43+
}
44+
}
45+
}
46+
47+
impl Grouper for PartitionGrouper {
48+
fn split(&self, num_partitions: usize) -> Vec<PartitionGroup> {
49+
(0..num_partitions)
50+
.step_by(self.partition_group_size)
51+
.map(|start| {
52+
let end = std::cmp::min(start + self.partition_group_size, num_partitions);
53+
PartitionGroup { start, end }
54+
})
55+
.collect()
56+
}
57+
}
58+
59+
#[cfg(test)]
60+
mod tests {
61+
use super::*;
62+
63+
#[test]
64+
fn test_partition_grouper_basic() {
65+
let grouper = PartitionGrouper::new(4);
66+
let groups = grouper.split(10);
67+
68+
let expected = vec![
69+
PartitionGroup { start: 0, end: 4 },
70+
PartitionGroup { start: 4, end: 8 },
71+
PartitionGroup { start: 8, end: 10 },
72+
];
73+
74+
assert_eq!(groups, expected);
75+
}
76+
77+
#[test]
78+
fn test_partition_grouper_uneven() {
79+
let grouper = PartitionGrouper::new(2);
80+
let groups = grouper.split(5);
81+
82+
let expected = vec![
83+
PartitionGroup { start: 0, end: 2 },
84+
PartitionGroup { start: 2, end: 4 },
85+
PartitionGroup { start: 4, end: 5 },
86+
];
87+
88+
assert_eq!(groups, expected);
89+
}
90+
91+
#[test]
92+
#[should_panic]
93+
fn test_invalid_group_size() {
94+
PartitionGrouper::new(0);
95+
}
96+
}

src/isolator.rs

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ impl ExecutionPlan for PartitionIsolatorExec {
7373
}
7474

7575
fn with_new_children(
76-
self: std::sync::Arc<Self>,
77-
children: Vec<std::sync::Arc<dyn ExecutionPlan>>,
78-
) -> Result<std::sync::Arc<dyn ExecutionPlan>> {
76+
self: Arc<Self>,
77+
children: Vec<Arc<dyn ExecutionPlan>>,
78+
) -> Result<Arc<dyn ExecutionPlan>> {
7979
// TODO: generalize this
8080
assert_eq!(children.len(), 1);
8181
Ok(Arc::new(Self::new(
@@ -87,7 +87,7 @@ impl ExecutionPlan for PartitionIsolatorExec {
8787
fn execute(
8888
&self,
8989
partition: usize,
90-
context: std::sync::Arc<datafusion::execution::TaskContext>,
90+
context: Arc<datafusion::execution::TaskContext>,
9191
) -> Result<SendableRecordBatchStream> {
9292
let config = context.session_config();
9393
let partition_group = &config
@@ -143,3 +143,73 @@ impl ExecutionPlan for PartitionIsolatorExec {
143143
output_stream
144144
}
145145
}
146+
147+
#[cfg(test)]
148+
mod tests {
149+
use super::*;
150+
use crate::{record_batch_exec::RecordBatchExec, vocab::CtxPartitionGroup};
151+
use arrow::array::{Int32Array, RecordBatch};
152+
use datafusion::{
153+
arrow::datatypes::{DataType, Field, Schema},
154+
prelude::SessionContext,
155+
};
156+
use std::sync::Arc;
157+
158+
fn create_test_record_batch_exec() -> Arc<dyn ExecutionPlan> {
159+
let schema = Arc::new(Schema::new(vec![Field::new(
160+
"col1",
161+
DataType::Int32,
162+
false,
163+
)]));
164+
let batch = RecordBatch::try_new(
165+
schema.clone(),
166+
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
167+
)
168+
.unwrap();
169+
Arc::new(RecordBatchExec::new(batch))
170+
}
171+
172+
#[test]
173+
fn test_partition_isolator_exec() {
174+
let input = create_test_record_batch_exec();
175+
let partition_count = 3;
176+
let isolator = PartitionIsolatorExec::new(input, partition_count);
177+
178+
// Test success case: valid partition with partition group
179+
let ctx = SessionContext::new();
180+
let partition_group = vec![0u64, 1u64, 2u64];
181+
{
182+
let state = ctx.state_ref();
183+
let mut guard = state.write();
184+
let config = guard.config_mut();
185+
config.set_extension(Arc::new(CtxPartitionGroup(partition_group)));
186+
}
187+
188+
let task_context = ctx.task_ctx();
189+
190+
// Success case: execute valid partition
191+
let result = isolator.execute(0, task_context.clone());
192+
assert!(result.is_ok());
193+
194+
// Error case: try to execute partition beyond partition_count
195+
let result = isolator.execute(4, task_context.clone());
196+
assert!(result.is_err());
197+
if let Err(e) = result {
198+
assert!(e
199+
.to_string()
200+
.contains("Invalid partition 4 for PartitionIsolatorExec"));
201+
}
202+
203+
// Error case: test without partition group extension
204+
let empty_ctx = SessionContext::new();
205+
let empty_task_context = empty_ctx.task_ctx();
206+
207+
let result = isolator.execute(0, empty_task_context);
208+
assert!(result.is_err());
209+
if let Err(e) = result {
210+
assert!(e
211+
.to_string()
212+
.contains("PartitionGroup not set in session config"));
213+
}
214+
}
215+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub use proto::generated::protobuf;
2525
pub mod analyze;
2626
pub mod codec;
2727
pub mod customizer;
28+
pub mod distribution_strategy;
2829
pub mod explain;
2930
pub mod flight;
3031
pub mod friendly;

0 commit comments

Comments
 (0)