Skip to content

Commit 7a1653b

Browse files
authored
[Exec](profile) add max row count one backend in materialization profile (#59728)
- Add `_backend_rows_count` to `MaterializationSharedState` to count the number of rows assigned to each backend during block distribution. - Introduce `_max_rows_per_backend` to record the maximum row count handled by any single backend in the current batch. - Update `merge_multi_response()` to compute `_max_rows_per_backend` after row assignment. - Expose this metric via a new runtime profile counter `MaxRowsPerBackend` in `MaterializationLocalState`. - This helps monitor data skew across backends and aids in performance debugging for materialization-heavy queries.
1 parent 01a6423 commit 7a1653b

File tree

2 files changed

+21
-0
lines changed

2 files changed

+21
-0
lines changed

be/src/pipeline/exec/materialization_opertor.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ void MaterializationSharedState::get_block(vectorized::Block* block) {
5656

5757
Status MaterializationSharedState::merge_multi_response() {
5858
std::unordered_map<int64_t, std::pair<vectorized::Block, int>> block_maps;
59+
5960
for (int i = 0; i < block_order_results.size(); ++i) {
6061
for (auto& [backend_id, rpc_struct] : rpc_struct_map) {
6162
vectorized::Block partial_block;
@@ -175,12 +176,22 @@ Status MaterializationSharedState::create_muiltget_result(const vectorized::Colu
175176
rpc_struct->second.request.mutable_request_block_descs(i)->add_file_id(
176177
row_location.file_id);
177178
block_order[j] = row_location.backend_id;
179+
180+
// Count rows per backend
181+
_backend_rows_count[row_location.backend_id]++;
178182
} else {
179183
block_order[j] = 0;
180184
}
181185
}
182186
}
183187

188+
// Update max rows per backend
189+
for (const auto& [_, row_count] : _backend_rows_count) {
190+
if (row_count > _max_rows_per_backend) {
191+
_max_rows_per_backend = row_count;
192+
}
193+
}
194+
184195
eos = child_eos;
185196
if (eos && gc_id_map) {
186197
for (auto& [_, rpc_struct] : rpc_struct_map) {
@@ -361,6 +372,8 @@ Status MaterializationOperator::push(RuntimeState* state, vectorized::Block* in_
361372
if (local_state._materialization_state.need_merge_block) {
362373
SCOPED_TIMER(local_state._merge_response_timer);
363374
RETURN_IF_ERROR(local_state._materialization_state.merge_multi_response());
375+
local_state._max_rows_per_backend_counter->set(
376+
(int64_t)local_state._materialization_state._max_rows_per_backend);
364377
}
365378
}
366379

be/src/pipeline/exec/materialization_opertor.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ struct MaterializationSharedState {
6666
std::vector<std::vector<int64_t>> block_order_results;
6767
// backend id => <rpc profile info string key, rpc profile info string value>.
6868
std::map<int64_t, std::map<std::string, fmt::memory_buffer>> backend_profile_info_string;
69+
70+
// Store the maximum number of rows processed by a single backend in the current batch
71+
uint32_t _max_rows_per_backend = 0;
72+
// Store the number of rows processed by each backend
73+
std::unordered_map<int64_t, uint32_t> _backend_rows_count; // backend_id => rows_count
6974
};
7075

7176
class MaterializationLocalState final : public PipelineXLocalState<FakeSharedState> {
@@ -80,6 +85,8 @@ class MaterializationLocalState final : public PipelineXLocalState<FakeSharedSta
8085
RETURN_IF_ERROR(Base::init(state, info));
8186
_max_rpc_timer = ADD_TIMER_WITH_LEVEL(custom_profile(), "MaxRpcTime", 2);
8287
_merge_response_timer = ADD_TIMER_WITH_LEVEL(custom_profile(), "MergeResponseTime", 2);
88+
_max_rows_per_backend_counter =
89+
ADD_COUNTER_WITH_LEVEL(custom_profile(), "MaxRowsPerBackend", TUnit::UNIT, 2);
8390
return Status::OK();
8491
}
8592

@@ -93,6 +100,7 @@ class MaterializationLocalState final : public PipelineXLocalState<FakeSharedSta
93100
MaterializationSharedState _materialization_state;
94101
RuntimeProfile::Counter* _max_rpc_timer = nullptr;
95102
RuntimeProfile::Counter* _merge_response_timer = nullptr;
103+
RuntimeProfile::Counter* _max_rows_per_backend_counter = nullptr;
96104
};
97105

98106
class MaterializationOperator final : public StatefulOperatorX<MaterializationLocalState> {

0 commit comments

Comments
 (0)