Skip to content

Commit 4c215b1

Browse files
authored
[opt](Arena)Release Arena memory earlier in pipeline operators. (#59045)
### What problem does this PR solve? Previously we put the Arena into the local state, which is only released when the fragment is freed. That was done because the shared state might use memory from the Arena. ```C++ RETURN_IF_ERROR( Base::_shared_state->aggregate_evaluators[i]->execute_batch_add_selected( block, Base::_parent->template cast<AggSinkOperatorX>() ._offsets_of_aggregate_states[i], _places.data(), _agg_arena_pool)); ``` However, this is actually wrong — we should place the Arena directly into the shared state so the Arena is released at task granularity instead of fragment granularity. ```C++ Status PipelineTask::finalize() { auto fragment = _fragment_context.lock(); if (!fragment) { return Status::OK(); } SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker()); RETURN_IF_ERROR(_state_transition(State::FINALIZED)); std::unique_lock<std::mutex> lc(_dependency_lock); _sink_shared_state.reset(); _op_shared_states.clear(); _shared_state_map.clear(); _block.reset(); _operators.clear(); _sink.reset(); _pipeline.reset(); return Status::OK(); } ``` For operators that don't require shared state, release it in close(). ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [x] No need to test or manual test. Explain why: - [x] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [x] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [x] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
1 parent cd1794b commit 4c215b1

13 files changed

+46
-44
lines changed

be/src/pipeline/dependency.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,9 @@ struct AggSharedState : public BasicSharedState {
382382
// Refresh the top limit heap with a new row
383383
void refresh_top_limit(size_t row_id, const vectorized::ColumnRawPtrs& key_columns);
384384

385+
vectorized::Arena agg_arena_pool;
386+
vectorized::Arena agg_profile_arena;
387+
385388
private:
386389
vectorized::MutableColumns _get_keys_hash_table();
387390

@@ -577,6 +580,7 @@ struct AnalyticSharedState : public BasicSharedState {
577580
std::mutex buffer_mutex;
578581
bool sink_eos = false;
579582
std::mutex sink_eos_lock;
583+
vectorized::Arena agg_arena_pool;
580584
};
581585

582586
struct JoinSharedState : public BasicSharedState {
@@ -699,6 +703,8 @@ struct SetSharedState : public BasicSharedState {
699703

700704
std::atomic<bool> ready_for_read = false;
701705

706+
vectorized::Arena arena;
707+
702708
/// called in setup_local_state
703709
Status hash_table_init();
704710
};

be/src/pipeline/exec/aggregation_sink_operator.cpp

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ Status AggSinkLocalState::open(RuntimeState* state) {
102102
}
103103

104104
if (Base::_shared_state->probe_expr_ctxs.empty()) {
105-
_agg_data->without_key =
106-
reinterpret_cast<vectorized::AggregateDataPtr>(_agg_profile_arena.aligned_alloc(
105+
_agg_data->without_key = reinterpret_cast<vectorized::AggregateDataPtr>(
106+
Base::_shared_state->agg_profile_arena.aligned_alloc(
107107
p._total_size_of_aggregate_states, p._align_aggregate_states));
108108

109109
if (p._is_merge) {
@@ -187,7 +187,7 @@ Status AggSinkLocalState::_execute_without_key(vectorized::Block* block) {
187187
block,
188188
_agg_data->without_key + Base::_parent->template cast<AggSinkOperatorX>()
189189
._offsets_of_aggregate_states[i],
190-
_agg_arena_pool));
190+
Base::_shared_state->agg_arena_pool));
191191
}
192192
return Status::OK();
193193
}
@@ -207,7 +207,7 @@ size_t AggSinkLocalState::_memory_usage() const {
207207
return 0;
208208
}
209209
size_t usage = 0;
210-
usage += _agg_arena_pool.size();
210+
usage += Base::_shared_state->agg_arena_pool.size();
211211

212212
if (Base::_shared_state->aggregate_data_container) {
213213
usage += Base::_shared_state->aggregate_data_container->memory_usage();
@@ -240,7 +240,7 @@ void AggSinkLocalState::_update_memusage_with_serialized_key() {
240240
},
241241
[&](auto& agg_method) -> void {
242242
auto& data = *agg_method.hash_table;
243-
int64_t memory_usage_arena = _agg_arena_pool.size();
243+
int64_t memory_usage_arena = Base::_shared_state->agg_arena_pool.size();
244244
int64_t memory_usage_container =
245245
_shared_state->aggregate_data_container->memory_usage();
246246
int64_t hash_table_memory_usage = data.get_buffer_size_in_bytes();
@@ -321,16 +321,16 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
321321
_places.data(),
322322
Base::_parent->template cast<AggSinkOperatorX>()
323323
._offsets_of_aggregate_states[i],
324-
_deserialize_buffer.data(), column.get(), _agg_arena_pool,
325-
rows);
324+
_deserialize_buffer.data(), column.get(),
325+
Base::_shared_state->agg_arena_pool, rows);
326326
}
327327
} else {
328328
RETURN_IF_ERROR(
329329
Base::_shared_state->aggregate_evaluators[i]->execute_batch_add_selected(
330330
block,
331331
Base::_parent->template cast<AggSinkOperatorX>()
332332
._offsets_of_aggregate_states[i],
333-
_places.data(), _agg_arena_pool));
333+
_places.data(), Base::_shared_state->agg_arena_pool));
334334
}
335335
}
336336
} else {
@@ -375,15 +375,15 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
375375
_places.data(),
376376
Base::_parent->template cast<AggSinkOperatorX>()
377377
._offsets_of_aggregate_states[i],
378-
_deserialize_buffer.data(), column.get(), _agg_arena_pool,
379-
rows);
378+
_deserialize_buffer.data(), column.get(),
379+
Base::_shared_state->agg_arena_pool, rows);
380380
}
381381
} else {
382382
RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_batch_add(
383383
block,
384384
Base::_parent->template cast<AggSinkOperatorX>()
385385
._offsets_of_aggregate_states[i],
386-
_places.data(), _agg_arena_pool));
386+
_places.data(), Base::_shared_state->agg_arena_pool));
387387
}
388388
}
389389
}
@@ -423,20 +423,20 @@ Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
423423
_agg_data->without_key +
424424
Base::_parent->template cast<AggSinkOperatorX>()
425425
._offsets_of_aggregate_states[i],
426-
*column, _agg_arena_pool);
426+
*column, Base::_shared_state->agg_arena_pool);
427427
} else {
428428
RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_single_add(
429429
block,
430430
_agg_data->without_key + Base::_parent->template cast<AggSinkOperatorX>()
431431
._offsets_of_aggregate_states[i],
432-
_agg_arena_pool));
432+
Base::_shared_state->agg_arena_pool));
433433
}
434434
}
435435
return Status::OK();
436436
}
437437

