Skip to content

Commit 25fe243

Browse files
Refactor partition groups and add tests
This change adds a new abstraction called a `Grouper` in `distribution_strategy.rs`. This is responsible for grouping partitions into partition groups. The new `distribution_strategy` module can be used to store other splitters and, in the future, distribution strategies like round robin etc. A "partition group" is now a range [starting_partition, ending_partition) instead of a Vec, so we improve space efficiency. In the proto layer, the `DDTask` proto still uses a Vec, so we simply expand the range into a Vec when creating them in `planning.rs::assign_to_workers`. Testing - unit tests in isolator.rs - unit tests for `build_replacement` in `planning.rs` - unit tests for the grouper in `distribution_strategy.rs`
1 parent 07e26de commit 25fe243

File tree

6 files changed

+264
-46
lines changed

6 files changed

+264
-46
lines changed

src/codec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ mod test {
305305
let schema = create_test_schema();
306306
let part = Partitioning::UnknownPartitioning(2);
307307
let reader_exec = Arc::new(DDStageReaderExec::try_new(part, schema, 1).unwrap());
308-
let exec = Arc::new(PartitionIsolatorExec::new(reader_exec, 4));
308+
let exec = Arc::new(PartitionIsolatorExec::new(reader_exec, 2));
309309

310310
verify_round_trip(exec);
311311
}

src/distribution_strategy.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// A Grouper is used to group partitions.
2+
pub trait Grouper {
3+
fn split(&self, num_partitions: usize) -> Vec<PartitionGroup>;
4+
}
5+
6+
// PartitionGroup is a struct that represents a range of partitions from [start, end). This is
7+
// more space efficient than a vector of u64s.
8+
#[derive(Debug, Clone, Copy, PartialEq)]
9+
pub struct PartitionGroup {
10+
start: usize,
11+
end: usize,
12+
}
13+
14+
impl PartitionGroup {
15+
pub fn new(start: usize, end: usize) -> Self {
16+
Self { start, end }
17+
}
18+
19+
// start is the first in the range
20+
pub fn start(&self) -> usize {
21+
self.start
22+
}
23+
24+
// end is the exclusive end partition in the range
25+
pub fn end(&self) -> usize {
26+
self.end
27+
}
28+
}
29+
30+
#[derive(Debug, Copy, Clone, PartialEq)]
31+
pub struct PartitionGrouper{
32+
partition_group_size: usize,
33+
}
34+
35+
impl PartitionGrouper {
36+
pub fn new(partition_group_size: usize) -> Self {
37+
assert!(partition_group_size > 0, "partition groups cannot be size 0");
38+
PartitionGrouper {
39+
partition_group_size,
40+
}
41+
}
42+
}
43+
44+
impl Grouper for PartitionGrouper {
45+
fn split(&self, num_partitions: usize) -> Vec<PartitionGroup> {
46+
(0..num_partitions)
47+
.step_by(self.partition_group_size)
48+
.map(|start| {
49+
let end = std::cmp::min(start + self.partition_group_size, num_partitions);
50+
PartitionGroup { start, end }
51+
})
52+
.collect()
53+
}
54+
}
55+
56+
57+
#[cfg(test)]
58+
mod tests {
59+
use super::*;
60+
61+
#[test]
62+
fn test_partition_grouper_basic() {
63+
let grouper = PartitionGrouper::new(4);
64+
let groups = grouper.split(10);
65+
66+
let expected = vec![
67+
PartitionGroup { start: 0, end: 4 },
68+
PartitionGroup { start: 4, end: 8 },
69+
PartitionGroup { start: 8, end: 10 },
70+
];
71+
72+
assert_eq!(groups, expected);
73+
}
74+
75+
#[test]
76+
fn test_partition_grouper_uneven() {
77+
let grouper = PartitionGrouper::new(2);
78+
let groups = grouper.split(5);
79+
80+
let expected = vec![
81+
PartitionGroup { start: 0, end: 2 },
82+
PartitionGroup { start: 2, end: 4 },
83+
PartitionGroup { start: 4, end: 5 },
84+
];
85+
86+
assert_eq!(groups, expected);
87+
}
88+
89+
#[test]
90+
#[should_panic]
91+
fn test_invalid_group_size() {
92+
PartitionGrouper::new(0);
93+
}
94+
}

