Skip to content

Commit 3102e46

Browse files
jimwwalkerdaverigby
authored andcommitted
MB-29287: Test that takeover finds all items
Using the single threaded test harness we can demonstrate the window in which a close-stream and create stream will lose items. By triggering the close/re-create from within the snapshot processor after it has a handle on the stream which will be closed and before it access the items, we will transfer items into the dead stream and prevent the new takeover stream from obtaining them. Change-Id: I896a7cbd3d32419576294ea88c288b097759a362 Reviewed-on: http://review.couchbase.org/93123 Well-Formed: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 3ae001d commit 3102e46

File tree

11 files changed

+315
-23
lines changed

11 files changed

+315
-23
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ TARGET_LINK_LIBRARIES(ep cJSON JSON_checker couchstore forestdb
185185
# so simpler / quicker just to link them into a single executable).
186186
ADD_EXECUTABLE(ep-engine_ep_unit_tests
187187
tests/mock/mock_dcp.cc
188+
tests/mock/mock_dcp_producer.cc
188189
tests/module_tests/checkpoint_test.cc
189190
tests/module_tests/defragmenter_test.cc
190191
tests/module_tests/ep_unit_tests_main.cc

src/dcp/producer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ class DcpProducer : public Producer {
237237

238238
ExTask checkpointCreatorTask;
239239

240-
private:
240+
protected:
241241

242242

243243
DcpResponse* getNextItem();

src/dcp/stream.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ class ActiveStream : public Stream {
264264

265265
protected:
266266
// Returns the outstanding items for the stream's checkpoint cursor.
267-
void getOutstandingItems(RCPtr<VBucket> &vb, std::vector<queued_item> &items);
267+
virtual void getOutstandingItems(RCPtr<VBucket> &vb, std::vector<queued_item> &items);
268268

269269
// Given a set of queued items, create mutation responses for each item,
270270
// and pass onto the producer associated with this stream.

tests/mock/mock_dcp_producer.cc

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2018 Couchbase, Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "mock_dcp_producer.h"
19+
20+
#include "mock_stream.h"
21+
22+
void MockDcpProducer::mockActiveStreamRequest(uint32_t flags,
23+
uint32_t opaque,
24+
uint16_t vbucket,
25+
uint64_t start_seqno,
26+
uint64_t end_seqno,
27+
uint64_t vbucket_uuid,
28+
uint64_t snap_start_seqno,
29+
uint64_t snap_end_seqno) {
30+
stream_t stream = new MockActiveStream(
31+
static_cast<EventuallyPersistentEngine*>(&engine_),
32+
this,
33+
getName(),
34+
flags,
35+
opaque,
36+
vbucket,
37+
start_seqno,
38+
end_seqno,
39+
vbucket_uuid,
40+
snap_start_seqno,
41+
snap_end_seqno);
42+
stream->setActive();
43+
auto insertResult = streams.insert(std::make_pair(vbucket, stream));
44+
if (!insertResult.second) {
45+
throw std::logic_error("MockDcpProducer::mockActiveStreamRequest "
46+
"failed to insert requested stream");
47+
}
48+
notifyStreamReady(vbucket);
49+
}

tests/mock/mock_dcp_producer.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,30 @@ class MockDcpProducer: public DcpProducer {
6060
return *static_cast<ActiveStreamCheckpointProcessorTask*>(
6161
checkpointCreatorTask.get());
6262
}
63+
64+
/**
65+
* Finds the stream for a given vbucket
66+
*/
67+
SingleThreadedRCPtr<Stream> findStream(uint16_t vbid) {
68+
auto it = streams.find(vbid);
69+
if (it != streams.end()) {
70+
return it->second;
71+
} else {
72+
return SingleThreadedRCPtr<Stream>();
73+
}
74+
}
75+
76+
/**
77+
* Place a mock active stream into the producer
78+
*/
79+
void mockActiveStreamRequest(uint32_t flags,
80+
uint32_t opaque,
81+
uint16_t vbucket,
82+
uint64_t start_seqno,
83+
uint64_t end_seqno,
84+
uint64_t vbucket_uuid,
85+
uint64_t snap_start_seqno,
86+
uint64_t snap_end_seqno);
6387
};
6488

6589
using mock_dcp_producer_t = SingleThreadedRCPtr<MockDcpProducer>;

tests/mock/mock_stream.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
#pragma once
1919

20+
#include "dcp/consumer.h"
21+
#include "dcp/producer.h"
2022
#include "dcp/stream.h"
2123

2224
/*
@@ -90,4 +92,13 @@ class MockActiveStream : public ActiveStream {
9092
void transitionStateToBackfilling() {
9193
transitionState(STREAM_BACKFILLING);
9294
}
95+
96+
virtual void getOutstandingItems(RCPtr<VBucket>& vb,
97+
std::vector<queued_item>& items) override {
98+
preGetOutstandingItemsCallback();
99+
ActiveStream::getOutstandingItems(vb, items);
100+
}
101+
102+
/// A callback to allow tests to inject code before we access the checkpoint
103+
std::function<void()> preGetOutstandingItemsCallback = [] { return; };
93104
};

tests/module_tests/dcp_test.cc

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -973,21 +973,8 @@ class ActiveStreamChkptProcessorTaskTest : public SingleThreadedEPStoreTest {
973973
}
974974

975975
void notifyAndStepToCheckpoint() {
976-
auto vb = store->getVBucket(vbid);
977-
ASSERT_NE(nullptr, vb.get());
978-
979-
producer->notifySeqnoAvailable(vbid, vb->getHighSeqno());
980-
981-
/* Step which will notify the checkpoint processor task */
982-
EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
983-
EXPECT_EQ(1, producer->getCheckpointSnapshotTask().queueSize());
984-
985-
/* Run the task */
986-
producer->getCheckpointSnapshotTask().run();
987-
988-
/* This time the step should return something that is read from the
989-
checkpoint processor */
990-
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
976+
SingleThreadedEPStoreTest::notifyAndStepToCheckpoint(*producer,
977+
producers.get());
991978
}
992979

993980
const void* cookie;

tests/module_tests/evp_store_single_threaded_test.cc

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,34 @@
1717

1818
#include "evp_store_single_threaded_test.h"
1919

20+
#include "../mock/mock_dcp.h"
2021
#include "../mock/mock_dcp_producer.h"
2122
#include "../mock/mock_dcp_consumer.h"
2223
#include "../mock/mock_stream.h"
24+
25+
#include "failover-table.h"
2326
#include "programs/engine_testapp/mock_server.h"
2427

28+
void SingleThreadedEPStoreTest::notifyAndStepToCheckpoint(
29+
MockDcpProducer& producer, dcp_message_producers* producers) {
30+
31+
auto vb = store->getVBucket(vbid);
32+
ASSERT_NE(nullptr, vb.get());
33+
34+
producer.notifySeqnoAvailable(vbid, vb->getHighSeqno());
35+
36+
/* Step which will notify the checkpoint processor task */
37+
EXPECT_EQ(ENGINE_SUCCESS, producer.step(producers));
38+
EXPECT_EQ(1, producer.getCheckpointSnapshotTask().queueSize());
39+
40+
/* Run the task */
41+
producer.getCheckpointSnapshotTask().run();
42+
43+
/* This time the step should return something that is read from the
44+
checkpoint processor */
45+
EXPECT_EQ(ENGINE_WANT_MORE, producer.step(producers));
46+
}
47+
2548
/*
2649
* The following test checks to see if we call handleSlowStream when in a
2750
* backfilling state, but the backfillTask is not running, we
@@ -674,3 +697,194 @@ TEST_F(SingleThreadedEPStoreTest, MB18953_taskWake) {
674697
EXPECT_EQ(1, task_executor->getNumReadyTasks(NONIO_TASK_IDX));
675698
runNextTask(lpNonioQ, "TestTask DefragmenterTask"); // lptask goes second
676699
}
700+
701+
extern uint8_t dcp_last_op;
702+
extern std::string dcp_last_key;
703+
704+
class MB_29287 : public SingleThreadedEPStoreTest {
705+
public:
706+
void SetUp() override {
707+
SingleThreadedEPStoreTest::SetUp();
708+
cookie = create_mock_cookie();
709+
setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
710+
// 1. Mock producer
711+
producer = new MockDcpProducer(*engine,
712+
cookie,
713+
"test_producer",
714+
/*notifyOnly*/ false);
715+
716+
producers = get_dcp_producers(
717+
reinterpret_cast<ENGINE_HANDLE*>(engine.get()),
718+
reinterpret_cast<ENGINE_HANDLE_V1*>(engine.get()));
719+
720+
// 2. Mock active stream
721+
producer->mockActiveStreamRequest(0, // flags
722+
1, // opaque
723+
vbid,
724+
0, // start_seqno
725+
~0, // end_seqno
726+
0, // vbucket_uuid,
727+
0, // snap_start_seqno,
728+
0); // snap_end_seqno,
729+
730+
store_item(vbid, "1", "value1");
731+
store_item(vbid, "2", "value2");
732+
store_item(vbid, "3", "value3");
733+
flush_vbucket_to_disk(vbid, 3);
734+
notifyAndStepToCheckpoint(*producer, producers.get());
735+
736+
for (int i = 0; i < 3; i++) { // 1, 2 and 3
737+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
738+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
739+
}
740+
741+
store_item(vbid, "4", "value4");
742+
743+
auto stream = producer->findStream(vbid);
744+
auto* mockStream = static_cast<MockActiveStream*>(stream.get());
745+
mockStream->preGetOutstandingItemsCallback =
746+
std::bind(&MB_29287::closeAndRecreateStream, this);
747+
748+
// call next - get success (nothing ready, but task has been scheduled)
749+
EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
750+
751+
// Run the snapshot task and step (triggering
752+
// preGetOutstandingItemsCallback)
753+
notifyAndStepToCheckpoint(*producer, producers.get());
754+
}
755+
756+
void TearDown() override {
757+
destroy_mock_cookie(cookie);
758+
producer->closeAllStreams();
759+
producer->cancelCheckpointCreatorTask();
760+
producer.reset();
761+
SingleThreadedEPStoreTest::TearDown();
762+
}
763+
764+
void closeAndRecreateStream() {
765+
// Without the fix, 5 will be lost
766+
store_item(vbid, "5", "don't lose me");
767+
producer->closeStream(1, 0);
768+
auto vb = store->getVBuckets().getBucket(vbid);
769+
ASSERT_NE(nullptr, vb.get());
770+
producer->mockActiveStreamRequest(DCP_ADD_STREAM_FLAG_TAKEOVER,
771+
1, // opaque
772+
vbid,
773+
3, // start_seqno
774+
~0, // end_seqno
775+
vb->failovers->getLatestUUID(),
776+
3, // snap_start_seqno
777+
~0); // snap_end_seqno
778+
}
779+
780+
const void* cookie = nullptr;
781+
mock_dcp_producer_t producer;
782+
std::unique_ptr<dcp_message_producers> producers;
783+
};
784+
785+
// Stream takeover with no more writes
786+
TEST_F(MB_29287, dataloss_end) {
787+
auto stream = producer->findStream(vbid);
788+
auto* as = static_cast<ActiveStream*>(stream.get());
789+
790+
EXPECT_EQ(STREAM_TAKEOVER_SEND, stream->getState());
791+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
792+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
793+
dcp_last_op = 0;
794+
EXPECT_EQ("4", dcp_last_key);
795+
796+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
797+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
798+
dcp_last_op = 0;
799+
EXPECT_EQ("5", dcp_last_key);
800+
801+
// Snapshot received
802+
as->snapshotMarkerAckReceived();
803+
804+
// set-vb-state now underway
805+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
806+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE, dcp_last_op);
807+
808+
// Move stream to pending and vb to dead
809+
as->setVBucketStateAckRecieved();
810+
811+
// Cannot store anymore items
812+
store_item(vbid,
813+
"K6",
814+
"value6",
815+
ENGINE_NOT_MY_VBUCKET);
816+
817+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
818+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE, dcp_last_op);
819+
as->setVBucketStateAckRecieved();
820+
EXPECT_TRUE(!stream->isActive());
821+
822+
auto vb = store->getVBuckets().getBucket(vbid);
823+
ASSERT_NE(nullptr, vb.get());
824+
// Have persistence cursor only (dcp now closed down)
825+
EXPECT_EQ(1, vb->checkpointManager.getNumOfCursors());
826+
}
827+
828+
// takeover when more writes occur
829+
TEST_F(MB_29287, dataloss_hole) {
830+
auto stream = producer->findStream(vbid);
831+
auto* as = static_cast<ActiveStream*>(stream.get());
832+
833+
store_item(vbid, "6", "value6");
834+
835+
EXPECT_EQ(STREAM_TAKEOVER_SEND, stream->getState());
836+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
837+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
838+
dcp_last_op = 0;
839+
EXPECT_EQ("4", dcp_last_key);
840+
841+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
842+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
843+
dcp_last_op = 0;
844+
EXPECT_EQ("5", dcp_last_key);
845+
846+
// Snapshot received
847+
as->snapshotMarkerAckReceived();
848+
849+
// More data in the checkpoint (key 6)
850+
851+
// call next - get success (nothing ready, but task has been scheduled)
852+
EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
853+
854+
// Run the snapshot task and step
855+
notifyAndStepToCheckpoint(*producer, producers.get());
856+
857+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
858+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
859+
EXPECT_EQ("6", dcp_last_key);
860+
861+
// Snapshot received
862+
as->snapshotMarkerAckReceived();
863+
864+
// Now send
865+
EXPECT_EQ(STREAM_TAKEOVER_SEND, stream->getState());
866+
867+
// set-vb-state now underway
868+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
869+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE, dcp_last_op);
870+
dcp_last_op = 0;
871+
872+
// Move stream to pending and vb to dead
873+
as->setVBucketStateAckRecieved();
874+
875+
// Cannot store anymore items
876+
store_item(vbid,
877+
"K6",
878+
"value6",
879+
ENGINE_NOT_MY_VBUCKET);
880+
881+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
882+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE, dcp_last_op);
883+
as->setVBucketStateAckRecieved();
884+
EXPECT_TRUE(!stream->isActive());
885+
886+
auto vb = store->getVBuckets().getBucket(vbid);
887+
ASSERT_NE(nullptr, vb.get());
888+
// Have persistence cursor only (dcp now closed down)
889+
EXPECT_EQ(1, vb->checkpointManager.getNumOfCursors());
890+
}

0 commit comments

Comments
 (0)