Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,9 @@ struct AggSharedState : public BasicSharedState {
// Refresh the top limit heap with a new row
void refresh_top_limit(size_t row_id, const vectorized::ColumnRawPtrs& key_columns);

vectorized::Arena agg_arena_pool;
vectorized::Arena agg_profile_arena;

private:
vectorized::MutableColumns _get_keys_hash_table();

Expand Down Expand Up @@ -580,6 +583,7 @@ struct AnalyticSharedState : public BasicSharedState {
std::mutex buffer_mutex;
bool sink_eos = false;
std::mutex sink_eos_lock;
vectorized::Arena agg_arena_pool;
};

struct JoinSharedState : public BasicSharedState {
Expand Down Expand Up @@ -702,6 +706,8 @@ struct SetSharedState : public BasicSharedState {

std::atomic<bool> ready_for_read = false;

vectorized::Arena arena;

/// called in setup_local_state
Status hash_table_init();
};
Expand Down
46 changes: 23 additions & 23 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ Status AggSinkLocalState::open(RuntimeState* state) {
}

if (Base::_shared_state->probe_expr_ctxs.empty()) {
_agg_data->without_key =
reinterpret_cast<vectorized::AggregateDataPtr>(_agg_profile_arena.aligned_alloc(
_agg_data->without_key = reinterpret_cast<vectorized::AggregateDataPtr>(
Base::_shared_state->agg_profile_arena.aligned_alloc(
p._total_size_of_aggregate_states, p._align_aggregate_states));

if (p._is_merge) {
Expand Down Expand Up @@ -187,7 +187,7 @@ Status AggSinkLocalState::_execute_without_key(vectorized::Block* block) {
block,
_agg_data->without_key + Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
_agg_arena_pool));
Base::_shared_state->agg_arena_pool));
}
return Status::OK();
}
Expand All @@ -207,7 +207,7 @@ size_t AggSinkLocalState::_memory_usage() const {
return 0;
}
size_t usage = 0;
usage += _agg_arena_pool.size();
usage += Base::_shared_state->agg_arena_pool.size();

if (Base::_shared_state->aggregate_data_container) {
usage += Base::_shared_state->aggregate_data_container->memory_usage();
Expand Down Expand Up @@ -240,7 +240,7 @@ void AggSinkLocalState::_update_memusage_with_serialized_key() {
},
[&](auto& agg_method) -> void {
auto& data = *agg_method.hash_table;
int64_t memory_usage_arena = _agg_arena_pool.size();
int64_t memory_usage_arena = Base::_shared_state->agg_arena_pool.size();
int64_t memory_usage_container =
_shared_state->aggregate_data_container->memory_usage();
int64_t hash_table_memory_usage = data.get_buffer_size_in_bytes();
Expand Down Expand Up @@ -321,16 +321,16 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
_places.data(),
Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
_deserialize_buffer.data(), column.get(), _agg_arena_pool,
rows);
_deserialize_buffer.data(), column.get(),
Base::_shared_state->agg_arena_pool, rows);
}
} else {
RETURN_IF_ERROR(
Base::_shared_state->aggregate_evaluators[i]->execute_batch_add_selected(
block,
Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
_places.data(), _agg_arena_pool));
_places.data(), Base::_shared_state->agg_arena_pool));
}
}
} else {
Expand Down Expand Up @@ -375,15 +375,15 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
_places.data(),
Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
_deserialize_buffer.data(), column.get(), _agg_arena_pool,
rows);
_deserialize_buffer.data(), column.get(),
Base::_shared_state->agg_arena_pool, rows);
}
} else {
RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_batch_add(
block,
Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
_places.data(), _agg_arena_pool));
_places.data(), Base::_shared_state->agg_arena_pool));
}
}
}
Expand Down Expand Up @@ -423,20 +423,20 @@ Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
_agg_data->without_key +
Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
*column, _agg_arena_pool);
*column, Base::_shared_state->agg_arena_pool);
} else {
RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_single_add(
block,
_agg_data->without_key + Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
_agg_arena_pool));
Base::_shared_state->agg_arena_pool));
}
}
return Status::OK();
}

void AggSinkLocalState::_update_memusage_without_key() {
int64_t arena_memory_usage = _agg_arena_pool.size();
int64_t arena_memory_usage = Base::_shared_state->agg_arena_pool.size();
COUNTER_SET(_memory_used_counter, arena_memory_usage);
COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
}
Expand Down Expand Up @@ -487,7 +487,7 @@ Status AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block*
block,
Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
_places.data(), _agg_arena_pool));
_places.data(), Base::_shared_state->agg_arena_pool));
}
} else {
auto do_aggregate_evaluators = [&] {
Expand All @@ -496,7 +496,7 @@ Status AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block*
block,
Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
_places.data(), _agg_arena_pool));
_places.data(), Base::_shared_state->agg_arena_pool));
}
return Status::OK();
};
Expand Down Expand Up @@ -550,8 +550,8 @@ void AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p
agg_method.init_serialized_keys(key_columns, num_rows);

