Skip to content

Commit 5360816

Browse files
committed
MB-35226: Do not process seqno ack after stream is dead
We should not ack after a stream is dead because logically we should not process any other messages. This previously could have manifested in a where we receive a seqno ack for a replica that does not exist in the replication topology and would have been added to the queuedSeqnoAcks after a stream was dead. Setting the stream to dead removes the queuedSeqnoAck so we would keep a queuedSeqnoAck for a stream that no longer exists. If this replica were then to reconnect and stream up to and ack the same sequence number, then we would fire a monotonic invariant exception. Change-Id: I976b4a1dedde58cde351ea543aca94e0f6370957 Reviewed-on: http://review.couchbase.org/112714 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 4d03e6e commit 5360816

File tree

10 files changed

+114
-32
lines changed

10 files changed

+114
-32
lines changed

engines/ep/src/dcp/active_stream.cc

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1746,3 +1746,32 @@ bool ActiveStream::removeCheckpointCursor() {
17461746
}
17471747
return false;
17481748
}
1749+
1750+
ENGINE_ERROR_CODE ActiveStream::seqnoAck(const std::string& consumerName,
1751+
uint64_t preparedSeqno) {
1752+
VBucketPtr vb = engine->getVBucket(vb_);
1753+
if (!vb) {
1754+
return ENGINE_NOT_MY_VBUCKET;
1755+
}
1756+
1757+
// Take the vb state lock so that we don't change the state of
1758+
// this vb. Done before the streamMutex is acquired to prevent a lock order
1759+
// inversion.
1760+
{
1761+
folly::SharedMutex::ReadHolder vbStateLh(vb->getStateLock());
1762+
1763+
// Locked with the streamMutex to ensure that we cannot race with a
1764+
// stream end
1765+
{
1766+
LockHolder lh(streamMutex);
1767+
1768+
// We cannot ack something on a dead stream.
1769+
if (!isActive()) {
1770+
return ENGINE_SUCCESS;
1771+
}
1772+
1773+
return vb->seqnoAcknowledged(
1774+
vbStateLh, consumerName, preparedSeqno);
1775+
} // end stream mutex lock scope
1776+
} // end vb state lock scope
1777+
}

engines/ep/src/dcp/active_stream.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "collections/vbucket_filter.h"
2121
#include "dcp/stream.h"
22+
#include <memcached/engine_error.h>
2223
#include <spdlog/common.h>
2324

2425
class CheckpointManager;
@@ -199,6 +200,15 @@ class ActiveStream : public Stream,
199200
std::vector<queued_item> items;
200201
};
201202

