Skip to content

Commit 49d69f5

Browse files
committed
Merge branch 'couchbase/spock' into 'couchbase/vulcan'
* 'couchbase/spock': MB-29381: Allow DCP rollback on vbuckets in pending state MB-29287: Test that takeover finds all items MB-29287: Give each ActiveStream a unique cursor name Change-Id: I98a2fdde67c2c003504da8219112434e1f2f1b0a
2 parents 259277f + b8da5d3 commit 49d69f5

File tree

11 files changed

+318
-35
lines changed

11 files changed

+318
-35
lines changed

engines/ep/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS)
268268
# so simpler / quicker just to link them into a single executable).
269269
ADD_EXECUTABLE(ep-engine_ep_unit_tests
270270
tests/mock/mock_dcp.cc
271+
tests/mock/mock_dcp_producer.cc
271272
tests/mock/mock_ephemeral_vb.cc
272273
tests/mock/mock_synchronous_ep_engine.cc
273274
tests/module_tests/atomic_unordered_map_test.cc

engines/ep/src/dcp/producer.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,12 +1047,12 @@ void DcpProducer::closeStreamDueToRollback(uint16_t vbucket) {
10471047
}
10481048

10491049
bool DcpProducer::handleSlowStream(uint16_t vbid,
1050-
const std::string &name) {
1050+
const std::string& cursorName) {
10511051
if (supportsCursorDropping) {
10521052
auto stream = findStream(vbid);
10531053
if (stream) {
1054-
if (stream->getName().compare(name) == 0) {
1055-
ActiveStream* as = static_cast<ActiveStream*>(stream.get());
1054+
ActiveStream* as = static_cast<ActiveStream*>(stream.get());
1055+
if (as->getCursorName() == cursorName) {
10561056
return as->handleSlowStream();
10571057
}
10581058
}

engines/ep/src/dcp/producer.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,17 @@ class DcpProducer : public ConnHandler,
101101

102102
void closeStreamDueToRollback(uint16_t vbucket);
103103

104-
/* This function handles a stream that is detected as slow by the checkpoint
105-
remover. Currently we handle the slow stream by switching from in-memory
106-
to backfilling */
107-
bool handleSlowStream(uint16_t vbid, const std::string &name);
104+
/**
105+
* This function handles a stream that is detected as slow by the checkpoint
106+
* remover. Currently we handle the slow stream by switching from in-memory
107+
* to backfilling.
108+
*
109+
* @param vbid vbucket the checkpoint-remover is processing
110+
* @param cursorName the cursor name registered in the checkpoint manager
111+
* which is slow.
112+
* @return true if the cursor was removed from the checkpoint manager
113+
*/
114+
bool handleSlowStream(uint16_t vbid, const std::string& cursorName);
108115

109116
void closeAllStreams();
110117

engines/ep/src/dcp/stream.cc

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,8 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e,
271271
includeValue(includeVal),
272272
includeXattributes(includeXattrs),
273273
includeDeleteTime(includeDeleteTime),
274-
filter(filter, manifest) {
274+
filter(filter, manifest),
275+
cursorName(n + std::to_string(cursorUID.fetch_add(1))) {
275276
const char* type = "";
276277
if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
277278
type = "takeover ";
@@ -376,7 +377,7 @@ void ActiveStream::registerCursor(CheckpointManager& chkptmgr,
376377
uint64_t lastProcessedSeqno) {
377378
try {
378379
CursorRegResult result = chkptmgr.registerCursorBySeqno(
379-
name_, lastProcessedSeqno, MustSendCheckpointEnd::NO);
380+
cursorName, lastProcessedSeqno, MustSendCheckpointEnd::NO);
380381
/*
381382
* MB-22960: Due to cursor dropping we re-register the replication
382383
* cursor only during backfill when we mark the disk snapshot. However
@@ -885,8 +886,9 @@ void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie,
885886

886887
size_t vb_items = vb.getNumItems();
887888
size_t chk_items =
888-
vb_items > 0 ? vb.checkpointManager->getNumItemsForCursor(name_)
889-
: 0;
889+
vb_items > 0
890+
? vb.checkpointManager->getNumItemsForCursor(cursorName)
891+
: 0;
890892

891893
size_t del_items = 0;
892894
try {
@@ -945,7 +947,7 @@ std::unique_ptr<DcpResponse> ActiveStream::nextQueuedItem() {
945947
bool ActiveStream::nextCheckpointItem() {
946948
VBucketPtr vbucket = engine->getVBucket(vb_);
947949
if (vbucket &&
948-
vbucket->checkpointManager->getNumItemsForCursor(name_) > 0) {
950+
vbucket->checkpointManager->getNumItemsForCursor(cursorName) > 0) {
949951
// schedule this stream to build the next checkpoint
950952
auto producer = producerPtr.lock();
951953
if (!producer) {
@@ -1092,7 +1094,7 @@ std::vector<queued_item> ActiveStream::getOutstandingItems(VBucket& vb) {
10921094
chkptItemsExtractionInProgress.store(true);
10931095

10941096
auto _begin_ = ProcessClock::now();
1095-
vb.checkpointManager->getAllItemsForCursor(name_, items);
1097+
vb.checkpointManager->getAllItemsForCursor(cursorName, items);
10961098
engine->getEpStats().dcpCursorsGetItemsHisto.add(
10971099
std::chrono::duration_cast<std::chrono::microseconds>(
10981100
ProcessClock::now() - _begin_));
@@ -1451,7 +1453,7 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
14511453
try {
14521454
std::tie(curChkSeqno, tryBackfill) =
14531455
vbucket->checkpointManager->registerCursorBySeqno(
1454-
name_,
1456+
cursorName,
14551457
lastReadSeqno.load(),
14561458
MustSendCheckpointEnd::NO);
14571459
} catch(std::exception& error) {
@@ -1553,7 +1555,7 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
15531555
try {
15541556
CursorRegResult result =
15551557
vbucket->checkpointManager->registerCursorBySeqno(
1556-
name_,
1558+
cursorName,
15571559
lastReadSeqno.load(),
15581560
MustSendCheckpointEnd::NO);
15591561

@@ -1777,7 +1779,7 @@ size_t ActiveStream::getItemsRemaining() {
17771779
// Items remaining is the sum of:
17781780
// (a) Items outstanding in checkpoints
17791781
// (b) Items pending in our readyQ, excluding any meta items.
1780-
return vbucket->checkpointManager->getNumItemsForCursor(name_) +
1782+
return vbucket->checkpointManager->getNumItemsForCursor(cursorName) +
17811783
readyQ_non_meta_items;
17821784
}
17831785

@@ -1826,7 +1828,7 @@ bool ActiveStream::dropCheckpointCursor_UNLOCKED() {
18261828
notifyStreamReady();
18271829
}
18281830
/* Drop the existing cursor */
1829-
return vbucket->checkpointManager->removeCursor(name_);
1831+
return vbucket->checkpointManager->removeCursor(cursorName);
18301832
}
18311833

18321834
EXTENSION_LOG_LEVEL ActiveStream::getTransitionStateLogLevel(
@@ -1864,10 +1866,12 @@ void ActiveStream::notifyStreamReady(bool force) {
18641866
void ActiveStream::removeCheckpointCursor() {
18651867
VBucketPtr vb = engine->getVBucket(vb_);
18661868
if (vb) {
1867-
vb->checkpointManager->removeCursor(name_);
1869+
vb->checkpointManager->removeCursor(cursorName);
18681870
}
18691871
}
18701872

1873+
std::atomic<uint64_t> ActiveStream::cursorUID;
1874+
18711875
NotifierStream::NotifierStream(EventuallyPersistentEngine* e,
18721876
std::shared_ptr<DcpProducer> p,
18731877
const std::string& name,

engines/ep/src/dcp/stream.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,13 +307,18 @@ class ActiveStream : public Stream,
307307
return currentSeparator;
308308
}
309309

310+
/// @return a const reference to the streams cursor name
311+
const std::string& getCursorName() const {
312+
return cursorName;
313+
}
314+
310315
protected:
311316
/**
312317
* @param vb reference to the associated vbucket
313318
*
314319
* @return the outstanding items for the stream's checkpoint cursor.
315320
*/
316-
std::vector<queued_item> getOutstandingItems(VBucket& vb);
321+
virtual std::vector<queued_item> getOutstandingItems(VBucket& vb);
317322

318323
// Given a set of queued items, create mutation responses for each item,
319324
// and pass onto the producer associated with this stream.
@@ -516,6 +521,17 @@ class ActiveStream : public Stream,
516521
* The filter the stream will use to decide which keys should be transmitted
517522
*/
518523
Collections::VB::Filter filter;
524+
525+
/**
526+
* The name which uniquely identifies this stream's checkpoint cursor
527+
*/
528+
std::string cursorName;
529+
530+
/**
531+
* To ensure each stream gets a unique cursorName, we maintain a 'uid'
532+
* which is really just an incrementing uint64
533+
*/
534+
static std::atomic<uint64_t> cursorUID;
519535
};
520536

521537

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2018 Couchbase, Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "mock_dcp_producer.h"
19+
20+
#include "mock_stream.h"
21+
22+
void MockDcpProducer::mockActiveStreamRequest(uint32_t flags,
23+
uint32_t opaque,
24+
VBucket& vb,
25+
uint64_t start_seqno,
26+
uint64_t end_seqno,
27+
uint64_t vbucket_uuid,
28+
uint64_t snap_start_seqno,
29+
uint64_t snap_end_seqno) {
30+
auto stream = std::make_shared<MockActiveStream>(
31+
static_cast<EventuallyPersistentEngine*>(&engine_),
32+
std::static_pointer_cast<MockDcpProducer>(shared_from_this()),
33+
flags,
34+
opaque,
35+
vb,
36+
start_seqno,
37+
end_seqno,
38+
vbucket_uuid,
39+
snap_start_seqno,
40+
snap_end_seqno);
41+
stream->setActive();
42+
if (!streams.insert(std::make_pair(vb.getId(), stream))) {
43+
throw std::logic_error(
44+
"MockDcpProducer::mockActiveStreamRequest "
45+
"failed to insert requested stream");
46+
}
47+
notifyStreamReady(vb.getId());
48+
}

engines/ep/tests/mock/mock_dcp_producer.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,4 +143,16 @@ class MockDcpProducer : public DcpProducer {
143143
size_t getBytesOutstanding() const {
144144
return log.getBytesOutstanding();
145145
}
146+
147+
/**
148+
* Place a mock active stream into the producer
149+
*/
150+
void mockActiveStreamRequest(uint32_t flags,
151+
uint32_t opaque,
152+
VBucket& vb,
153+
uint64_t start_seqno,
154+
uint64_t end_seqno,
155+
uint64_t vbucket_uuid,
156+
uint64_t snap_start_seqno,
157+
uint64_t snap_end_seqno);
146158
};

engines/ep/tests/mock/mock_stream.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#pragma once
1919

2020
#include "collections/vbucket_filter.h"
21+
#include "dcp/consumer.h"
22+
#include "dcp/producer.h"
2123
#include "dcp/stream.h"
2224
#include "tests/mock/mock_dcp_producer.h"
2325

@@ -147,6 +149,14 @@ class MockActiveStream : public ActiveStream {
147149
void setState(StreamState state) {
148150
state_ = state;
149151
}
152+
153+
virtual std::vector<queued_item> getOutstandingItems(VBucket& vb) override {
154+
preGetOutstandingItemsCallback();
155+
return ActiveStream::getOutstandingItems(vb);
156+
}
157+
158+
/// A callback to allow tests to inject code before we access the checkpoint
159+
std::function<void()> preGetOutstandingItemsCallback = [] { return; };
150160
};
151161

152162
/**

engines/ep/tests/module_tests/dcp_test.cc

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ class DCPTest : public EventuallyPersistentEngineTest {
132132
includeXattrs);
133133

134134
EXPECT_FALSE(vb0->checkpointManager->registerCursor(
135-
producer->getName(), 1, false, MustSendCheckpointEnd::NO))
135+
stream->getCursorName(), 1, false, MustSendCheckpointEnd::NO))
136136
<< "Found an existing TAP cursor when attempting to register "
137137
"ours";
138138
}
@@ -3225,21 +3225,8 @@ class ActiveStreamChkptProcessorTaskTest : public SingleThreadedKVBucketTest {
32253225
}
32263226

32273227
void notifyAndStepToCheckpoint() {
3228-
auto vb = store->getVBucket(vbid);
3229-
ASSERT_NE(nullptr, vb.get());
3230-
3231-
producer->notifySeqnoAvailable(vbid, vb->getHighSeqno());
3232-
3233-
/* Step which will notify the checkpoint processor task */
3234-
EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
3235-
EXPECT_EQ(1, producer->getCheckpointSnapshotTask().queueSize());
3236-
3237-
/* Run the task */
3238-
producer->getCheckpointSnapshotTask().run();
3239-
3240-
/* This time the step should return something that is read from the
3241-
checkpoint processor */
3242-
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3228+
SingleThreadedKVBucketTest::notifyAndStepToCheckpoint(*producer,
3229+
*producers);
32433230
}
32443231

32453232
const void* cookie;

0 commit comments

Comments
 (0)