Skip to content

Commit fe63640

Browse files
authored
[fix](move-memtable) tolerate non-open streams in close wait (#44680)
Related PR: #44344 `VTabletWriterV2::_select_streams()` is already checking if there is enough downstream BE to meet the replication requirements. `VTabletWriterV2::close()` should tolerate those non-open streams on close wait. Debug point `VTabletWriterV2._open_streams.skip_two_backends` is added along with `VTabletWriterV2._open_streams.skip_one_backend` to check this behavior.
1 parent 5f7a4a8 commit fe63640

File tree

3 files changed

+23
-18
lines changed

3 files changed

+23
-18
lines changed

be/src/vec/sink/load_stream_stub.cpp

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -219,11 +219,7 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64
219219
add_failed_tablet(tablet_id, _status);
220220
return _status;
221221
}
222-
DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
223-
if (segment_id != 0) {
224-
return Status::OK();
225-
}
226-
});
222+
DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); });
227223
PStreamHeader header;
228224
header.set_src_id(_src_id);
229225
*header.mutable_load_id() = _load_id;
@@ -246,11 +242,7 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64
246242
add_failed_tablet(tablet_id, _status);
247243
return _status;
248244
}
249-
DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
250-
if (segment_id != 0) {
251-
return Status::OK();
252-
}
253-
});
245+
DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); });
254246
PStreamHeader header;
255247
header.set_src_id(_src_id);
256248
*header.mutable_load_id() = _load_id;
@@ -340,6 +332,10 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i
340332

341333
Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
342334
DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
335+
if (!_is_open.load()) {
336+
// we don't need to close wait on non-open streams
337+
return Status::OK();
338+
}
343339
if (!_is_closing.load()) {
344340
return _status;
345341
}

be/src/vec/sink/writer/vtablet_writer_v2.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,14 +269,20 @@ Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {
269269
}
270270

271271
Status VTabletWriterV2::_open_streams() {
272-
bool fault_injection_skip_be = true;
272+
int fault_injection_skip_be = 0;
273273
bool any_backend = false;
274274
bool any_success = false;
275275
for (auto& [dst_id, _] : _tablets_for_node) {
276276
auto streams = _load_stream_map->get_or_create(dst_id);
277277
DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_one_backend", {
278-
if (fault_injection_skip_be) {
279-
fault_injection_skip_be = false;
278+
if (fault_injection_skip_be < 1) {
279+
fault_injection_skip_be++;
280+
continue;
281+
}
282+
});
283+
DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", {
284+
if (fault_injection_skip_be < 2) {
285+
fault_injection_skip_be++;
280286
continue;
281287
}
282288
});

regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,30 +75,33 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") {
7575
file "baseall.txt"
7676
}
7777

78-
def load_with_injection = { injection, error_msg->
78+
def load_with_injection = { injection, error_msg, success=false->
7979
try {
8080
sql "truncate table test"
8181
GetDebugPoint().enableDebugPointForAllBEs(injection)
8282
sql "insert into test select * from baseall where k1 <= 3"
83+
assertTrue(success, String.format("Expected Exception '%s', actual success", error_msg))
8384
} catch(Exception e) {
8485
logger.info(e.getMessage())
85-
assertTrue(e.getMessage().contains(error_msg))
86+
assertTrue(e.getMessage().contains(error_msg), e.toString())
8687
} finally {
8788
GetDebugPoint().disableDebugPointForAllBEs(injection)
8889
}
8990
}
9091

9192
// StreamSinkFileWriter appendv write segment failed one replica
9293
// success
93-
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", "sucess")
94+
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", "sucess", true)
9495
// StreamSinkFileWriter appendv write segment failed two replica
9596
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", "add segment failed")
9697
// StreamSinkFileWriter appendv write segment failed all replica
9798
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", "failed to send segment data to any replicas")
9899
// test segment num check when LoadStreamStub missed tail segments
99-
load_with_injection("LoadStreamStub.only_send_segment_0", "segment num mismatch")
100+
load_with_injection("LoadStreamStub.skip_send_segment", "segment num mismatch")
100101
// test one backend open failure
101-
load_with_injection("VTabletWriterV2._open_streams.skip_one_backend", "success")
102+
load_with_injection("VTabletWriterV2._open_streams.skip_one_backend", "success", true)
103+
// test two backend open failure
104+
load_with_injection("VTabletWriterV2._open_streams.skip_two_backends", "not enough streams 1/3")
102105
sql """ set enable_memtable_on_sink_node=false """
103106
}
104107
}

0 commit comments

Comments
 (0)