diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp index ee3c3c8055ef55..362de4fb42b73c 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp @@ -26,7 +26,16 @@ namespace doris::pipeline { std::string MultiCastDataStreamSinkLocalState::name_suffix() { auto* parent = static_cast(_parent); - return fmt::format(operator_name_suffix, parent->operator_id()); + auto& dest_ids = parent->dests_id(); + std::string result = "("; + for (size_t i = 0; i < dest_ids.size(); ++i) { + if (i > 0) { + result += ", "; + } + result += fmt::format("dest_id={}", dest_ids[i]); + } + result += ")"; + return fmt::format(result + operator_name_suffix, parent->operator_id()); } std::shared_ptr MultiCastDataStreamSinkOperatorX::create_shared_state() const { diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 77cd468efe267e..1d5caa6d64459c 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1139,8 +1139,9 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS OperatorPtr source_op; // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline source_op = std::make_shared( - multi_cast_node_id, i, pool, thrift_sink.multi_cast_stream_sink.sinks[i], - row_desc, /*operator_id=*/source_id); + /*node_id*/ source_id, /*consumer_id*/ i, pool, + thrift_sink.multi_cast_stream_sink.sinks[i], row_desc, + /*operator_id=*/source_id); RETURN_IF_ERROR(new_pipeline->add_operator( source_op, params.__isset.parallel_instances ? params.parallel_instances : 0)); // 2. create and set sink operator of data stream sender for new pipeline