Skip to content

Commit 4597def

Browse files
committed
[BP] MB-58961: Remove 'vb' arg from ActiveStream::getOutstandingItems()
Change-Id: I427936055e0be7e2a91ccd236a5debeece243d86 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/198873 Well-Formed: Restriction Checker Tested-by: Paolo Cocchi <[email protected]> Reviewed-by: Trond Norbye <[email protected]>
1 parent fa4e166 commit 4597def

File tree

5 files changed

+54
-48
lines changed

5 files changed

+54
-48
lines changed

engines/ep/src/dcp/active_stream.cc

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -991,36 +991,41 @@ void ActiveStream::nextCheckpointItemTask() {
991991

992992
void ActiveStream::nextCheckpointItemTask(
993993
const std::lock_guard<std::mutex>& streamMutex) {
994-
VBucketPtr vbucket = engine->getVBucket(vb_);
995-
if (vbucket) {
996-
auto producer = producerPtr.lock();
997-
if (!producer) {
998-
return;
999-
}
994+
if (!producerPtr.lock()) {
995+
// Nothing to do, the connection is being shut down
996+
return;
997+
}
1000998

1001-
// MB-29369: only run the task's work if the stream is in an in-memory
1002-
// phase (of which takeover is a variant).
1003-
if (isInMemory() || isTakeoverSend()) {
1004-
auto res = getOutstandingItems(*vbucket);
1005-
processItems(res, streamMutex);
1006-
}
1007-
} else {
1008-
/* The entity deleting the vbucket must set stream to dead,
1009-
calling setDead(cb::mcbp::DcpStreamEndStatus::StateChanged) will
1010-
cause deadlock because it will try to grab streamMutex which is
1011-
already acquired at this point here */
999+
// MB-29369: only run the task's work if the stream is in an in-memory
1000+
// phase (of which takeover is a variant).
1001+
if (!(isInMemory() || isTakeoverSend())) {
10121002
return;
10131003
}
1004+
1005+
auto res = getOutstandingItems();
1006+
if (res.isEmpty()) {
1007+
return;
1008+
}
1009+
1010+
processItems(res, streamMutex);
10141011
}
10151012

1016-
ActiveStream::OutstandingItemsResult ActiveStream::getOutstandingItems(
1017-
VBucket& vb) {
1013+
ActiveStream::OutstandingItemsResult ActiveStream::getOutstandingItems() {
10181014
OutstandingItemsResult result;
1015+
auto vb = engine->getVBucket(vb_);
1016+
if (!vb) {
1017+
// The entity deleting the vbucket must set stream to dead,
1018+
// calling setDead(cb::mcbp::DcpStreamEndStatus::StateChanged) will
1019+
// cause deadlock because it will try to grab streamMutex which is
1020+
// already acquired at this point here
1021+
return {};
1022+
}
1023+
10191024
// Commencing item processing - set guard flag.
10201025
chkptItemsExtractionInProgress.store(true);
10211026

10221027
auto _begin_ = std::chrono::steady_clock::now();
1023-
const auto itemsForCursor = vb.checkpointManager->getNextItemsForCursor(
1028+
const auto itemsForCursor = vb->checkpointManager->getNextItemsForCursor(
10241029
cursor.lock().get(), result.items);
10251030
engine->getEpStats().dcpCursorsGetItemsHisto.add(
10261031
std::chrono::duration_cast<std::chrono::microseconds>(
@@ -1072,7 +1077,7 @@ ActiveStream::OutstandingItemsResult ActiveStream::getOutstandingItems(
10721077
result.historical = itemsForCursor.historical;
10731078

10741079
result.visibleSeqno = itemsForCursor.visibleSeqno;
1075-
if (vb.checkpointManager->hasClosedCheckpointWhichCanBeRemoved()) {
1080+
if (vb->checkpointManager->hasClosedCheckpointWhichCanBeRemoved()) {
10761081
engine->getKVBucket()->wakeUpCheckpointMemRecoveryTask();
10771082
}
10781083
return result;

engines/ep/src/dcp/active_stream.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,10 @@ class ActiveStream : public Stream,
351351
OutstandingItemsResult();
352352
~OutstandingItemsResult();
353353

354+
bool isEmpty() const {
355+
return items.empty();
356+
}
357+
354358
/**
355359
* Optional state required when sending a checkpoint of type Disk
356360
* (i.e. when a Producer streams a disk-snapshot from memory.
@@ -505,13 +509,10 @@ class ActiveStream : public Stream,
505509
void notifyEmptyBackfill_UNLOCKED(uint64_t lastSeenSeqno);
506510

507511
/**
508-
* @param vb reference to the associated vbucket
509-
*
510512
* @return the outstanding items for the stream's checkpoint cursor and
511513
* checkpoint type.
512514
*/
513-
virtual ActiveStream::OutstandingItemsResult getOutstandingItems(
514-
VBucket& vb);
515+
virtual ActiveStream::OutstandingItemsResult getOutstandingItems();
515516

516517
/**
517518
* Given a set of queued items, create mutation response for each item,

engines/ep/tests/mock/mock_stream.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ class MockActiveStream : public ActiveStream {
4141
std::optional<std::string_view> jsonFilter = {});
4242

4343
// Expose underlying protected ActiveStream methods as public
44-
OutstandingItemsResult public_getOutstandingItems(VBucket& vb) {
45-
return getOutstandingItems(vb);
44+
OutstandingItemsResult public_getOutstandingItems() {
45+
return getOutstandingItems();
4646
}
4747

4848
void public_processItems(OutstandingItemsResult& result) {
@@ -134,9 +134,9 @@ class MockActiveStream : public ActiveStream {
134134
state_ = state;
135135
}
136136

137-
OutstandingItemsResult getOutstandingItems(VBucket& vb) override {
137+
OutstandingItemsResult getOutstandingItems() override {
138138
preGetOutstandingItemsCallback();
139-
return ActiveStream::getOutstandingItems(vb);
139+
return ActiveStream::getOutstandingItems();
140140
}
141141

142142
/// A callback to allow tests to inject code before we access the checkpoint

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ void DurabilityActiveStreamTest::testSendDcpPrepare() {
117117
EXPECT_EQ(value, (*it)->getValue()->to_s());
118118

119119
// We must have ckpt-start + Prepare
120-
auto outstandingItemsResult = stream->public_getOutstandingItems(*vb);
120+
auto outstandingItemsResult = stream->public_getOutstandingItems();
121121
ASSERT_EQ(2, outstandingItemsResult.items.size());
122122
ASSERT_EQ(queue_op::checkpoint_start,
123123
outstandingItemsResult.items.at(0)->getOperation());
@@ -273,7 +273,7 @@ void DurabilityActiveStreamTest::testSendCompleteSyncWrite(Resolution res) {
273273
}
274274

275275
// Fetch items via DCP stream.
276-
auto outstandingItemsResult = stream->public_getOutstandingItems(*vb);
276+
auto outstandingItemsResult = stream->public_getOutstandingItems();
277277
uint64_t expectedVisibleSeqno = 0;
278278
switch (res) {
279279
case Resolution::Commit:
@@ -839,7 +839,7 @@ TEST_P(DurabilityActiveStreamTest,
839839
}
840840
vb->notifyActiveDMOfLocalSyncWrite();
841841

842-
auto items = stream->getOutstandingItems(*vb);
842+
auto items = stream->getOutstandingItems();
843843
stream->public_processItems(items);
844844
stream->consumeBackfillItems(*producer, 1);
845845
stream->public_nextQueuedItem(*producer);
@@ -865,7 +865,7 @@ TEST_P(DurabilityActiveStreamTest,
865865
}
866866
vb->notifyActiveDMOfLocalSyncWrite();
867867

868-
items = stream->getOutstandingItems(*vb);
868+
items = stream->getOutstandingItems();
869869
stream->public_processItems(items);
870870
stream->consumeBackfillItems(*producer, 3);
871871
stream->public_nextQueuedItem(*producer);
@@ -4648,7 +4648,7 @@ void DurabilityPromotionStreamTest::testDiskCheckpointStreamedAsDiskSnapshot() {
46484648
// returns only items from contiguous checkpoints of the same type.
46494649
// Given that in CM we have checkpoints 1_Disk + 2_Memory, then the next
46504650
// returns only the entire 1_Disk.
4651-
auto outItems = stream->public_getOutstandingItems(*vb);
4651+
auto outItems = stream->public_getOutstandingItems();
46524652
ASSERT_EQ(4, outItems.items.size());
46534653
ASSERT_EQ(queue_op::checkpoint_start, outItems.items.at(0)->getOperation());
46544654
ASSERT_EQ(queue_op::pending_sync_write,
@@ -4691,7 +4691,7 @@ void DurabilityPromotionStreamTest::testDiskCheckpointStreamedAsDiskSnapshot() {
46914691

46924692
// Simulate running the checkpoint processor task again, now we process
46934693
// the second and the third checkpoints (both type:memory)
4694-
outItems = stream->public_getOutstandingItems(*vb);
4694+
outItems = stream->public_getOutstandingItems();
46954695
ASSERT_EQ(7, outItems.items.size());
46964696
ASSERT_EQ(queue_op::checkpoint_start, outItems.items.at(0)->getOperation());
46974697
// set_vbucket_state is from changing to active in the middle of this test
@@ -4894,7 +4894,7 @@ void DurabilityPromotionStreamTest::
48944894
// First CheckpointProcessorTask run
48954895
// Get items from CM, expect Memory{M:1}:
48964896
// ckpt-start + M:1 + ckpt-end
4897-
auto outItems = activeStream->public_getOutstandingItems(*vb);
4897+
auto outItems = activeStream->public_getOutstandingItems();
48984898
ASSERT_EQ(3, outItems.items.size());
48994899
ASSERT_EQ(queue_op::checkpoint_start, outItems.items.at(0)->getOperation());
49004900
ASSERT_EQ(queue_op::mutation, outItems.items.at(1)->getOperation());
@@ -4927,7 +4927,7 @@ void DurabilityPromotionStreamTest::
49274927
//
49284928
// !! NOTE: This is the important part of the test !!
49294929
// Before this patch we do not get any ckpt-start from CM
4930-
outItems = activeStream->public_getOutstandingItems(*vb);
4930+
outItems = activeStream->public_getOutstandingItems();
49314931
ASSERT_EQ(4, outItems.items.size());
49324932
ASSERT_EQ(queue_op::checkpoint_start, outItems.items.at(0)->getOperation());
49334933
ASSERT_EQ(queue_op::pending_sync_write,
@@ -4975,7 +4975,7 @@ void DurabilityPromotionStreamTest::
49754975
// Third CheckpointProcessorTask run
49764976
// Get items from CM, expect Memory{set-vbs:4, M:4}:
49774977
// ckpt-start + set-vbs:4 + M:4 + ckpt-end
4978-
outItems = activeStream->public_getOutstandingItems(*vb);
4978+
outItems = activeStream->public_getOutstandingItems();
49794979
ASSERT_EQ(3, outItems.items.size());
49804980
ASSERT_EQ(queue_op::checkpoint_start, outItems.items.at(0)->getOperation());
49814981
ASSERT_EQ(queue_op::set_vbucket_state,
@@ -5121,7 +5121,7 @@ void DurabilityPromotionStreamTest::
51215121
// CheckpointProcessorTask runs
51225122
// Get items from CM, expect Disk{PRE:1, M:2}:
51235123
// {CS, PRE:1, M:2, CE}
5124-
auto outItems = activeStream->public_getOutstandingItems(*vb);
5124+
auto outItems = activeStream->public_getOutstandingItems();
51255125
ASSERT_EQ(4, outItems.items.size());
51265126

51275127
ASSERT_EQ(queue_op::checkpoint_start, outItems.items.at(0)->getOperation());
@@ -5160,7 +5160,7 @@ void DurabilityPromotionStreamTest::
51605160
// CheckpointProcessorTask runs again
51615161
// Get items from CM, expect Disk{PRE:3, M:4}:
51625162
// {CS, PRE:3, M:4, CE}
5163-
outItems = activeStream->public_getOutstandingItems(*vb);
5163+
outItems = activeStream->public_getOutstandingItems();
51645164
ASSERT_EQ(4, outItems.items.size());
51655165

51665166
ASSERT_EQ(queue_op::checkpoint_start, outItems.items.at(0)->getOperation());
@@ -5298,7 +5298,7 @@ TEST_P(DurabilityPromotionStreamTest,
52985298
// Get items from CM, expect Disk{M:1} as only one Disk checkpoint can be
52995299
// retrieved at a time
53005300
auto vb = store->getVBucket(vbid);
5301-
auto outItems = activeStream->public_getOutstandingItems(*vb);
5301+
auto outItems = activeStream->public_getOutstandingItems();
53025302

53035303
// Push items into the Stream::readyQ
53045304
activeStream->public_processItems(outItems);
@@ -5321,7 +5321,7 @@ TEST_P(DurabilityPromotionStreamTest,
53215321

53225322
// CheckpointProcessorTask runs
53235323
// Get items from CM, expect Disk{A:3}
5324-
outItems = activeStream->public_getOutstandingItems(*vb);
5324+
outItems = activeStream->public_getOutstandingItems();
53255325
activeStream->public_processItems(outItems);
53265326

53275327
// 7)
@@ -5425,7 +5425,7 @@ TEST_P(DurabilityPromotionStreamTest,
54255425
// Get items from CM, expect Disk{M:1} as only one Disk checkpoint can be
54265426
// retrieved at a time
54275427
auto vb = store->getVBucket(vbid);
5428-
auto outItems = activeStream->public_getOutstandingItems(*vb);
5428+
auto outItems = activeStream->public_getOutstandingItems();
54295429

54305430
// Push items into the Stream::readyQ
54315431
activeStream->public_processItems(outItems);
@@ -5449,7 +5449,7 @@ TEST_P(DurabilityPromotionStreamTest,
54495449

54505450
// CheckpointProcessorTask runs
54515451
// Get items from CM, expect Disk{A:3}
5452-
outItems = activeStream->public_getOutstandingItems(*vb);
5452+
outItems = activeStream->public_getOutstandingItems();
54535453
activeStream->public_processItems(outItems);
54545454

54555455
// 7)
@@ -5516,7 +5516,7 @@ TEST_P(DurabilityPromotionStreamTest, ReplicaDeadActiveCanCommitPrepare) {
55165516

55175517
// Push items into the Stream::readyQ
55185518
auto vb = store->getVBucket(vbid);
5519-
auto outItems = activeStream->public_getOutstandingItems(*vb);
5519+
auto outItems = activeStream->public_getOutstandingItems();
55205520
activeStream->public_processItems(outItems);
55215521

55225522
// readyQ also contains SnapshotMarker
@@ -5540,7 +5540,7 @@ TEST_P(DurabilityPromotionStreamTest, ReplicaDeadActiveCanCommitPrepare) {
55405540
flushVBucketToDiskIfPersistent(vbid, 1);
55415541

55425542
// Push items into the Stream::readyQ
5543-
outItems = activeStream->public_getOutstandingItems(*vb);
5543+
outItems = activeStream->public_getOutstandingItems();
55445544
activeStream->public_processItems(outItems);
55455545

55465546
resp = activeStream->public_nextQueuedItem(*producer);
@@ -6057,7 +6057,7 @@ TEST_P(DurabilityActiveStreamTest, inMemoryMultipleMarkers) {
60576057

60586058
// We should get items from two checkpoints which will make processItems
60596059
// generate two markers
6060-
auto items = stream->public_getOutstandingItems(*vb);
6060+
auto items = stream->public_getOutstandingItems();
60616061
stream->public_processItems(items);
60626062

60636063
// marker, prepare, marker, mutation, prepare

engines/ep/tests/module_tests/dcp_stream_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -655,7 +655,7 @@ TEST_P(StreamTest, test_mb17766) {
655655
<< "nextCheckpointItem() should initially be true.";
656656

657657
// Get the set of outstanding items
658-
auto items = stream->public_getOutstandingItems(*vb0);
658+
auto items = stream->public_getOutstandingItems();
659659

660660
// REGRESSION CHECK: nextCheckpointItem() should still return true
661661
EXPECT_TRUE(stream->public_nextCheckpointItem(*producer))

0 commit comments

Comments
 (0)