auto creator = [this](const auto& ctor, auto& key, auto& origin) {
HashMethodType::try_presis_key_and_origin(key, origin,
_agg_arena_pool);
HashMethodType::try_presis_key_and_origin(
key, origin, Base::_shared_state->agg_arena_pool);
auto mapped =
Base::_shared_state->aggregate_data_container->append_data(
origin);
Expand All @@ -563,7 +563,7 @@ void AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p
};

auto creator_for_null_key = [&](auto& mapped) {
mapped = _agg_arena_pool.aligned_alloc(
mapped = Base::_shared_state->agg_arena_pool.aligned_alloc(
Base::_parent->template cast<AggSinkOperatorX>()
._total_size_of_aggregate_states,
Base::_parent->template cast<AggSinkOperatorX>()
Expand Down Expand Up @@ -627,8 +627,8 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData

auto creator = [&](const auto& ctor, auto& key, auto& origin) {
try {
HashMethodType::try_presis_key_and_origin(key, origin,
_agg_arena_pool);
HashMethodType::try_presis_key_and_origin(
key, origin, Base::_shared_state->agg_arena_pool);
_shared_state->refresh_top_limit(i, key_columns);
auto mapped =
_shared_state->aggregate_data_container->append_data(
Expand All @@ -647,7 +647,7 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
};

auto creator_for_null_key = [&](auto& mapped) {
mapped = _agg_arena_pool.aligned_alloc(
mapped = Base::_shared_state->agg_arena_pool.aligned_alloc(
Base::_parent->template cast<AggSinkOperatorX>()
._total_size_of_aggregate_states,
Base::_parent->template cast<AggSinkOperatorX>()
Expand Down Expand Up @@ -909,7 +909,7 @@ Status AggSinkOperatorX::reset_hash_table(RuntimeState* state) {
auto& ss = *local_state.Base::_shared_state;
RETURN_IF_ERROR(ss.reset_hash_table());
local_state._serialize_key_arena_memory_usage->set((int64_t)0);
local_state._agg_arena_pool.clear(true);
local_state.Base::_shared_state->agg_arena_pool.clear(true);
return Status::OK();
}

Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {
vectorized::Block _preagg_block = vectorized::Block();

AggregatedDataVariants* _agg_data = nullptr;
vectorized::Arena _agg_arena_pool;
vectorized::Arena _agg_profile_arena;

std::unique_ptr<ExecutorBase> _executor = nullptr;

Expand Down
13 changes: 8 additions & 5 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,8 @@ Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block)
SCOPED_TIMER(_deserialize_data_timer);
Base::_shared_state->aggregate_evaluators[i]->function()->deserialize_and_merge_vec(
_places.data(), _shared_state->offsets_of_aggregate_states[i],
_deserialize_buffer.data(), column.get(), _agg_arena_pool, rows);
_deserialize_buffer.data(), column.get(), Base::_shared_state->agg_arena_pool,
rows);
}
}

Expand Down Expand Up @@ -558,7 +559,8 @@ void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
agg_method.init_serialized_keys(key_columns, num_rows);

auto creator = [this](const auto& ctor, auto& key, auto& origin) {
HashMethodType::try_presis_key_and_origin(key, origin, _agg_arena_pool);
HashMethodType::try_presis_key_and_origin(
key, origin, Base::_shared_state->agg_arena_pool);
auto mapped =
Base::_shared_state->aggregate_data_container->append_data(
origin);
Expand All @@ -570,7 +572,7 @@ void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
};

auto creator_for_null_key = [&](auto& mapped) {
mapped = _agg_arena_pool.aligned_alloc(
mapped = Base::_shared_state->agg_arena_pool.aligned_alloc(
_shared_state->total_size_of_aggregate_states,
_shared_state->align_aggregate_states);
auto st = _create_agg_status(mapped);
Expand All @@ -595,8 +597,9 @@ void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
_memory_usage_container,
static_cast<int64_t>(
_shared_state->aggregate_data_container->memory_usage()));
COUNTER_SET(_memory_usage_arena,
static_cast<int64_t>(_agg_arena_pool.size()));
COUNTER_SET(
_memory_usage_arena,
static_cast<int64_t>(Base::_shared_state->agg_arena_pool.size()));
}},
_shared_state->agg_data->method_variant);
}
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/aggregation_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class AggLocalState MOCK_REMOVE(final) : public PipelineXLocalState<AggSharedSta
vectorized::ColumnRawPtrs& key_columns, uint32_t num_rows);

vectorized::PODArray<vectorized::AggregateDataPtr> _places;
vectorized::Arena _agg_arena_pool;
std::vector<char> _deserialize_buffer;

RuntimeProfile::Counter* _get_results_timer = nullptr;
Expand Down
9 changes: 5 additions & 4 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) {
_range_between_expr_ctxs[i]->root()->data_type()->create_column();
}

