Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
85cea16
refactor: support row/bucket shuffle for aggregation
dqhl76 Dec 16, 2025
672b885
fmt
dqhl76 Dec 25, 2025
fc733c1
enable_experiment_aggregate = 1
dqhl76 Dec 25, 2025
df270a0
license check
dqhl76 Dec 25, 2025
40737d1
make clippy happy
dqhl76 Dec 25, 2025
70710f5
make clippy happy
dqhl76 Dec 25, 2025
4075b59
try fix hang when merge join is after agg
dqhl76 Dec 29, 2025
f427bb6
fmt
dqhl76 Dec 29, 2025
d499dc8
fmt
dqhl76 Dec 29, 2025
85d6c6e
temp save
dqhl76 Dec 29, 2025
d2f3352
simplify AggregateShuffleMode
dqhl76 Dec 29, 2025
29cbbfb
simplify new partial aggregate
dqhl76 Dec 29, 2025
017cfe4
not finish
dqhl76 Dec 29, 2025
6db6e57
save
dqhl76 Dec 30, 2025
75d85b3
save
dqhl76 Dec 30, 2025
c6dd2c1
fix
dqhl76 Dec 31, 2025
e7fc7d9
fix
dqhl76 Dec 31, 2025
156608f
row
dqhl76 Dec 31, 2025
ac3d821
bucket
dqhl76 Jan 1, 2026
4bbc54d
finish
dqhl76 Jan 3, 2026
307c3b7
fmt + add test
dqhl76 Jan 3, 2026
b342d56
clean useless branch
dqhl76 Jan 3, 2026
7cc05d6
try support final aggregate spill
dqhl76 Jan 4, 2026
b7b9866
feat: support final aggregate spill
dqhl76 Jan 5, 2026
01ce590
forget on_finish for output triggered finish
dqhl76 Jan 5, 2026
c80335f
shuffle mode determination for not cluster aggregation
dqhl76 Jan 5, 2026
0be800a
add more test
dqhl76 Jan 5, 2026
0b255dd
fix
dqhl76 Jan 5, 2026
82c47d5
add more test
dqhl76 Jan 6, 2026
73888eb
remove debug info
dqhl76 Jan 7, 2026
748c33c
disable for now, prefer enable in another PR
dqhl76 Jan 7, 2026
5c6a707
Merge branch 'main' into aggregate-24
dqhl76 Jan 7, 2026
8007f90
Merge remote-tracking branch 'upstream/main' into aggregate-24
dqhl76 Jan 13, 2026
c221289
Merge branch 'main' into aggregate-24
zhang2014 Jan 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/query/expression/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,33 @@ impl Default for HashTableConfig {
}

impl HashTableConfig {
pub fn new_experiment_partial(
radix_bits: u64,
node_nums: usize,
active_threads: usize,
) -> Self {
let capacity = if node_nums != 1 {
131072 * (2 << node_nums)
} else {
let total_shared_cache_size = active_threads * L3_CACHE_SIZE;
let cache_per_active_thread =
L1_CACHE_SIZE + L2_CACHE_SIZE + total_shared_cache_size / active_threads;
let size_per_entry = (8_f64 * LOAD_FACTOR) as usize;
(cache_per_active_thread / size_per_entry).next_power_of_two()
};

// not support payload growth when `enable_experiment_aggregate` = 1
HashTableConfig {
current_max_radix_bits: Arc::new(AtomicU64::new(radix_bits)),
initial_radix_bits: radix_bits,
max_radix_bits: radix_bits,
repartition_radix_bits_incr: 0,
partial_agg: true,
max_partial_capacity: capacity,
..Default::default()
}
}

pub fn with_initial_radix_bits(mut self, initial_radix_bits: u64) -> Self {
self.initial_radix_bits = initial_radix_bits;
self.current_max_radix_bits = Arc::new(AtomicU64::new(initial_radix_bits));
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/physical_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub use databend_common_sql::executor::physical_plans::window::*;
pub use physical_add_stream_column::AddStreamColumn;
pub use physical_aggregate_expand::AggregateExpand;
pub use physical_aggregate_final::AggregateFinal;
pub use physical_aggregate_final::AggregateShuffleMode;
pub use physical_aggregate_partial::AggregatePartial;
pub use physical_async_func::AsyncFunction;
pub use physical_broadcast::BroadcastSink;
Expand Down
407 changes: 195 additions & 212 deletions src/query/service/src/physical_plans/physical_aggregate_final.rs

Large diffs are not rendered by default.

70 changes: 45 additions & 25 deletions src/query/service/src/physical_plans/physical_aggregate_partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use databend_common_expression::DataSchemaRef;
use databend_common_expression::DataSchemaRefExt;
use databend_common_expression::HashTableConfig;
use databend_common_expression::LimitType;
use databend_common_expression::MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM;
use databend_common_expression::SortColumnDescription;
use databend_common_expression::types::DataType;
use databend_common_functions::aggregates::AggregateFunctionFactory;
Expand All @@ -38,10 +37,10 @@ use databend_common_sql::executor::physical_plans::SortDesc;
use databend_common_storage::DataOperator;
use itertools::Itertools;

use crate::clusters::ClusterHelper;
use crate::physical_plans::explain::PlanStatsInfo;
use crate::physical_plans::format::AggregatePartialFormatter;
use crate::physical_plans::format::PhysicalFormat;
use crate::physical_plans::physical_aggregate_final::AggregateShuffleMode;
use crate::physical_plans::physical_plan::IPhysicalPlan;
use crate::physical_plans::physical_plan::PhysicalPlan;
use crate::physical_plans::physical_plan::PhysicalPlanMeta;
Expand All @@ -65,6 +64,9 @@ pub struct AggregatePartial {
pub rank_limit: Option<(Vec<SortDesc>, usize)>,
// Only used for explain
pub stat_info: Option<PlanStatsInfo>,

// Only used when enable_experiment_aggregate is true
pub shuffle_mode: AggregateShuffleMode,
}

#[typetag::serde]
Expand Down Expand Up @@ -165,6 +167,7 @@ impl IPhysicalPlan for AggregatePartial {
group_by_display: self.group_by_display.clone(),
rank_limit: self.rank_limit.clone(),
stat_info: self.stat_info.clone(),
shuffle_mode: self.shuffle_mode.clone(),
})
}

