Skip to content

Commit 80fa82e

Browse files
BenHuddlestondaverigby
authored andcommitted
MB-34091: Manually ack seqnos for new secondChain nodes
It is possible for a new node (will exist in the new topology) to ack before ns_server gives us a new replication topology. ns_server does this so that we do not block SyncWrites on vBucket streaming on the new node as this could take a long time and cause the SyncWrites to time out. However, this means that a new node can ack before it exists in our replication topology. Store a map of acks for nodes that we do not know about, and use this map on setting of the replication topology to manually ack each new node at their highest acked seqno. This will ensure that our SyncWrites do not timeout waiting for the secondChain to be satisfied. Change-Id: I480abf22878b30d321b3ffb4419f61975d33c5eb Reviewed-on: http://review.couchbase.org/109575 Reviewed-by: Paolo Cocchi <[email protected]> Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 2aead9c commit 80fa82e

File tree

7 files changed

+379
-5
lines changed

7 files changed

+379
-5
lines changed

engines/ep/src/dcp/active_stream.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1123,6 +1123,21 @@ uint32_t ActiveStream::setDead(end_stream_status_t status) {
11231123
if (status != END_STREAM_DISCONNECTED) {
11241124
notifyStreamReady();
11251125
}
1126+
1127+
// Remove any unknown acks for the stream. Why here and not on
1128+
// destruction of the object? We could be replacing an existing
1129+
// DcpProducer with another. This old ActiveStream may then live on
1130+
// (owned by a backfill) and clear a seqno ack from a new ActiveStream.
1131+
if (supportSyncReplication()) {
1132+
auto vb = engine->getVBucket(vb_);
1133+
// Take the vb state lock so that we don't change the state of
1134+
// this vb
1135+
folly::SharedMutex::ReadHolder vbStateLh(vb->getStateLock());
1136+
if (vb->getState() == vbucket_state_active) {
1137+
vb->removeQueuedAckFromDM(name_);
1138+
}
1139+
}
1140+
11261141
return 0;
11271142
}
11281143

engines/ep/src/durability/active_durability_monitor.cc

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,10 @@ uint8_t ActiveDurabilityMonitor::getSecondChainMajority() const {
320320
return s->secondChain ? s->secondChain->majority : 0;
321321
}
322322

323+
void ActiveDurabilityMonitor::removedQueuedAck(const std::string& node) {
324+
state.wlock()->queuedSeqnoAcks.erase(node);
325+
}
326+
323327
ActiveDurabilityMonitor::Container::iterator
324328
ActiveDurabilityMonitor::State::getNodeNext(const std::string& node) {
325329
Expects(firstChain.get());
@@ -431,20 +435,23 @@ void ActiveDurabilityMonitor::State::updateNodeAck(const std::string& node,
431435
firstChainPos.lastAckSeqno = seqno;
432436
}
433437

438+
bool secondChainFound = false;
434439
if (secondChain) {
435440
auto secondChainItr = secondChain->positions.find(node);
436441
if (secondChainItr != secondChain->positions.end()) {
442+
secondChainFound = true;
437443
auto& secondChainPos =
438444
const_cast<Position&>(secondChainItr->second);
439445
secondChainPos.lastAckSeqno = seqno;
440446
}
441447
}
442448

443-
// Just drop out of here if we don't find the node. We could be receiving an
444-
// ack from a new replica that is not yet in the second chain. We don't want
445-
// to make each sync write wait on a vBucket being (almost) fully
446-
// transferred during a rebalance so ns_server deal with these by waiting
447-
// for seqno persistence on the new replica.
449+
if (!firstChainFound && !secondChainFound) {
450+
// We didn't find the node in either of our chains, but we still need to
451+
// track the ack for this node in case we are about to get a topology
452+
// change in which this node will exist.
453+
queuedSeqnoAcks[node] = seqno;
454+
}
448455
}
449456

