Skip to content

Commit edeaeb7

Browse files
committed
MB-35429: Ack correct PassiveStream
The seqno ack code incorrectly assumes that we have only one Consumer in the vbConns "map". This may not be the case if we have a Consumer with dead PassiveStream and another Consumer with an alive one. Instead of attempting to ack for the first Consumer found, ack for all Consumers found but only if the PassiveStream is alive. Change-Id: If4adc3a6cb73ea3befaa23ad7bfd33bced0b7a65 Reviewed-on: http://review.couchbase.org/112961 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent c8ee0a8 commit edeaeb7

File tree

3 files changed

+85
-18
lines changed

3 files changed

+85
-18
lines changed

engines/ep/src/dcp/dcpconnmap.cc

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -431,39 +431,43 @@ void DcpConnMap::notifyVBConnections(Vbid vbid, uint64_t bySeqno) {
431431
}
432432

433433
void DcpConnMap::seqnoAckVBPassiveStream(Vbid vbid, int64_t seqno) {
434-
std::shared_ptr<DcpConsumer> conn;
434+
std::vector<std::shared_ptr<DcpConsumer>> conns;
435435
{ // Locking scope for vbConnsLocks[index]
436436
size_t index = vbid.get() % vbConnLockNum;
437437
std::lock_guard<std::mutex> lg(vbConnLocks[index]);
438438

439-
// Note: logically we can have only one Consumer per VBucket, but I keep
440-
// using the existing vbConns mapping for now (originally added for
441-
// tracking only Producers).
439+
// Note: logically we should only have one Consumer per vBucket but
440+
// we may keep around old Consumers with either no PassiveStream for
441+
// this vBucket or a dead PassiveStream. We need to search the list of
442+
// ConnHandlers for the Consumer with the alive PassiveStream for this
443+
// vBucket.
442444
for (auto& weakPtr : vbConns[vbid.get()]) {
443445
auto connection = weakPtr.lock();
444446
if (!connection) {
445447
continue;
446448
}
447449
auto consumer = dynamic_pointer_cast<DcpConsumer>(connection);
448450
if (consumer) {
449-
// MB-34437
450-
// We should only have one consumer per vb (and we only care
451-
// about consumers in this function) so we can now unlock the
452-
// vbConnsLock to prevent a lock order inversion with
453-
// PassiveStreamMap RWLock as ownership of the actual object is
454-
// maintained by the shared_ptr.
455-
conn = consumer;
456-
break;
451+
// We need to search for the alive stream in the Consumers but
452+
// can't do so inside the vbConnLock due to lock order inversion
453+
// so store the Consumers for later processing.
454+
conns.push_back(std::move(consumer));
457455
}
458456
}
459457
}
460458

461-
// Note: Sync Repl enabled at Consumer only if Producer supports it.
462-
// This is to prevent that 6.5 Consumers send DCP_SEQNO_ACK to
463-
// pre-6.5 Producers (e.g., topology change in a 6.5 cluster
464-
// where a new pre-6.5 Active is elected).
465-
if (conn && conn->isSyncReplicationEnabled()) {
466-
conn->seqnoAckStream(vbid, seqno);
459+
for (auto& conn : conns) {
460+
// Note: Sync Repl enabled at Consumer only if Producer supports it.
461+
// This is to prevent that 6.5 Consumers send DCP_SEQNO_ACK to
462+
// pre-6.5 Producers (e.g., topology change in a 6.5 cluster
463+
// where a new pre-6.5 Active is elected).
464+
if (conn->isSyncReplicationEnabled() && conn->isStreamPresent(vbid)) {
465+
// Ideally, we would verify here that we only ack a single consumer,
466+
// however, that is not possible without adding additional locking
467+
// to ensure that no consumer can add new streams whilst this loop
468+
// runs.
469+
conn->seqnoAckStream(vbid, seqno);
470+
}
467471
}
468472
}
469473

engines/ep/src/dcp/passive_stream.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -711,6 +711,10 @@ ENGINE_ERROR_CODE PassiveStream::processPrepare(
711711
void PassiveStream::seqnoAck(int64_t seqno) {
712712
{
713713
LockHolder lh(streamMutex);
714+
if (!isActive()) {
715+
return;
716+
}
717+
714718
pushToReadyQ(
715719
std::make_unique<SeqnoAcknowledgement>(opaque_, vb_, seqno));
716720
}

engines/ep/tests/module_tests/dcp_test.cc

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1548,6 +1548,65 @@ TEST_P(ConnectionTest, ConsumerWithConsumerNameEnablesSyncRepl) {
15481548
destroy_mock_cookie(cookie);
15491549
}
15501550

1551+
TEST_P(ConnectionTest, AckCorrectPassiveStream) {
1552+
ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
1553+
1554+
const void* cookie1 = create_mock_cookie();
1555+
Vbid vbid = Vbid(0);
1556+
1557+
// Create our first consumer and enable SyncReplication so that we can ack
1558+
auto consumer1 =
1559+
std::make_shared<MockDcpConsumer>(*engine, cookie1, "consumer1");
1560+
consumer1->enableSyncReplication();
1561+
1562+
// Add our stream and accept to mimic real scenario and ensure it is alive
1563+
ASSERT_EQ(ENGINE_SUCCESS,
1564+
consumer1->addStream(0 /*opaque*/, vbid, 0 /*flags*/));
1565+
MockPassiveStream* stream1 = static_cast<MockPassiveStream*>(
1566+
(consumer1->getVbucketStream(vbid)).get());
1567+
stream1->acceptStream(cb::mcbp::Status::Success, 0 /*opaque*/);
1568+
ASSERT_TRUE(stream1->isActive());
1569+
1570+
ASSERT_EQ(2, stream1->public_readyQ().size());
1571+
auto resp = stream1->public_popFromReadyQ();
1572+
EXPECT_EQ(DcpResponse::Event::StreamReq, resp->getEvent());
1573+
resp = stream1->public_popFromReadyQ();
1574+
EXPECT_EQ(DcpResponse::Event::AddStream, resp->getEvent());
1575+
EXPECT_EQ(0, stream1->public_readyQ().size());
1576+
1577+
// Now close the stream to transition to dead
1578+
consumer1->closeStream(0 /*opaque*/, vbid);
1579+
ASSERT_FALSE(stream1->isActive());
1580+
1581+
// Add a new consumer and new PassiveStream
1582+
const void* cookie2 = create_mock_cookie();
1583+
auto consumer2 =
1584+
std::make_shared<MockDcpConsumer>(*engine, cookie2, "consumer2");
1585+
consumer2->enableSyncReplication();
1586+
ASSERT_EQ(ENGINE_SUCCESS,
1587+
consumer2->addStream(1 /*opaque*/, vbid, 0 /*flags*/));
1588+
MockPassiveStream* stream2 = static_cast<MockPassiveStream*>(
1589+
(consumer2->getVbucketStream(vbid)).get());
1590+
stream2->acceptStream(cb::mcbp::Status::Success, 1 /*opaque*/);
1591+
ASSERT_TRUE(stream2->isActive());
1592+
1593+
EXPECT_EQ(2, stream2->public_readyQ().size());
1594+
resp = stream2->public_popFromReadyQ();
1595+
EXPECT_EQ(DcpResponse::Event::StreamReq, resp->getEvent());
1596+
resp = stream2->public_popFromReadyQ();
1597+
EXPECT_EQ(DcpResponse::Event::AddStream, resp->getEvent());
1598+
EXPECT_EQ(0, stream2->public_readyQ().size());
1599+
1600+
// When we ack we should hit the active stream (stream2)
1601+
engine->getDcpConnMap().seqnoAckVBPassiveStream(vbid, 1);
1602+
EXPECT_EQ(0, stream1->public_readyQ().size());
1603+
EXPECT_EQ(1, stream2->public_readyQ().size());
1604+
1605+
ASSERT_EQ(ENGINE_SUCCESS, consumer2->closeStream(1 /*opaque*/, vbid));
1606+
destroy_mock_cookie(cookie1);
1607+
destroy_mock_cookie(cookie2);
1608+
}
1609+
15511610
/*
15521611
* The following tests that when the disk_backfill_queue configuration is
15531612
* set to false on receiving a snapshot marker it does not move into the

0 commit comments

Comments
 (0)