203+
/**
204+
* Process a seqno ack against this stream.
205+
*
206+
* @param consumerName the name of the consumer acking
207+
* @param preparedSeqno the seqno that the consumer is acking
208+
*/
209+
ENGINE_ERROR_CODE seqnoAck(const std::string& consumerName,
210+
uint64_t preparedSeqno);
211+
202212
protected:
203213
/**
204214
* @param vb reference to the associated vbucket

engines/ep/src/dcp/producer.cc

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,29 +1044,25 @@ ENGINE_ERROR_CODE DcpProducer::seqno_acknowledged(uint32_t opaque,
10441044
" but we don't have a StreamContainer for that vb");
10451045
}
10461046

1047-
uint64_t lastSentSeqno;
1047+
std::shared_ptr<ActiveStream> stream;
10481048
{
10491049
auto handle = rv->second->rlock();
10501050

10511051
// Producer for replication should only have one stream.
10521052
Expects(handle.size() == 1);
10531053

1054-
auto* s = dynamic_cast<ActiveStream*>(handle.get().get());
1055-
1056-
// We should not have received a seqno ack if the stream
1057-
// is not an ActiveStream
1058-
Expects(s);
1059-
lastSentSeqno = s->getLastSentSeqno();
1054+
stream = dynamic_pointer_cast<ActiveStream>(handle.get());
1055+
Expects(stream.get());
10601056
}
10611057

1062-
if (prepared_seqno > lastSentSeqno) {
1058+
if (prepared_seqno > stream->getLastSentSeqno()) {
10631059
throw std::logic_error(
10641060
"Replica acked seqno:" + std::to_string(prepared_seqno) +
10651061
" greater than last sent seqno:" +
1066-
std::to_string(lastSentSeqno));
1062+
std::to_string(stream->getLastSentSeqno()));
10671063
}
10681064

1069-
return vb->seqnoAcknowledged(consumerName, prepared_seqno);
1065+
return stream->seqnoAck(consumerName, prepared_seqno);
10701066
}
10711067

10721068
bool DcpProducer::handleResponse(const protocol_binary_response_header* resp) {

engines/ep/src/durability/active_durability_monitor.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -860,6 +860,7 @@ void ActiveDurabilityMonitor::State::updateNodeAck(const std::string& node,
860860
// track the ack for this node in case we are about to get a topology
861861
// change in which this node will exist.
862862
queuedSeqnoAcks[node] = seqno;
863+
queuedSeqnoAcks[node].setLabel("queuedSeqnoAck: " + node);
863864
}
864865
}
865866

engines/ep/src/vbucket.cc

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3820,12 +3820,10 @@ bool VBucket::isLogicallyNonExistent(
38203820
cHandle.isLogicallyDeleted(v.getBySeqno());
38213821
}
38223822

3823-
ENGINE_ERROR_CODE VBucket::seqnoAcknowledged(const std::string& replicaId,
3824-
uint64_t preparedSeqno) {
3825-
// Take the state lock to ensure that we don't change state whilst
3826-
// processing a seqno ack.
3827-
folly::SharedMutex::ReadHolder rlh(getStateLock());
3828-
3823+
ENGINE_ERROR_CODE VBucket::seqnoAcknowledged(
3824+
const folly::SharedMutex::ReadHolder& vbStateLock,
3825+
const std::string& replicaId,
3826+
uint64_t preparedSeqno) {
38293827
// We may receive an ack after we have set a vBucket to dead during a
38303828
// takeover; just ignore it.
38313829
if (getState() == vbucket_state_dead) {

engines/ep/src/vbucket.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1668,11 +1668,14 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
16681668
* Inform the vBucket that sequence number(s) have been acknowledged by
16691669
* a replica node.
16701670
*
1671+
* @param vbStateLock read lock on the vBucket state.
16711672
* @param replicaId The replica node which has acknowledged.
16721673
* @param preparedSeqno The sequence number the replica has prepared up to.
16731674
*/
1674-
ENGINE_ERROR_CODE seqnoAcknowledged(const std::string& replicaId,
1675-
uint64_t preparedSeqno);
1675+
ENGINE_ERROR_CODE seqnoAcknowledged(
1676+
const folly::SharedMutex::ReadHolder& vbStateLock,
1677+
const std::string& replicaId,
1678+
uint64_t preparedSeqno);
16761679

