Skip to content

Commit 390ef9c

Browse files
committed
fix ut:
1 parent fc6eb67 commit 390ef9c

File tree

3 files changed

+29
-29
lines changed

3 files changed

+29
-29
lines changed

be/src/pipeline/exec/streaming_aggregation_operator.cpp

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -313,23 +313,22 @@ bool StreamingAggLocalState::_should_not_do_pre_agg(size_t rows) {
313313
const auto spill_streaming_agg_mem_limit = p._spill_streaming_agg_mem_limit;
314314
const bool used_too_much_memory =
315315
spill_streaming_agg_mem_limit > 0 && _memory_usage() > spill_streaming_agg_mem_limit;
316-
std::visit(
317-
vectorized::Overload {
318-
[&](std::monostate& arg) {
319-
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
320-
},
321-
[&](auto& agg_method) {
322-
auto& hash_tbl = *agg_method.hash_table;
323-
/// If too much memory is used during the pre-aggregation stage,
324-
/// it is better to output the data directly without performing further aggregation.
325-
// do not try to do agg, just init and serialize directly return the out_block
326-
if (used_too_much_memory || (hash_tbl.add_elem_size_overflow(rows) &&
327-
!_should_expand_preagg_hash_tables())) {
328-
SCOPED_TIMER(_streaming_agg_timer);
329-
ret_flag = true;
330-
}
331-
}},
332-
_agg_data->method_variant);
316+
std::visit(vectorized::Overload {
317+
[&](std::monostate& arg) {
318+
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
319+
},
320+
[&](auto& agg_method) {
321+
auto& hash_tbl = *agg_method.hash_table;
322+
/// If too much memory is used during the pre-aggregation stage,
323+
/// it is better to output the data directly without performing further aggregation.
324+
// do not try to do agg, just init and serialize directly return the out_block
325+
if (used_too_much_memory || (hash_tbl.add_elem_size_overflow(rows) &&
326+
!_should_expand_preagg_hash_tables())) {
327+
SCOPED_TIMER(_streaming_agg_timer);
328+
ret_flag = true;
329+
}
330+
}},
331+
_agg_data->method_variant);
333332

334333
return ret_flag;
335334
}

be/test/pipeline/operator/streaming_agg_operator_test.cpp

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ TEST_F(StreamingAggOperatorTest, test1) {
109109
false));
110110
op->_pool = &pool;
111111
op->_needs_finalize = false;
112-
op->_is_merge = false;
113112

114113
EXPECT_TRUE(op->set_child(child_op));
115114

@@ -157,7 +156,9 @@ TEST_F(StreamingAggOperatorTest, test1) {
157156
EXPECT_TRUE(op->need_more_input_data(state.get()));
158157
}
159158

160-
{ EXPECT_TRUE(local_state->close(state.get()).ok()); }
159+
{
160+
EXPECT_TRUE(local_state->close(state.get()).ok());
161+
}
161162
}
162163

163164
TEST_F(StreamingAggOperatorTest, test2) {
@@ -166,7 +167,6 @@ TEST_F(StreamingAggOperatorTest, test2) {
166167
false));
167168
op->_pool = &pool;
168169
op->_needs_finalize = false;
169-
op->_is_merge = false;
170170

171171
EXPECT_TRUE(op->set_child(child_op));
172172

@@ -234,7 +234,9 @@ TEST_F(StreamingAggOperatorTest, test2) {
234234
EXPECT_EQ(block.rows(), 3);
235235
}
236236

237-
{ EXPECT_TRUE(local_state->close(state.get()).ok()); }
237+
{
238+
EXPECT_TRUE(local_state->close(state.get()).ok());
239+
}
238240
}
239241

240242
TEST_F(StreamingAggOperatorTest, test3) {
@@ -243,7 +245,6 @@ TEST_F(StreamingAggOperatorTest, test3) {
243245
false));
244246
op->_pool = &pool;
245247
op->_needs_finalize = false;
246-
op->_is_merge = false;
247248

248249
EXPECT_TRUE(op->set_child(child_op));
249250

@@ -314,7 +315,9 @@ TEST_F(StreamingAggOperatorTest, test3) {
314315
EXPECT_EQ(block.rows(), 3);
315316
}
316317

317-
{ EXPECT_TRUE(local_state->close(state.get()).ok()); }
318+
{
319+
EXPECT_TRUE(local_state->close(state.get()).ok());
320+
}
318321
}
319322

320323
TEST_F(StreamingAggOperatorTest, test4) {
@@ -323,7 +326,6 @@ TEST_F(StreamingAggOperatorTest, test4) {
323326
std::make_shared<DataTypeBitMap>(), false));
324327
op->_pool = &pool;
325328
op->_needs_finalize = false;
326-
op->_is_merge = false;
327329

328330
EXPECT_TRUE(op->set_child(child_op));
329331

@@ -406,7 +408,9 @@ TEST_F(StreamingAggOperatorTest, test4) {
406408
// << "Expected: " << res_block.dump_data() << ", but got: " << block.dump_data();
407409
}
408410

409-
{ EXPECT_TRUE(local_state->close(state.get()).ok()); }
411+
{
412+
EXPECT_TRUE(local_state->close(state.get()).ok());
413+
}
410414
}
411415

412416
} // namespace doris::pipeline

regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,19 @@ suite("push_topn_to_agg") {
3232
explain{
3333
sql "select o_custkey, sum(o_shippriority) from orders group by o_custkey limit 4;"
3434
multiContains ("sortByGroupKey:true", 2)
35-
notContains("STREAMING")
3635
}
3736

3837
// when apply this opt, trun off STREAMING
3938
// limit -> proj -> agg,
4039
explain{
4140
sql "select sum(c_custkey), c_name from customer group by c_name limit 6;"
4241
multiContains ("sortByGroupKey:true", 2)
43-
notContains("STREAMING")
4442
}
4543

4644
// topn -> agg
4745
explain{
4846
sql "select o_custkey, sum(o_shippriority) from orders group by o_custkey order by o_custkey limit 8;"
4947
multiContains ("sortByGroupKey:true", 2)
50-
notContains("STREAMING")
5148
}
5249

5350
// order keys are part of group keys,
@@ -185,4 +182,4 @@ suite("push_topn_to_agg") {
185182
| planed with unknown column statistics |
186183
+--------------------------------------------------------------------------------+
187184
**/
188-
}
185+
}

0 commit comments

Comments
 (0)