diff --git a/be/src/pipeline/exec/materialization_opertor.cpp b/be/src/pipeline/exec/materialization_opertor.cpp index 51f07d65002a51..20b1994f6628d5 100644 --- a/be/src/pipeline/exec/materialization_opertor.cpp +++ b/be/src/pipeline/exec/materialization_opertor.cpp @@ -56,6 +56,7 @@ void MaterializationSharedState::get_block(vectorized::Block* block) { Status MaterializationSharedState::merge_multi_response() { std::unordered_map> block_maps; + for (int i = 0; i < block_order_results.size(); ++i) { for (auto& [backend_id, rpc_struct] : rpc_struct_map) { vectorized::Block partial_block; @@ -175,12 +176,22 @@ Status MaterializationSharedState::create_muiltget_result(const vectorized::Colu rpc_struct->second.request.mutable_request_block_descs(i)->add_file_id( row_location.file_id); block_order[j] = row_location.backend_id; + + // Count rows per backend + _backend_rows_count[row_location.backend_id]++; } else { block_order[j] = 0; } } } + // Update max rows per backend + for (const auto& [_, row_count] : _backend_rows_count) { + if (row_count > _max_rows_per_backend) { + _max_rows_per_backend = row_count; + } + } + eos = child_eos; if (eos && gc_id_map) { for (auto& [_, rpc_struct] : rpc_struct_map) { @@ -361,6 +372,8 @@ Status MaterializationOperator::push(RuntimeState* state, vectorized::Block* in_ if (local_state._materialization_state.need_merge_block) { SCOPED_TIMER(local_state._merge_response_timer); RETURN_IF_ERROR(local_state._materialization_state.merge_multi_response()); + local_state._max_rows_per_backend_counter->set( + (int64_t)local_state._materialization_state._max_rows_per_backend); } } diff --git a/be/src/pipeline/exec/materialization_opertor.h b/be/src/pipeline/exec/materialization_opertor.h index cdb197d101dad0..a456374d09f939 100644 --- a/be/src/pipeline/exec/materialization_opertor.h +++ b/be/src/pipeline/exec/materialization_opertor.h @@ -66,6 +66,11 @@ struct MaterializationSharedState { std::vector> block_order_results; // backend id => . std::map> backend_profile_info_string; + + // Store the maximum number of rows processed by a single backend in the current batch + uint32_t _max_rows_per_backend = 0; + // Store the number of rows processed by each backend + std::unordered_map _backend_rows_count; // backend_id => rows_count }; class MaterializationLocalState final : public PipelineXLocalState { @@ -80,6 +85,8 @@ class MaterializationLocalState final : public PipelineXLocalState {