Expand All @@ -175,8 +178,8 @@ impl IPhysicalPlan for AggregatePartial {
let max_block_bytes = builder.settings.get_max_block_bytes()? as usize;
let max_threads = builder.settings.get_max_threads()?;
let max_spill_io_requests = builder.settings.get_max_spill_io_requests()?;

let enable_experiment_aggregate = builder.settings.get_enable_experiment_aggregate()?;
let cluster = &builder.ctx.get_cluster();

let params = PipelineBuilder::build_aggregator_params(
self.input.output_schema()?,
Expand All @@ -197,12 +200,21 @@ impl IPhysicalPlan for AggregatePartial {

let schema_before_group_by = params.input_schema.clone();

// Need a global atomic to read the max current radix bits hint
let partial_agg_config = if !builder.is_exchange_parent() {
HashTableConfig::default().with_partial(true, max_threads as usize)
let partial_agg_config = if enable_experiment_aggregate {
let radix_bits = self.shuffle_mode.determine_radix_bits();
HashTableConfig::new_experiment_partial(
radix_bits,
cluster.nodes.len(),
max_threads as usize,
)
} else {
HashTableConfig::default()
.cluster_with_partial(true, builder.ctx.get_cluster().nodes.len())
// Need a global atomic to read the max current radix bits hint
if !builder.is_exchange_parent() {
HashTableConfig::default().with_partial(true, max_threads as usize)
} else {
HashTableConfig::default()
.cluster_with_partial(true, builder.ctx.get_cluster().nodes.len())
}
};

// For rank limit, we can filter data using sort with rank before partial.
Expand All @@ -215,23 +227,18 @@ impl IPhysicalPlan for AggregatePartial {
}

if params.enable_experiment_aggregate {
let cluster = &builder.ctx.get_cluster();
let streams_num = if !builder.is_exchange_parent() {
1
} else {
let is_row_shuffle = matches!(self.shuffle_mode, AggregateShuffleMode::Row);
let bucket_num = if is_row_shuffle {
cluster.nodes.len()
} else {
2_usize.pow(partial_agg_config.initial_radix_bits as u32)
};
let local_pos = cluster.ordered_index();
let shared_partition_streams = (0..streams_num)
.map(|_| {
SharedPartitionStream::new(
builder.main_pipeline.output_len(),
max_block_rows,
max_block_bytes,
MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize,
)
})
.collect::<Vec<_>>();
let shared_partition_streams = SharedPartitionStream::new(
builder.main_pipeline.output_len(),
max_block_rows,
max_block_bytes,
bucket_num,
);

builder.main_pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(
Expand All @@ -242,7 +249,8 @@ impl IPhysicalPlan for AggregatePartial {
params.clone(),
partial_agg_config.clone(),
shared_partition_streams.clone(),
local_pos,
bucket_num,
is_row_shuffle,
)?,
))
})?;
Expand Down Expand Up @@ -276,7 +284,19 @@ impl IPhysicalPlan for AggregatePartial {
})?;
}

builder.exchange_injector = AggregateInjector::create(builder.ctx.clone(), params.clone());
builder.exchange_injector = if params.enable_experiment_aggregate {
AggregateInjector::<true>::create(
builder.ctx.clone(),
params.clone(),
self.shuffle_mode.clone(),
)
} else {
AggregateInjector::<false>::create(
builder.ctx.clone(),
params.clone(),
self.shuffle_mode.clone(),
)
};
Ok(())
}
}
Expand Down
Loading