Skip to content

Commit 9e12645

Browse files
committed
MB-58961: Update AS::lastReadSeqno for empty in-memory snapshot
Let's consider this case: - 2 collections, default + fruit - a single ActiveStream that filters on fruit In the case where the stream ends up processing a checkpoint like: [e:4 cs:4 m(keyD4):4 m(keyD5):5) (ie, all mutations in the checkpoint are in collection default and so they are filtered out at stream), then we do move the stream cursor in checkpoint but we miss to update lastReadSeqno=5. That misbehaviour creates the preconditions for a possible subsequent error that might end up in a Producer disconnection. That is detailed in the new test in this patch. Change-Id: Ia0b097f19286c5e1f50a82eb0fee841e722f89b0 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/199966 Tested-by: Paolo Cocchi <[email protected]> Well-Formed: Restriction Checker Reviewed-by: Jim Walker <[email protected]>
1 parent c029c0f commit 9e12645

File tree

3 files changed

+168
-1
lines changed

3 files changed

+168
-1
lines changed

engines/ep/src/dcp/active_stream.cc

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1425,6 +1425,25 @@ void ActiveStream::processItems(
14251425
sendSnapshotAndSeqnoAdvanced(outstandingItemsResult,
14261426
highNonVisibleSeqno.value(),
14271427
highNonVisibleSeqno.value());
1428+
} else if (lastReadSeqno < curChkSeqno) {
1429+
// So here we are in the case where:
1430+
// - We have moved the DCP cursor and pulled some items
1431+
// - There was some non-meta items (as curChkSeqno has been
1432+
// bumped)
1433+
// - We have filtered out some item (lastReadSeqno not aligned
1434+
// to curChkSeqno)
1435+
// - Actually we have filtered all the items and we have skipped
1436+
// the call to snapshot().
1437+
//
1438+
// We need to bump lastReadSeqno.
1439+
// The local newLastReadSeqno variable is updated with all
1440+
// seqnos that belong the stream, regardless of whether they are
1441+
// filtered out by the stream filter. That's the quantity that
1442+
// we normally use in the snapshot() golden-path for updating
1443+
// AS::lastReadSeqno. Used here with the same semantic.
1444+
if (lastReadSeqno < newLastReadSeqno) {
1445+
lastReadSeqno = newLastReadSeqno;
1446+
}
14281447
}
14291448
}
14301449
// if we've processed past the stream's end seqno then transition to the
@@ -1437,7 +1456,7 @@ void ActiveStream::processItems(
14371456
// After the snapshot has been processed, check if the filter is now empty
14381457
// a stream with an empty filter does nothing but self close
14391458
if (filter.empty()) {
1440-
// Filter is now empty empty, so endStream
1459+
// Filter is now empty, so endStream
14411460
endStream(cb::mcbp::DcpStreamEndStatus::FilterEmpty);
14421461
}
14431462

engines/ep/src/dcp/active_stream.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,14 @@ class ActiveStream : public Stream,
303303

304304
uint64_t getLastSentSeqno() const;
305305

306+
uint64_t getCurChkSeqno() const {
307+
return curChkSeqno;
308+
}
309+
310+
uint64_t getLastSentSnapEndSeqno() const {
311+
return lastSentSnapEndSeqno;
312+
}
313+
306314
// Defined in active_stream_impl.h to remove the need to include the
307315
// producer header here
308316
template <typename... Args>

engines/ep/tests/module_tests/dcp_stream_test.cc

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5201,6 +5201,146 @@ TEST_P(SingleThreadedActiveStreamTest,
52015201
EXPECT_EQ(DcpResponse::Event::Mutation, readyQ.back()->getEvent());
52025202
}
52035203

5204+
TEST_P(SingleThreadedActiveStreamTest, MB_58961) {
5205+
// No OSOBackfill in Ephemeral
5206+
if (ephemeral()) {
5207+
GTEST_SKIP();
5208+
}
5209+
5210+
// Create a collection
5211+
CollectionsManifest manifest;
5212+
const auto& cFruit = CollectionEntry::fruit;
5213+
manifest.add(
5214+
cFruit, cb::NoExpiryLimit, true /*history*/, ScopeEntry::defaultS);
5215+
auto& vb = *store->getVBucket(vbid);
5216+
vb.updateFromManifest(Collections::Manifest{std::string{manifest}});
5217+
ASSERT_EQ(1, vb.getHighSeqno());
5218+
5219+
// seqno:2 in cFruit
5220+
const std::string value("value");
5221+
store_item(vbid, makeStoredDocKey("keyF", cFruit), value);
5222+
ASSERT_EQ(2, vb.getHighSeqno());
5223+
5224+
// Add some data to the default collection
5225+
const auto& cDefault = CollectionEntry::defaultC;
5226+
store_item(vbid, makeStoredDocKey("keyD", cDefault), value);
5227+
ASSERT_EQ(3, vb.getHighSeqno());
5228+
5229+
// [e:1 cs:1 se:1 m(keyF):2 m(keyD):3)
5230+
5231+
// Ensure new stream will backfill
5232+
stream->public_getOutstandingItems(vb);
5233+
removeCheckpoint(vb);
5234+
5235+
// [e:4 cs:4)
5236+
5237+
// Ensure OSOBackfill is triggered
5238+
engine->getConfiguration().setDcpOsoBackfill("enabled");
5239+
producer->setOutOfOrderSnapshots(OutOfOrderSnapshots::YesWithSeqnoAdvanced);
5240+
5241+
// Stream filters on cFruit
5242+
recreateStream(
5243+
vb,
5244+
true,
5245+
fmt::format(R"({{"collections":["{:x}"]}})", uint32_t(cFruit.uid)));
5246+
ASSERT_TRUE(stream);
5247+
// Pushed to backfill
5248+
ASSERT_TRUE(stream->isBackfilling());
5249+
5250+
// [e:4 cs:4)
5251+
// ^
5252+
5253+
auto& readyQ = stream->public_readyQ();
5254+
ASSERT_EQ(0, readyQ.size());
5255+
5256+
// Run the OSO backfill
5257+
runBackfill(); // push markers and data
5258+
ASSERT_EQ(5, readyQ.size());
5259+
5260+
auto resp = stream->public_nextQueuedItem(*producer);
5261+
EXPECT_EQ(DcpResponse::Event::OSOSnapshot, resp->getEvent());
5262+
5263+
resp = stream->public_nextQueuedItem(*producer);
5264+
EXPECT_EQ(DcpResponse::Event::SystemEvent, resp->getEvent());
5265+
EXPECT_EQ(1, resp->getBySeqno());
5266+
5267+
resp = stream->public_nextQueuedItem(*producer);
5268+
EXPECT_EQ(DcpResponse::Event::Mutation, resp->getEvent());
5269+
EXPECT_EQ(2, resp->getBySeqno());
5270+
5271+
// Note: seqno:3 is in cDefault, filtered out, SeqnoAdvance(3) sent
5272+
resp = stream->public_nextQueuedItem(*producer);
5273+
EXPECT_EQ(DcpResponse::Event::SeqnoAdvanced, resp->getEvent());
5274+
EXPECT_EQ(3, resp->getBySeqno());
5275+
5276+
resp = stream->public_nextQueuedItem(*producer);
5277+
EXPECT_EQ(DcpResponse::Event::OSOSnapshot, resp->getEvent());
5278+
5279+
EXPECT_EQ(0, stream->getLastSentSnapEndSeqno());
5280+
EXPECT_EQ(2, stream->getLastBackfilledSeqno());
5281+
EXPECT_EQ(3, stream->getLastSentSeqno());
5282+
EXPECT_EQ(3, stream->getLastReadSeqno());
5283+
EXPECT_EQ(4, stream->getCurChkSeqno());
5284+
5285+
ASSERT_FALSE(stream->public_nextQueuedItem(*producer));
5286+
GMockDcpMsgProducers producers;
5287+
EXPECT_EQ(cb::engine_errc::would_block, producer->step(producers));
5288+
ASSERT_TRUE(stream->isInMemory());
5289+
5290+
// [e:4 cs:4)
5291+
// ^
5292+
5293+
store_item(vbid, makeStoredDocKey("keyD4", cDefault), value);
5294+
ASSERT_EQ(4, vb.getHighSeqno());
5295+
store_item(vbid, makeStoredDocKey("keyD5", cDefault), value);
5296+
ASSERT_EQ(5, vb.getHighSeqno());
5297+
5298+
// [e:4 cs:4 m(keyD4):4 m(keyD5):5)
5299+
// ^
5300+
5301+
// Move inMemory stream.
5302+
// Note: Before the fix we do move the cursor but we miss to advance the
5303+
// stream
5304+
ASSERT_EQ(0, readyQ.size());
5305+
runCheckpointProcessor(*producer, producers);
5306+
// Note: We don't send any snapshot when all items are filtrered out
5307+
EXPECT_EQ(0, readyQ.size());
5308+
5309+
// [e:4 cs:4 m(keyD4):4 m(keyD5):5)
5310+
// ^
5311+
5312+
EXPECT_EQ(0, stream->getLastSentSnapEndSeqno());
5313+
EXPECT_EQ(2, stream->getLastBackfilledSeqno());
5314+
EXPECT_EQ(3, stream->getLastSentSeqno());
5315+
EXPECT_EQ(5, stream->getLastReadSeqno()); // Before the fix: 3
5316+
EXPECT_EQ(5, stream->getCurChkSeqno());
5317+
5318+
// CursorDrop + backfill
5319+
ASSERT_TRUE(stream->handleSlowStream());
5320+
ASSERT_TRUE(stream->isInMemory());
5321+
5322+
// [e:4 cs:4 m(keyD4):4 m(keyD5):5)
5323+
// x
5324+
5325+
// Before the fix for MB-58961 this step triggers:
5326+
//
5327+
// libc++abi: terminating due to uncaught exception of type
5328+
// boost::exception_detail::error_info_injector<std::logic_error>:Monotonic<y>
5329+
// (ActiveStream(test_producer->test_consumer (vb:0))::curChkSeqno)
5330+
// invariant failed: new value (4) breaks invariant on current value (5)
5331+
//
5332+
// The reason for the failure in MB-58961 is that we miss to update
5333+
// AS::lastReadSeqno when we process the "empty-by-filter" snapshot before
5334+
// CursorDrop.
5335+
// By that:
5336+
// (a) lastReadSeqno stays at 3
5337+
// (b) In the subsequent AS::scheduleBackfill(lastReadSeqno:3) call we
5338+
// re-register the cursor at cs:4 and we try to reset curChkSeqno (5) by
5339+
// (4).
5340+
EXPECT_EQ(cb::engine_errc::would_block, producer->step(producers));
5341+
EXPECT_FALSE(stream->isBackfilling());
5342+
}
5343+
52045344
INSTANTIATE_TEST_SUITE_P(AllBucketTypes,
52055345
SingleThreadedActiveStreamTest,
52065346
STParameterizedBucketTest::allConfigValues(),

0 commit comments

Comments
 (0)