Skip to content

Commit 2ffc46f

Browse files
BenHuddlestondaverigby
authored andcommitted
MB-35427: Remove ack for correct consumer
When we set the ActiveStream to dead we attempt to remove any queuedSeqnoAck from the DM's map. We currently attempt to remove the ack with the name of the ActiveStream which is not correct. We should instead remove the ack with the name of the consumer which is stored in the DcpProducer. Change-Id: Ie4308b114542d61f2ffd91f6e65375f88114fe54 Reviewed-on: http://review.couchbase.org/112919 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Dave Rigby <[email protected]>
1 parent 26ee724 commit 2ffc46f

File tree

5 files changed

+88
-7
lines changed

5 files changed

+88
-7
lines changed

engines/ep/src/dcp/active_stream.cc

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1185,11 +1185,33 @@ uint32_t ActiveStream::setDead(end_stream_status_t status) {
11851185
if (!vb) {
11861186
return 0;
11871187
}
1188+
1189+
// Get the consumer name from the producer so that we can clear the
1190+
// correct ack
1191+
std::string consumerName;
1192+
{
1193+
auto p = producerPtr.lock();
1194+
if (!p) {
1195+
log(spdlog::level::warn,
1196+
"Producer could not be locked when"
1197+
"attempting to clear queued seqno acks");
1198+
return 0;
1199+
}
1200+
consumerName = p->getConsumerName();
1201+
}
1202+
1203+
if (consumerName.empty()) {
1204+
log(spdlog::level::warn,
1205+
"Consumer name not found for producer when"
1206+
"attempting to clear queued seqno acks");
1207+
return 0;
1208+
}
1209+
11881210
// Take the vb state lock so that we don't change the state of
11891211
// this vb
11901212
folly::SharedMutex::ReadHolder vbStateLh(vb->getStateLock());
11911213
if (vb->getState() == vbucket_state_active) {
1192-
vb->removeQueuedAckFromDM(name_);
1214+
vb->removeQueuedAckFromDM(consumerName);
11931215
}
11941216
}
11951217

engines/ep/src/dcp/producer.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1827,3 +1827,7 @@ end_stream_status_t DcpProducer::mapEndStreamStatus(
18271827
}
18281828
return status;
18291829
}
1830+
1831+
std::string DcpProducer::getConsumerName() const {
1832+
return consumerName;
1833+
}

engines/ep/src/dcp/producer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,8 @@ class DcpProducer : public ConnHandler,
329329
std::shared_ptr<StreamContainer<std::shared_ptr<Stream>>> findStreams(
330330
Vbid vbid);
331331

