Skip to content

Commit 76cca8f

Browse files
authored
fix(query): try fix hang for cluster aggregate (#18655)
* fix(query): try fix hang for cluster aggregate * fix(query): try fix hang for cluster aggregate * fix(query): try fix hang for cluster aggregate * fix(query): try fix hang for cluster aggregate * fix(query): try fix hang for cluster aggregate * fix(query): try fix hang for cluster aggregate * fix(query): try fix hang for cluster aggregate
1 parent e99e622 commit 76cca8f

File tree

6 files changed

+21
-10
lines changed

6 files changed

+21
-10
lines changed

src/query/service/src/physical_plans/physical_aggregate_final.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,19 @@ impl IPhysicalPlan for AggregateFinal {
186186

187187
self.input.build_pipeline(builder)?;
188188

189+
// For distributed plans, since we are unaware of the data size processed by other nodes,
190+
// we estimate the parallelism based on the worst-case scenario.
191+
let after_group_parallel = match self.input.is_distributed_plan() {
192+
true => builder.settings.get_max_threads()? as usize,
193+
false => builder.main_pipeline.output_len(),
194+
};
195+
189196
builder.exchange_injector = old_inject;
190197
build_partition_bucket(
191198
&mut builder.main_pipeline,
192199
params.clone(),
193200
max_restore_worker,
201+
after_group_parallel,
194202
)
195203
}
196204
}

src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,7 @@ pub fn build_partition_bucket(
580580
pipeline: &mut Pipeline,
581581
params: Arc<AggregatorParams>,
582582
max_restore_worker: u64,
583+
after_worker: usize,
583584
) -> Result<()> {
584585
let input_nums = pipeline.output_len();
585586
let transform = NewTransformPartitionBucket::create(input_nums, params.clone())?;
@@ -610,7 +611,7 @@ pub fn build_partition_bucket(
610611
)?))
611612
})?;
612613

613-
pipeline.try_resize(input_nums)?;
614+
pipeline.try_resize(after_worker)?;
614615

615616
Ok(())
616617
}

src/query/service/src/servers/flight/v1/exchange/exchange_transform.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ impl ExchangeTransform {
4848

4949
// exchange writer sink and resize and exchange reader
5050
let len = params.destination_ids.len();
51-
let local_pipe = if params.allow_adjust_parallelism {
51+
let local_pipe = if params.allow_adjust_parallelism
52+
&& params.exchange_injector.exchange_sorting().is_none()
53+
{
5254
ctx.get_settings().get_max_threads()? as usize
5355
} else {
5456
1

tests/sqllogictests/suites/ee/02_ee_aggregating_index/02_0001_agg_index_projected_scan.test

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,11 @@ export function finish(state) {
106106
}
107107
$$;
108108

109-
query IIR
109+
query I rowsort
110110
SELECT MAX(a), MIN(b), weighted_avg(a,b) from t group by b;
111111
----
112-
2 2 1.3333334
113112
1 1 1.0
113+
2 2 1.3333334
114114

115115
query IIR
116116
SELECT MAX(a), MIN(b), weighted_avg(a,b) from t;

tests/sqllogictests/suites/query/cte/basic_r_cte.test

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ select concat('城市',rn::varchar) city from t1 where rn<=5;
254254

255255
statement ok
256256
insert into train
257-
select concat('G',row_number()over()::varchar),c1.city,c2.city, n from city c1, city c2, (select 600 n union select 800 union select 1200 union select 1600) a ;
257+
select concat('G',row_number()over()::varchar),city_1,city_2, n from (SELECT c1.city as city_1,c2.city as city_2, n FROM city c1, city c2, (select 600 n union select 800 union select 1200 union select 1600) a order by c1.city,c2.city, n);
258258

259259
statement ok
260260
insert into passenger
@@ -281,7 +281,7 @@ select
281281
from
282282
t0,(select 1 n union all select 2);
283283
----
284-
236700 473200 210000
284+
224100 448000 210000
285285

286286
statement ok
287287
CREATE OR REPLACE table t_tree (id int, parent int, name string);

tests/sqllogictests/suites/query/window_function/window_bound.test

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -267,23 +267,23 @@ SELECT a, DENSE_RANK() OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND UNB
267267
6 4
268268
7 5
269269

270-
query I
270+
query I rowsort
271271
SELECT DISTINCT lead((861, FALSE, FALSE), 9, (849, TRUE, TRUE)) OVER (
272272
PARTITION BY 15560425903542832284, 965871850213131579
273273
ORDER BY 13746504519650342222, 5897530378272856518 ASC NULLS FIRST)
274274
FROM range(100, 12000000, 467);
275275
----
276-
(861,0,0)
277276
(849,1,1)
277+
(861,0,0)
278278

279-
query II
279+
query II rowsort
280280
SELECT DISTINCT lead((861, FALSE, FALSE), 9, (849, TRUE, TRUE)) OVER (
281281
PARTITION BY 15560425903542832284, 965871850213131579
282282
ORDER BY 13746504519650342222, 5897530378272856518 ASC NULLS FIRST)
283283
FROM range(100, 120000000, 467);
284284
----
285-
(861,0,0)
286285
(849,1,1)
286+
(861,0,0)
287287

288288
statement ok
289289
DROP DATABASE test_window_bound;

0 commit comments

Comments
 (0)