Skip to content

Commit 5d6525e

Browse files
BenHuddlestondaverigby
authored andcommitted
MB-35361: Send seqno ack on stream (re)connection
If we have a vBucket with non-zero HPS accepting a passive stream then we should send a SeqnoAcknowledgement to the active node to ensure that we can commit any in-flight SyncWrites. Consider the case where we have 1 replica and a durable write: 1) Replica receives prepare from active 2) Replica disconnects from active before acking 3) Replica attempts to ack but cannot due to lack of stream 4) Replica reconnects 5) Replica sends seqno ack If we did not do step 5 then we may never complete the given prepare. In the general case we would just timeout, but if this prepare had been warmed up or grandfathered into an ADM then it would have an infinite timeout and we would forever block this key if the replica never received a disk snapshot or any other durable writes. Change-Id: Ib54a0dda79745fdecfbce0a7517c6a468c25202a Reviewed-on: http://review.couchbase.org/112781 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 6206a83 commit 5d6525e

File tree

3 files changed

+71
-2
lines changed

3 files changed

+71
-2
lines changed

engines/ep/src/dcp/passive_stream.cc

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,13 +161,28 @@ std::string PassiveStream::getStreamTypeName() const {
161161
void PassiveStream::acceptStream(cb::mcbp::Status status, uint32_t add_opaque) {
162162
std::unique_lock<std::mutex> lh(streamMutex);
163163
if (isPending()) {
164+
pushToReadyQ(std::make_unique<AddStreamResponse>(
165+
add_opaque, opaque_, status));
164166
if (status == cb::mcbp::Status::Success) {
167+
// Before we receive/process anything else, send a seqno ack if we
168+
// are a stream for a pre-existing vBucket to ensure that the
169+
// replica can commit any in-flight SyncWrites if no further
170+
// SyncWrites are done and no disk snapshots processed by this
171+
// replica.
172+
{
173+
auto consumer = consumerPtr.lock();
174+
if (consumer && consumer->isSyncReplicationEnabled()) {
175+
auto vb = engine->getVBucket(vb_);
176+
if (vb && vb->getHighPreparedSeqno() != 0) {
177+
pushToReadyQ(std::make_unique<SeqnoAcknowledgement>(
178+
opaque_, vb_, vb->getHighPreparedSeqno()));
179+
}
180+
}
181+
}
165182
transitionState(StreamState::AwaitingFirstSnapshotMarker);
166183
} else {
167184
transitionState(StreamState::Dead);
168185
}
169-
pushToReadyQ(std::make_unique<AddStreamResponse>(
170-
add_opaque, opaque_, status));
171186
lh.unlock();
172187
notifyStreamReady();
173188
}

engines/ep/tests/mock/mock_dcp_consumer.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,13 @@ class MockDcpConsumer: public DcpConsumer {
133133
supportsSyncReplication = true;
134134
}
135135

136+
/**
137+
* Disable SyncRepl for testing
138+
*/
139+
void disableSyncReplication() {
140+
supportsSyncReplication = false;
141+
}
142+
136143
/**
137144
*
138145
* Map from the opaque used to create a stream to the internal opaque

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,53 @@ void DurabilityPassiveStreamTest::TearDown() {
716716
SingleThreadedPassiveStreamTest::TearDown();
717717
}
718718

719+
TEST_P(DurabilityPassiveStreamTest, SendSeqnoAckOnStreamAcceptance) {
720+
// 1) Put something in the vBucket as we won't send a seqno ack if there are
721+
// no items
722+
testReceiveDcpPrepare();
723+
724+
consumer->closeAllStreams();
725+
uint32_t opaque = 0;
726+
consumer->addStream(opaque, vbid, 0 /*flags*/);
727+
stream = static_cast<MockPassiveStream*>(
728+
(consumer->getVbucketStream(vbid)).get());
729+
stream->acceptStream(cb::mcbp::Status::Success, opaque);
730+
731+
EXPECT_EQ(3, stream->public_readyQ().size());
732+
auto resp = stream->public_popFromReadyQ();
733+
EXPECT_EQ(DcpResponse::Event::StreamReq, resp->getEvent());
734+
resp = stream->public_popFromReadyQ();
735+
EXPECT_EQ(DcpResponse::Event::AddStream, resp->getEvent());
736+
resp = stream->public_popFromReadyQ();
737+
EXPECT_EQ(DcpResponse::Event::SeqnoAcknowledgement, resp->getEvent());
738+
const auto& ack = static_cast<SeqnoAcknowledgement&>(*resp);
739+
EXPECT_EQ(1, ack.getPreparedSeqno());
740+
}
741+
742+
TEST_P(DurabilityPassiveStreamTest,
743+
NoSeqnoAckOnStreamAcceptanceIfNotSupported) {
744+
consumer->disableSyncReplication();
745+
746+
// 1) Put something in the vBucket as we won't send a seqno ack if there are
747+
// no items
748+
testReceiveDcpPrepare();
749+
750+
consumer->closeAllStreams();
751+
uint32_t opaque = 0;
752+
consumer->addStream(opaque, vbid, 0 /*flags*/);
753+
stream = static_cast<MockPassiveStream*>(
754+
(consumer->getVbucketStream(vbid)).get());
755+
stream->acceptStream(cb::mcbp::Status::Success, opaque);
756+
757+
ASSERT_EQ(2, stream->public_readyQ().size());
758+
auto resp = stream->public_popFromReadyQ();
759+
EXPECT_EQ(DcpResponse::Event::StreamReq, resp->getEvent());
760+
resp = stream->public_popFromReadyQ();
761+
EXPECT_EQ(DcpResponse::Event::AddStream, resp->getEvent());
762+
resp = stream->public_popFromReadyQ();
763+
EXPECT_FALSE(resp);
764+
}
765+
719766
void DurabilityPassiveStreamTest::
720767
testReceiveMutationOrDeletionInsteadOfCommitWhenStreamingFromDisk(
721768
DocumentState docState) {

0 commit comments

Comments
 (0)