Skip to content

Commit a6312a6

Browse files
BenHuddlestondaverigby
authored andcommitted
MB-35429: Move preparedSeqno check to ActiveStream::seqnoAck
Currently we check if we should process a seqno ack based on whether or not the preparedSeqno is less than our ActiveStream's lastSentSeqno. This should only be done if the stream is alive as we should never attempt to process seqno acks for any dead Stream. Change-Id: Iac72210a4bca390ea19013c7d43a8082fbb44a5d Reviewed-on: http://review.couchbase.org/112971 Reviewed-by: James Harrison <[email protected]> Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent edeaeb7 commit a6312a6

File tree

4 files changed

+25
-37
lines changed

4 files changed

+25
-37
lines changed

engines/ep/src/dcp/active_stream.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1792,6 +1792,14 @@ ENGINE_ERROR_CODE ActiveStream::seqnoAck(const std::string& consumerName,
17921792
return ENGINE_SUCCESS;
17931793
}
17941794

1795+
if (preparedSeqno > getLastSentSeqno()) {
1796+
throw std::logic_error(
1797+
vb_.to_string() + " replica \"" + consumerName +
1798+
"\" acked seqno:" + std::to_string(preparedSeqno) +
1799+
" which is greater than last sent seqno:" +
1800+
std::to_string(getLastSentSeqno()));
1801+
}
1802+
17951803
return vb->seqnoAcknowledged(
17961804
vbStateLh, consumerName, preparedSeqno);
17971805
} // end stream mutex lock scope

engines/ep/src/dcp/producer.cc

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1067,14 +1067,6 @@ ENGINE_ERROR_CODE DcpProducer::seqno_acknowledged(uint32_t opaque,
10671067
}
10681068

10691069
ActiveStream* as = static_cast<ActiveStream*>(stream.get());
1070-
1071-
if (prepared_seqno > as->getLastSentSeqno()) {
1072-
throw std::logic_error(
1073-
"Replica acked seqno:" + std::to_string(prepared_seqno) +
1074-
" greater than last sent seqno:" +
1075-
std::to_string(as->getLastSentSeqno()));
1076-
}
1077-
10781070
return as->seqnoAck(consumerName, prepared_seqno);
10791071
}
10801072

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -453,23 +453,11 @@ TEST_P(DurabilityActiveStreamTest, BackfillAbort) {
453453
}
454454

455455
TEST_P(DurabilityActiveStreamTest, RemoveUnknownSeqnoAckAtDestruction) {
456-
auto vb = engine->getVBucket(vbid);
457-
458-
const auto key = makeStoredDocKey("key");
459-
const auto& value = "value";
460-
auto item = makePendingItem(
461-
key,
462-
value,
463-
cb::durability::Requirements(cb::durability::Level::Majority,
464-
1 /*timeout*/));
465-
VBQueueItemCtx ctx;
466-
ctx.durability =
467-
DurabilityItemCtx{item->getDurabilityReqs(), nullptr /*cookie*/};
468-
469-
EXPECT_EQ(MutationStatus::WasClean, public_processSet(*vb, *item, ctx));
456+
testSendDcpPrepare();
470457
flushVBucketToDiskIfPersistent(vbid, 1);
471458

472459
// We don't include prepares in the numItems stat (should not exist in here)
460+
auto vb = engine->getVBucket(vbid);
473461
EXPECT_EQ(0, vb->getNumItems());
474462

475463
// Our topology gives replica name as "replica" an our producer/stream has
@@ -503,23 +491,11 @@ TEST_P(DurabilityActiveStreamTest, RemoveUnknownSeqnoAckAtDestruction) {
503491
}
504492

505493
TEST_P(DurabilityActiveStreamTest, RemoveCorrectQueuedAckAtStreamSetDead) {
506-
auto vb = engine->getVBucket(vbid);
507-
508-
const auto key = makeStoredDocKey("key");
509-
const auto& value = "value";
510-
auto item = makePendingItem(
511-
key,
512-
value,
513-
cb::durability::Requirements(cb::durability::Level::Majority,
514-
1 /*timeout*/));
515-
VBQueueItemCtx ctx;
516-
ctx.durability =
517-
DurabilityItemCtx{item->getDurabilityReqs(), nullptr /*cookie*/};
518-
519-
EXPECT_EQ(MutationStatus::WasClean, public_processSet(*vb, *item, ctx));
494+
testSendDcpPrepare();
520495
flushVBucketToDiskIfPersistent(vbid, 1);
521496

522497
// We don't include prepares in the numItems stat (should not exist in here)
498+
auto vb = engine->getVBucket(vbid);
523499
EXPECT_EQ(0, vb->getNumItems());
524500

525501
// Our topology gives replica name as "replica" an our producer/stream has
@@ -544,6 +520,17 @@ TEST_P(DurabilityActiveStreamTest, RemoveCorrectQueuedAckAtStreamSetDead) {
544520
producer->scheduleCheckpointProcessorTask();
545521
stream->setActive();
546522

523+
// Process items to ensure that lastSentSeqno is GE the seqno that we will
524+
// ack
525+
stream->transitionStateToBackfilling();
526+
ASSERT_TRUE(stream->isBackfilling());
527+
528+
auto& bfm = producer->getBFM();
529+
bfm.backfill();
530+
bfm.backfill();
531+
EXPECT_EQ(2, stream->public_readyQSize());
532+
stream->consumeBackfillItems(2);
533+
547534
// Should not throw a monotonic exception as the ack should have been
548535
// removed by setDead.
549536
stream->seqnoAck(producer->getConsumerName(), 1);

engines/ep/tests/module_tests/dcp_stream_sync_repl_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -770,7 +770,8 @@ TEST_P(DcpStreamSyncReplPersistentTest, ProducerAllowsSeqnoAckLEQToLastSent) {
770770
try {
771771
producer->seqno_acknowledged(0, vbid, 4);
772772
} catch (const std::logic_error& e) {
773-
EXPECT_TRUE(std::string(e.what()).find("Replica acked seqno") !=
773+
EXPECT_TRUE(std::string(e.what()).find(
774+
"replica \"replica1\" acked seqno") !=
774775
std::string::npos);
775776
thrown = true;
776777
}

0 commit comments

Comments
 (0)