438438
void AggSinkLocalState::_update_memusage_without_key() {
439-
int64_t arena_memory_usage = _agg_arena_pool.size();
439+
int64_t arena_memory_usage = Base::_shared_state->agg_arena_pool.size();
440440
COUNTER_SET(_memory_used_counter, arena_memory_usage);
441441
COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
442442
}
@@ -487,7 +487,7 @@ Status AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block*
487487
block,
488488
Base::_parent->template cast<AggSinkOperatorX>()
489489
._offsets_of_aggregate_states[i],
490-
_places.data(), _agg_arena_pool));
490+
_places.data(), Base::_shared_state->agg_arena_pool));
491491
}
492492
} else {
493493
auto do_aggregate_evaluators = [&] {
@@ -496,7 +496,7 @@ Status AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block*
496496
block,
497497
Base::_parent->template cast<AggSinkOperatorX>()
498498
._offsets_of_aggregate_states[i],
499-
_places.data(), _agg_arena_pool));
499+
_places.data(), Base::_shared_state->agg_arena_pool));
500500
}
501501
return Status::OK();
502502
};
@@ -550,8 +550,8 @@ void AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p
550550
agg_method.init_serialized_keys(key_columns, num_rows);
551551

552552
auto creator = [this](const auto& ctor, auto& key, auto& origin) {
553-
HashMethodType::try_presis_key_and_origin(key, origin,
554-
_agg_arena_pool);
553+
HashMethodType::try_presis_key_and_origin(
554+
key, origin, Base::_shared_state->agg_arena_pool);
555555
auto mapped =
556556
Base::_shared_state->aggregate_data_container->append_data(
557557
origin);
@@ -563,7 +563,7 @@ void AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p
563563
};
564564

