Skip to content

Commit 987e3a2

Browse files
committed
MB-29287: Merge commit '3ae001df' into couchbase/5.1.0
* commit '3ae001df43b4be5dc8e71c7f2f78cb3fa3e26633': MB-29287: Give each ActiveStream a unique cursor name Note this indvidual merge commit is part of merging watson_ep into couchbase/5.1.0 Change-Id: Ia55cd4dbb10deed2224f33257264465960d2a762
2 parents 43bc123 + 3ae001d commit 987e3a2

File tree

5 files changed

+58
-26
lines changed

5 files changed

+58
-26
lines changed

engines/ep/src/dcp/producer.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -997,12 +997,12 @@ void DcpProducer::closeStreamDueToRollback(uint16_t vbucket) {
997997
}
998998

999999
bool DcpProducer::handleSlowStream(uint16_t vbid,
1000-
const std::string &name) {
1000+
const std::string& cursorName) {
10011001
if (supportsCursorDropping) {
10021002
auto stream = findStream(vbid);
10031003
if (stream) {
1004-
if (stream->getName().compare(name) == 0) {
1005-
ActiveStream* as = static_cast<ActiveStream*>(stream.get());
1004+
ActiveStream* as = static_cast<ActiveStream*>(stream.get());
1005+
if (as->getCursorName() == cursorName) {
10061006
return as->handleSlowStream();
10071007
}
10081008
}

engines/ep/src/dcp/producer.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,17 @@ class DcpProducer : public ConnHandler {
104104

105105
void closeStreamDueToRollback(uint16_t vbucket);
106106

107-
/* This function handles a stream that is detected as slow by the checkpoint
108-
remover. Currently we handle the slow stream by switching from in-memory
109-
to backfilling */
110-
bool handleSlowStream(uint16_t vbid, const std::string &name);
107+
/**
108+
* This function handles a stream that is detected as slow by the checkpoint
109+
* remover. Currently we handle the slow stream by switching from in-memory
110+
* to backfilling.
111+
*
112+
* @param vbid vbucket the checkpoint-remover is processing
113+
* @param cursorName the cursor name registered in the checkpoint manager
114+
* which is slow.
115+
* @return true if the cursor was removed from the checkpoint manager
116+
*/
117+
bool handleSlowStream(uint16_t vbid, const std::string& cursorName);
111118

112119
void closeAllStreams();
113120

engines/ep/src/dcp/stream.cc

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,8 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e,
268268
chkptItemsExtractionInProgress(false),
269269
includeValue(includeVal),
270270
includeXattributes(includeXattrs),
271-
filter(std::move(filter)) {
271+
filter(std::move(filter)),
272+
cursorName(n + std::to_string(cursorUID.fetch_add(1))) {
272273
const char* type = "";
273274
if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
274275
type = "takeover ";
@@ -364,7 +365,7 @@ void ActiveStream::registerCursor(CheckpointManager& chkptmgr,
364365
uint64_t lastProcessedSeqno) {
365366
try {
366367
CursorRegResult result = chkptmgr.registerCursorBySeqno(
367-
name_, lastProcessedSeqno, MustSendCheckpointEnd::NO);
368+
cursorName, lastProcessedSeqno, MustSendCheckpointEnd::NO);
368369
/*
369370
* MB-22960: Due to cursor dropping we re-register the replication
370371
* cursor only during backfill when we mark the disk snapshot. However
@@ -814,8 +815,9 @@ void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie,
814815
add_stat, cookie);
815816

816817
size_t vb_items = vb.getNumItems();
817-
size_t chk_items = vb_items > 0 ?
818-
vb.checkpointManager.getNumItemsForCursor(name_) : 0;
818+
size_t chk_items =
819+
vb_items > 0 ? vb.checkpointManager.getNumItemsForCursor(cursorName)
820+
: 0;
819821

820822
size_t del_items = 0;
821823
try {
@@ -866,7 +868,8 @@ DcpResponse* ActiveStream::nextQueuedItem() {
866868

867869
bool ActiveStream::nextCheckpointItem() {
868870
VBucketPtr vbucket = engine->getVBucket(vb_);
869-
if (vbucket && vbucket->checkpointManager.getNumItemsForCursor(name_) > 0) {
871+
if (vbucket &&
872+
vbucket->checkpointManager.getNumItemsForCursor(cursorName) > 0) {
870873
// schedule this stream to build the next checkpoint
871874
producer->scheduleCheckpointProcessorTask(this);
872875
return true;
@@ -957,7 +960,7 @@ void ActiveStream::getOutstandingItems(VBucketPtr &vb,
957960
chkptItemsExtractionInProgress.store(true);
958961

959962
hrtime_t _begin_ = gethrtime();
960-
vb->checkpointManager.getAllItemsForCursor(name_, items);
963+
vb->checkpointManager.getAllItemsForCursor(cursorName, items);
961964
engine->getEpStats().dcpCursorsGetItemsHisto.add(
962965
(gethrtime() - _begin_) / 1000);
963966

@@ -1195,7 +1198,8 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
11951198
try {
11961199
std::tie(curChkSeqno, tryBackfill) =
11971200
vbucket->checkpointManager.registerCursorBySeqno(
1198-
name_, lastReadSeqno.load(),
1201+
cursorName,
1202+
lastReadSeqno.load(),
11991203
MustSendCheckpointEnd::NO);
12001204
} catch(std::exception& error) {
12011205
producer->getLogger().log(EXTENSION_LOG_WARNING,
@@ -1273,11 +1277,12 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
12731277
*/
12741278
try {
12751279
CursorRegResult result =
1276-
vbucket->checkpointManager.registerCursorBySeqno(
1277-
name_, lastReadSeqno.load(),
1278-
MustSendCheckpointEnd::NO);
1280+
vbucket->checkpointManager.registerCursorBySeqno(
1281+
cursorName,
1282+
lastReadSeqno.load(),
1283+
MustSendCheckpointEnd::NO);
12791284

1280-
curChkSeqno = result.first;
1285+
curChkSeqno = result.first;
12811286
} catch (std::exception& error) {
12821287
producer->getLogger().log(EXTENSION_LOG_WARNING,
12831288
"(vb %" PRIu16 ") Failed to register "
@@ -1469,7 +1474,7 @@ void ActiveStream::transitionState(StreamState newState) {
14691474
{
14701475
VBucketPtr vb = engine->getVBucket(vb_);
14711476
if (vb) {
1472-
vb->checkpointManager.removeCursor(name_);
1477+
vb->checkpointManager.removeCursor(cursorName);
14731478
}
14741479
break;
14751480
}
@@ -1493,8 +1498,8 @@ size_t ActiveStream::getItemsRemaining() {
14931498
// Items remaining is the sum of:
14941499
// (a) Items outstanding in checkpoints
14951500
// (b) Items pending in our readyQ, excluding any meta items.
1496-
return vbucket->checkpointManager.getNumItemsForCursor(name_) +
1497-
readyQ_non_meta_items;
1501+
return vbucket->checkpointManager.getNumItemsForCursor(cursorName) +
1502+
readyQ_non_meta_items;
14981503
}
14991504

15001505
uint64_t ActiveStream::getLastReadSeqno() const {
@@ -1534,7 +1539,7 @@ bool ActiveStream::dropCheckpointCursor_UNLOCKED() {
15341539
}
15351540
}
15361541
/* Drop the existing cursor */
1537-
return vbucket->checkpointManager.removeCursor(name_);
1542+
return vbucket->checkpointManager.removeCursor(cursorName);
15381543
}
15391544

15401545
EXTENSION_LOG_LEVEL ActiveStream::getTransitionStateLogLevel(
@@ -1576,6 +1581,8 @@ bool ActiveStream::queueResponse(DcpResponse* resp) const {
15761581
return true;
15771582
}
15781583

1584+
std::atomic<uint64_t> ActiveStream::cursorUID;
1585+
15791586
NotifierStream::NotifierStream(EventuallyPersistentEngine* e,
15801587
dcp_producer_t p,
15811588
const std::string& name,

engines/ep/src/dcp/stream.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,11 @@ class ActiveStream : public Stream {
299299
return currentSeparator;
300300
}
301301

302+
/// @return a const reference to the streams cursor name
303+
const std::string& getCursorName() const {
304+
return cursorName;
305+
}
306+
302307
protected:
303308
// Returns the outstanding items for the stream's checkpoint cursor.
304309
void getOutstandingItems(VBucketPtr &vb, std::vector<queued_item> &items);
@@ -490,6 +495,17 @@ class ActiveStream : public Stream {
490495
* The filter the stream will use to decide which keys should be transmitted
491496
*/
492497
std::unique_ptr<Collections::VB::Filter> filter;
498+
499+
/**
500+
* The name which uniquely identifies this stream's checkpoint cursor
501+
*/
502+
std::string cursorName;
503+
504+
/**
505+
* To ensure each stream gets a unique cursorName, we maintain a 'uid'
506+
* which is really just an incrementing uint64
507+
*/
508+
static std::atomic<uint64_t> cursorUID;
493509
};
494510

495511

engines/ep/tests/module_tests/dcp_test.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,12 @@ class StreamTest : public DCPTest,
128128
includeXattrs);
129129

130130
EXPECT_FALSE(vb0->checkpointManager.registerCursor(
131-
producer->getName(),
132-
1, false,
133-
MustSendCheckpointEnd::NO))
134-
<< "Found an existing TAP cursor when attempting to register ours";
131+
static_cast<ActiveStream*>(stream.get())->getCursorName(),
132+
1,
133+
false,
134+
MustSendCheckpointEnd::NO))
135+
<< "Found an existing TAP cursor when attempting to register "
136+
"ours";
135137
}
136138

137139
void destroy_dcp_stream() {

0 commit comments

Comments
 (0)