src/isolator.rs

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ impl ExecutionPlan for PartitionIsolatorExec {
7373
}
7474

7575
fn with_new_children(
76-
self: std::sync::Arc<Self>,
77-
children: Vec<std::sync::Arc<dyn ExecutionPlan>>,
78-
) -> Result<std::sync::Arc<dyn ExecutionPlan>> {
76+
self: Arc<Self>,
77+
children: Vec<Arc<dyn ExecutionPlan>>,
78+
) -> Result<Arc<dyn ExecutionPlan>> {
7979
// TODO: generalize this
8080
assert_eq!(children.len(), 1);
8181
Ok(Arc::new(Self::new(
@@ -87,7 +87,7 @@ impl ExecutionPlan for PartitionIsolatorExec {
8787
fn execute(
8888
&self,
8989
partition: usize,
90-
context: std::sync::Arc<datafusion::execution::TaskContext>,
90+
context: Arc<datafusion::execution::TaskContext>,
9191
) -> Result<SendableRecordBatchStream> {
9292
let config = context.session_config();
9393
let partition_group = &config
@@ -143,3 +143,66 @@ impl ExecutionPlan for PartitionIsolatorExec {
143143
output_stream
144144
}
145145
}
146+
147+
#[cfg(test)]
148+
mod tests {
149+
use super::*;
150+
use datafusion::{
151+
arrow::datatypes::{DataType, Field, Schema},
152+
prelude::SessionContext,
153+
};
154+
use crate::{record_batch_exec::RecordBatchExec, vocab::CtxPartitionGroup};
155+
use arrow::array::{Int32Array, RecordBatch};
156+
use std::sync::Arc;
157+
158+
fn create_test_record_batch_exec() -> Arc<dyn ExecutionPlan> {
159+
let schema = Arc::new(Schema::new(vec![
160+
Field::new("col1", DataType::Int32, false),
161+
]));
162+
let batch = RecordBatch::try_new(
163+
schema.clone(),
164+
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
165+
).unwrap();
166+
Arc::new(RecordBatchExec::new(batch))
167+
}
168+
169+
#[test]
170+
fn test_partition_isolator_exec() {
171+
let input = create_test_record_batch_exec();
172+
let partition_count = 3;
173+
let isolator = PartitionIsolatorExec::new(input, partition_count);
174+
175+
// Test success case: valid partition with partition group
176+
let ctx = SessionContext::new();
177+
let partition_group = vec![0u64, 1u64, 2u64];
178+
{
179+
let state = ctx.state_ref();
180+
let mut guard = state.write();
181+
let config = guard.config_mut();
182+
config.set_extension(Arc::new(CtxPartitionGroup(partition_group)));
183+
}
184+
185+
let task_context = ctx.task_ctx();
186+
187+
// Success case: execute valid partition
188+
let result = isolator.execute(0, task_context.clone());
189+
assert!(result.is_ok());
190+
191+
// Error case: try to execute partition beyond partition_count
192+
let result = isolator.execute(4, task_context.clone());
193+
assert!(result.is_err());
194+
if let Err(e) = result {
195+
assert!(e.to_string().contains("Invalid partition 4 for PartitionIsolatorExec"));
196+
}
197+
198+
// Error case: test without partition group extension
199+
let empty_ctx = SessionContext::new();
200+
let empty_task_context = empty_ctx.task_ctx();
201+
202+
let result = isolator.execute(0, empty_task_context);
203+
assert!(result.is_err());
204+
if let Err(e) = result {
205+
assert!(e.to_string().contains("PartitionGroup not set in session config"));
206+
}
207+
}
208+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub use proto::generated::protobuf;
2525
pub mod analyze;
2626
pub mod codec;
2727
pub mod customizer;
28+
pub mod distribution_strategy;
2829
pub mod explain;
2930
pub mod flight;
3031
pub mod friendly;

0 commit comments

Comments
 (0)