diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 0a735184b0b483..893f456446a491 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -757,6 +757,7 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* } } break; case PStreamHeader::CLOSE_LOAD: { + DBUG_EXECUTE_IF("LoadStream.close_load.block", DBUG_BLOCK); std::vector success_tablet_ids; FailedTablets failed_tablets; std::vector tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end()); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index dad781059e5a92..04901208096fdc 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -104,11 +104,11 @@ Status VTabletWriterV2::_incremental_open_streams( tablet.set_tablet_id(tablet_id); new_backends.insert(node); _tablets_for_node[node].emplace(tablet_id, tablet); + _tablets_by_node[node].emplace(tablet_id); if (known_indexes.contains(index.index_id)) [[likely]] { continue; } _indexes_from_node[node].emplace_back(tablet); - _tablets_by_node[node].emplace(tablet_id); known_indexes.insert(index.index_id); VLOG_DEBUG << "incremental open stream (" << partition->id << ", " << tablet_id << ")"; @@ -347,11 +347,11 @@ Status VTabletWriterV2::_build_tablet_node_mapping() { // ignore fake tablet for auto partition continue; } + _tablets_by_node[node].emplace(tablet_id); if (known_indexes.contains(index.index_id)) [[likely]] { continue; } _indexes_from_node[node].emplace_back(tablet); - _tablets_by_node[node].emplace(tablet_id); known_indexes.insert(index.index_id); } _build_tablet_replica_info(tablet_id, partition); diff --git a/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy b/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy index c55d80a70cdec5..3e63b5fd02befb 100644 --- a/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy +++ b/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy @@ -117,5 +117,44 @@ suite("test_sink_tolerate", "docker") { } finally { GetDebugPoint().disableDebugPointForAllBEs("TabletStream.add_segment.add_segment_failed") } + + try { + // Enable close_load block on only the first BE (minor BE) + def firstBE = true + GetDebugPoint().operateDebugPointForAllBEs({ host, port -> + if (port == -1) return + if (firstBE) { + firstBE = false + GetDebugPoint().enableDebugPoint(host, port as int, NodeType.BE, "LoadStream.close_load.block") + } + }) + streamLoad { + table "${tableName}" + set 'column_separator', '\t' + set 'columns', 'k1, k2, v2, v10, v11' + set 'partitions', 'partition_a, partition_b, partition_c, partition_d' + set 'strict_mode', 'true' + set 'memtable_on_sink_node', 'true' + file 'test_strict_mode.csv' + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(2, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + sql "sync" + def res = sql "select * from ${tableName}" + log.info("select result: ${res}".toString()) + assertEquals(2, res.size()) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("LoadStream.close_load.block") + } } } \ No newline at end of file