_fn_place_ptr = _agg_arena_pool.aligned_alloc(p._total_size_of_aggregate_states,
p._align_aggregate_states);
_fn_place_ptr = _shared_state->agg_arena_pool.aligned_alloc(p._total_size_of_aggregate_states,
p._align_aggregate_states);
_create_agg_status();
return Status::OK();
}
Expand Down Expand Up @@ -388,13 +388,14 @@ void AnalyticSinkLocalState::_execute_for_function(int64_t partition_start, int6
_agg_functions[i]->function()->execute_function_with_incremental(
partition_start, partition_end, frame_start, frame_end,
_fn_place_ptr + _offsets_of_aggregate_states[i], agg_columns.data(),
_agg_arena_pool, false, false, false, &_use_null_result[i],
_shared_state->agg_arena_pool, false, false, false, &_use_null_result[i],
&_could_use_previous_result[i]);
} else {
_agg_functions[i]->function()->add_range_single_place(
partition_start, partition_end, frame_start, frame_end,
_fn_place_ptr + _offsets_of_aggregate_states[i], agg_columns.data(),
_agg_arena_pool, &(_use_null_result[i]), &_could_use_previous_result[i]);
_shared_state->agg_arena_pool, &(_use_null_result[i]),
&_could_use_previous_result[i]);
}
}
}
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedStat
size_t _agg_functions_size = 0;
bool _agg_functions_created = false;
vectorized::AggregateDataPtr _fn_place_ptr = nullptr;
vectorized::Arena _agg_arena_pool;
std::vector<vectorized::AggFnEvaluator*> _agg_functions;
std::vector<size_t> _offsets_of_aggregate_states;
std::vector<bool> _result_column_nullable_flags;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta
OperatorXBase* parent)
: PipelineXLocalState<FakeSharedState>(state, parent),
batch_size(state->batch_size()),
_agg_arena_pool(std::make_unique<vectorized::Arena>()),
_agg_data(std::make_unique<DistinctDataVariants>()),
_agg_profile_arena(std::make_unique<vectorized::Arena>()),
_child_block(vectorized::Block::create_unique()),
_aggregated_block(vectorized::Block::create_unique()) {}

Expand Down Expand Up @@ -459,6 +457,8 @@ Status DistinctStreamingAggLocalState::close(RuntimeState* state) {
}
}
_cache_block.clear();

_arena.clear();
return Base::close(state);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,9 @@ class DistinctStreamingAggLocalState final : public PipelineXLocalState<FakeShar
bool _should_expand_hash_table = true;
bool _stop_emplace_flag = false;
const int batch_size;
std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
std::unique_ptr<DistinctDataVariants> _agg_data = nullptr;
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr;
std::unique_ptr<vectorized::Block> _child_block = nullptr;
bool _child_eos = false;
bool _reach_limit = false;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/set_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block(
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
vectorized::HashTableBuild<HashTableCtxType, is_intersect>
hash_table_build_process(&local_state, uint32_t(rows), raw_ptrs, state);
st = hash_table_build_process(arg, local_state._arena);
st = hash_table_build_process(arg, local_state._shared_state->arena);
} else {
LOG(FATAL) << "FATAL: uninited hash table";
__builtin_unreachable();
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/set_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class SetSinkLocalState final : public PipelineXSinkLocalState<SetSharedState> {
vectorized::MutableBlock _mutable_block;
// every child has its result expr list
vectorized::VExprContextSPtrs _child_exprs;
vectorized::Arena _arena;

RuntimeProfile::Counter* _merge_block_timer = nullptr;
RuntimeProfile::Counter* _build_timer = nullptr;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state, OperatorXBase* parent)
: Base(state, parent),
_agg_data(std::make_unique<AggregatedDataVariants>()),
_agg_profile_arena(std::make_unique<vectorized::Arena>()),
_child_block(vectorized::Block::create_unique()),
_pre_aggregated_block(vectorized::Block::create_unique()) {}

Expand Down Expand Up @@ -760,6 +759,7 @@ Status StreamingAggLocalState::close(RuntimeState* state) {
_agg_data->method_variant);
}
_close_with_serialized_key();
_agg_arena_pool.clear(true);
return Base::close(state);
}

Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/streaming_aggregation_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ class StreamingAggLocalState MOCK_REMOVE(final) : public PipelineXLocalState<Fak
std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr;
std::unique_ptr<AggregateDataContainer> _aggregate_data_container = nullptr;
bool _should_limit_output = false;
bool _reach_limit = false;
Expand Down
Loading