450457
int64_t ActiveDurabilityMonitor::getNodeWriteSeqno(
@@ -617,6 +624,9 @@ void ActiveDurabilityMonitor::State::processSeqnoAck(const std::string& node,
617624
std::to_string(lastTrackedSeqno) + "\"");
618625
}
619626

627+
// We should never ack for the active
628+
Expects(firstChain->active != node);
629+
620630
// Note: process up to the ack'ed seqno
621631
ActiveDurabilityMonitor::Container::iterator next;
622632
while ((next = getNodeNext(node)) != trackedWrites.end() &&
@@ -748,6 +758,50 @@ void ActiveDurabilityMonitor::State::setReplicationTopology(
748758
for (auto& write : trackedWrites) {
749759
write.resetTopology(*firstChain, secondChain.get());
750760
}
761+
762+
// Manually ack any nodes that did not previously exist in either chain
763+
performQueuedAckForChain(*firstChain);
764+
765+
if (secondChain) {
766+
performQueuedAckForChain(*secondChain);
767+
}
768+
}
769+
770+
void ActiveDurabilityMonitor::State::performQueuedAckForChain(
771+
const DurabilityMonitor::ReplicationChain& chain) {
772+
for (const auto& node : chain.positions) {
773+
auto existingAck = queuedSeqnoAcks.find(node.first);
774+
if (existingAck != queuedSeqnoAcks.end()) {
775+
Container toCommit;
776+
processSeqnoAck(existingAck->first, existingAck->second, toCommit);
777+
// ======================= FIRST CHAIN =============================
778+
// @TODO MB-34318 this should no longer be true and we will need
779+
// to remove the pre-condition check.
780+
//
781+
// This is a little bit counter-intuitive. We may actually need to
782+
// commit something post-topology change, however, because we have
783+
// reset the ackCount of all in flight SyncWrites previously we
784+
// should never ack here. If we had Replicas=1 then we would have
785+
// already committed due to active ack or would require an active
786+
// ack (PERSIST levels) to commit. So, if we do commit something as
787+
// a result of a topology change it will only be done when we move
788+
// the HighPreparedSeqno. The active can never exist in the
789+
// queuedSeqnoAcks map so we should also never attempt to ack it
790+
// here.
791+
// ===================== SECOND CHAIN ==============================
792+
// We don't expect any SyncWrite to currently need committing. Why?
793+
// We require that a SyncWrite must satisfy both firstChain and
794+
// secondChain. The SyncWrite should have already been committed
795+
// if the firstChain is satisfied and we are under a vbState lock
796+
// which will block seqno acks until this topology change has been
797+
// completed.
798+
Expects(toCommit.empty());
799+
800+
// Remove the existingAck, we don't need to track it any further as
801+
// it is in a chain.
802+
queuedSeqnoAcks.erase(existingAck);
803+
}
804+
}
751805
}
752806

753807
void ActiveDurabilityMonitor::State::addSyncWrite(const void* cookie,

engines/ep/src/durability/active_durability_monitor.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,16 @@ class ActiveDurabilityMonitor : public DurabilityMonitor {
199199
*/
200200
void checkForCommit();
201201

202+
/**
203+
* We track acks for unknown nodes as they may precede a topology change
204+
* that could cause a SyncWrite to timeout. We only receive these acks via
205+
* DCP so we can remove any "unknown" ack for a given node when we close the
206+
* ActiveStream serving it.
207+
*
208+
* @param node Node for which we wish to remove the unknown ack
209+
*/
210+
void removedQueuedAck(const std::string& node);
211+
202212
protected:
203213
void toOStream(std::ostream& os) const override;
204214

@@ -384,6 +394,14 @@ class ActiveDurabilityMonitor : public DurabilityMonitor {
384394
*/
385395
Container updateHighPreparedSeqno();
386396

397+
/**
398+
* Perform the manual ack (from the map of queuedSeqnoAcks) that is
399+
* required at rebalance for the given chain
400+
*
401+
* @param chain Chain for which we should manually ack nodes
402+
*/
403+
void performQueuedAckForChain(const ReplicationChain& chain);
404+
387405
private:
388406
/**
389407
* Advance the current Position (iterator and seqno).
@@ -436,6 +454,12 @@ class ActiveDurabilityMonitor : public DurabilityMonitor {
436454
std::chrono::milliseconds defaultTimeout = std::chrono::seconds(30);
437455

438456
const ActiveDurabilityMonitor& adm;
457+
458+
// Map of node to seqno value for seqno acks that we have seen but
459+
// do not exist in the current replication topology. They may be
460+
// required to manually ack for a new node if we receive an ack before
461+
// ns_server sends us a new replication topology.
462+
std::unordered_map<std::string, Monotonic<int64_t>> queuedSeqnoAcks;
439463
};
440464

441465
// The VBucket owning this DurabilityMonitor instance

engines/ep/src/vbucket.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3556,3 +3556,7 @@ ENGINE_ERROR_CODE VBucket::checkDurabilityRequirements(const Item& item) {
35563556

35573557
return ENGINE_SUCCESS;
35583558
}
3559+
3560+
void VBucket::removeQueuedAckFromDM(const std::string& node) {
3561+
getActiveDM().removedQueuedAck(node);
3562+
}

engines/ep/src/vbucket.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1640,6 +1640,14 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
16401640
*/
16411641
const DurabilityMonitor& getDurabilityMonitor() const;
16421642

1643+
/**
1644+
* Remove any queued acks for the given node from the Durability Monitor.
1645+
* (should be Active)
1646+
*
1647+
* @param node Name of the node for which we wish to remove the ack
1648+
*/
1649+
void removeQueuedAckFromDM(const std::string& node);
1650+
16431651
std::queue<queued_item> rejectQueue;
16441652
std::unique_ptr<FailoverTable> failovers;
16451653

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,48 @@ TEST_P(DurabilityActiveStreamTest, SendDcpAbort) {
205205
ASSERT_FALSE(resp);
206206
}
207207

208+
TEST_P(DurabilityActiveStreamTest, RemoveUnknownSeqnoAckAtDestruction) {
209+
auto vb = engine->getVBucket(vbid);
210+
211+
const auto key = makeStoredDocKey("key");
212+
const auto& value = "value";
213+
auto item = makePendingItem(
214+
key,
215+
value,
216+
cb::durability::Requirements(cb::durability::Level::Majority,
217+
1 /*timeout*/));
218+
VBQueueItemCtx ctx;
219+
ctx.durability =
220+
DurabilityItemCtx{item->getDurabilityReqs(), nullptr /*cookie*/};
221+
222+
EXPECT_EQ(MutationStatus::WasClean, public_processSet(*vb, *item, ctx));
223+
flushVBucketToDiskIfPersistent(vbid, 1);
224+
225+
// We don't include prepares in the numItems stat (should not exist in here)
226+
EXPECT_EQ(0, vb->getNumItems());
227+
228+
// Our topology gives replica name as "replica" an our producer/stream has
229+
// name "test_producer". Simulate a seqno ack by calling the vBucket level
230+
// function.
231+
vb->seqnoAcknowledged("test_producer", 1);
232+
233+
// An unknown seqno ack should not have committed the item
234+
EXPECT_EQ(0, vb->getNumItems());
235+
236+
// Disconnect the ActiveStream
237+
stream->setDead(END_STREAM_DISCONNECTED);
238+
239+
// If the seqno ack still existed in the queuedSeqnoAcks map then it would
240+
// result in a commit on topology change
241+
setVBucketStateAndRunPersistTask(
242+
vbid,
243+
vbucket_state_active,
244+
{{"topology",
245+
nlohmann::json::array(
246+
{{"active", "replica1", "test_producer"}})}});
247+
EXPECT_EQ(0, vb->getNumItems());
248+
}
249+
208250
void DurabilityPassiveStreamTest::SetUp() {
209251
SingleThreadedPassiveStreamTest::SetUp();
210252
consumer->enableSyncReplication();

0 commit comments

Comments
 (0)