From a6b600312bdd7283241bb6221ed8bc13082fdda3 Mon Sep 17 00:00:00 2001 From: hui lai Date: Wed, 25 Feb 2026 11:11:32 +0800 Subject: [PATCH] [fix](load) fix quorum success invalid in move-memtable-on-sink load path (#60681) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Description In `VTabletWriterV2`, the `_quorum_success()` function always returns `false` even when all streams have finished successfully, making the quorum success write feature effectively non-functional. ## Root Cause In both `_build_tablet_node_mapping()` and `_incremental_open_streams()`, `_tablets_by_node[node].emplace(tablet_id)` is guarded by the `known_indexes` check: ```cpp if (known_indexes.contains(index.index_id)) [[likely]] { continue; } _indexes_from_node[node].emplace_back(tablet); _tablets_by_node[node].emplace(tablet_id); known_indexes.insert(index.index_id); ``` The known_indexes set is shared across all partitions, tablets, and nodes. Once an index_id is inserted after processing the first tablet's first node, all subsequent tablets (and all other nodes of the same tablet) with the same index_id skip the _tablets_by_node update. For example, with 1 index, 3 tablets [T1, T2, T3], and 3 replicas [N1, N2, N3]: Only _tablets_by_node[N1] = {T1} gets populated T1 on N2/N3, and T2/T3 on all nodes are skipped This causes _quorum_success() to compute finished_tablets_replica incorrectly: finished_tablets_replica[T1] = 1 (only counted from N1) finished_tablets_replica[T2] = 0, finished_tablets_replica[T3] = 0 With a quorum requirement of 2 (for 3 replicas), the check always fails. The known_indexes optimization was intended only for _indexes_from_node (to avoid sending duplicate schema info per index), but it incorrectly also blocked the _tablets_by_node population. Note: vtablet_writer.cpp does NOT have this issue — its _tablets_by_channel is populated without the known_indexes guard. ## Fix Move _tablets_by_node[node].emplace(tablet_id) before the known_indexes check in both _build_tablet_node_mapping() and _incremental_open_streams(), so that every tablet on every node is correctly recorded. --- be/src/runtime/load_stream.cpp | 1 + be/src/vec/sink/writer/vtablet_writer_v2.cpp | 4 +- .../stream_load/test_sink_tolerate.groovy | 39 +++++++++++++++++++ 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index f94e7864377542..2e272e25c70efb 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -707,6 +707,7 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* } } break; case PStreamHeader::CLOSE_LOAD: { + DBUG_EXECUTE_IF("LoadStream.close_load.block", DBUG_BLOCK); std::vector success_tablet_ids; FailedTablets failed_tablets; std::vector tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end()); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 666954744f18a6..898bb10c437824 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -102,11 +102,11 @@ Status VTabletWriterV2::_incremental_open_streams( tablet.set_tablet_id(tablet_id); new_backends.insert(node); _tablets_for_node[node].emplace(tablet_id, tablet); + _tablets_by_node[node].emplace(tablet_id); if (known_indexes.contains(index.index_id)) [[likely]] { continue; } _indexes_from_node[node].emplace_back(tablet); - _tablets_by_node[node].emplace(tablet_id); known_indexes.insert(index.index_id); VLOG_DEBUG << "incremental open stream (" << partition->id << ", " << tablet_id << ")"; @@ -343,11 +343,11 @@ Status VTabletWriterV2::_build_tablet_node_mapping() { // ignore fake tablet for auto partition continue; } + _tablets_by_node[node].emplace(tablet_id); if (known_indexes.contains(index.index_id)) [[likely]] { continue; } _indexes_from_node[node].emplace_back(tablet); - _tablets_by_node[node].emplace(tablet_id); known_indexes.insert(index.index_id); } _build_tablet_replica_info(tablet_id, partition); diff --git a/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy b/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy index c55d80a70cdec5..3e63b5fd02befb 100644 --- a/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy +++ b/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy @@ -117,5 +117,44 @@ suite("test_sink_tolerate", "docker") { } finally { GetDebugPoint().disableDebugPointForAllBEs("TabletStream.add_segment.add_segment_failed") } + + try { + // Enable close_load block on only the first BE (minor BE) + def firstBE = true + GetDebugPoint().operateDebugPointForAllBEs({ host, port -> + if (port == -1) return + if (firstBE) { + firstBE = false + GetDebugPoint().enableDebugPoint(host, port as int, NodeType.BE, "LoadStream.close_load.block") + } + }) + streamLoad { + table "${tableName}" + set 'column_separator', '\t' + set 'columns', 'k1, k2, v2, v10, v11' + set 'partitions', 'partition_a, partition_b, partition_c, partition_d' + set 'strict_mode', 'true' + set 'memtable_on_sink_node', 'true' + file 'test_strict_mode.csv' + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(2, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + sql "sync" + def res = sql "select * from ${tableName}" + log.info("select result: ${res}".toString()) + assertEquals(2, res.size()) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("LoadStream.close_load.block") + } } } \ No newline at end of file