Skip to content

Commit 38ba836

Browse files
jimwwalkerdaverigby
authored andcommitted
MB-29369: Obtain the streamMutex earlier in the snapshot task
Obtain the streamMutex and also check the stream is in-memory / takeover-send before the task increments the cursor. Change-Id: I82ba9b959859921062f817f9f8e2c1452cb852e7 Reviewed-on: http://review.couchbase.org/93497 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent a09cd0b commit 38ba836

File tree

9 files changed

+263
-68
lines changed

9 files changed

+263
-68
lines changed

engines/ep/src/dcp/stream.cc

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,10 +1075,20 @@ void ActiveStreamCheckpointProcessorTask::addStats(const std::string& name,
10751075
}
10761076

10771077
void ActiveStream::nextCheckpointItemTask() {
1078+
// MB-29369: Obtain stream mutex here
1079+
LockHolder lh(streamMutex);
1080+
nextCheckpointItemTask(lh);
1081+
}
1082+
1083+
void ActiveStream::nextCheckpointItemTask(const LockHolder& streamMutex) {
10781084
VBucketPtr vbucket = engine->getVBucket(vb_);
10791085
if (vbucket) {
1080-
auto items = getOutstandingItems(*vbucket);
1081-
processItems(items);
1086+
// MB-29369: only run the task's work if the stream is in an in-memory
1087+
// phase (of which takeover is a variant).
1088+
if (isInMemory() || isTakeoverSend()) {
1089+
auto items = getOutstandingItems(*vbucket);
1090+
processItems(items, streamMutex);
1091+
}
10821092
} else {
10831093
/* The entity deleting the vbucket must set stream to dead,
10841094
calling setDead(END_STREAM_STATE) will cause deadlock because
@@ -1207,7 +1217,8 @@ std::unique_ptr<DcpResponse> ActiveStream::makeResponseFromItem(
12071217
}
12081218
}
12091219

1210-
void ActiveStream::processItems(std::vector<queued_item>& items) {
1220+
void ActiveStream::processItems(std::vector<queued_item>& items,
1221+
const LockHolder& streamMutex) {
12111222
if (!items.empty()) {
12121223
bool mark = false;
12131224
if (items.front()->getOperation() == queue_op::checkpoint_start) {
@@ -1243,7 +1254,7 @@ void ActiveStream::processItems(std::vector<queued_item>& items) {
12431254
if (mutations.empty()) {
12441255
// If we only got checkpoint start or ends check to see if there are
12451256
// any more snapshots before pausing the stream.
1246-
nextCheckpointItemTask();
1257+
nextCheckpointItemTask(streamMutex);
12471258
} else {
12481259
snapshot(mutations, mark);
12491260
}
@@ -1267,19 +1278,6 @@ void ActiveStream::snapshot(std::deque<std::unique_ptr<DcpResponse>>& items,
12671278
return;
12681279
}
12691280

1270-
LockHolder lh(streamMutex);
1271-
1272-
if (!isActive() || isBackfilling()) {
1273-
// If stream was closed forcefully by the time the checkpoint items
1274-
// retriever task completed, or if we decided to switch the stream to
1275-
// backfill state from in-memory state, none of the acquired mutations
1276-
// should be added on the stream's readyQ. We must drop items in case
1277-
// we switch state from in-memory to backfill because we schedule
1278-
// backfill from lastReadSeqno + 1
1279-
items.clear();
1280-
return;
1281-
}
1282-
12831281
/* This assumes that all items in the "items deque" is put onto readyQ */
12841282
lastReadSeqno.store(lastReadSeqnoUnSnapshotted);
12851283

engines/ep/src/dcp/stream.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,8 @@ class ActiveStream : public Stream,
322322

323323
// Given a set of queued items, create mutation responses for each item,
324324
// and pass onto the producer associated with this stream.
325-
void processItems(std::vector<queued_item>& items);
325+
void processItems(std::vector<queued_item>& items,
326+
const LockHolder& streamMutex);
326327

327328
bool nextCheckpointItem();
328329

@@ -359,6 +360,12 @@ class ActiveStream : public Stream,
359360
virtual void registerCursor(CheckpointManager& chkptmgr,
360361
uint64_t lastProcessedSeqno);
361362

363+
/**
364+
* Unlocked variant of nextCheckpointItemTask caller must obtain
365+
* streamMutex and pass a reference to it
366+
* @param streamMutex reference to lockholder
367+
*/
368+
void nextCheckpointItemTask(const LockHolder& streamMutex);
362369

363370
/* Indicates that a backfill has been scheduled and has not yet completed.
364371
* Is protected (as opposed to private) for testing purposes.

engines/ep/tests/mock/mock_dcp_producer.cc

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919

2020
#include "mock_stream.h"
2121

22-
void MockDcpProducer::mockActiveStreamRequest(uint32_t flags,
23-
uint32_t opaque,
24-
VBucket& vb,
25-
uint64_t start_seqno,
26-
uint64_t end_seqno,
27-
uint64_t vbucket_uuid,
28-
uint64_t snap_start_seqno,
29-
uint64_t snap_end_seqno) {
22+
std::shared_ptr<MockActiveStream> MockDcpProducer::mockActiveStreamRequest(
23+
uint32_t flags,
24+
uint32_t opaque,
25+
VBucket& vb,
26+
uint64_t start_seqno,
27+
uint64_t end_seqno,
28+
uint64_t vbucket_uuid,
29+
uint64_t snap_start_seqno,
30+
uint64_t snap_end_seqno) {
3031
auto stream = std::make_shared<MockActiveStream>(
3132
static_cast<EventuallyPersistentEngine*>(&engine_),
3233
std::static_pointer_cast<MockDcpProducer>(shared_from_this()),
@@ -45,4 +46,5 @@ void MockDcpProducer::mockActiveStreamRequest(uint32_t flags,
4546
"failed to insert requested stream");
4647
}
4748
notifyStreamReady(vb.getId());
49+
return stream;
4850
}

engines/ep/tests/mock/mock_dcp_producer.h

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#include "kv_bucket.h"
2424
#include "mock_dcp_backfill_mgr.h"
2525

26+
class MockActiveStream;
27+
2628
/*
2729
* Mock of the DcpProducer class. Wraps the real DcpProducer, but exposes
2830
* normally protected methods publically for test purposes.
@@ -147,12 +149,13 @@ class MockDcpProducer : public DcpProducer {
147149
/**
148150
* Place a mock active stream into the producer
149151
*/
150-
void mockActiveStreamRequest(uint32_t flags,
151-
uint32_t opaque,
152-
VBucket& vb,
153-
uint64_t start_seqno,
154-
uint64_t end_seqno,
155-
uint64_t vbucket_uuid,
156-
uint64_t snap_start_seqno,
157-
uint64_t snap_end_seqno);
152+
std::shared_ptr<MockActiveStream> mockActiveStreamRequest(
153+
uint32_t flags,
154+
uint32_t opaque,
155+
VBucket& vb,
156+
uint64_t start_seqno,
157+
uint64_t end_seqno,
158+
uint64_t vbucket_uuid,
159+
uint64_t snap_start_seqno,
160+
uint64_t snap_end_seqno);
158161
};

engines/ep/tests/mock/mock_stream.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ class MockActiveStream : public ActiveStream {
6565
}
6666

6767
void public_processItems(std::vector<queued_item>& items) {
68-
processItems(items);
68+
LockHolder lh(streamMutex);
69+
processItems(items, lh);
6970
}
7071

7172
bool public_nextCheckpointItem() {

engines/ep/tests/module_tests/dcp_test.cc

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ class DCPTest : public EventuallyPersistentEngineTest {
135135
stream->getCursorName(), 1, false, MustSendCheckpointEnd::NO))
136136
<< "Found an existing TAP cursor when attempting to register "
137137
"ours";
138+
stream->setActive();
138139
}
139140

