Skip to content

Commit 9764f7a

Browse files
committed
feat: add automatic block-level partition shuffle
Add heuristic-based block-level shuffle for better load balancing when tables have few segments relative to cluster size. Changes: - Add BlockMod shuffle kind for block-level distribution - Add auto_block_shuffle_threshold setting (default=5, 0 to disable) - When segment_count < nodes * threshold, use block-level shuffle - Each executor filters blocks by block_idx % num_executors == executor_idx - Add info logging for shuffle strategy selection - Preserve partition kind during reshuffle to prevent data duplication
1 parent a29733a commit 9764f7a

File tree

9 files changed

+93
-10
lines changed

9 files changed

+93
-10
lines changed

src/query/catalog/src/plan/partition.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ pub enum PartitionsShuffleKind {
103103
ConsistentHash,
104104
// Bind the Partition to executor by partition.rand() order.
105105
Rand,
106+
// Bind the Partition to executor by block-level modulo (block_idx % num_executors)
107+
BlockMod,
106108
// Bind the Partition to executor by broadcast
107109
BroadcastCluster,
108110
// Bind the Partition to warehouse executor by broadcast
@@ -194,13 +196,16 @@ impl Partitions {
194196
parts
195197
}
196198
// the executors will be all nodes in the warehouse if a query is BroadcastWarehouse.
197-
PartitionsShuffleKind::BroadcastCluster | PartitionsShuffleKind::BroadcastWarehouse => {
199+
PartitionsShuffleKind::BlockMod
200+
| PartitionsShuffleKind::BroadcastCluster
201+
| PartitionsShuffleKind::BroadcastWarehouse => {
198202
return Ok(executors_sorted
199203
.into_iter()
200204
.map(|executor| {
201205
(
202206
executor.id.clone(),
203-
Partitions::create(PartitionsShuffleKind::Seq, self.partitions.clone()),
207+
// Preserve the original kind so executors know how to handle it
208+
Partitions::create(self.kind.clone(), self.partitions.clone()),
204209
)
205210
})
206211
.collect());

src/query/catalog/tests/it/partitions.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,14 +254,14 @@ fn test_partition_reshuffle() {
254254
writeln!(file, "{:?}", e2_parts.len()).unwrap();
255255
}
256256

257-
// Broadcast.
257+
// BlockMod.
258258
{
259-
let partitions = gen_parts(PartitionsShuffleKind::BroadcastCluster, 3);
259+
let partitions = gen_parts(PartitionsShuffleKind::BlockMod, 3);
260260
let shuffle = partitions.reshuffle(executors_2.clone()).unwrap();
261261

262262
writeln!(
263263
file,
264-
"PartitionsShuffleKind::Broadcast: 3 partitions of 2 executors"
264+
"PartitionsShuffleKind::BlockMod: 3 partitions of 2 executors"
265265
)
266266
.unwrap();
267267
let e1_parts = shuffle.get(&executors_2[0].id).unwrap();

src/query/catalog/tests/it/testdata/partition-reshuffle.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,6 @@ Partitions { kind: Seq, partitions: [{"type":"fuse_lazy","loc":"4"}, {"type":"fu
2727
PartitionsShuffleKind::Rand: 11 partitions of 2 executors
2828
5
2929
6
30-
PartitionsShuffleKind::Broadcast: 3 partitions of 2 executors
31-
Partitions { kind: Seq, partitions: [{"type":"fuse_lazy","loc":"0"}, {"type":"fuse_lazy","loc":"1"}, {"type":"fuse_lazy","loc":"2"}] }
32-
Partitions { kind: Seq, partitions: [{"type":"fuse_lazy","loc":"0"}, {"type":"fuse_lazy","loc":"1"}, {"type":"fuse_lazy","loc":"2"}] }
30+
PartitionsShuffleKind::BlockMod: 3 partitions of 2 executors
31+
Partitions { kind: BlockMod, partitions: [{"type":"fuse_lazy","loc":"0"}, {"type":"fuse_lazy","loc":"1"}, {"type":"fuse_lazy","loc":"2"}] }
32+
Partitions { kind: BlockMod, partitions: [{"type":"fuse_lazy","loc":"0"}, {"type":"fuse_lazy","loc":"1"}, {"type":"fuse_lazy","loc":"2"}] }

src/query/service/tests/it/storages/fuse/pruning_pipeline.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ async fn apply_snapshot_pruning(
9494
cache_key,
9595
segment_locs.len(),
9696
0,
97+
false, // use_block_level_shuffle
9798
)?;
9899
prune_pipeline.set_max_threads(1);
99100
prune_pipeline.set_on_init(move || {

src/query/settings/src/settings_default.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,6 +1314,13 @@ impl DefaultSettings {
13141314
scope: SettingScope::Both,
13151315
range: Some(SettingRange::Numeric(0..=1)),
13161316
}),
1317+
("auto_block_shuffle_threshold", DefaultSettingValue {
1318+
value: UserSettingValue::UInt64(5),
1319+
desc: "Threshold for automatic block-level shuffle. When segment_count < cluster_nodes * threshold, uses block-level distribution (block_idx % num_executors) instead of segment-level. Set to 0 to disable.",
1320+
mode: SettingMode::Both,
1321+
scope: SettingScope::Both,
1322+
range: Some(SettingRange::Numeric(0..=100)),
1323+
}),
13171324
("enable_prune_pipeline", DefaultSettingValue {
13181325
value: UserSettingValue::UInt64(1),
13191326
desc: "Enable pruning pipeline",

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -976,6 +976,10 @@ impl Settings {
976976
Ok(self.try_get_u64("enable_distributed_pruning")? == 1)
977977
}
978978

979+
pub fn get_auto_block_shuffle_threshold(&self) -> Result<usize> {
980+
Ok(self.try_get_u64("auto_block_shuffle_threshold")? as usize)
981+
}
982+
979983
pub fn get_persist_materialized_cte(&self) -> Result<bool> {
980984
Ok(self.try_get_u64("persist_materialized_cte")? != 0)
981985
}

src/query/storages/basic/src/memory_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ impl Table for MemoryTable {
208208
let parts = vec![MemoryPartInfo::create()];
209209
return Ok((
210210
statistics,
211-
Partitions::create(PartitionsShuffleKind::BroadcastCluster, parts),
211+
Partitions::create(PartitionsShuffleKind::BlockMod, parts),
212212
));
213213
}
214214

src/query/storages/fuse/src/operations/read_partitions.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ use databend_storages_common_index::NgramArgs;
5555
use databend_storages_common_pruner::BlockMetaIndex;
5656
use databend_storages_common_pruner::TopNPruner;
5757
use databend_storages_common_table_meta::meta::BlockMeta;
58+
use databend_storages_common_table_meta::meta::BlockSlotDescription;
5859
use databend_storages_common_table_meta::meta::ColumnStatistics;
5960
use databend_storages_common_table_meta::meta::column_oriented_segment::BLOCK_SIZE;
6061
use databend_storages_common_table_meta::meta::column_oriented_segment::BLOOM_FILTER_INDEX_LOCATION;
@@ -169,6 +170,27 @@ impl FuseTable {
169170
segments.push(FuseLazyPartInfo::create(idx, segment_location))
170171
}
171172

173+
// Determine shuffle kind based on heuristic:
174+
// Use block-level shuffle when segment count is small relative to cluster size
175+
let threshold = ctx.get_settings().get_auto_block_shuffle_threshold()?;
176+
let shuffle_kind =
177+
if threshold > 0 && nodes_num > 1 && segment_len < nodes_num * threshold {
178+
// Block-level shuffle: broadcast all segments to all executors,
179+
// each executor filters blocks by block_idx % num_executors == executor_idx
180+
info!(
181+
"Using BlockMod shuffle: segments={}, nodes={}, threshold={}",
182+
segment_len, nodes_num, threshold
183+
);
184+
PartitionsShuffleKind::BlockMod
185+
} else {
186+
// Default: use Mod shuffle kind for cache affinity at segment level
187+
info!(
188+
"Using Mod shuffle: segments={}, nodes={}, threshold={}",
189+
segment_len, nodes_num, threshold
190+
);
191+
PartitionsShuffleKind::Mod
192+
};
193+
172194
return Ok((
173195
PartStatistics::new_estimated(
174196
Some(snapshot_loc),
@@ -177,7 +199,7 @@ impl FuseTable {
177199
segment_len,
178200
segment_len,
179201
),
180-
Partitions::create(PartitionsShuffleKind::Mod, segments),
202+
Partitions::create(shuffle_kind, segments),
181203
));
182204
}
183205

@@ -284,6 +306,9 @@ impl FuseTable {
284306

285307
let (segment_tx, segment_rx) = async_channel::bounded(max_io_requests);
286308

309+
// Check if we should use block-level shuffle based on the partition kind
310+
let use_block_level_shuffle = plan.parts.kind == PartitionsShuffleKind::BlockMod;
311+
287312
match segment_format {
288313
FuseSegmentFormat::Row => {
289314
self.prune_segments_with_pipeline(
@@ -295,6 +320,7 @@ impl FuseTable {
295320
derterministic_cache_key.clone(),
296321
lazy_init_segments.len(),
297322
plan_id,
323+
use_block_level_shuffle,
298324
)?;
299325
}
300326
FuseSegmentFormat::Column => {
@@ -420,6 +446,7 @@ impl FuseTable {
420446
derterministic_cache_key: Option<String>,
421447
partitions_total: usize,
422448
plan_id: u32,
449+
use_block_level_shuffle: bool,
423450
) -> Result<()> {
424451
let max_threads = ctx.get_settings().get_max_threads()? as usize;
425452
prune_pipeline.add_source(
@@ -524,6 +551,26 @@ impl FuseTable {
524551
.filter(|p| p.order_by.is_empty() && p.filters.is_none())
525552
.and_then(|p| p.limit);
526553
let enable_prune_cache = ctx.get_settings().get_enable_prune_cache()?;
554+
555+
// Compute block slot for block-level shuffle
556+
let block_slot = if use_block_level_shuffle {
557+
let cluster = ctx.get_cluster();
558+
if !cluster.is_empty() {
559+
// Sort nodes by cache_id for deterministic ordering
560+
let mut nodes = cluster.nodes.clone();
561+
nodes.sort_by(|a, b| a.cache_id.cmp(&b.cache_id));
562+
let num_slots = nodes.len();
563+
let local_id = &cluster.local_id;
564+
// Find the local node's position in the sorted list
565+
let slot = nodes.iter().position(|n| &n.id == local_id).unwrap_or(0) as u32;
566+
Some(BlockSlotDescription { num_slots, slot })
567+
} else {
568+
None
569+
}
570+
} else {
571+
None
572+
};
573+
527574
let send_part_state = Arc::new(SendPartState::create(
528575
derterministic_cache_key,
529576
limit,
@@ -540,6 +587,7 @@ impl FuseTable {
540587
pruner.table_schema.clone(),
541588
send_part_state.clone(),
542589
enable_prune_cache,
590+
block_slot.clone(),
543591
)
544592
})?;
545593

src/query/storages/fuse/src/pruning_pipeline/send_part_info_sink.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use databend_storages_common_cache::CacheAccessor;
4141
use databend_storages_common_cache::CachedObject;
4242
use databend_storages_common_pruner::BlockMetaIndex;
4343
use databend_storages_common_table_meta::meta::BlockMeta;
44+
use databend_storages_common_table_meta::meta::BlockSlotDescription;
4445
use parking_lot::Mutex;
4546

4647
use crate::FuseTable;
@@ -134,6 +135,7 @@ pub struct SendPartInfoSink {
134135
statistics: PartStatistics,
135136
send_part_state: Arc<SendPartState>,
136137
enable_cache: bool,
138+
block_slot: Option<BlockSlotDescription>,
137139
}
138140

139141
impl SendPartInfoSink {
@@ -145,6 +147,7 @@ impl SendPartInfoSink {
145147
schema: TableSchemaRef,
146148
send_part_state: Arc<SendPartState>,
147149
enable_cache: bool,
150+
block_slot: Option<BlockSlotDescription>,
148151
) -> Result<ProcessorPtr> {
149152
let partitions = Partitions::default();
150153
let statistics = PartStatistics::default();
@@ -159,6 +162,7 @@ impl SendPartInfoSink {
159162
statistics,
160163
send_part_state,
161164
enable_cache,
165+
block_slot,
162166
},
163167
)))
164168
}
@@ -228,6 +232,13 @@ impl SendPartInfoSink {
228232
let mut parts = Vec::with_capacity(block_metas.len());
229233

230234
for (block_meta_index, block_meta) in block_metas.iter() {
235+
// Block-level partition shuffle: filter blocks by block_idx % num_slots == slot
236+
if let Some(BlockSlotDescription { num_slots, slot }) = &self.block_slot {
237+
if block_meta_index.block_idx % num_slots != *slot as usize {
238+
continue;
239+
}
240+
}
241+
231242
let rows = block_meta.row_count as usize;
232243
let previous_limit = self.send_part_state.limit.fetch_sub(
233244
rows.min(self.send_part_state.limit.load(Ordering::SeqCst)),
@@ -275,6 +286,13 @@ impl SendPartInfoSink {
275286
};
276287

277288
for (block_meta_index, block_meta) in block_metas.iter() {
289+
// Block-level partition shuffle: filter blocks by block_idx % num_slots == slot
290+
if let Some(BlockSlotDescription { num_slots, slot }) = &self.block_slot {
291+
if block_meta_index.block_idx % num_slots != *slot as usize {
292+
continue;
293+
}
294+
}
295+
278296
let rows = block_meta.row_count as usize;
279297
let previous_limit = self.send_part_state.limit.fetch_sub(
280298
rows.min(self.send_part_state.limit.load(Ordering::SeqCst)),

0 commit comments

Comments
 (0)