Skip to content

Commit 05ebb6d

Browse files
committed
Merge 'ep-engine/watson' into 'kv-engine/watson_ep'
* ep-engine/watson: MB-29381: Allow DCP rollback on vbuckets in pending state MB-29287: Test that takeover finds all items MB-29287: Give each ActiveStream a unique cursor name Change-Id: I8753ecdaa8ebd5f52f3c319542da346e3eaeb45c
2 parents 9e750e3 + f4982c3 commit 05ebb6d

15 files changed

+391
-59
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ TARGET_LINK_LIBRARIES(ep cJSON JSON_checker couchstore forestdb
185185
# so simpler / quicker just to link them into a single executable).
186186
ADD_EXECUTABLE(ep-engine_ep_unit_tests
187187
tests/mock/mock_dcp.cc
188+
tests/mock/mock_dcp_producer.cc
188189
tests/module_tests/checkpoint_test.cc
189190
tests/module_tests/defragmenter_test.cc
190191
tests/module_tests/ep_unit_tests_main.cc

src/dcp/producer.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -799,12 +799,12 @@ void DcpProducer::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) {
799799
}
800800

801801
bool DcpProducer::handleSlowStream(uint16_t vbid,
802-
const std::string &name) {
802+
const std::string& cursorName) {
803803
if (supportsCursorDropping) {
804804
stream_t stream = findStreamByVbid(vbid);
805805
if (stream) {
806-
if (stream->getName().compare(name) == 0) {
807-
ActiveStream* as = static_cast<ActiveStream*>(stream.get());
806+
ActiveStream* as = static_cast<ActiveStream*>(stream.get());
807+
if (as->getCursorName() == cursorName) {
808808
as->handleSlowStream();
809809
return true;
810810
}

src/dcp/producer.h

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,17 @@ class DcpProducer : public Producer {
7272

7373
void vbucketStateChanged(uint16_t vbucket, vbucket_state_t state);
7474

75-
/* This function handles a stream that is detected as slow by the checkpoint
76-
remover. Currently we handle the slow stream by switching from in-memory
77-
to backfilling */
78-
bool handleSlowStream(uint16_t vbid, const std::string &name);
75+
/**
76+
* This function handles a stream that is detected as slow by the checkpoint
77+
* remover. Currently we handle the slow stream by switching from in-memory
78+
* to backfilling.
79+
*
80+
* @param vbid vbucket the checkpoint-remover is processing
81+
* @param cursorName the cursor name registered in the checkpoint manager
82+
* which is slow.
83+
* @return true if the cursor was removed from the checkpoint manager
84+
*/
85+
bool handleSlowStream(uint16_t vbid, const std::string& cursorName);
7986

8087
void closeAllStreams();
8188

@@ -230,7 +237,7 @@ class DcpProducer : public Producer {
230237

231238
ExTask checkpointCreatorTask;
232239

233-
private:
240+
protected:
234241

235242

236243
DcpResponse* getNextItem();

src/dcp/stream.cc

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,8 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
182182
engine(e), producer(p),
183183
payloadType((flags & DCP_ADD_STREAM_FLAG_NO_VALUE) ? KEY_ONLY :
184184
KEY_VALUE),
185-
lastSentSnapEndSeqno(0), chkptItemsExtractionInProgress(false) {
186-
185+
lastSentSnapEndSeqno(0), chkptItemsExtractionInProgress(false),
186+
cursorName(n + std::to_string(cursorUID.fetch_add(1))) {
187187
const char* type = "";
188188
if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
189189
type = "takeover ";
@@ -338,7 +338,7 @@ void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
338338
try {
339339
CursorRegResult result =
340340
vb->checkpointManager.registerCursorBySeqno(
341-
name_, chkCursorSeqno,
341+
cursorName, chkCursorSeqno,
342342
MustSendCheckpointEnd::NO);
343343

344344
curChkSeqno = result.first;
@@ -696,7 +696,7 @@ void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
696696
item_eviction_policy_t iep = engine->getEpStore()->getItemEvictionPolicy();
697697
size_t vb_items = vb->getNumItems(iep);
698698
size_t chk_items = vb_items > 0 ?
699-
vb->checkpointManager.getNumItemsForCursor(name_) : 0;
699+
vb->checkpointManager.getNumItemsForCursor(cursorName) : 0;
700700

701701
size_t del_items = 0;
702702
try {
@@ -747,7 +747,7 @@ DcpResponse* ActiveStream::nextQueuedItem() {
747747

748748
bool ActiveStream::nextCheckpointItem() {
749749
RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
750-
if (vbucket && vbucket->checkpointManager.getNumItemsForCursor(name_) > 0) {
750+
if (vbucket && vbucket->checkpointManager.getNumItemsForCursor(cursorName) > 0) {
751751
// schedule this stream to build the next checkpoint
752752
producer->scheduleCheckpointProcessorTask(this);
753753
return true;
@@ -837,7 +837,7 @@ void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
837837
// Commencing item processing - set guard flag.
838838
chkptItemsExtractionInProgress.store(true);
839839

840-
vb->checkpointManager.getAllItemsForCursor(name_, items);
840+
vb->checkpointManager.getAllItemsForCursor(cursorName, items);
841841
if (vb->checkpointManager.getNumCheckpoints() > 1) {
842842
engine->getEpStore()->wakeUpCheckpointRemover();
843843
}
@@ -1047,7 +1047,8 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
10471047
try {
10481048
std::tie(curChkSeqno, tryBackfill) =
10491049
vbucket->checkpointManager.registerCursorBySeqno(
1050-
name_, lastReadSeqno.load(),
1050+
cursorName,
1051+
lastReadSeqno.load(),
10511052
MustSendCheckpointEnd::NO);
10521053
} catch(std::exception& error) {
10531054
producer->getLogger().log(EXTENSION_LOG_WARNING,
@@ -1125,11 +1126,12 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
11251126
*/
11261127
try {
11271128
CursorRegResult result =
1128-
vbucket->checkpointManager.registerCursorBySeqno(
1129-
name_, lastReadSeqno.load(),
1130-
MustSendCheckpointEnd::NO);
1129+
vbucket->checkpointManager.registerCursorBySeqno(
1130+
cursorName,
1131+
lastReadSeqno.load(),
1132+
MustSendCheckpointEnd::NO);
11311133

1132-
curChkSeqno = result.first;
1134+
curChkSeqno = result.first;
11331135
} catch (std::exception& error) {
11341136
producer->getLogger().log(EXTENSION_LOG_WARNING,
11351137
"(vb %" PRIu16 ") Failed to register "
@@ -1307,7 +1309,7 @@ void ActiveStream::transitionState(stream_state_t newState) {
13071309
{
13081310
RCPtr<VBucket> vb = engine->getVBucket(vb_);
13091311
if (vb) {
1310-
vb->checkpointManager.removeCursor(name_);
1312+
vb->checkpointManager.removeCursor(cursorName);
13111313
}
13121314
break;
13131315
}
@@ -1330,8 +1332,8 @@ size_t ActiveStream::getItemsRemaining() {
13301332
// Items remaining is the sum of:
13311333
// (a) Items outstanding in checkpoints
13321334
// (b) Items pending in our readyQ, excluding any meta items.
1333-
return vbucket->checkpointManager.getNumItemsForCursor(name_) +
1334-
readyQ_non_meta_items;
1335+
return vbucket->checkpointManager.getNumItemsForCursor(cursorName) +
1336+
readyQ_non_meta_items;
13351337
}
13361338

13371339
uint64_t ActiveStream::getLastReadSeqno() const {
@@ -1377,9 +1379,11 @@ void ActiveStream::dropCheckpointCursor_UNLOCKED()
13771379
}
13781380
}
13791381
/* Drop the existing cursor */
1380-
vbucket->checkpointManager.removeCursor(name_);
1382+
vbucket->checkpointManager.removeCursor(cursorName);
13811383
}
13821384

1385+
std::atomic<uint64_t> ActiveStream::cursorUID;
1386+
13831387
NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
13841388
const std::string &name, uint32_t flags,
13851389
uint32_t opaque, uint16_t vb, uint64_t st_seqno,

src/dcp/stream.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,9 +257,14 @@ class ActiveStream : public Stream {
257257
in-memory to backfilling */
258258
void handleSlowStream();
259259

260+
/// @return a const reference to the streams cursor name
261+
const std::string& getCursorName() const {
262+
return cursorName;
263+
}
264+
260265
protected:
261266
// Returns the outstanding items for the stream's checkpoint cursor.
262-
void getOutstandingItems(RCPtr<VBucket> &vb, std::vector<queued_item> &items);
267+
virtual void getOutstandingItems(RCPtr<VBucket> &vb, std::vector<queued_item> &items);
263268

264269
// Given a set of queued items, create mutation responses for each item,
265270
// and pass onto the producer associated with this stream.
@@ -381,6 +386,17 @@ class ActiveStream : public Stream {
381386
items are added to the readyQ */
382387
AtomicValue<bool> chkptItemsExtractionInProgress;
383388

389+
/**
390+
* The name which uniquely identifies this stream's checkpoint cursor
391+
*/
392+
std::string cursorName;
393+
394+
/**
395+
* To ensure each stream gets a unique cursorName, we maintain a 'uid'
396+
* which is really just an incrementing uint64
397+
*/
398+
static std::atomic<uint64_t> cursorUID;
399+
384400
};
385401

386402

src/ep.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4008,7 +4008,8 @@ TaskStatus EventuallyPersistentStore::rollback(uint16_t vbid,
40084008
}
40094009

40104010
ReaderLockHolder rlh(vb->getStateLock());
4011-
if (vb->getState() == vbucket_state_replica) {
4011+
if ((vb->getState() == vbucket_state_replica) ||
4012+
(vb->getState() == vbucket_state_pending)) {
40124013
uint64_t prevHighSeqno =
40134014
static_cast<uint64_t>(vb->checkpointManager.getHighSeqno());
40144015
if (rollbackSeqno != 0) {

tests/mock/mock_dcp_producer.cc

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2018 Couchbase, Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "mock_dcp_producer.h"
19+
20+
#include "mock_stream.h"
21+
22+
void MockDcpProducer::mockActiveStreamRequest(uint32_t flags,
23+
uint32_t opaque,
24+
uint16_t vbucket,
25+
uint64_t start_seqno,
26+
uint64_t end_seqno,
27+
uint64_t vbucket_uuid,
28+
uint64_t snap_start_seqno,
29+
uint64_t snap_end_seqno) {
30+
stream_t stream = new MockActiveStream(
31+
static_cast<EventuallyPersistentEngine*>(&engine_),
32+
this,
33+
getName(),
34+
flags,
35+
opaque,
36+
vbucket,
37+
start_seqno,
38+
end_seqno,
39+
vbucket_uuid,
40+
snap_start_seqno,
41+
snap_end_seqno);
42+
stream->setActive();
43+
auto insertResult = streams.insert(std::make_pair(vbucket, stream));
44+
if (!insertResult.second) {
45+
throw std::logic_error("MockDcpProducer::mockActiveStreamRequest "
46+
"failed to insert requested stream");
47+
}
48+
notifyStreamReady(vbucket);
49+
}

tests/mock/mock_dcp_producer.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,30 @@ class MockDcpProducer: public DcpProducer {
6060
return *static_cast<ActiveStreamCheckpointProcessorTask*>(
6161
checkpointCreatorTask.get());
6262
}
63+
64+
/**
65+
* Finds the stream for a given vbucket
66+
*/
67+
SingleThreadedRCPtr<Stream> findStream(uint16_t vbid) {
68+
auto it = streams.find(vbid);
69+
if (it != streams.end()) {
70+
return it->second;
71+
} else {
72+
return SingleThreadedRCPtr<Stream>();
73+
}
74+
}
75+
76+
/**
77+
* Place a mock active stream into the producer
78+
*/
79+
void mockActiveStreamRequest(uint32_t flags,
80+
uint32_t opaque,
81+
uint16_t vbucket,
82+
uint64_t start_seqno,
83+
uint64_t end_seqno,
84+
uint64_t vbucket_uuid,
85+
uint64_t snap_start_seqno,
86+
uint64_t snap_end_seqno);
6387
};
6488

6589
using mock_dcp_producer_t = SingleThreadedRCPtr<MockDcpProducer>;

tests/mock/mock_stream.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
#pragma once
1919

20+
#include "dcp/consumer.h"
21+
#include "dcp/producer.h"
2022
#include "dcp/stream.h"
2123

2224
/*
@@ -90,4 +92,13 @@ class MockActiveStream : public ActiveStream {
9092
void transitionStateToBackfilling() {
9193
transitionState(STREAM_BACKFILLING);
9294
}
95+
96+
virtual void getOutstandingItems(RCPtr<VBucket>& vb,
97+
std::vector<queued_item>& items) override {
98+
preGetOutstandingItemsCallback();
99+
ActiveStream::getOutstandingItems(vb, items);
100+
}
101+
102+
/// A callback to allow tests to inject code before we access the checkpoint
103+
std::function<void()> preGetOutstandingItemsCallback = [] { return; };
93104
};

tests/module_tests/dcp_test.cc

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class StreamTest : public DCPTest {
103103
vb0 = engine->getVBucket(0);
104104
EXPECT_TRUE(vb0) << "Failed to get valid VBucket object for id 0";
105105
EXPECT_FALSE(vb0->checkpointManager.registerCursor(
106-
producer->getName(),
106+
static_cast<ActiveStream*>(stream.get())->getCursorName(),
107107
1, false,
108108
MustSendCheckpointEnd::NO))
109109
<< "Found an existing TAP cursor when attempting to register ours";
@@ -973,21 +973,8 @@ class ActiveStreamChkptProcessorTaskTest : public SingleThreadedEPStoreTest {
973973
}
974974

975975
void notifyAndStepToCheckpoint() {
976-
auto vb = store->getVBucket(vbid);
977-
ASSERT_NE(nullptr, vb.get());
978-
979-
producer->notifySeqnoAvailable(vbid, vb->getHighSeqno());
980-
981-
/* Step which will notify the checkpoint processor task */
982-
EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
983-
EXPECT_EQ(1, producer->getCheckpointSnapshotTask().queueSize());
984-
985-
/* Run the task */
986-
producer->getCheckpointSnapshotTask().run();
987-
988-
/* This time the step should return something that is read from the
989-
checkpoint processor */
990-
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
976+
SingleThreadedEPStoreTest::notifyAndStepToCheckpoint(*producer,
977+
producers.get());
991978
}
992979

993980
const void* cookie;

0 commit comments

Comments
 (0)