16771680
/**
16781681
* Notify the DurabilityMonitor that the Flusher has persisted all the

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ void DurabilityActiveStreamTest::testSendCompleteSyncWrite(Resolution res) {
198198
// Majority Prepare), simulating a SeqnoAck received from replica
199199
// satisfies Durability Requirements and triggers Commit. So, the
200200
// following indirectly calls VBucket::commit
201-
vb->seqnoAcknowledged(replica, prepareSeqno);
201+
stream->seqnoAck(replica, prepareSeqno);
202202
// Note: At FE we have an exact item count only at persistence.
203203
auto evictionType = std::get<1>(GetParam());
204204
if (evictionType == "value_only" || !persistent()) {
@@ -474,14 +474,21 @@ TEST_P(DurabilityActiveStreamTest, RemoveUnknownSeqnoAckAtDestruction) {
474474
// Our topology gives replica name as "replica" an our producer/stream has
475475
// name "test_producer". Simulate a seqno ack by calling the vBucket level
476476
// function.
477-
vb->seqnoAcknowledged("test_producer", 1);
477+
stream->seqnoAck("test_producer", 1);
478478

479479
// An unknown seqno ack should not have committed the item
480480
EXPECT_EQ(0, vb->getNumItems());
481481

482482
// Disconnect the ActiveStream
483483
stream->setDead(END_STREAM_DISCONNECTED);
484484

485+
// Attempt to ack the seqno again. The stream is dead so we should not
486+
// process the ack although we return SUCCESS to avoid tearing down any
487+
// connections. We verify that the seqno ack does not exist in the map
488+
// by performing the topology change that would commit the prepare if it
489+
// did.
490+
EXPECT_EQ(ENGINE_SUCCESS, stream->seqnoAck("test_producer", 1));
491+
485492
// If the seqno ack still existed in the queuedSeqnoAcks map then it would
486493
// result in a commit on topology change
487494
setVBucketStateAndRunPersistTask(

engines/ep/tests/module_tests/evp_store_durability_test.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1259,7 +1259,10 @@ TEST_P(DurabilityEPBucketTest, ActiveLocalNotifyPersistedSeqno) {
12591259

12601260
// Replica acks disk-seqno
12611261
EXPECT_EQ(ENGINE_SUCCESS,
1262-
vb->seqnoAcknowledged("replica", 3 /*preparedSeqno*/));
1262+
vb->seqnoAcknowledged(
1263+
folly::SharedMutex::ReadHolder(vb->getStateLock()),
1264+
"replica",
1265+
3 /*preparedSeqno*/));
12631266
// Active has not persisted, so Durability Requirements not satisfied yet
12641267
checkPending();
12651268

