Skip to content

Commit 0a8ea27

Browse files
committed
MB-29287: Merge branch 'couchbase/5.1.0' into 'couchbase/spock'
* couchbase/5.1.0: MB-29287: Test that takeover finds all items MB-29287: Give each ActiveStream a unique cursor name Change-Id: I26962e40fac08dd6df229fdc8cd7be5654403036
2 parents a52ce0a + 787236b commit 0a8ea27

File tree

11 files changed

+351
-42
lines changed

11 files changed

+351
-42
lines changed

engines/ep/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ TARGET_LINK_LIBRARIES(ep cJSON JSON_checker couchstore ${EP_FORESTDB_LIB}
242242
# so simpler / quicker just to link them into a single executable).
243243
ADD_EXECUTABLE(ep-engine_ep_unit_tests
244244
tests/mock/mock_dcp.cc
245+
tests/mock/mock_dcp_producer.cc
245246
tests/mock/mock_ephemeral_vb.cc
246247
tests/mock/mock_synchronous_ep_engine.cc
247248
tests/module_tests/atomic_unordered_map_test.cc

engines/ep/src/dcp/producer.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -997,12 +997,12 @@ void DcpProducer::closeStreamDueToRollback(uint16_t vbucket) {
997997
}
998998

999999
bool DcpProducer::handleSlowStream(uint16_t vbid,
1000-
const std::string &name) {
1000+
const std::string& cursorName) {
10011001
if (supportsCursorDropping) {
10021002
auto stream = findStream(vbid);
10031003
if (stream) {
1004-
if (stream->getName().compare(name) == 0) {
1005-
ActiveStream* as = static_cast<ActiveStream*>(stream.get());
1004+
ActiveStream* as = static_cast<ActiveStream*>(stream.get());
1005+
if (as->getCursorName() == cursorName) {
10061006
return as->handleSlowStream();
10071007
}
10081008
}

engines/ep/src/dcp/producer.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,17 @@ class DcpProducer : public ConnHandler {
104104

105105
void closeStreamDueToRollback(uint16_t vbucket);
106106

107-
/* This function handles a stream that is detected as slow by the checkpoint
108-
remover. Currently we handle the slow stream by switching from in-memory
109-
to backfilling */
110-
bool handleSlowStream(uint16_t vbid, const std::string &name);
107+
/**
108+
* This function handles a stream that is detected as slow by the checkpoint
109+
* remover. Currently we handle the slow stream by switching from in-memory
110+
* to backfilling.
111+
*
112+
* @param vbid vbucket the checkpoint-remover is processing
113+
* @param cursorName the cursor name registered in the checkpoint manager
114+
* which is slow.
115+
* @return true if the cursor was removed from the checkpoint manager
116+
*/
117+
bool handleSlowStream(uint16_t vbid, const std::string& cursorName);
111118

112119
void closeAllStreams();
113120

engines/ep/src/dcp/stream.cc

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,8 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e,
268268
chkptItemsExtractionInProgress(false),
269269
includeValue(includeVal),
270270
includeXattributes(includeXattrs),
271-
filter(std::move(filter)) {
271+
filter(std::move(filter)),
272+
cursorName(n + std::to_string(cursorUID.fetch_add(1))) {
272273
const char* type = "";
273274
if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
274275
type = "takeover ";
@@ -364,7 +365,7 @@ void ActiveStream::registerCursor(CheckpointManager& chkptmgr,
364365
uint64_t lastProcessedSeqno) {
365366
try {
366367
CursorRegResult result = chkptmgr.registerCursorBySeqno(
367-
name_, lastProcessedSeqno, MustSendCheckpointEnd::NO);
368+
cursorName, lastProcessedSeqno, MustSendCheckpointEnd::NO);
368369
/*
369370
* MB-22960: Due to cursor dropping we re-register the replication
370371
* cursor only during backfill when we mark the disk snapshot. However
@@ -814,8 +815,9 @@ void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie,
814815
add_stat, cookie);
815816

816817
size_t vb_items = vb.getNumItems();
817-
size_t chk_items = vb_items > 0 ?
818-
vb.checkpointManager.getNumItemsForCursor(name_) : 0;
818+
size_t chk_items =
819+
vb_items > 0 ? vb.checkpointManager.getNumItemsForCursor(cursorName)
820+
: 0;
819821

820822
size_t del_items = 0;
821823
try {
@@ -866,7 +868,8 @@ DcpResponse* ActiveStream::nextQueuedItem() {
866868

867869
bool ActiveStream::nextCheckpointItem() {
868870
VBucketPtr vbucket = engine->getVBucket(vb_);
869-
if (vbucket && vbucket->checkpointManager.getNumItemsForCursor(name_) > 0) {
871+
if (vbucket &&
872+
vbucket->checkpointManager.getNumItemsForCursor(cursorName) > 0) {
870873
// schedule this stream to build the next checkpoint
871874
producer->scheduleCheckpointProcessorTask(this);
872875
return true;
@@ -957,7 +960,7 @@ void ActiveStream::getOutstandingItems(VBucketPtr &vb,
957960
chkptItemsExtractionInProgress.store(true);
958961

959962
hrtime_t _begin_ = gethrtime();
960-
vb->checkpointManager.getAllItemsForCursor(name_, items);
963+
vb->checkpointManager.getAllItemsForCursor(cursorName, items);
961964
engine->getEpStats().dcpCursorsGetItemsHisto.add(
962965
(gethrtime() - _begin_) / 1000);
963966

@@ -1195,7 +1198,8 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
11951198
try {
11961199
std::tie(curChkSeqno, tryBackfill) =
11971200
vbucket->checkpointManager.registerCursorBySeqno(
1198-
name_, lastReadSeqno.load(),
1201+
cursorName,
1202+
lastReadSeqno.load(),
11991203
MustSendCheckpointEnd::NO);
12001204
} catch(std::exception& error) {
12011205
producer->getLogger().log(EXTENSION_LOG_WARNING,
@@ -1273,11 +1277,12 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
12731277
*/
12741278
try {
12751279
CursorRegResult result =
1276-
vbucket->checkpointManager.registerCursorBySeqno(
1277-
name_, lastReadSeqno.load(),
1278-
MustSendCheckpointEnd::NO);
1280+
vbucket->checkpointManager.registerCursorBySeqno(
1281+
cursorName,
1282+
lastReadSeqno.load(),
1283+
MustSendCheckpointEnd::NO);
12791284

1280-
curChkSeqno = result.first;
1285+
curChkSeqno = result.first;
12811286
} catch (std::exception& error) {
12821287
producer->getLogger().log(EXTENSION_LOG_WARNING,
12831288
"(vb %" PRIu16 ") Failed to register "
@@ -1469,7 +1474,7 @@ void ActiveStream::transitionState(StreamState newState) {
14691474
{
14701475
VBucketPtr vb = engine->getVBucket(vb_);
14711476
if (vb) {
1472-
vb->checkpointManager.removeCursor(name_);
1477+
vb->checkpointManager.removeCursor(cursorName);
14731478
}
14741479
break;
14751480
}
@@ -1493,8 +1498,8 @@ size_t ActiveStream::getItemsRemaining() {
14931498
// Items remaining is the sum of:
14941499
// (a) Items outstanding in checkpoints
14951500
// (b) Items pending in our readyQ, excluding any meta items.
1496-
return vbucket->checkpointManager.getNumItemsForCursor(name_) +
1497-
readyQ_non_meta_items;
1501+
return vbucket->checkpointManager.getNumItemsForCursor(cursorName) +
1502+
readyQ_non_meta_items;
14981503
}
14991504

15001505
uint64_t ActiveStream::getLastReadSeqno() const {
@@ -1534,7 +1539,7 @@ bool ActiveStream::dropCheckpointCursor_UNLOCKED() {
15341539
}
15351540
}
15361541
/* Drop the existing cursor */
1537-
return vbucket->checkpointManager.removeCursor(name_);
1542+
return vbucket->checkpointManager.removeCursor(cursorName);
15381543
}
15391544

15401545
EXTENSION_LOG_LEVEL ActiveStream::getTransitionStateLogLevel(
@@ -1576,6 +1581,8 @@ bool ActiveStream::queueResponse(DcpResponse* resp) const {
15761581
return true;
15771582
}
15781583

1584+
std::atomic<uint64_t> ActiveStream::cursorUID;
1585+
15791586
NotifierStream::NotifierStream(EventuallyPersistentEngine* e,
15801587
dcp_producer_t p,
15811588
const std::string& name,

engines/ep/src/dcp/stream.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,9 +299,15 @@ class ActiveStream : public Stream {
299299
return currentSeparator;
300300
}
301301

302+
/// @return a const reference to the streams cursor name
303+
const std::string& getCursorName() const {
304+
return cursorName;
305+
}
306+
302307
protected:
303308
// Returns the outstanding items for the stream's checkpoint cursor.
304-
void getOutstandingItems(VBucketPtr &vb, std::vector<queued_item> &items);
309+
virtual void getOutstandingItems(VBucketPtr& vb,
310+
std::vector<queued_item>& items);
305311

306312
// Given a set of queued items, create mutation responses for each item,
307313
// and pass onto the producer associated with this stream.
@@ -490,6 +496,17 @@ class ActiveStream : public Stream {
490496
* The filter the stream will use to decide which keys should be transmitted
491497
*/
492498
std::unique_ptr<Collections::VB::Filter> filter;
499+
500+
/**
501+
* The name which uniquely identifies this stream's checkpoint cursor
502+
*/
503+
std::string cursorName;
504+
505+
/**
506+
* To ensure each stream gets a unique cursorName, we maintain a 'uid'
507+
* which is really just an incrementing uint64
508+
*/
509+
static std::atomic<uint64_t> cursorUID;
493510
};
494511

495512

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2018 Couchbase, Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "mock_dcp_producer.h"
19+
20+
#include "mock_stream.h"
21+
22+
void MockDcpProducer::mockActiveStreamRequest(uint32_t flags,
23+
uint32_t opaque,
24+
VBucket& vb,
25+
uint64_t start_seqno,
26+
uint64_t end_seqno,
27+
uint64_t vbucket_uuid,
28+
uint64_t snap_start_seqno,
29+
uint64_t snap_end_seqno) {
30+
stream_t stream = new MockActiveStream(
31+
static_cast<EventuallyPersistentEngine*>(&engine_),
32+
this,
33+
flags,
34+
opaque,
35+
vb,
36+
start_seqno,
37+
end_seqno,
38+
vbucket_uuid,
39+
snap_start_seqno,
40+
snap_end_seqno);
41+
stream->setActive();
42+
if (!streams.insert(std::make_pair(vb.getId(), stream))) {
43+
throw std::logic_error(
44+
"MockDcpProducer::mockActiveStreamRequest "
45+
"failed to insert requested stream");
46+
}
47+
notifyStreamReady(vb.getId());
48+
}

engines/ep/tests/mock/mock_dcp_producer.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,18 @@ class MockDcpProducer: public DcpProducer {
129129
BackfillManager& getBFM() {
130130
return *backfillMgr;
131131
}
132+
133+
/**
134+
* Place a mock active stream into the producer
135+
*/
136+
void mockActiveStreamRequest(uint32_t flags,
137+
uint32_t opaque,
138+
VBucket& vb,
139+
uint64_t start_seqno,
140+
uint64_t end_seqno,
141+
uint64_t vbucket_uuid,
142+
uint64_t snap_start_seqno,
143+
uint64_t snap_end_seqno);
132144
};
133145

134146
using mock_dcp_producer_t = SingleThreadedRCPtr<MockDcpProducer>;

engines/ep/tests/mock/mock_stream.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#pragma once
1919

2020
#include "collections/vbucket_filter.h"
21+
#include "dcp/consumer.h"
22+
#include "dcp/producer.h"
2123
#include "dcp/stream.h"
2224
#include "tests/mock/mock_dcp_producer.h"
2325

@@ -148,6 +150,15 @@ class MockActiveStream : public ActiveStream {
148150
void setState(StreamState state) {
149151
state_ = state;
150152
}
153+
154+
virtual void getOutstandingItems(VBucketPtr& vb,
155+
std::vector<queued_item>& items) override {
156+
preGetOutstandingItemsCallback();
157+
ActiveStream::getOutstandingItems(vb, items);
158+
}
159+
160+
/// A callback to allow tests to inject code before we access the checkpoint
161+
std::function<void()> preGetOutstandingItemsCallback = [] { return; };
151162
};
152163

153164
/**

engines/ep/tests/module_tests/dcp_test.cc

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,12 @@ class StreamTest : public DCPTest,
128128
includeXattrs);
129129

130130
EXPECT_FALSE(vb0->checkpointManager.registerCursor(
131-
producer->getName(),
132-
1, false,
133-
MustSendCheckpointEnd::NO))
134-
<< "Found an existing TAP cursor when attempting to register ours";
131+
static_cast<ActiveStream*>(stream.get())->getCursorName(),
132+
1,
133+
false,
134+
MustSendCheckpointEnd::NO))
135+
<< "Found an existing TAP cursor when attempting to register "
136+
"ours";
135137
}
136138

137139
void destroy_dcp_stream() {
@@ -2291,21 +2293,8 @@ class ActiveStreamChkptProcessorTaskTest : public SingleThreadedKVBucketTest {
22912293
}
22922294

22932295
void notifyAndStepToCheckpoint() {
2294-
auto vb = store->getVBucket(vbid);
2295-
ASSERT_NE(nullptr, vb.get());
2296-
2297-
producer->notifySeqnoAvailable(vbid, vb->getHighSeqno());
2298-
2299-
/* Step which will notify the checkpoint processor task */
2300-
EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
2301-
EXPECT_EQ(1, producer->getCheckpointSnapshotTask().queueSize());
2302-
2303-
/* Run the task */
2304-
producer->getCheckpointSnapshotTask().run();
2305-
2306-
/* This time the step should return something that is read from the
2307-
checkpoint processor */
2308-
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
2296+
SingleThreadedKVBucketTest::notifyAndStepToCheckpoint(*producer,
2297+
producers.get());
23092298
}
23102299

23112300
const void* cookie;

0 commit comments

Comments
 (0)