565565
auto creator_for_null_key = [&](auto& mapped) {
566-
mapped = _agg_arena_pool.aligned_alloc(
566+
mapped = Base::_shared_state->agg_arena_pool.aligned_alloc(
567567
Base::_parent->template cast<AggSinkOperatorX>()
568568
._total_size_of_aggregate_states,
569569
Base::_parent->template cast<AggSinkOperatorX>()
@@ -627,8 +627,8 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
627627

628628
auto creator = [&](const auto& ctor, auto& key, auto& origin) {
629629
try {
630-
HashMethodType::try_presis_key_and_origin(key, origin,
631-
_agg_arena_pool);
630+
HashMethodType::try_presis_key_and_origin(
631+
key, origin, Base::_shared_state->agg_arena_pool);
632632
_shared_state->refresh_top_limit(i, key_columns);
633633
auto mapped =
634634
_shared_state->aggregate_data_container->append_data(
@@ -647,7 +647,7 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
647647
};
648648

649649
auto creator_for_null_key = [&](auto& mapped) {
650-
mapped = _agg_arena_pool.aligned_alloc(
650+
mapped = Base::_shared_state->agg_arena_pool.aligned_alloc(
651651
Base::_parent->template cast<AggSinkOperatorX>()
652652
._total_size_of_aggregate_states,
653653
Base::_parent->template cast<AggSinkOperatorX>()
@@ -909,7 +909,7 @@ Status AggSinkOperatorX::reset_hash_table(RuntimeState* state) {
909909
auto& ss = *local_state.Base::_shared_state;
910910
RETURN_IF_ERROR(ss.reset_hash_table());
911911
local_state._serialize_key_arena_memory_usage->set((int64_t)0);
912-
local_state._agg_arena_pool.clear(true);
912+
local_state.Base::_shared_state->agg_arena_pool.clear(true);
913913
return Status::OK();
914914
}
915915

be/src/pipeline/exec/aggregation_sink_operator.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,6 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {
122122
vectorized::Block _preagg_block = vectorized::Block();
123123

124124
AggregatedDataVariants* _agg_data = nullptr;
125-
vectorized::Arena _agg_arena_pool;
126-
vectorized::Arena _agg_profile_arena;
127125

128126
std::unique_ptr<ExecutorBase> _executor = nullptr;
129127

be/src/pipeline/exec/aggregation_source_operator.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,8 @@ Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block)
514514
SCOPED_TIMER(_deserialize_data_timer);
515515
Base::_shared_state->aggregate_evaluators[i]->function()->deserialize_and_merge_vec(
516516
_places.data(), _shared_state->offsets_of_aggregate_states[i],
517-
_deserialize_buffer.data(), column.get(), _agg_arena_pool, rows);
517+
_deserialize_buffer.data(), column.get(), Base::_shared_state->agg_arena_pool,
518+
rows);
518519
}
519520
}
520521

@@ -558,7 +559,8 @@ void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
558559
agg_method.init_serialized_keys(key_columns, num_rows);
559560

560561
auto creator = [this](const auto& ctor, auto& key, auto& origin) {
561-
HashMethodType::try_presis_key_and_origin(key, origin, _agg_arena_pool);
562+
HashMethodType::try_presis_key_and_origin(
563+
key, origin, Base::_shared_state->agg_arena_pool);
562564
auto mapped =
563565
Base::_shared_state->aggregate_data_container->append_data(
564566
origin);
@@ -570,7 +572,7 @@ void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
570572
};
571573

572574
auto creator_for_null_key = [&](auto& mapped) {
573-
mapped = _agg_arena_pool.aligned_alloc(
575+
mapped = Base::_shared_state->agg_arena_pool.aligned_alloc(
574576
_shared_state->total_size_of_aggregate_states,
575577
_shared_state->align_aggregate_states);
576578
auto st = _create_agg_status(mapped);
@@ -595,8 +597,9 @@ void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
595597
_memory_usage_container,
596598
static_cast<int64_t>(
597599
_shared_state->aggregate_data_container->memory_usage()));
598-
COUNTER_SET(_memory_usage_arena,
599-
static_cast<int64_t>(_agg_arena_pool.size()));
600+
COUNTER_SET(
601+
_memory_usage_arena,
602+
static_cast<int64_t>(Base::_shared_state->agg_arena_pool.size()));
600603
}},
601604
_shared_state->agg_data->method_variant);
602605
}

be/src/pipeline/exec/aggregation_source_operator.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ class AggLocalState MOCK_REMOVE(final) : public PipelineXLocalState<AggSharedSta
6868
vectorized::ColumnRawPtrs& key_columns, uint32_t num_rows);
6969

7070
vectorized::PODArray<vectorized::AggregateDataPtr> _places;
71-
vectorized::Arena _agg_arena_pool;
7271
std::vector<char> _deserialize_buffer;
7372

7473
RuntimeProfile::Counter* _get_results_timer = nullptr;

be/src/pipeline/exec/analytic_sink_operator.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,8 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) {
168168
_range_between_expr_ctxs[i]->root()->data_type()->create_column();
169169
}
170170

