diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 626e174045b7df..036ca3a90de067 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -382,6 +382,9 @@ struct AggSharedState : public BasicSharedState { // Refresh the top limit heap with a new row void refresh_top_limit(size_t row_id, const vectorized::ColumnRawPtrs& key_columns); + vectorized::Arena agg_arena_pool; + vectorized::Arena agg_profile_arena; + private: vectorized::MutableColumns _get_keys_hash_table(); @@ -580,6 +583,7 @@ struct AnalyticSharedState : public BasicSharedState { std::mutex buffer_mutex; bool sink_eos = false; std::mutex sink_eos_lock; + vectorized::Arena agg_arena_pool; }; struct JoinSharedState : public BasicSharedState { @@ -702,6 +706,8 @@ struct SetSharedState : public BasicSharedState { std::atomic ready_for_read = false; + vectorized::Arena arena; + /// called in setup_local_state Status hash_table_init(); }; diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index f666a4122c0a7d..63648ab81466be 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -102,8 +102,8 @@ Status AggSinkLocalState::open(RuntimeState* state) { } if (Base::_shared_state->probe_expr_ctxs.empty()) { - _agg_data->without_key = - reinterpret_cast(_agg_profile_arena.aligned_alloc( + _agg_data->without_key = reinterpret_cast( + Base::_shared_state->agg_profile_arena.aligned_alloc( p._total_size_of_aggregate_states, p._align_aggregate_states)); if (p._is_merge) { @@ -187,7 +187,7 @@ Status AggSinkLocalState::_execute_without_key(vectorized::Block* block) { block, _agg_data->without_key + Base::_parent->template cast() ._offsets_of_aggregate_states[i], - _agg_arena_pool)); + Base::_shared_state->agg_arena_pool)); } return Status::OK(); } @@ -207,7 +207,7 @@ size_t AggSinkLocalState::_memory_usage() const { return 0; } size_t usage = 0; - usage += _agg_arena_pool.size(); + usage += Base::_shared_state->agg_arena_pool.size(); if (Base::_shared_state->aggregate_data_container) { usage += Base::_shared_state->aggregate_data_container->memory_usage(); @@ -240,7 +240,7 @@ void AggSinkLocalState::_update_memusage_with_serialized_key() { }, [&](auto& agg_method) -> void { auto& data = *agg_method.hash_table; - int64_t memory_usage_arena = _agg_arena_pool.size(); + int64_t memory_usage_arena = Base::_shared_state->agg_arena_pool.size(); int64_t memory_usage_container = _shared_state->aggregate_data_container->memory_usage(); int64_t hash_table_memory_usage = data.get_buffer_size_in_bytes(); @@ -321,8 +321,8 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b _places.data(), Base::_parent->template cast() ._offsets_of_aggregate_states[i], - _deserialize_buffer.data(), column.get(), _agg_arena_pool, - rows); + _deserialize_buffer.data(), column.get(), + Base::_shared_state->agg_arena_pool, rows); } } else { RETURN_IF_ERROR( @@ -330,7 +330,7 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b block, Base::_parent->template cast() ._offsets_of_aggregate_states[i], - _places.data(), _agg_arena_pool)); + _places.data(), Base::_shared_state->agg_arena_pool)); } } } else { @@ -375,15 +375,15 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b _places.data(), Base::_parent->template cast() ._offsets_of_aggregate_states[i], - _deserialize_buffer.data(), column.get(), _agg_arena_pool, - rows); + _deserialize_buffer.data(), column.get(), + Base::_shared_state->agg_arena_pool, rows); } } else { RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_batch_add( block, Base::_parent->template cast() ._offsets_of_aggregate_states[i], - _places.data(), _agg_arena_pool)); + _places.data(), Base::_shared_state->agg_arena_pool)); } } } @@ -423,20 +423,20 @@ Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) { _agg_data->without_key + Base::_parent->template cast() ._offsets_of_aggregate_states[i], - *column, _agg_arena_pool); + *column, Base::_shared_state->agg_arena_pool); } else { RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_single_add( block, _agg_data->without_key + Base::_parent->template cast() ._offsets_of_aggregate_states[i], - _agg_arena_pool)); + Base::_shared_state->agg_arena_pool)); } } return Status::OK(); } void AggSinkLocalState::_update_memusage_without_key() { - int64_t arena_memory_usage = _agg_arena_pool.size(); + int64_t arena_memory_usage = Base::_shared_state->agg_arena_pool.size(); COUNTER_SET(_memory_used_counter, arena_memory_usage); COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage); } @@ -487,7 +487,7 @@ Status AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block* block, Base::_parent->template cast() ._offsets_of_aggregate_states[i], - _places.data(), _agg_arena_pool)); + _places.data(), Base::_shared_state->agg_arena_pool)); } } else { auto do_aggregate_evaluators = [&] { @@ -496,7 +496,7 @@ Status AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block* block, Base::_parent->template cast() ._offsets_of_aggregate_states[i], - _places.data(), _agg_arena_pool)); + _places.data(), Base::_shared_state->agg_arena_pool)); } return Status::OK(); }; @@ -550,8 +550,8 @@ void AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p agg_method.init_serialized_keys(key_columns, num_rows); auto creator = [this](const auto& ctor, auto& key, auto& origin) { - HashMethodType::try_presis_key_and_origin(key, origin, - _agg_arena_pool); + HashMethodType::try_presis_key_and_origin( + key, origin, Base::_shared_state->agg_arena_pool); auto mapped = Base::_shared_state->aggregate_data_container->append_data( origin); @@ -563,7 +563,7 @@ void AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p }; auto creator_for_null_key = [&](auto& mapped) { - mapped = _agg_arena_pool.aligned_alloc( + mapped = Base::_shared_state->agg_arena_pool.aligned_alloc( Base::_parent->template cast() ._total_size_of_aggregate_states, Base::_parent->template cast() @@ -627,8 +627,8 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData auto creator = [&](const auto& ctor, auto& key, auto& origin) { try { - HashMethodType::try_presis_key_and_origin(key, origin, - _agg_arena_pool); + HashMethodType::try_presis_key_and_origin( + key, origin, Base::_shared_state->agg_arena_pool); _shared_state->refresh_top_limit(i, key_columns); auto mapped = _shared_state->aggregate_data_container->append_data( @@ -647,7 +647,7 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData }; auto creator_for_null_key = [&](auto& mapped) { - mapped = _agg_arena_pool.aligned_alloc( + mapped = Base::_shared_state->agg_arena_pool.aligned_alloc( Base::_parent->template cast() ._total_size_of_aggregate_states, Base::_parent->template cast() @@ -909,7 +909,7 @@ Status AggSinkOperatorX::reset_hash_table(RuntimeState* state) { auto& ss = *local_state.Base::_shared_state; RETURN_IF_ERROR(ss.reset_hash_table()); local_state._serialize_key_arena_memory_usage->set((int64_t)0); - local_state._agg_arena_pool.clear(true); + local_state.Base::_shared_state->agg_arena_pool.clear(true); return Status::OK(); } diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index e0dcb3884dc9ad..a1454c114c6d12 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -122,8 +122,6 @@ class AggSinkLocalState : public PipelineXSinkLocalState { vectorized::Block _preagg_block = vectorized::Block(); AggregatedDataVariants* _agg_data = nullptr; - vectorized::Arena _agg_arena_pool; - vectorized::Arena _agg_profile_arena; std::unique_ptr _executor = nullptr; diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 24e13d5c0ec4ca..989a7cd478fb9b 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -514,7 +514,8 @@ Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block) SCOPED_TIMER(_deserialize_data_timer); Base::_shared_state->aggregate_evaluators[i]->function()->deserialize_and_merge_vec( _places.data(), _shared_state->offsets_of_aggregate_states[i], - _deserialize_buffer.data(), column.get(), _agg_arena_pool, rows); + _deserialize_buffer.data(), column.get(), Base::_shared_state->agg_arena_pool, + rows); } } @@ -558,7 +559,8 @@ void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place agg_method.init_serialized_keys(key_columns, num_rows); auto creator = [this](const auto& ctor, auto& key, auto& origin) { - HashMethodType::try_presis_key_and_origin(key, origin, _agg_arena_pool); + HashMethodType::try_presis_key_and_origin( + key, origin, Base::_shared_state->agg_arena_pool); auto mapped = Base::_shared_state->aggregate_data_container->append_data( origin); @@ -570,7 +572,7 @@ void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place }; auto creator_for_null_key = [&](auto& mapped) { - mapped = _agg_arena_pool.aligned_alloc( + mapped = Base::_shared_state->agg_arena_pool.aligned_alloc( _shared_state->total_size_of_aggregate_states, _shared_state->align_aggregate_states); auto st = _create_agg_status(mapped); @@ -595,8 +597,9 @@ void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place _memory_usage_container, static_cast( _shared_state->aggregate_data_container->memory_usage())); - COUNTER_SET(_memory_usage_arena, - static_cast(_agg_arena_pool.size())); + COUNTER_SET( + _memory_usage_arena, + static_cast(Base::_shared_state->agg_arena_pool.size())); }}, _shared_state->agg_data->method_variant); } diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index d2cff32246a302..1bf4edabf5d9b4 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -68,7 +68,6 @@ class AggLocalState MOCK_REMOVE(final) : public PipelineXLocalState _places; - vectorized::Arena _agg_arena_pool; std::vector _deserialize_buffer; RuntimeProfile::Counter* _get_results_timer = nullptr; diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 37a6cd5fe51dd6..5e777b4dfe590c 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -168,8 +168,8 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) { _range_between_expr_ctxs[i]->root()->data_type()->create_column(); } - _fn_place_ptr = _agg_arena_pool.aligned_alloc(p._total_size_of_aggregate_states, - p._align_aggregate_states); + _fn_place_ptr = _shared_state->agg_arena_pool.aligned_alloc(p._total_size_of_aggregate_states, + p._align_aggregate_states); _create_agg_status(); return Status::OK(); } @@ -388,13 +388,14 @@ void AnalyticSinkLocalState::_execute_for_function(int64_t partition_start, int6 _agg_functions[i]->function()->execute_function_with_incremental( partition_start, partition_end, frame_start, frame_end, _fn_place_ptr + _offsets_of_aggregate_states[i], agg_columns.data(), - _agg_arena_pool, false, false, false, &_use_null_result[i], + _shared_state->agg_arena_pool, false, false, false, &_use_null_result[i], &_could_use_previous_result[i]); } else { _agg_functions[i]->function()->add_range_single_place( partition_start, partition_end, frame_start, frame_end, _fn_place_ptr + _offsets_of_aggregate_states[i], agg_columns.data(), - _agg_arena_pool, &(_use_null_result[i]), &_could_use_previous_result[i]); + _shared_state->agg_arena_pool, &(_use_null_result[i]), + &_could_use_previous_result[i]); } } } diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 4e7d82679f3028..6b00386ac3da5d 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -133,7 +133,6 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState _agg_functions; std::vector _offsets_of_aggregate_states; std::vector _result_column_nullable_flags; diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 1331a9e38ae2c7..6363355db7308e 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -60,9 +60,7 @@ DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta OperatorXBase* parent) : PipelineXLocalState(state, parent), batch_size(state->batch_size()), - _agg_arena_pool(std::make_unique()), _agg_data(std::make_unique()), - _agg_profile_arena(std::make_unique()), _child_block(vectorized::Block::create_unique()), _aggregated_block(vectorized::Block::create_unique()) {} @@ -459,6 +457,8 @@ Status DistinctStreamingAggLocalState::close(RuntimeState* state) { } } _cache_block.clear(); + + _arena.clear(); return Base::close(state); } diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index 3a2ecf5ac4fec4..50c43e956cbeac 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -72,11 +72,9 @@ class DistinctStreamingAggLocalState final : public PipelineXLocalState _agg_arena_pool = nullptr; std::unique_ptr _agg_data = nullptr; // group by k1,k2 vectorized::VExprContextSPtrs _probe_expr_ctxs; - std::unique_ptr _agg_profile_arena = nullptr; std::unique_ptr _child_block = nullptr; bool _child_eos = false; bool _reach_limit = false; diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index d1580b4f3ac2c6..ae2024f831de96 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -128,7 +128,7 @@ Status SetSinkOperatorX::_process_build_block( if constexpr (!std::is_same_v) { vectorized::HashTableBuild hash_table_build_process(&local_state, uint32_t(rows), raw_ptrs, state); - st = hash_table_build_process(arg, local_state._arena); + st = hash_table_build_process(arg, local_state._shared_state->arena); } else { LOG(FATAL) << "FATAL: uninited hash table"; __builtin_unreachable(); diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index b076564202f26f..2df817f6ab4f7e 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -57,7 +57,6 @@ class SetSinkLocalState final : public PipelineXSinkLocalState { vectorized::MutableBlock _mutable_block; // every child has its result expr list vectorized::VExprContextSPtrs _child_exprs; - vectorized::Arena _arena; RuntimeProfile::Counter* _merge_block_timer = nullptr; RuntimeProfile::Counter* _build_timer = nullptr; diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index df943e642243cc..9ac8507cc56a26 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -80,7 +80,6 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE = StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent), _agg_data(std::make_unique()), - _agg_profile_arena(std::make_unique()), _child_block(vectorized::Block::create_unique()), _pre_aggregated_block(vectorized::Block::create_unique()) {} @@ -760,6 +759,7 @@ Status StreamingAggLocalState::close(RuntimeState* state) { _agg_data->method_variant); } _close_with_serialized_key(); + _agg_arena_pool.clear(true); return Base::close(state); } diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h b/be/src/pipeline/exec/streaming_aggregation_operator.h index 663431d26b0927..001ac300970ad0 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_operator.h @@ -94,7 +94,6 @@ class StreamingAggLocalState MOCK_REMOVE(final) : public PipelineXLocalState _aggregate_evaluators; // group by k1,k2 vectorized::VExprContextSPtrs _probe_expr_ctxs; - std::unique_ptr _agg_profile_arena = nullptr; std::unique_ptr _aggregate_data_container = nullptr; bool _should_limit_output = false; bool _reach_limit = false;