Skip to content

Commit 194b4ea

Browse files
committed
Revert "[BP] MB-58961: Remove 'vb' arg from ActiveStream::getOutstandingItems()"
This reverts commit 4597def. Reason for revert: That commit causes MB-59089. Change-Id: Ic5f77ad337a01c93e7c83796bfa24d3137afdd87 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/198136 Reviewed-by: Vesko Karaganev <[email protected]> Well-Formed: Restriction Checker Tested-by: Paolo Cocchi <[email protected]>
1 parent 8a4014c commit 194b4ea

File tree

5 files changed

+48
-54
lines changed

5 files changed

+48
-54
lines changed

engines/ep/src/dcp/active_stream.cc

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -988,41 +988,36 @@ void ActiveStream::nextCheckpointItemTask() {
988988

989989
void ActiveStream::nextCheckpointItemTask(
990990
const std::lock_guard<std::mutex>& streamMutex) {
991-
if (!producerPtr.lock()) {
992-
// Nothing to do, the connection is being shut down
993-
return;
994-
}
995-
996-
// MB-29369: only run the task's work if the stream is in an in-memory
997-
// phase (of which takeover is a variant).
998-
if (!(isInMemory() || isTakeoverSend())) {
999-
return;
1000-
}
991+
VBucketPtr vbucket = engine->getVBucket(vb_);
992+
if (vbucket) {
993+
auto producer = producerPtr.lock();
994+
if (!producer) {
995+
return;
996+
}
1001997

1002-
auto res = getOutstandingItems();
1003-
if (res.isEmpty()) {
998+
// MB-29369: only run the task's work if the stream is in an in-memory
999+
// phase (of which takeover is a variant).
1000+
if (isInMemory() || isTakeoverSend()) {
1001+
auto res = getOutstandingItems(*vbucket);
1002+
processItems(res, streamMutex);
1003+
}
1004+
} else {
1005+
/* The entity deleting the vbucket must set stream to dead,
1006+
calling setDead(cb::mcbp::DcpStreamEndStatus::StateChanged) will
1007+
cause deadlock because it will try to grab streamMutex which is
1008+
already acquired at this point here */
10041009
return;
10051010
}
1006-
1007-
processItems(res, streamMutex);
10081011
}
10091012

1010-
ActiveStream::OutstandingItemsResult ActiveStream::getOutstandingItems() {
1013+
ActiveStream::OutstandingItemsResult ActiveStream::getOutstandingItems(
1014+
VBucket& vb) {
10111015
OutstandingItemsResult result;
1012-
auto vb = engine->getVBucket(vb_);
1013-
if (!vb) {
1014-
// The entity deleting the vbucket must set stream to dead,
1015-
// calling setDead(cb::mcbp::DcpStreamEndStatus::StateChanged) will
1016-
// cause deadlock because it will try to grab streamMutex which is
1017-
// already acquired at this point here
1018-
return {};
1019-
}
1020-
10211016
// Commencing item processing - set guard flag.
10221017
chkptItemsExtractionInProgress.store(true);
10231018

10241019
auto _begin_ = std::chrono::steady_clock::now();
1025-
const auto itemsForCursor = vb->checkpointManager->getNextItemsForCursor(
1020+
const auto itemsForCursor = vb.checkpointManager->getNextItemsForCursor(
10261021
cursor.lock().get(), result.items);
10271022
engine->getEpStats().dcpCursorsGetItemsHisto.add(
10281023
std::chrono::duration_cast<std::chrono::microseconds>(
@@ -1074,7 +1069,7 @@ ActiveStream::OutstandingItemsResult ActiveStream::getOutstandingItems() {
10741069
result.historical = itemsForCursor.historical;
10751070

10761071
result.visibleSeqno = itemsForCursor.visibleSeqno;
1077-
if (vb->checkpointManager->hasClosedCheckpointWhichCanBeRemoved()) {
1072+
if (vb.checkpointManager->hasClosedCheckpointWhichCanBeRemoved()) {
10781073
engine->getKVBucket()->wakeUpCheckpointMemRecoveryTask();
10791074
}
10801075
return result;

engines/ep/src/dcp/active_stream.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -351,10 +351,6 @@ class ActiveStream : public Stream,
351351
OutstandingItemsResult();
352352
~OutstandingItemsResult();
353353

354-
bool isEmpty() const {
355-
return items.empty();
356-
}
357-
358354
/**
359355
* Optional state required when sending a checkpoint of type Disk
360356
* (i.e. when a Producer streams a disk-snapshot from memory.
@@ -509,10 +505,13 @@ class ActiveStream : public Stream,
509505
void notifyEmptyBackfill_UNLOCKED(uint64_t lastSeenSeqno);
510506

511507
/**
508+
* @param vb reference to the associated vbucket
509+
*
512510
* @return the outstanding items for the stream's checkpoint cursor and
513511
* checkpoint type.
514512
*/
515-
virtual ActiveStream::OutstandingItemsResult getOutstandingItems();
513+
virtual ActiveStream::OutstandingItemsResult getOutstandingItems(
514+
VBucket& vb);
516515

517516
/**
518517
* Given a set of queued items, create mutation response for each item,

engines/ep/tests/mock/mock_stream.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ class MockActiveStream : public ActiveStream {
4141
std::optional<std::string_view> jsonFilter = {});
4242

4343
// Expose underlying protected ActiveStream methods as public
44-
OutstandingItemsResult public_getOutstandingItems() {
45-
return getOutstandingItems();
44+
OutstandingItemsResult public_getOutstandingItems(VBucket& vb) {
45+
return getOutstandingItems(vb);
4646
}
4747

4848
void public_processItems(OutstandingItemsResult& result) {
@@ -134,9 +134,9 @@ class MockActiveStream : public ActiveStream {
134134
state_ = state;
135135
}
136136

137-
OutstandingItemsResult getOutstandingItems() override {
137+
OutstandingItemsResult getOutstandingItems(VBucket& vb) override {
138138
preGetOutstandingItemsCallback();
139-
return ActiveStream::getOutstandingItems();
139+
return ActiveStream::getOutstandingItems(vb);
140140
}
141141

142142
/// A callback to allow tests to inject code before we access the checkpoint

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ void DurabilityActiveStreamTest::testSendDcpPrepare() {
117117
EXPECT_EQ(value, (*it)->getValue()->to_s());
118118

119119
// We must have ckpt-start + Prepare
120-
auto outstandingItemsResult = stream->public_getOutstandingItems();
120+
auto outstandingItemsResult = stream->public_getOutstandingItems(*vb);
121121
ASSERT_EQ(2, outstandingItemsResult.items.size());
122122
ASSERT_EQ(queue_op::checkpoint_start,
123123
outstandingItemsResult.items.at(0)->getOperation());
@@ -273,7 +273,7 @@ void DurabilityActiveStreamTest::testSendCompleteSyncWrite(Resolution res) {
273273
}
274274

275275
// Fetch items via DCP stream.
276-
auto outstandingItemsResult = stream->public_getOutstandingItems();
276+
auto outstandingItemsResult = stream->public_getOutstandingItems(*vb);
277277
uint64_t expectedVisibleSeqno = 0;
278278
switch (res) {
279279
case Resolution::Commit:
@@ -839,7 +839,7 @@ TEST_P(DurabilityActiveStreamTest,
839839
}
840840
vb->notifyActiveDMOfLocalSyncWrite();
841841

842-
auto items = stream->getOutstandingItems();
842+
auto items = stream->getOutstandingItems(*vb);
843843
stream->public_processItems(items);
844844
stream->consumeBackfillItems(*producer, 1);
845845
stream->public_nextQueuedItem(*producer);
@@ -865,7 +865,7 @@ TEST_P(DurabilityActiveStreamTest,
865865
}
866866
vb->notifyActiveDMOfLocalSyncWrite();
867867

868-
items = stream->getOutstandingItems();
868+
items = stream->getOutstandingItems(*vb);
869869
stream->public_processItems(items);
870870
stream->consumeBackfillItems(*producer, 3);
871871
stream->public_nextQueuedItem(*producer);
@@ -4648,7 +4648,7 @@ void DurabilityPromotionStreamTest::testDiskCheckpointStreamedAsDiskSnapshot() {
46484648
// returns only items from contiguous checkpoints of the same type.
46494649
// Given that in CM we have checkpoints 1_Disk + 2_Memory, then the next
46504650
// returns only the entire 1_Disk.
4651-
auto outItems = stream->public_getOutstandingItems();
4651+
auto outItems = stream->public_getOutstandingItems(*vb);
46524652
ASSERT_EQ(4, outItems.items.size());
46534653
ASSERT_EQ(queue_op::checkpoint_start, outItems.items.at(0)->getOperation());
46544654
ASSERT_EQ(queue_op::pending_sync_write,
@@ -4691,7 +4691,7 @@ void DurabilityPromotionStreamTest::testDiskCheckpointStreamedAsDiskSnapshot() {
46914691

46924692
// Simulate running the checkpoint processor task again, now we process
46934693
// the second and the third checkpoints (both type:memory)
4694-
outItems = stream->public_getOutstandingItems();
4694+
outItems = stream->public_getOutstandingItems(*vb);
46954695
ASSERT_EQ(7, outItems.items.size());
46964696
ASSERT_EQ(queue_op::checkpoint_start, outItems.items.at(0)->getOperation());
46974697
// set_vbucket_state is from changing to active in the middle of this test
@@ -4894,7 +4894,7 @@ void DurabilityPromotionStreamTest::
48944894
// First CheckpointProcessorTask run
48954895
// Get items from CM, expect Memory{M:1}:
48964896
// ckpt-start + M:1 + ckpt-end
4897-
auto outItems = activeStream->public_getOutstandingItems();
4897+
auto outItems = activeStream->public_getOutstandingItems(*vb);
48984898
ASSERT_EQ(3, outItems.items.size());
48994899
ASSERT_EQ(queue_op::checkpoint_start, outItems.items.at(0)->getOperation());
49004900
ASSERT_EQ(queue_op::mutation, outItems.items.at(1)->getOperation());
@@ -4927,7 +4927,7 @@ void DurabilityPromotionStreamTest::
49274927
//
49284928
// !! NOTE: This is the important part of the test !!
49294929
// Before this patch we do not get any ckpt-start from CM
4930-
outItems = activeStream->public_getOutstandingItems();
4930+
outItems = activeStream->public_getOutstandingItems(*vb);
49314931
ASSERT_EQ(4, outItems.items.size());
49324932
ASSERT_EQ(queue_op::checkpoint_start, outItems.items.at(0)->getOperation());
49334933
ASSERT_EQ(queue_op::pending_sync_write,
@@ -4975,7 +4975,7 @@ void DurabilityPromotionStreamTest::
49754975
// Third CheckpointProcessorTask run
49764976
// Get items from CM, expect Memory{set-vbs:4, M:4}:
49774977
// ckpt-start + set-vbs:4 + M:4 + ckpt-end
4978-
outItems = activeStream->public_getOutstandingItems();
4978+
outItems = activeStream->public_getOutstandingItems(*vb);
49794979
ASSERT_EQ(3, outItems.items.size());
49804980
ASSERT_EQ(queue_op::checkpoint_start, outItems.items.at(0)->getOperation());
49814981
ASSERT_EQ(queue_op::set_vbucket_state,
@@ -5121,7 +5121,7 @@ void DurabilityPromotionStreamTest::
51215121
// CheckpointProcessorTask runs
51225122
// Get items from CM, expect Disk{PRE:1, M:2}:
51235123
// {CS, PRE:1, M:2, CE}
5124-
auto outItems = activeStream->public_getOutstandingItems();
5124+
auto outItems = activeStream->public_getOutstandingItems(*vb);
51255125
ASSERT_EQ(4, outItems.items.size());
51265126

51275127
ASSERT_EQ(queue_op::checkpoint_start, outItems.items.at(0)->getOperation());
@@ -5160,7 +5160,7 @@ void DurabilityPromotionStreamTest::
51605160
// CheckpointProcessorTask runs again
51615161
// Get items from CM, expect Disk{PRE:3, M:4}:
51625162
// {CS, PRE:3, M:4, CE}
5163-
outItems = activeStream->public_getOutstandingItems();
5163+
outItems = activeStream->public_getOutstandingItems(*vb);
51645164
ASSERT_EQ(4, outItems.items.size());
51655165

51665166
ASSERT_EQ(queue_op::checkpoint_start, outItems.items.at(0)->getOperation());
@@ -5298,7 +5298,7 @@ TEST_P(DurabilityPromotionStreamTest,
52985298
// Get items from CM, expect Disk{M:1} as only one Disk checkpoint can be
52995299
// retrieved at a time
53005300
auto vb = store->getVBucket(vbid);
5301-
auto outItems = activeStream->public_getOutstandingItems();
5301+
auto outItems = activeStream->public_getOutstandingItems(*vb);
53025302

53035303
// Push items into the Stream::readyQ
53045304
activeStream->public_processItems(outItems);
@@ -5321,7 +5321,7 @@ TEST_P(DurabilityPromotionStreamTest,
53215321

53225322
// CheckpointProcessorTask runs
53235323
// Get items from CM, expect Disk{A:3}
5324-
outItems = activeStream->public_getOutstandingItems();
5324+
outItems = activeStream->public_getOutstandingItems(*vb);
53255325
activeStream->public_processItems(outItems);
53265326

53275327
// 7)
@@ -5425,7 +5425,7 @@ TEST_P(DurabilityPromotionStreamTest,
54255425
// Get items from CM, expect Disk{M:1} as only one Disk checkpoint can be
54265426
// retrieved at a time
54275427
auto vb = store->getVBucket(vbid);
5428-
auto outItems = activeStream->public_getOutstandingItems();
5428+
auto outItems = activeStream->public_getOutstandingItems(*vb);
54295429

54305430
// Push items into the Stream::readyQ
54315431
activeStream->public_processItems(outItems);
@@ -5449,7 +5449,7 @@ TEST_P(DurabilityPromotionStreamTest,
54495449

54505450
// CheckpointProcessorTask runs
54515451
// Get items from CM, expect Disk{A:3}
5452-
outItems = activeStream->public_getOutstandingItems();
5452+
outItems = activeStream->public_getOutstandingItems(*vb);
54535453
activeStream->public_processItems(outItems);
54545454

54555455
// 7)
@@ -5516,7 +5516,7 @@ TEST_P(DurabilityPromotionStreamTest, ReplicaDeadActiveCanCommitPrepare) {
55165516

55175517
// Push items into the Stream::readyQ
55185518
auto vb = store->getVBucket(vbid);
5519-
auto outItems = activeStream->public_getOutstandingItems();
5519+
auto outItems = activeStream->public_getOutstandingItems(*vb);
55205520
activeStream->public_processItems(outItems);
55215521

55225522
// readyQ also contains SnapshotMarker
@@ -5540,7 +5540,7 @@ TEST_P(DurabilityPromotionStreamTest, ReplicaDeadActiveCanCommitPrepare) {
55405540
flushVBucketToDiskIfPersistent(vbid, 1);
55415541

55425542
// Push items into the Stream::readyQ
5543-
outItems = activeStream->public_getOutstandingItems();
5543+
outItems = activeStream->public_getOutstandingItems(*vb);
55445544
activeStream->public_processItems(outItems);
55455545

55465546
resp = activeStream->public_nextQueuedItem(*producer);
@@ -6057,7 +6057,7 @@ TEST_P(DurabilityActiveStreamTest, inMemoryMultipleMarkers) {
60576057

60586058
// We should get items from two checkpoints which will make processItems
60596059
// generate two markers
6060-
auto items = stream->public_getOutstandingItems();
6060+
auto items = stream->public_getOutstandingItems(*vb);
60616061
stream->public_processItems(items);
60626062

60636063
// marker, prepare, marker, mutation, prepare

engines/ep/tests/module_tests/dcp_stream_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -655,7 +655,7 @@ TEST_P(StreamTest, test_mb17766) {
655655
<< "nextCheckpointItem() should initially be true.";
656656

657657
// Get the set of outstanding items
658-
auto items = stream->public_getOutstandingItems();
658+
auto items = stream->public_getOutstandingItems(*vb0);
659659

660660
// REGRESSION CHECK: nextCheckpointItem() should still return true
661661
EXPECT_TRUE(stream->public_nextCheckpointItem(*producer))

0 commit comments

Comments
 (0)