332+
std::string getConsumerName() const;
333+
332334
protected:
333335
/** We may disconnect if noop messages are enabled and the last time we
334336
* received any message (including a noop) exceeds the dcpTimeout.

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ void DurabilityActiveStreamTest::setUp(bool startCheckpointProcessorTask) {
4848

4949
// Enable SyncReplication and flow-control (Producer BufferLog)
5050
setupProducer({{"enable_synchronous_replication", "true"},
51-
{"connection_buffer_size", "52428800"}},
51+
{"connection_buffer_size", "52428800"},
52+
{"consumer_name", "test_consumer"}},
5253
startCheckpointProcessorTask);
5354
ASSERT_TRUE(stream->public_supportSyncReplication());
5455
}
@@ -474,7 +475,7 @@ TEST_P(DurabilityActiveStreamTest, RemoveUnknownSeqnoAckAtDestruction) {
474475
// Our topology gives replica name as "replica" an our producer/stream has
475476
// name "test_producer". Simulate a seqno ack by calling the vBucket level
476477
// function.
477-
stream->seqnoAck("test_producer", 1);
478+
stream->seqnoAck(producer->getConsumerName(), 1);
478479

479480
// An unknown seqno ack should not have committed the item
480481
EXPECT_EQ(0, vb->getNumItems());
@@ -487,7 +488,7 @@ TEST_P(DurabilityActiveStreamTest, RemoveUnknownSeqnoAckAtDestruction) {
487488
// connections. We verify that the seqno ack does not exist in the map
488489
// by performing the topology change that would commit the prepare if it
489490
// did.
490-
EXPECT_EQ(ENGINE_SUCCESS, stream->seqnoAck("test_producer", 1));
491+
EXPECT_EQ(ENGINE_SUCCESS, stream->seqnoAck(producer->getConsumerName(), 1));
491492

492493
// If the seqno ack still existed in the queuedSeqnoAcks map then it would
493494
// result in a commit on topology change
@@ -496,11 +497,60 @@ TEST_P(DurabilityActiveStreamTest, RemoveUnknownSeqnoAckAtDestruction) {
496497
vbucket_state_active,
497498
{{"topology",
498499
nlohmann::json::array(
499-
{{"active", "replica1", "test_producer"}})}});
500+
{{"active", "replica1", producer->getConsumerName()}})}});
500501

501502
EXPECT_EQ(0, vb->getNumItems());
502503
}
503504

505+
TEST_P(DurabilityActiveStreamTest, RemoveCorrectQueuedAckAtStreamSetDead) {
506+
auto vb = engine->getVBucket(vbid);
507+
508+
const auto key = makeStoredDocKey("key");
509+
const auto& value = "value";
510+
auto item = makePendingItem(
511+
key,
512+
value,
513+
cb::durability::Requirements(cb::durability::Level::Majority,
514+
1 /*timeout*/));
515+
VBQueueItemCtx ctx;
516+
ctx.durability =
517+
DurabilityItemCtx{item->getDurabilityReqs(), nullptr /*cookie*/};
518+
519+
EXPECT_EQ(MutationStatus::WasClean, public_processSet(*vb, *item, ctx));
520+
flushVBucketToDiskIfPersistent(vbid, 1);
521+
522+
// We don't include prepares in the numItems stat (should not exist in here)
523+
EXPECT_EQ(0, vb->getNumItems());
524+
525+
// Our topology gives replica name as "replica" an our producer/stream has
526+
// name "test_producer". Simulate a seqno ack by calling the vBucket level
527+
// function.
528+
stream->seqnoAck(producer->getConsumerName(), 1);
529+
530+
// Disconnect the ActiveStream. Should remove the queued seqno ack
531+
stream->setDead(END_STREAM_DISCONNECTED);
532+
533+
stream = std::make_shared<MockActiveStream>(engine.get(),
534+
producer,
535+
0 /*flags*/,
536+
0 /*opaque*/,
537+
*vb,
538+
0 /*st_seqno*/,
539+
~0 /*en_seqno*/,
540+
0x0 /*vb_uuid*/,
541+
0 /*snap_start_seqno*/,
542+
~0 /*snap_end_seqno*/);
543+
producer->createCheckpointProcessorTask();
544+
producer->scheduleCheckpointProcessorTask();
545+
stream->setActive();
546+
547+
// Should not throw a monotonic exception as the ack should have been
548+
// removed by setDead.
549+
stream->seqnoAck(producer->getConsumerName(), 1);
550+
551+
producer->cancelCheckpointCreatorTask();
552+
}
553+
504554
void DurabilityActiveStreamTest::setUpSendSetInsteadOfCommitTest() {
505555
auto vb = engine->getVBucket(vbid);
506556

engines/ep/tests/module_tests/dcp_stream_test.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1427,8 +1427,11 @@ void SingleThreadedActiveStreamTest::setupProducer(
14271427
// We don't set the startTask flag here because we will create the task
14281428
// manually. We do this because the producer actually creates the task on
14291429
// StreamRequest which we do not do because we want a MockActiveStream.
1430-
producer = std::make_shared<MockDcpProducer>(
1431-
*engine, cookie, "test_producer", flags, false /*startTask*/);
1430+
producer = std::make_shared<MockDcpProducer>(*engine,
1431+
cookie,
1432+
"test_producer->test_consumer",
1433+
flags,
1434+
false /*startTask*/);
14321435

14331436
if (startCheckpointProcessorTask &&
14341437
!producer->getCheckpointSnapshotTask()) {

0 commit comments

Comments
 (0)