Skip to content

Commit 3ae001d

Browse files
jimwwalkerdaverigby
authored andcommitted
MB-29287: Give each ActiveStream a unique cursor name
To ensure that when the background snapshot processor task runs and we have closed/created the ActiveStream, if the task gets a handle on the closed stream we must be sure it does not obtain items destined for the new stream. Previously with each ActiveStream just using the name of its producer, the closed stream was able to drain items which the new stream needed. Adding a monotonic atomic to the ActiveStream class and appending a new value to the name we use for the cursor ensures concurrent streams cannot interfere with each other. Change-Id: Ie05092490a75c656c344425850eba00043019e96 Reviewed-on: http://review.couchbase.org/93112 Well-Formed: Build Bot <[email protected]> Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 20c6ce3 commit 3ae001d

File tree

5 files changed

+50
-23
lines changed

5 files changed

+50
-23
lines changed

src/dcp/producer.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -799,12 +799,12 @@ void DcpProducer::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) {
799799
}
800800

801801
bool DcpProducer::handleSlowStream(uint16_t vbid,
802-
const std::string &name) {
802+
const std::string& cursorName) {
803803
if (supportsCursorDropping) {
804804
stream_t stream = findStreamByVbid(vbid);
805805
if (stream) {
806-
if (stream->getName().compare(name) == 0) {
807-
ActiveStream* as = static_cast<ActiveStream*>(stream.get());
806+
ActiveStream* as = static_cast<ActiveStream*>(stream.get());
807+
if (as->getCursorName() == cursorName) {
808808
as->handleSlowStream();
809809
return true;
810810
}

src/dcp/producer.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,17 @@ class DcpProducer : public Producer {
7272

7373
void vbucketStateChanged(uint16_t vbucket, vbucket_state_t state);
7474

75-
/* This function handles a stream that is detected as slow by the checkpoint
76-
remover. Currently we handle the slow stream by switching from in-memory
77-
to backfilling */
78-
bool handleSlowStream(uint16_t vbid, const std::string &name);
75+
/**
76+
* This function handles a stream that is detected as slow by the checkpoint
77+
* remover. Currently we handle the slow stream by switching from in-memory
78+
* to backfilling.
79+
*
80+
* @param vbid vbucket the checkpoint-remover is processing
81+
* @param cursorName the cursor name registered in the checkpoint manager
82+
* which is slow.
83+
* @return true if the cursor was removed from the checkpoint manager
84+
*/
85+
bool handleSlowStream(uint16_t vbid, const std::string& cursorName);
7986

8087
void closeAllStreams();
8188

src/dcp/stream.cc

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,8 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
182182
engine(e), producer(p),
183183
payloadType((flags & DCP_ADD_STREAM_FLAG_NO_VALUE) ? KEY_ONLY :
184184
KEY_VALUE),
185-
lastSentSnapEndSeqno(0), chkptItemsExtractionInProgress(false) {
186-
185+
lastSentSnapEndSeqno(0), chkptItemsExtractionInProgress(false),
186+
cursorName(n + std::to_string(cursorUID.fetch_add(1))) {
187187
const char* type = "";
188188
if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
189189
type = "takeover ";
@@ -338,7 +338,7 @@ void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
338338
try {
339339
CursorRegResult result =
340340
vb->checkpointManager.registerCursorBySeqno(
341-
name_, chkCursorSeqno,
341+
cursorName, chkCursorSeqno,
342342
MustSendCheckpointEnd::NO);
343343

344344
curChkSeqno = result.first;
@@ -696,7 +696,7 @@ void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
696696
item_eviction_policy_t iep = engine->getEpStore()->getItemEvictionPolicy();
697697
size_t vb_items = vb->getNumItems(iep);
698698
size_t chk_items = vb_items > 0 ?
699-
vb->checkpointManager.getNumItemsForCursor(name_) : 0;
699+
vb->checkpointManager.getNumItemsForCursor(cursorName) : 0;
700700

701701
size_t del_items = 0;
702702
try {
@@ -747,7 +747,7 @@ DcpResponse* ActiveStream::nextQueuedItem() {
747747

748748
bool ActiveStream::nextCheckpointItem() {
749749
RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
750-
if (vbucket && vbucket->checkpointManager.getNumItemsForCursor(name_) > 0) {
750+
if (vbucket && vbucket->checkpointManager.getNumItemsForCursor(cursorName) > 0) {
751751
// schedule this stream to build the next checkpoint
752752
producer->scheduleCheckpointProcessorTask(this);
753753
return true;
@@ -837,7 +837,7 @@ void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
837837
// Commencing item processing - set guard flag.
838838
chkptItemsExtractionInProgress.store(true);
839839

840-
vb->checkpointManager.getAllItemsForCursor(name_, items);
840+
vb->checkpointManager.getAllItemsForCursor(cursorName, items);
841841
if (vb->checkpointManager.getNumCheckpoints() > 1) {
842842
engine->getEpStore()->wakeUpCheckpointRemover();
843843
}
@@ -1047,7 +1047,8 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
10471047
try {
10481048
std::tie(curChkSeqno, tryBackfill) =
10491049
vbucket->checkpointManager.registerCursorBySeqno(
1050-
name_, lastReadSeqno.load(),
1050+
cursorName,
1051+
lastReadSeqno.load(),
10511052
MustSendCheckpointEnd::NO);
10521053
} catch(std::exception& error) {
10531054
producer->getLogger().log(EXTENSION_LOG_WARNING,
@@ -1125,11 +1126,12 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
11251126
*/
11261127
try {
11271128
CursorRegResult result =
1128-
vbucket->checkpointManager.registerCursorBySeqno(
1129-
name_, lastReadSeqno.load(),
1130-
MustSendCheckpointEnd::NO);
1129+
vbucket->checkpointManager.registerCursorBySeqno(
1130+
cursorName,
1131+
lastReadSeqno.load(),
1132+
MustSendCheckpointEnd::NO);
11311133

1132-
curChkSeqno = result.first;
1134+
curChkSeqno = result.first;
11331135
} catch (std::exception& error) {
11341136
producer->getLogger().log(EXTENSION_LOG_WARNING,
11351137
"(vb %" PRIu16 ") Failed to register "
@@ -1307,7 +1309,7 @@ void ActiveStream::transitionState(stream_state_t newState) {
13071309
{
13081310
RCPtr<VBucket> vb = engine->getVBucket(vb_);
13091311
if (vb) {
1310-
vb->checkpointManager.removeCursor(name_);
1312+
vb->checkpointManager.removeCursor(cursorName);
13111313
}
13121314
break;
13131315
}
@@ -1330,8 +1332,8 @@ size_t ActiveStream::getItemsRemaining() {
13301332
// Items remaining is the sum of:
13311333
// (a) Items outstanding in checkpoints
13321334
// (b) Items pending in our readyQ, excluding any meta items.
1333-
return vbucket->checkpointManager.getNumItemsForCursor(name_) +
1334-
readyQ_non_meta_items;
1335+
return vbucket->checkpointManager.getNumItemsForCursor(cursorName) +
1336+
readyQ_non_meta_items;
13351337
}
13361338

13371339
uint64_t ActiveStream::getLastReadSeqno() const {
@@ -1377,9 +1379,11 @@ void ActiveStream::dropCheckpointCursor_UNLOCKED()
13771379
}
13781380
}
13791381
/* Drop the existing cursor */
1380-
vbucket->checkpointManager.removeCursor(name_);
1382+
vbucket->checkpointManager.removeCursor(cursorName);
13811383
}
13821384

1385+
std::atomic<uint64_t> ActiveStream::cursorUID;
1386+
13831387
NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
13841388
const std::string &name, uint32_t flags,
13851389
uint32_t opaque, uint16_t vb, uint64_t st_seqno,

src/dcp/stream.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,11 @@ class ActiveStream : public Stream {
257257
in-memory to backfilling */
258258
void handleSlowStream();
259259

260+
/// @return a const reference to the streams cursor name
261+
const std::string& getCursorName() const {
262+
return cursorName;
263+
}
264+
260265
protected:
261266
// Returns the outstanding items for the stream's checkpoint cursor.
262267
void getOutstandingItems(RCPtr<VBucket> &vb, std::vector<queued_item> &items);
@@ -381,6 +386,17 @@ class ActiveStream : public Stream {
381386
items are added to the readyQ */
382387
AtomicValue<bool> chkptItemsExtractionInProgress;
383388

389+
/**
390+
* The name which uniquely identifies this stream's checkpoint cursor
391+
*/
392+
std::string cursorName;
393+
394+
/**
395+
* To ensure each stream gets a unique cursorName, we maintain a 'uid'
396+
* which is really just an incrementing uint64
397+
*/
398+
static std::atomic<uint64_t> cursorUID;
399+
384400
};
385401

386402

tests/module_tests/dcp_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class StreamTest : public DCPTest {
103103
vb0 = engine->getVBucket(0);
104104
EXPECT_TRUE(vb0) << "Failed to get valid VBucket object for id 0";
105105
EXPECT_FALSE(vb0->checkpointManager.registerCursor(
106-
producer->getName(),
106+
static_cast<ActiveStream*>(stream.get())->getCursorName(),
107107
1, false,
108108
MustSendCheckpointEnd::NO))
109109
<< "Found an existing TAP cursor when attempting to register ours";

0 commit comments

Comments
 (0)