Skip to content

Commit 3541a6d

Browse files
branch-4.0: [fix](load) fix quorum success invalid in move-memtable-on-sink load path #60681 (#60820)
Cherry-picked from #60681 Co-authored-by: hui lai <laihui@selectdb.com>
1 parent 7362354 commit 3541a6d

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

be/src/runtime/load_stream.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,7 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf*
707707
}
708708
} break;
709709
case PStreamHeader::CLOSE_LOAD: {
710+
DBUG_EXECUTE_IF("LoadStream.close_load.block", DBUG_BLOCK);
710711
std::vector<int64_t> success_tablet_ids;
711712
FailedTablets failed_tablets;
712713
std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end());

be/src/vec/sink/writer/vtablet_writer_v2.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,11 @@ Status VTabletWriterV2::_incremental_open_streams(
102102
tablet.set_tablet_id(tablet_id);
103103
new_backends.insert(node);
104104
_tablets_for_node[node].emplace(tablet_id, tablet);
105+
_tablets_by_node[node].emplace(tablet_id);
105106
if (known_indexes.contains(index.index_id)) [[likely]] {
106107
continue;
107108
}
108109
_indexes_from_node[node].emplace_back(tablet);
109-
_tablets_by_node[node].emplace(tablet_id);
110110
known_indexes.insert(index.index_id);
111111
VLOG_DEBUG << "incremental open stream (" << partition->id << ", " << tablet_id
112112
<< ")";
@@ -343,11 +343,11 @@ Status VTabletWriterV2::_build_tablet_node_mapping() {
343343
// ignore fake tablet for auto partition
344344
continue;
345345
}
346+
_tablets_by_node[node].emplace(tablet_id);
346347
if (known_indexes.contains(index.index_id)) [[likely]] {
347348
continue;
348349
}
349350
_indexes_from_node[node].emplace_back(tablet);
350-
_tablets_by_node[node].emplace(tablet_id);
351351
known_indexes.insert(index.index_id);
352352
}
353353
_build_tablet_replica_info(tablet_id, partition);

regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,5 +117,44 @@ suite("test_sink_tolerate", "docker") {
117117
} finally {
118118
GetDebugPoint().disableDebugPointForAllBEs("TabletStream.add_segment.add_segment_failed")
119119
}
120+
121+
try {
122+
// Enable close_load block on only the first BE (minor BE)
123+
def firstBE = true
124+
GetDebugPoint().operateDebugPointForAllBEs({ host, port ->
125+
if (port == -1) return
126+
if (firstBE) {
127+
firstBE = false
128+
GetDebugPoint().enableDebugPoint(host, port as int, NodeType.BE, "LoadStream.close_load.block")
129+
}
130+
})
131+
streamLoad {
132+
table "${tableName}"
133+
set 'column_separator', '\t'
134+
set 'columns', 'k1, k2, v2, v10, v11'
135+
set 'partitions', 'partition_a, partition_b, partition_c, partition_d'
136+
set 'strict_mode', 'true'
137+
set 'memtable_on_sink_node', 'true'
138+
file 'test_strict_mode.csv'
139+
time 10000 // limit inflight 10s
140+
check { result, exception, startTime, endTime ->
141+
if (exception != null) {
142+
throw exception
143+
}
144+
log.info("Stream load result: ${result}".toString())
145+
def json = parseJson(result)
146+
assertEquals("success", json.Status.toLowerCase())
147+
assertEquals(2, json.NumberTotalRows)
148+
assertEquals(0, json.NumberFilteredRows)
149+
assertEquals(0, json.NumberUnselectedRows)
150+
}
151+
}
152+
sql "sync"
153+
def res = sql "select * from ${tableName}"
154+
log.info("select result: ${res}".toString())
155+
assertEquals(2, res.size())
156+
} finally {
157+
GetDebugPoint().disableDebugPointForAllBEs("LoadStream.close_load.block")
158+
}
120159
}
121160
}

0 commit comments

Comments
 (0)