Skip to content

Commit 38d49b1

Browse files
paolococchidaverigby
authored andcommitted
MB-33804 [SR]: Replica VBucket queues into PDM at DCP_PREPARE received
With this patch we wire up VBucket and DurabilityMonitor at Prepare received. VBucket queues received Prepares into the DM. Logically, Prepares at Replica will be removed at DCP_COMMIT or DCP_ABORT received (added in follow-up patches). Change-Id: I426f7b09eaa5f2652a7790ed55137fbcd0fb965d Reviewed-on: http://review.couchbase.org/107962 Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 92d00b1 commit 38d49b1

File tree

4 files changed

+97
-60
lines changed

4 files changed

+97
-60
lines changed

engines/ep/src/vbucket.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,12 +1058,16 @@ VBNotifyCtx VBucket::queueItem(queued_item& item, const VBQueueItemCtx& ctx) {
10581058
}
10591059
notifyCtx.bySeqno = item->getBySeqno();
10601060

1061-
// @todo-durability: Add support DurabilityMonitor at Replica
1062-
if (getState() == vbucket_state_active && item->isPending()) {
1061+
if (item->isPending()) {
10631062
// Register this mutation with the durability monitor.
10641063
Expects(ctx.durability.is_initialized());
1065-
const auto cookie = ctx.durability->cookie;
1066-
getActiveDM().addSyncWrite(cookie, item);
1064+
// @todo-durability: Add DurabilityMonitor support at vbstate Pending
1065+
// if necessary
1066+
if (state == vbucket_state_active) {
1067+
getActiveDM().addSyncWrite(ctx.durability->cookie, item);
1068+
} else if (state == vbucket_state_replica) {
1069+
getPassiveDM().addSyncWrite(item);
1070+
}
10671071
}
10681072

10691073
return notifyCtx;

engines/ep/tests/module_tests/vbucket_durability_test.cc

Lines changed: 72 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
#include "checkpoint_manager.h"
2222
#include "checkpoint_utils.h"
2323
#include "durability/active_durability_monitor.h"
24+
#include "durability/passive_durability_monitor.h"
2425
#include "test_helpers.h"
2526
#include "thread_gate.h"
27+
#include "vbucket_utils.h"
2628

2729
#include "../mock/mock_checkpoint_manager.h"
2830

@@ -36,18 +38,16 @@ void VBucketDurabilityTest::SetUp() {
3638
ckptMgr = static_cast<MockCheckpointManager*>(
3739
vbucket->checkpointManager.get());
3840
vbucket->setState(vbucket_state_active);
39-
40-
monitor = &vbucket->getActiveDM();
41-
ASSERT_TRUE(monitor);
42-
vbucket->setReplicationTopology(nlohmann::json::array({{active, replica}}));
43-
ASSERT_EQ(2, monitor->getFirstChainSize());
41+
vbucket->setReplicationTopology(
42+
nlohmann::json::array({{active, replica1}}));
43+
ASSERT_EQ(2, vbucket->getActiveDM().getFirstChainSize());
4444
}
4545

46-
size_t VBucketDurabilityTest::storeSyncWrites(
46+
void VBucketDurabilityTest::storeSyncWrites(
4747
const std::vector<SyncWriteSpec>& seqnos) {
4848
if (seqnos.empty()) {
4949
throw std::logic_error(
50-
"VBucketDurabilityTest::addSyncWrites: seqnos list is empty");
50+
"VBucketDurabilityTest::storeSyncWrites: seqnos list is empty");
5151
}
5252

5353
// @todo: For now this function is supposed to be called once per test,
@@ -68,9 +68,8 @@ size_t VBucketDurabilityTest::storeSyncWrites(
6868
ckptMgr->createSnapshot(seqnos.front().seqno, seqnos.back().seqno);
6969
EXPECT_EQ(1, ckptMgr->getNumCheckpoints());
7070

71-
size_t numStored = ht->getNumItems();
72-
size_t numCkptItems = ckptMgr->getNumItems();
73-
size_t numTracked = monitor->getNumTracked();
71+
const auto preHTCount = ht->getNumItems();
72+
const auto preCMCount = ckptMgr->getNumItems();
7473
for (auto write : seqnos) {
7574
auto item = Item(makeStoredDocKey("key" + std::to_string(write.seqno)),
7675
0 /*flags*/,
@@ -89,25 +88,24 @@ size_t VBucketDurabilityTest::storeSyncWrites(
8988
ctx.genBySeqno = GenerateBySeqno::No;
9089
ctx.durability = DurabilityItemCtx{item.getDurabilityReqs(), cookie};
9190

92-
EXPECT_EQ(MutationStatus::WasClean,
91+
ASSERT_EQ(MutationStatus::WasClean,
9392
public_processSet(item, 0 /*cas*/, ctx));
94-
95-
EXPECT_EQ(++numStored, ht->getNumItems());
96-
EXPECT_EQ(++numTracked, monitor->getNumTracked());
97-
EXPECT_EQ(++numCkptItems, ckptMgr->getNumItems());
9893
}
99-
return numStored;
94+
EXPECT_EQ(preHTCount + seqnos.size(), ht->getNumItems());
95+
EXPECT_EQ(preCMCount + seqnos.size(), ckptMgr->getNumItems());
10096
}
10197

10298
void VBucketDurabilityTest::simulateLocalAck(uint64_t seqno) {
10399
vbucket->setPersistenceSeqno(seqno);
104-
monitor->notifyLocalPersistence();
100+
vbucket->getActiveDM().notifyLocalPersistence();
105101
}
106102

107-
void VBucketDurabilityTest::testSyncWrites(
103+
void VBucketDurabilityTest::testAddPrepare(
108104
const std::vector<SyncWriteSpec>& writes) {
109-
auto numStored = storeSyncWrites(writes);
110-
ASSERT_EQ(writes.size(), numStored);
105+
{
106+
SCOPED_TRACE("");
107+
storeSyncWrites(writes);
108+
}
111109

112110
for (auto write : writes) {
113111
auto key = makeStoredDocKey("key" + std::to_string(write.seqno));
@@ -122,18 +120,23 @@ void VBucketDurabilityTest::testSyncWrites(
122120
CheckpointManagerTestIntrospector::public_getCheckpointList(
123121
*ckptMgr);
124122
ASSERT_EQ(1, ckptList.size());
125-
EXPECT_EQ(numStored, ckptList.front()->getNumItems());
123+
EXPECT_EQ(writes.size(), ckptList.front()->getNumItems());
126124
for (const auto& qi : *ckptList.front()) {
127125
if (!qi->isCheckPointMetaItem()) {
128126
EXPECT_EQ(queue_op::pending_sync_write, qi->getOperation());
129127
}
130128
}
129+
}
130+
131+
void VBucketDurabilityTest::testAddPrepareAndCommit(
132+
const std::vector<SyncWriteSpec>& writes) {
133+
testAddPrepare(writes);
131134

132135
// Simulate flush + checkpoint-removal
133136
ckptMgr->clear(*vbucket, 0 /*lastBySeqno*/);
134137

135138
// Simulate replica and active seqno-ack
136-
vbucket->seqnoAcknowledged(replica, writes.back().seqno);
139+
vbucket->seqnoAcknowledged(replica1, writes.back().seqno);
137140
simulateLocalAck(writes.back().seqno);
138141

139142
for (auto write : writes) {
@@ -148,34 +151,39 @@ void VBucketDurabilityTest::testSyncWrites(
148151
EXPECT_EQ(write.deletion, sv->isDeleted());
149152
}
150153

154+
const auto& ckptList =
155+
CheckpointManagerTestIntrospector::public_getCheckpointList(
156+
*ckptMgr);
151157
ASSERT_EQ(1, ckptList.size());
152-
EXPECT_EQ(numStored, ckptList.front()->getNumItems());
158+
EXPECT_EQ(writes.size(), ckptList.front()->getNumItems());
153159
for (const auto& qi : *ckptList.front()) {
154160
if (!qi->isCheckPointMetaItem()) {
155161
EXPECT_EQ(queue_op::commit_sync_write, qi->getOperation());
156162
}
157163
}
158164
}
159165

160-
TEST_P(VBucketDurabilityTest, SyncWrites_ContinuousSeqnos) {
161-
testSyncWrites({1, 2, 3});
166+
TEST_P(VBucketDurabilityTest, Active_AddPrepareAndCommit_ContinuousSeqnos) {
167+
testAddPrepareAndCommit({1, 2, 3});
162168
}
163169

164-
TEST_P(VBucketDurabilityTest, SyncWrites_ContinuousDeleteSeqnos) {
165-
testSyncWrites({{1, true}, {2, true}, {3, true}});
170+
TEST_P(VBucketDurabilityTest,
171+
Active_AddPrepareAndCommit_ContinuousDeleteSeqnos) {
172+
testAddPrepareAndCommit({{1, true}, {2, true}, {3, true}});
166173
}
167174

168-
TEST_P(VBucketDurabilityTest, SyncWrites_SparseSeqnos) {
169-
testSyncWrites({1, 3, 10, 20, 30});
175+
TEST_P(VBucketDurabilityTest, Active_AddPrepareAndCommit_SparseSeqnos) {
176+
testAddPrepareAndCommit({1, 3, 10, 20, 30});
170177
}
171178

172-
TEST_P(VBucketDurabilityTest, SyncWrites_SparseDeleteSeqnos) {
173-
testSyncWrites({{1, true}, {3, true}, {10, true}, {20, true}, {30, true}});
179+
TEST_P(VBucketDurabilityTest, Active_AddPrepareAndCommit_SparseDeleteSeqnos) {
180+
testAddPrepareAndCommit(
181+
{{1, true}, {3, true}, {10, true}, {20, true}, {30, true}});
174182
}
175183

176184
// Mix of Mutations and Deletions.
177-
TEST_P(VBucketDurabilityTest, SyncWrites_SparseMixedSeqnos) {
178-
testSyncWrites({{1, true}, 3, {10, true}, {20, true}, 30});
185+
TEST_P(VBucketDurabilityTest, Active_AddPrepareAndCommit_SparseMixedSeqnos) {
186+
testAddPrepareAndCommit({{1, true}, 3, {10, true}, {20, true}, 30});
179187
}
180188

181189
// Test cases which run in both Full and Value eviction
@@ -315,7 +323,7 @@ TEST(VBucketDurabilityTest, validateSetStateMetaTopologyNegative) {
315323
HasSubstr("chain[0] node[0] (active) cannot be null"));
316324
}
317325

318-
TEST_P(VBucketDurabilityTest, SetVBucketState_ClearTopologyAtReplica) {
326+
TEST_P(VBucketDurabilityTest, Replica_SetVBucketState_ClearTopology) {
319327
ASSERT_NE(nlohmann::json{}.dump(),
320328
vbucket->getReplicationTopology().dump());
321329

@@ -325,18 +333,15 @@ TEST_P(VBucketDurabilityTest, SetVBucketState_ClearTopologyAtReplica) {
325333
vbucket->getReplicationTopology().dump());
326334
}
327335

328-
TEST_P(VBucketDurabilityTest, MultipleReplicas) {
329-
const std::string active = "active";
330-
const std::string replica1 = "replica1";
331-
const std::string replica2 = "replica2";
332-
const std::string replica3 = "replica3";
333-
334-
monitor->setReplicationTopology(
336+
TEST_P(VBucketDurabilityTest, Active_Commit_MultipleReplicas) {
337+
auto& monitor = VBucketTestIntrospector::public_getActiveDM(*vbucket);
338+
monitor.setReplicationTopology(
335339
nlohmann::json::array({{active, replica1, replica2, replica3}}));
336-
ASSERT_EQ(4, monitor->getFirstChainSize());
340+
ASSERT_EQ(4, monitor.getFirstChainSize());
337341

338342
const int64_t preparedSeqno = 1;
339-
ASSERT_EQ(1, storeSyncWrites({preparedSeqno}));
343+
storeSyncWrites({preparedSeqno});
344+
ASSERT_EQ(1, monitor.getNumTracked());
340345

341346
auto key = makeStoredDocKey("key1");
342347

@@ -389,9 +394,12 @@ TEST_P(VBucketDurabilityTest, MultipleReplicas) {
389394
checkCommitted();
390395
}
391396

392-
TEST_P(VBucketDurabilityTest, PendingSkippedAtEjectionAndCommit) {
397+
TEST_P(VBucketDurabilityTest, Active_PendingSkippedAtEjectionAndCommit) {
393398
const int64_t preparedSeqno = 1;
394-
ASSERT_EQ(1, storeSyncWrites({preparedSeqno}));
399+
storeSyncWrites({preparedSeqno});
400+
ASSERT_EQ(1,
401+
VBucketTestIntrospector::public_getActiveDM(*vbucket)
402+
.getNumTracked());
395403

396404
auto key = makeStoredDocKey("key1");
397405
const auto& ckptList =
@@ -462,7 +470,7 @@ TEST_P(VBucketDurabilityTest, PendingSkippedAtEjectionAndCommit) {
462470
ASSERT_EQ(ENGINE_EINVAL, swCompleteTrace.status);
463471

464472
// Simulate replica and active seqno-ack
465-
vbucket->seqnoAcknowledged(replica, preparedSeqno);
473+
vbucket->seqnoAcknowledged(replica1, preparedSeqno);
466474
simulateLocalAck(preparedSeqno);
467475

468476
// Commit notified
@@ -533,8 +541,11 @@ TEST_P(VBucketDurabilityTest, NonPendingKeyAtAbort) {
533541
* 2) a queue_op::abort_sync_write item is enqueued into the CheckpointManager
534542
* 3) the abort_sync_write is not added to the DurabilityMonitor
535543
*/
536-
TEST_P(VBucketDurabilityTest, AbortSyncWrite_Active) {
537-
ASSERT_EQ(1, storeSyncWrites({1} /*seqnos*/));
544+
TEST_P(VBucketDurabilityTest, Active_AbortSyncWrite) {
545+
storeSyncWrites({1} /*seqno*/);
546+
ASSERT_EQ(1,
547+
VBucketTestIntrospector::public_getActiveDM(*vbucket)
548+
.getNumTracked());
538549

539550
auto key = makeStoredDocKey("key1");
540551

@@ -574,7 +585,8 @@ TEST_P(VBucketDurabilityTest, AbortSyncWrite_Active) {
574585
EXPECT_EQ("value", (*it)->getValue()->to_s());
575586

576587
// The Pending is tracked by the DurabilityMonitor
577-
EXPECT_EQ(1, monitor->getNumTracked());
588+
const auto& monitor = VBucketTestIntrospector::public_getActiveDM(*vbucket);
589+
EXPECT_EQ(1, monitor.getNumTracked());
578590

579591
// Note: ensure 1 Ckpt in CM, easier to inspect the CkptList after Commit
580592
ckptMgr->clear(*vbucket, 0 /*seqno*/);
@@ -623,7 +635,7 @@ TEST_P(VBucketDurabilityTest, AbortSyncWrite_Active) {
623635
// The Aborted item is not added for tracking.
624636
// Note: The Pending has not been removed as we are testing at VBucket
625637
// level, so num-tracked must be still 1.
626-
EXPECT_EQ(1, monitor->getNumTracked());
638+
EXPECT_EQ(1, monitor.getNumTracked());
627639
}
628640

629641
/*
@@ -646,7 +658,7 @@ TEST_P(VBucketDurabilityTest, AbortSyncWrite_Active) {
646658
* succeeds at fix. That is because the synchronization changes necessary
647659
* at fix invalidate the test.
648660
*/
649-
TEST_P(VBucketDurabilityTest, ParallelSet) {
661+
TEST_P(VBucketDurabilityTest, Active_ParallelSet) {
650662
const auto numThreads = 4;
651663
ThreadGate tg(numThreads);
652664
const auto threadLoad = 1000;
@@ -685,3 +697,12 @@ TEST_P(VBucketDurabilityTest, ParallelSet) {
685697
t.join();
686698
}
687699
}
700+
701+
TEST_P(VBucketDurabilityTest, Replica_AddPrepare) {
702+
vbucket->setState(vbucket_state_replica);
703+
const auto& monitor =
704+
VBucketTestIntrospector::public_getPassiveDM(*vbucket);
705+
ASSERT_EQ(0, monitor.getNumTracked());
706+
testAddPrepare({1, 2, 3} /*seqnos*/);
707+
ASSERT_EQ(3, monitor.getNumTracked());
708+
}

engines/ep/tests/module_tests/vbucket_durability_test.h

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,8 @@ class VBucketDurabilityTest
5151
* Store the given Sync mutations into VBucket
5252
*
5353
* @param writes the mutations to be added
54-
* @return the number of stored SyncWrites
5554
*/
56-
size_t storeSyncWrites(const std::vector<SyncWriteSpec>& writes);
55+
void storeSyncWrites(const std::vector<SyncWriteSpec>& writes);
5756

5857
/**
5958
* Simulate the local (active) seqno acknowledgement.
@@ -62,6 +61,15 @@ class VBucketDurabilityTest
6261
*/
6362
void simulateLocalAck(uint64_t seqno);
6463

64+
/**
65+
* Tests:
66+
* 1) mutations added to VBucket
67+
* 2) mutations in state "pending" in both HashTable and CheckpointManager
68+
*
69+
* @param writes the set of mutations to test
70+
*/
71+
void testAddPrepare(const std::vector<SyncWriteSpec>& writes);
72+
6573
/**
6674
* Tests the baseline progress of a set of SyncWrites in Vbucket:
6775
* 1) mutations added to VBucket
@@ -71,13 +79,14 @@ class VBucketDurabilityTest
7179
*
7280
* @param writes the set of mutations to test
7381
*/
74-
void testSyncWrites(const std::vector<SyncWriteSpec>& writes);
82+
void testAddPrepareAndCommit(const std::vector<SyncWriteSpec>& writes);
7583

7684
// All owned by VBucket
7785
HashTable* ht;
7886
MockCheckpointManager* ckptMgr;
79-
ActiveDurabilityMonitor* monitor;
8087

8188
const std::string active = "active";
82-
const std::string replica = "replica";
89+
const std::string replica1 = "replica1";
90+
const std::string replica2 = "replica2";
91+
const std::string replica3 = "replica3";
8392
};

engines/ep/tests/module_tests/vbucket_test.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
#include <platform/cb_malloc.h>
3232

3333
VBucketTestBase::VBucketTestBase(item_eviction_policy_t eviction_policy) {
34+
// Used for mem-checks at Replica VBuckets. Default=0 prevents any
35+
// processSet, returns NoMem. I set a production-like value.
36+
global_stats.replicationThrottleThreshold = 0.9;
3437
vbucket.reset(new EPVBucket(Vbid(0),
3538
vbucket_state_active,
3639
global_stats,

0 commit comments

Comments
 (0)