engines/ep/tests/module_tests/evp_store_warmup_test.cc

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,10 @@ void DurabilityWarmupTest::testCommittedSyncWrite(vbucket_state_t vbState,
657657
{ // scoping vb - is invalid once resetEngineAndWarmup() is called.
658658
auto vb = engine->getVBucket(vbid);
659659
EXPECT_EQ(ENGINE_SUCCESS,
660-
vb->seqnoAcknowledged("replica", vb->getHighPreparedSeqno()));
660+
vb->seqnoAcknowledged(
661+
folly::SharedMutex::ReadHolder(vb->getStateLock()),
662+
"replica",
663+
vb->getHighPreparedSeqno()));
661664

662665
flush_vbucket_to_disk(vbid, 1);
663666
}
@@ -942,7 +945,11 @@ void DurabilityWarmupTest::testHCSPersistedAndLoadedIntoVBState() {
942945
// Complete the Prepare
943946
vb = store->getVBucket(vbid);
944947
ASSERT_TRUE(vb);
945-
EXPECT_EQ(ENGINE_SUCCESS, vb->seqnoAcknowledged("replica", preparedSeqno));
948+
EXPECT_EQ(ENGINE_SUCCESS,
949+
vb->seqnoAcknowledged(
950+
folly::SharedMutex::ReadHolder(vb->getStateLock()),
951+
"replica",
952+
preparedSeqno));
946953
sv = vb->ht.findForRead(key).storedValue;
947954
ASSERT_TRUE(sv);
948955
ASSERT_TRUE(sv->isCommitted());
@@ -1048,13 +1055,19 @@ TEST_P(DurabilityWarmupTest, CommittedWithAckAfterWarmup) {
10481055
flush_vbucket_to_disk(vbid);
10491056
{
10501057
auto vb = engine->getVBucket(vbid);
1051-
vb->seqnoAcknowledged("replica", 1);
1058+
vb->seqnoAcknowledged(
1059+
folly::SharedMutex::ReadHolder(vb->getStateLock()),
1060+
"replica",
1061+
1);
10521062
flush_vbucket_to_disk(vbid, 1);
10531063
}
10541064
resetEngineAndWarmup();
10551065
{
10561066
auto vb = engine->getVBucket(vbid);
1057-
vb->seqnoAcknowledged("replica", 1);
1067+
vb->seqnoAcknowledged(
1068+
folly::SharedMutex::ReadHolder(vb->getStateLock()),
1069+
"replica",
1070+
1);
10581071
}
10591072
}
10601073

@@ -1075,14 +1088,20 @@ TEST_P(DurabilityWarmupTest, WarmUpHPSAndHCSWithNonSeqnoSortedItems) {
10751088
flush_vbucket_to_disk(vbid, 2);
10761089
{
10771090
auto vb = engine->getVBucket(vbid);
1078-
vb->seqnoAcknowledged("replica", 2);
1091+
vb->seqnoAcknowledged(
1092+
folly::SharedMutex::ReadHolder(vb->getStateLock()),
1093+
"replica",
1094+
2);
10791095
flush_vbucket_to_disk(vbid, 2);
10801096
}
10811097
SCOPED_TRACE("B");
10821098
resetEngineAndWarmup();
10831099
{
10841100
auto vb = engine->getVBucket(vbid);
1085-
vb->seqnoAcknowledged("replica", 2);
1101+
vb->seqnoAcknowledged(
1102+
folly::SharedMutex::ReadHolder(vb->getStateLock()),
1103+
"replica",
1104+
2);
10861105
}
10871106
}
10881107

engines/ep/tests/module_tests/vbucket_durability_test.cc

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,10 @@ void VBucketDurabilityTest::testAddPrepareAndCommit(
148148
ckptMgr->clear(*vbucket, ckptMgr->getHighSeqno());
149149

150150
// Simulate replica and active seqno-ack
151-
vbucket->seqnoAcknowledged(replica1, writes.back().seqno);
151+
vbucket->seqnoAcknowledged(
152+
folly::SharedMutex::ReadHolder(vbucket->getStateLock()),
153+
replica1,
154+
writes.back().seqno);
152155
simulateLocalAck(writes.back().seqno);
153156

154157
int i = 0;
@@ -582,13 +585,19 @@ TEST_P(VBucketDurabilityTest, Active_Commit_MultipleReplicas) {
582585
checkPending();
583586

584587
// replica2 acks, Durability Requirements not satisfied yet
585-
vbucket->seqnoAcknowledged(replica2, preparedSeqno);
588+
vbucket->seqnoAcknowledged(
589+
folly::SharedMutex::ReadHolder(vbucket->getStateLock()),
590+
replica2,
591+
preparedSeqno);
586592
checkPending();
587593

588594
// replica3 acks, Durability Requirements satisfied
589595
// Note: ensure 1 Ckpt in CM, easier to inspect the CkptList after Commit
590596
ckptMgr->clear(*vbucket, ckptMgr->getHighSeqno());
591-
vbucket->seqnoAcknowledged(replica3, preparedSeqno);
597+
vbucket->seqnoAcknowledged(
598+
folly::SharedMutex::ReadHolder(vbucket->getStateLock()),
599+
replica3,
600+
preparedSeqno);
592601
checkCommitted();
593602
}
594603

@@ -666,7 +675,10 @@ TEST_P(VBucketDurabilityTest, Active_PendingSkippedAtEjectionAndCommit) {
666675
swCompleteTrace);
667676

668677
// Simulate replica and active seqno-ack
669-
vbucket->seqnoAcknowledged(replica1, preparedSeqno);
678+
vbucket->seqnoAcknowledged(
679+
folly::SharedMutex::ReadHolder(vbucket->getStateLock()),
680+
replica1,
681+
preparedSeqno);
670682
simulateLocalAck(preparedSeqno);
671683

672684
// Commit notified
@@ -1947,7 +1959,11 @@ TEST_P(VBucketDurabilityTest, IgnoreAckAtTakeoverDead) {
19471959
// VBState transitions from Replica to Active
19481960
vbucket->setState(vbucket_state_dead, nlohmann::json{});
19491961

1950-
EXPECT_EQ(ENGINE_SUCCESS, vbucket->seqnoAcknowledged("replica1", 3));
1962+
EXPECT_EQ(ENGINE_SUCCESS,
1963+
vbucket->seqnoAcknowledged(
1964+
folly::SharedMutex::ReadHolder(vbucket->getStateLock()),
1965+
"replica1",
1966+
3));
19511967
// We won't have crashed and we will have ignored the ack
19521968
EXPECT_EQ(seqnos.size(), adm.getNumTracked());
19531969
}

0 commit comments

Comments
 (0)