Skip to content

Commit a35b2f3

Browse files
committed
MB-35378: Ignore seqno ack on producer without stream
A stream on a producer may be removed from the streams map if we have processed a close stream message and then stepped the producer to send a StreamEnd. The consumer may not yet have received and processed this StreamEnd but may have sent us a seqno ack. If this is the case, we should simply ignore the seqno ack. Change-Id: I728e39993f6f556db61697bb29dd78338023f2ab Reviewed-on: http://review.couchbase.org/112780 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 50a173e commit a35b2f3

File tree

3 files changed

+63
-12
lines changed

3 files changed

+63
-12
lines changed

engines/ep/src/dcp/producer.cc

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,25 +1044,38 @@ ENGINE_ERROR_CODE DcpProducer::seqno_acknowledged(uint32_t opaque,
10441044
" but we don't have a StreamContainer for that vb");
10451045
}
10461046

1047-
std::shared_ptr<ActiveStream> stream;
1048-
{
1049-
auto handle = rv->second->rlock();
1050-
1051-
// Producer for replication should only have one stream.
1052-
Expects(handle.size() == 1);
1053-
1054-
stream = dynamic_pointer_cast<ActiveStream>(handle.get());
1055-
Expects(stream.get());
1047+
// Search for an active stream with the same opaque as the response.
1048+
// Use find_if2 which will return the matching shared_ptr<Stream>
1049+
auto stream = find_if2([opaque](const StreamsMap::value_type& s) {
1050+
auto handle = s.second->rlock();
1051+
for (; !handle.end(); handle.next()) {
1052+
auto& stream = handle.get();
1053+
auto* as = dynamic_cast<ActiveStream*>(stream.get());
1054+
if (as && opaque == stream->getOpaque()) {
1055+
return stream; // return matching shared_ptr<Stream>
1056+
}
1057+
}
1058+
return ContainerElement{};
1059+
});
1060+
1061+
if (!stream) {
1062+
// No stream found, may be the case that we have just ended our
1063+
// stream and removed the stream from our map but the consumer is
1064+
// not yet aware and we have received a seqno ack. Just return
1065+
// success and ignore the ack.
1066+
return ENGINE_SUCCESS;
10561067
}
10571068

1058-
if (prepared_seqno > stream->getLastSentSeqno()) {
1069+
ActiveStream* as = static_cast<ActiveStream*>(stream.get());
1070+
1071+
if (prepared_seqno > as->getLastSentSeqno()) {
10591072
throw std::logic_error(
10601073
"Replica acked seqno:" + std::to_string(prepared_seqno) +
10611074
" greater than last sent seqno:" +
1062-
std::to_string(stream->getLastSentSeqno()));
1075+
std::to_string(as->getLastSentSeqno()));
10631076
}
10641077

1065-
return stream->seqnoAck(consumerName, prepared_seqno);
1078+
return as->seqnoAck(consumerName, prepared_seqno);
10661079
}
10671080

10681081
bool DcpProducer::handleResponse(const protocol_binary_response_header* resp) {

engines/ep/tests/mock/mock_dcp_producer.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ std::shared_ptr<Stream> MockDcpProducer::findStream(Vbid vbid) {
9191
auto rv = streams.find(vbid.get());
9292
if (rv != streams.end()) {
9393
auto handle = rv->second->rlock();
94+
// An empty StreamContainer for this vbid is allowed
95+
if (handle.size() == 0) {
96+
return nullptr;
97+
}
98+
9499
if (handle.size() != 1) {
95100
throw std::logic_error(
96101
"MockDcpProducer::findStream against producer with many "

engines/ep/tests/module_tests/dcp_stream_test.cc

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1203,6 +1203,39 @@ TEST_P(StreamTest, ProcessItemsCheckpointStartIsLastItem) {
12031203
EXPECT_EQ(DcpResponse::Event::Mutation, readyQ.front()->getEvent());
12041204
}
12051205

1206+
TEST_P(StreamTest, ProducerReceivesSeqnoAckForErasedStream) {
1207+
create_dcp_producer(0, /*flags*/
1208+
IncludeValue::Yes,
1209+
IncludeXattrs::Yes,
1210+
{{"send_stream_end_on_client_close_stream", "true"},
1211+
{"enable_synchronous_replication", "true"},
1212+
{"consumer_name", "replica1"}});
1213+
1214+
// Need to do a stream request to put the stream in the producers map
1215+
ASSERT_EQ(ENGINE_SUCCESS, doStreamRequest(*producer).status);
1216+
1217+
// Close the stream to start the removal process
1218+
EXPECT_EQ(ENGINE_SUCCESS, producer->closeStream(0 /*opaque*/, vbid));
1219+
1220+
// Stream should still exist, but should be dead
1221+
auto stream = producer->findStream(vbid);
1222+
EXPECT_TRUE(stream);
1223+
EXPECT_FALSE(stream->isActive());
1224+
1225+
// Step the stream on, this should remove the stream from the producer's
1226+
// StreamsMap
1227+
MockDcpMessageProducers producers(engine);
1228+
EXPECT_EQ(ENGINE_SUCCESS, producer->step(&producers));
1229+
EXPECT_EQ(cb::mcbp::ClientOpcode::DcpStreamEnd, producers.last_op);
1230+
1231+
// Stream should no longer exist in the map
1232+
EXPECT_FALSE(producer->findStream(vbid));
1233+
1234+
EXPECT_EQ(ENGINE_SUCCESS,
1235+
producer->seqno_acknowledged(
1236+
0 /*opaque*/, vbid, 1 /*prepareSeqno*/));
1237+
}
1238+
12061239
class CacheCallbackTest : public StreamTest {
12071240
protected:
12081241
void SetUp() override {

0 commit comments

Comments
 (0)