Skip to content

Commit 787236b

Browse files
committed
MB-29287: Merge commit '3102e467' into 'couchbase/5.1.0'
* commit '3102e467432dbd949c857cee75b66c0f477be2be': MB-29287: Test that takeover finds all items Note this indvidual merge commit is part of merging watson_ep into couchbase/5.1.0 Change-Id: I17c257bee2e9d413e15f07d5273d6badf334e36b
2 parents 987e3a2 + 3102e46 commit 787236b

File tree

8 files changed

+293
-16
lines changed

8 files changed

+293
-16
lines changed

engines/ep/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ TARGET_LINK_LIBRARIES(ep cJSON JSON_checker couchstore ${EP_FORESTDB_LIB}
242242
# so simpler / quicker just to link them into a single executable).
243243
ADD_EXECUTABLE(ep-engine_ep_unit_tests
244244
tests/mock/mock_dcp.cc
245+
tests/mock/mock_dcp_producer.cc
245246
tests/mock/mock_ephemeral_vb.cc
246247
tests/mock/mock_synchronous_ep_engine.cc
247248
tests/module_tests/atomic_unordered_map_test.cc

engines/ep/src/dcp/stream.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,8 @@ class ActiveStream : public Stream {
306306

307307
protected:
308308
// Returns the outstanding items for the stream's checkpoint cursor.
309-
void getOutstandingItems(VBucketPtr &vb, std::vector<queued_item> &items);
309+
virtual void getOutstandingItems(VBucketPtr& vb,
310+
std::vector<queued_item>& items);
310311

311312
// Given a set of queued items, create mutation responses for each item,
312313
// and pass onto the producer associated with this stream.
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2018 Couchbase, Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "mock_dcp_producer.h"
19+
20+
#include "mock_stream.h"
21+
22+
void MockDcpProducer::mockActiveStreamRequest(uint32_t flags,
23+
uint32_t opaque,
24+
VBucket& vb,
25+
uint64_t start_seqno,
26+
uint64_t end_seqno,
27+
uint64_t vbucket_uuid,
28+
uint64_t snap_start_seqno,
29+
uint64_t snap_end_seqno) {
30+
stream_t stream = new MockActiveStream(
31+
static_cast<EventuallyPersistentEngine*>(&engine_),
32+
this,
33+
flags,
34+
opaque,
35+
vb,
36+
start_seqno,
37+
end_seqno,
38+
vbucket_uuid,
39+
snap_start_seqno,
40+
snap_end_seqno);
41+
stream->setActive();
42+
if (!streams.insert(std::make_pair(vb.getId(), stream))) {
43+
throw std::logic_error(
44+
"MockDcpProducer::mockActiveStreamRequest "
45+
"failed to insert requested stream");
46+
}
47+
notifyStreamReady(vb.getId());
48+
}

engines/ep/tests/mock/mock_dcp_producer.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,18 @@ class MockDcpProducer: public DcpProducer {
129129
BackfillManager& getBFM() {
130130
return *backfillMgr;
131131
}
132+
133+
/**
134+
* Place a mock active stream into the producer
135+
*/
136+
void mockActiveStreamRequest(uint32_t flags,
137+
uint32_t opaque,
138+
VBucket& vb,
139+
uint64_t start_seqno,
140+
uint64_t end_seqno,
141+
uint64_t vbucket_uuid,
142+
uint64_t snap_start_seqno,
143+
uint64_t snap_end_seqno);
132144
};
133145

134146
using mock_dcp_producer_t = SingleThreadedRCPtr<MockDcpProducer>;

engines/ep/tests/mock/mock_stream.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#pragma once
1919

2020
#include "collections/vbucket_filter.h"
21+
#include "dcp/consumer.h"
22+
#include "dcp/producer.h"
2123
#include "dcp/stream.h"
2224
#include "tests/mock/mock_dcp_producer.h"
2325

@@ -148,6 +150,15 @@ class MockActiveStream : public ActiveStream {
148150
void setState(StreamState state) {
149151
state_ = state;
150152
}
153+
154+
virtual void getOutstandingItems(VBucketPtr& vb,
155+
std::vector<queued_item>& items) override {
156+
preGetOutstandingItemsCallback();
157+
ActiveStream::getOutstandingItems(vb, items);
158+
}
159+
160+
/// A callback to allow tests to inject code before we access the checkpoint
161+
std::function<void()> preGetOutstandingItemsCallback = [] { return; };
151162
};
152163

153164
/**

engines/ep/tests/module_tests/dcp_test.cc

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2293,21 +2293,8 @@ class ActiveStreamChkptProcessorTaskTest : public SingleThreadedKVBucketTest {
22932293
}
22942294

22952295
void notifyAndStepToCheckpoint() {
2296-
auto vb = store->getVBucket(vbid);
2297-
ASSERT_NE(nullptr, vb.get());
2298-
2299-
producer->notifySeqnoAvailable(vbid, vb->getHighSeqno());
2300-
2301-
/* Step which will notify the checkpoint processor task */
2302-
EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
2303-
EXPECT_EQ(1, producer->getCheckpointSnapshotTask().queueSize());
2304-
2305-
/* Run the task */
2306-
producer->getCheckpointSnapshotTask().run();
2307-
2308-
/* This time the step should return something that is read from the
2309-
checkpoint processor */
2310-
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
2296+
SingleThreadedKVBucketTest::notifyAndStepToCheckpoint(*producer,
2297+
producers.get());
23112298
}
23122299

23132300
const void* cookie;

engines/ep/tests/module_tests/evp_store_single_threaded_test.cc

100644100755
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717

1818
#include "evp_store_single_threaded_test.h"
19+
#include "../mock/mock_dcp.h"
1920
#include "../mock/mock_dcp_consumer.h"
2021
#include "../mock/mock_dcp_producer.h"
2122
#include "../mock/mock_global_task.h"
@@ -24,6 +25,7 @@
2425
#include "dcp/dcpconnmap.h"
2526
#include "ep_time.h"
2627
#include "evp_store_test.h"
28+
#include "failover-table.h"
2729
#include "fakes/fake_executorpool.h"
2830
#include "programs/engine_testapp/mock_server.h"
2931
#include "taskqueue.h"
@@ -135,6 +137,25 @@ void SingleThreadedKVBucketTest::runReadersUntilWarmedUp() {
135137
}
136138
}
137139

140+
void SingleThreadedKVBucketTest::notifyAndStepToCheckpoint(
141+
MockDcpProducer& producer, dcp_message_producers* producers) {
142+
auto vb = store->getVBucket(vbid);
143+
ASSERT_NE(nullptr, vb.get());
144+
145+
producer.notifySeqnoAvailable(vbid, vb->getHighSeqno());
146+
147+
/* Step which will notify the checkpoint processor task */
148+
EXPECT_EQ(ENGINE_SUCCESS, producer.step(producers));
149+
EXPECT_EQ(1, producer.getCheckpointSnapshotTask().queueSize());
150+
151+
/* Run the task */
152+
producer.getCheckpointSnapshotTask().run();
153+
154+
/* This time the step should return something that is read from the
155+
checkpoint processor */
156+
EXPECT_EQ(ENGINE_WANT_MORE, producer.step(producers));
157+
}
158+
138159
/*
139160
* The following test checks to see if we call handleSlowStream when in a
140161
* backfilling state, but the backfillTask is not running, we
@@ -1751,3 +1772,194 @@ TEST_F(SingleThreadedEPBucketTest, mb25273) {
17511772
EXPECT_EQ(3, gv.item->getCas());
17521773
EXPECT_EQ(value.size(), gv.item->getValue()->vlength());
17531774
}
1775+
1776+
extern uint8_t dcp_last_op;
1777+
extern std::string dcp_last_key;
1778+
1779+
class MB_29287 : public SingleThreadedEPBucketTest {
1780+
public:
1781+
void SetUp() override {
1782+
SingleThreadedEPBucketTest::SetUp();
1783+
cookie = create_mock_cookie();
1784+
setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
1785+
// 1. Mock producer
1786+
producer = new MockDcpProducer(*engine, cookie, "test_producer", 0, {});
1787+
1788+
producers = get_dcp_producers(
1789+
reinterpret_cast<ENGINE_HANDLE*>(engine.get()),
1790+
reinterpret_cast<ENGINE_HANDLE_V1*>(engine.get()));
1791+
auto vb = store->getVBuckets().getBucket(vbid);
1792+
ASSERT_NE(nullptr, vb.get());
1793+
// 2. Mock active stream
1794+
producer->mockActiveStreamRequest(0, // flags
1795+
1, // opaque
1796+
*vb,
1797+
0, // start_seqno
1798+
~0, // end_seqno
1799+
0, // vbucket_uuid,
1800+
0, // snap_start_seqno,
1801+
0); // snap_end_seqno,
1802+
1803+
store_item(vbid, makeStoredDocKey("1"), "value1");
1804+
store_item(vbid, makeStoredDocKey("2"), "value2");
1805+
store_item(vbid, makeStoredDocKey("3"), "value3");
1806+
flush_vbucket_to_disk(vbid, 3);
1807+
notifyAndStepToCheckpoint(*producer, producers.get());
1808+
1809+
for (int i = 0; i < 3; i++) { // 1, 2 and 3
1810+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
1811+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
1812+
}
1813+
1814+
store_item(vbid, makeStoredDocKey("4"), "value4");
1815+
1816+
auto stream = producer->findStream(vbid);
1817+
auto* mockStream = static_cast<MockActiveStream*>(stream.get());
1818+
mockStream->preGetOutstandingItemsCallback =
1819+
std::bind(&MB_29287::closeAndRecreateStream, this);
1820+
1821+
// call next - get success (nothing ready, but task has been scheduled)
1822+
EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
1823+
1824+
// Run the snapshot task and step (triggering
1825+
// preGetOutstandingItemsCallback)
1826+
notifyAndStepToCheckpoint(*producer, producers.get());
1827+
}
1828+
1829+
void TearDown() override {
1830+
destroy_mock_cookie(cookie);
1831+
producer->closeAllStreams();
1832+
producer->cancelCheckpointCreatorTask();
1833+
producer.reset();
1834+
SingleThreadedEPBucketTest::TearDown();
1835+
}
1836+
1837+
void closeAndRecreateStream() {
1838+
// Without the fix, 5 will be lost
1839+
store_item(vbid, makeStoredDocKey("5"), "don't lose me");
1840+
producer->closeStream(1, 0);
1841+
auto vb = store->getVBuckets().getBucket(vbid);
1842+
ASSERT_NE(nullptr, vb.get());
1843+
producer->mockActiveStreamRequest(DCP_ADD_STREAM_FLAG_TAKEOVER,
1844+
1, // opaque
1845+
*vb,
1846+
3, // start_seqno
1847+
~0, // end_seqno
1848+
vb->failovers->getLatestUUID(),
1849+
3, // snap_start_seqno
1850+
~0); // snap_end_seqno
1851+
}
1852+
1853+
const void* cookie = nullptr;
1854+
mock_dcp_producer_t producer;
1855+
std::unique_ptr<dcp_message_producers> producers;
1856+
};
1857+
1858+
// Stream takeover with no more writes
1859+
TEST_F(MB_29287, dataloss_end) {
1860+
auto stream = producer->findStream(vbid);
1861+
auto* as = static_cast<ActiveStream*>(stream.get());
1862+
1863+
EXPECT_TRUE(stream->isTakeoverSend());
1864+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
1865+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
1866+
dcp_last_op = 0;
1867+
EXPECT_EQ("4", dcp_last_key);
1868+
1869+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
1870+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
1871+
dcp_last_op = 0;
1872+
EXPECT_EQ("5", dcp_last_key);
1873+
1874+
// Snapshot received
1875+
as->snapshotMarkerAckReceived();
1876+
1877+
// set-vb-state now underway
1878+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
1879+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE, dcp_last_op);
1880+
1881+
// Move stream to pending and vb to dead
1882+
as->setVBucketStateAckRecieved();
1883+
1884+
// Cannot store anymore items
1885+
store_item(vbid,
1886+
makeStoredDocKey("K6"),
1887+
"value6",
1888+
0,
1889+
{cb::engine_errc::not_my_vbucket});
1890+
1891+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
1892+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE, dcp_last_op);
1893+
as->setVBucketStateAckRecieved();
1894+
EXPECT_TRUE(!stream->isActive());
1895+
1896+
auto vb = store->getVBuckets().getBucket(vbid);
1897+
ASSERT_NE(nullptr, vb.get());
1898+
// Have persistence cursor only (dcp now closed down)
1899+
EXPECT_EQ(1, vb->checkpointManager.getNumOfCursors());
1900+
}
1901+
1902+
// takeover when more writes occur
1903+
TEST_F(MB_29287, dataloss_hole) {
1904+
auto stream = producer->findStream(vbid);
1905+
auto* as = static_cast<ActiveStream*>(stream.get());
1906+
1907+
store_item(vbid, makeStoredDocKey("6"), "value6");
1908+
1909+
EXPECT_TRUE(stream->isTakeoverSend());
1910+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
1911+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
1912+
dcp_last_op = 0;
1913+
EXPECT_EQ("4", dcp_last_key);
1914+
1915+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
1916+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
1917+
dcp_last_op = 0;
1918+
EXPECT_EQ("5", dcp_last_key);
1919+
1920+
// Snapshot received
1921+
as->snapshotMarkerAckReceived();
1922+
1923+
// More data in the checkpoint (key 6)
1924+
1925+
// call next - get success (nothing ready, but task has been scheduled)
1926+
EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
1927+
1928+
// Run the snapshot task and step
1929+
notifyAndStepToCheckpoint(*producer, producers.get());
1930+
1931+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
1932+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
1933+
EXPECT_EQ("6", dcp_last_key);
1934+
1935+
// Snapshot received
1936+
as->snapshotMarkerAckReceived();
1937+
1938+
// Now send
1939+
EXPECT_TRUE(stream->isTakeoverSend());
1940+
1941+
// set-vb-state now underway
1942+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
1943+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE, dcp_last_op);
1944+
dcp_last_op = 0;
1945+
1946+
// Move stream to pending and vb to dead
1947+
as->setVBucketStateAckRecieved();
1948+
1949+
// Cannot store anymore items
1950+
store_item(vbid,
1951+
makeStoredDocKey("K6"),
1952+
"value6",
1953+
0,
1954+
{cb::engine_errc::not_my_vbucket});
1955+
1956+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
1957+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE, dcp_last_op);
1958+
as->setVBucketStateAckRecieved();
1959+
EXPECT_TRUE(!stream->isActive());
1960+
1961+
auto vb = store->getVBuckets().getBucket(vbid);
1962+
ASSERT_NE(nullptr, vb.get());
1963+
// Have persistence cursor only (dcp now closed down)
1964+
EXPECT_EQ(1, vb->checkpointManager.getNumOfCursors());
1965+
}

engines/ep/tests/module_tests/evp_store_single_threaded_test.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626

2727
class MockActiveStreamWithOverloadedRegisterCursor;
2828

29+
class MockDcpProducer;
30+
2931
/*
3032
* A subclass of KVBucketTest which uses a fake ExecutorPool,
3133
* which will not spawn ExecutorThreads and hence not run any tasks
@@ -80,6 +82,9 @@ class SingleThreadedKVBucketTest : public KVBucketTest {
8082
return ENGINE_SUCCESS;
8183
}
8284

85+
void notifyAndStepToCheckpoint(MockDcpProducer& producer,
86+
dcp_message_producers* producers);
87+
8388
SingleThreadedExecutorPool* task_executor;
8489
};
8590

0 commit comments

Comments
 (0)