171-
_fn_place_ptr = _agg_arena_pool.aligned_alloc(p._total_size_of_aggregate_states,
172-
p._align_aggregate_states);
171+
_fn_place_ptr = _shared_state->agg_arena_pool.aligned_alloc(p._total_size_of_aggregate_states,
172+
p._align_aggregate_states);
173173
_create_agg_status();
174174
return Status::OK();
175175
}
@@ -388,13 +388,14 @@ void AnalyticSinkLocalState::_execute_for_function(int64_t partition_start, int6
388388
_agg_functions[i]->function()->execute_function_with_incremental(
389389
partition_start, partition_end, frame_start, frame_end,
390390
_fn_place_ptr + _offsets_of_aggregate_states[i], agg_columns.data(),
391-
_agg_arena_pool, false, false, false, &_use_null_result[i],
391+
_shared_state->agg_arena_pool, false, false, false, &_use_null_result[i],
392392
&_could_use_previous_result[i]);
393393
} else {
394394
_agg_functions[i]->function()->add_range_single_place(
395395
partition_start, partition_end, frame_start, frame_end,
396396
_fn_place_ptr + _offsets_of_aggregate_states[i], agg_columns.data(),
397-
_agg_arena_pool, &(_use_null_result[i]), &_could_use_previous_result[i]);
397+
_shared_state->agg_arena_pool, &(_use_null_result[i]),
398+
&_could_use_previous_result[i]);
398399
}
399400
}
400401
}

be/src/pipeline/exec/analytic_sink_operator.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedStat
133133
size_t _agg_functions_size = 0;
134134
bool _agg_functions_created = false;
135135
vectorized::AggregateDataPtr _fn_place_ptr = nullptr;
136-
vectorized::Arena _agg_arena_pool;
137136
std::vector<vectorized::AggFnEvaluator*> _agg_functions;
138137
std::vector<size_t> _offsets_of_aggregate_states;
139138
std::vector<bool> _result_column_nullable_flags;

be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,7 @@ DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta
6060
OperatorXBase* parent)
6161
: PipelineXLocalState<FakeSharedState>(state, parent),
6262
batch_size(state->batch_size()),
63-
_agg_arena_pool(std::make_unique<vectorized::Arena>()),
6463
_agg_data(std::make_unique<DistinctDataVariants>()),
65-
_agg_profile_arena(std::make_unique<vectorized::Arena>()),
6664
_child_block(vectorized::Block::create_unique()),
6765
_aggregated_block(vectorized::Block::create_unique()) {}
6866

@@ -459,6 +457,8 @@ Status DistinctStreamingAggLocalState::close(RuntimeState* state) {
459457
}
460458
}
461459
_cache_block.clear();
460+
461+
_arena.clear();
462462
return Base::close(state);
463463
}
464464

be/src/pipeline/exec/distinct_streaming_aggregation_operator.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,9 @@ class DistinctStreamingAggLocalState final : public PipelineXLocalState<FakeShar
7272
bool _should_expand_hash_table = true;
7373
bool _stop_emplace_flag = false;
7474
const int batch_size;
75-
std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
7675
std::unique_ptr<DistinctDataVariants> _agg_data = nullptr;
7776
// group by k1,k2
7877
vectorized::VExprContextSPtrs _probe_expr_ctxs;
79-
std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr;
8078
std::unique_ptr<vectorized::Block> _child_block = nullptr;
8179
bool _child_eos = false;
8280
bool _reach_limit = false;

be/src/pipeline/exec/set_sink_operator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block(
128128
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
129129
vectorized::HashTableBuild<HashTableCtxType, is_intersect>
130130
hash_table_build_process(&local_state, uint32_t(rows), raw_ptrs, state);
131-
st = hash_table_build_process(arg, local_state._arena);
131+
st = hash_table_build_process(arg, local_state._shared_state->arena);
132132
} else {
133133
LOG(FATAL) << "FATAL: uninited hash table";
134134
__builtin_unreachable();

0 commit comments

Comments
 (0)