Skip to content

Commit a6b6003

Browse files
sollhuiYour Name
authored andcommitted
[fix](load) fix quorum success invalid in move-memtable-on-sink load path (#60681)
## Description In `VTabletWriterV2`, the `_quorum_success()` function always returns `false` even when all streams have finished successfully, making the quorum success write feature effectively non-functional. ## Root Cause In both `_build_tablet_node_mapping()` and `_incremental_open_streams()`, `_tablets_by_node[node].emplace(tablet_id)` is guarded by the `known_indexes` check: ```cpp if (known_indexes.contains(index.index_id)) [[likely]] { continue; } _indexes_from_node[node].emplace_back(tablet); _tablets_by_node[node].emplace(tablet_id); known_indexes.insert(index.index_id); ``` The known_indexes set is shared across all partitions, tablets, and nodes. Once an index_id is inserted after processing the first tablet's first node, all subsequent tablets (and all other nodes of the same tablet) with the same index_id skip the _tablets_by_node update. For example, with 1 index, 3 tablets [T1, T2, T3], and 3 replicas [N1, N2, N3]: Only _tablets_by_node[N1] = {T1} gets populated T1 on N2/N3, and T2/T3 on all nodes are skipped This causes _quorum_success() to compute finished_tablets_replica incorrectly: finished_tablets_replica[T1] = 1 (only counted from N1) finished_tablets_replica[T2] = 0, finished_tablets_replica[T3] = 0 With a quorum requirement of 2 (for 3 replicas), the check always fails. The known_indexes optimization was intended only for _indexes_from_node (to avoid sending duplicate schema info per index), but it incorrectly also blocked the _tablets_by_node population. Note: vtablet_writer.cpp does NOT have this issue — its _tablets_by_channel is populated without the known_indexes guard. ## Fix Move _tablets_by_node[node].emplace(tablet_id) before the known_indexes check in both _build_tablet_node_mapping() and _incremental_open_streams(), so that every tablet on every node is correctly recorded.
1 parent a133581 commit a6b6003

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

be/src/runtime/load_stream.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,7 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf*
707707
}
708708
} break;
709709
case PStreamHeader::CLOSE_LOAD: {
710+
DBUG_EXECUTE_IF("LoadStream.close_load.block", DBUG_BLOCK);
710711
std::vector<int64_t> success_tablet_ids;
711712
FailedTablets failed_tablets;
712713
std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end());

be/src/vec/sink/writer/vtablet_writer_v2.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,11 @@ Status VTabletWriterV2::_incremental_open_streams(
102102
tablet.set_tablet_id(tablet_id);
103103
new_backends.insert(node);
104104
_tablets_for_node[node].emplace(tablet_id, tablet);
105+
_tablets_by_node[node].emplace(tablet_id);
105106
if (known_indexes.contains(index.index_id)) [[likely]] {
106107
continue;
107108
}
108109
_indexes_from_node[node].emplace_back(tablet);
109-
_tablets_by_node[node].emplace(tablet_id);
110110
known_indexes.insert(index.index_id);
111111
VLOG_DEBUG << "incremental open stream (" << partition->id << ", " << tablet_id
112112
<< ")";
@@ -343,11 +343,11 @@ Status VTabletWriterV2::_build_tablet_node_mapping() {
343343
// ignore fake tablet for auto partition
344344
continue;
345345
}
346+
_tablets_by_node[node].emplace(tablet_id);
346347
if (known_indexes.contains(index.index_id)) [[likely]] {
347348
continue;
348349
}
349350
_indexes_from_node[node].emplace_back(tablet);
350-
_tablets_by_node[node].emplace(tablet_id);
351351
known_indexes.insert(index.index_id);
352352
}
353353
_build_tablet_replica_info(tablet_id, partition);

regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,5 +117,44 @@ suite("test_sink_tolerate", "docker") {
117117
} finally {
118118
GetDebugPoint().disableDebugPointForAllBEs("TabletStream.add_segment.add_segment_failed")
119119
}
120+
121+
try {
122+
// Enable close_load block on only the first BE (minor BE)
123+
def firstBE = true
124+
GetDebugPoint().operateDebugPointForAllBEs({ host, port ->
125+
if (port == -1) return
126+
if (firstBE) {
127+
firstBE = false
128+
GetDebugPoint().enableDebugPoint(host, port as int, NodeType.BE, "LoadStream.close_load.block")
129+
}
130+
})
131+
streamLoad {
132+
table "${tableName}"
133+
set 'column_separator', '\t'
134+
set 'columns', 'k1, k2, v2, v10, v11'
135+
set 'partitions', 'partition_a, partition_b, partition_c, partition_d'
136+
set 'strict_mode', 'true'
137+
set 'memtable_on_sink_node', 'true'
138+
file 'test_strict_mode.csv'
139+
time 10000 // limit inflight 10s
140+
check { result, exception, startTime, endTime ->
141+
if (exception != null) {
142+
throw exception
143+
}
144+
log.info("Stream load result: ${result}".toString())
145+
def json = parseJson(result)
146+
assertEquals("success", json.Status.toLowerCase())
147+
assertEquals(2, json.NumberTotalRows)
148+
assertEquals(0, json.NumberFilteredRows)
149+
assertEquals(0, json.NumberUnselectedRows)
150+
}
151+
}
152+
sql "sync"
153+
def res = sql "select * from ${tableName}"
154+
log.info("select result: ${res}".toString())
155+
assertEquals(2, res.size())
156+
} finally {
157+
GetDebugPoint().disableDebugPointForAllBEs("LoadStream.close_load.block")
158+
}
120159
}
121160
}

0 commit comments

Comments
 (0)