140141
void destroy_dcp_stream() {
@@ -1104,36 +1105,6 @@ TEST_P(StreamTest, MB17653_ItemsRemaining) {
11041105
destroy_dcp_stream();
11051106
}
11061107

1107-
TEST_P(StreamTest, test_mb18625) {
1108-
// Add an item.
1109-
store_item(vbid, "key", "value");
1110-
1111-
setup_dcp_stream();
1112-
1113-
// Should start with nextCheckpointItem() returning true.
1114-
EXPECT_TRUE(stream->public_nextCheckpointItem())
1115-
<< "nextCheckpointItem() should initially be true.";
1116-
1117-
// Get the set of outstanding items
1118-
auto items = stream->public_getOutstandingItems(*vb0);
1119-
1120-
// Set stream to DEAD to simulate a close stream request
1121-
stream->setDead(END_STREAM_CLOSED);
1122-
1123-
// Process the set of items retrieved from checkpoint queues previously
1124-
stream->public_processItems(items);
1125-
1126-
// Retrieve the next message in the stream's readyQ
1127-
auto op = stream->public_nextQueuedItem();
1128-
EXPECT_EQ(DcpResponse::Event::StreamEnd, op->getEvent())
1129-
<< "Expected the STREAM_END message";
1130-
1131-
// Expect no other message to be queued after stream end message
1132-
EXPECT_EQ(0, (stream->public_readyQ()).size())
1133-
<< "Expected no more messages in the readyQ";
1134-
destroy_dcp_stream();
1135-
}
1136-
11371108
/* Stream items from a DCP backfill */
11381109
TEST_P(StreamTest, BackfillOnly) {
11391110
/* Add 3 items */
@@ -1372,7 +1343,6 @@ TEST_P(StreamTest, CursorDroppingBasicNotAllowedStates) {
13721343

13731344
/* Transition stream to takeoverSend state and expect cursor dropping call
13741345
to fail */
1375-
stream->transitionStateToBackfilling();
13761346
stream->transitionStateToTakeoverSend();
13771347
EXPECT_FALSE(stream->public_handleSlowStream());
13781348

0 commit comments

Comments
 (0)