Skip to content

Commit a489e4a

Browse files
paolococchidaverigby
authored andcommitted
MB-33804 [SR]: Track pending Prepares in PassiveDurabilityMonitor
At DCP_PREPARE received, the PDM must track the new pending Prepare. That is necessary for: - implementing the in-order ACK logic - takeover at Replica->Active vbstate transition Change-Id: I99f892b1328ef2fb6c5ad0d4f1f296c2a6045bf0 Reviewed-on: http://review.couchbase.org/107900 Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent ca40cdb commit a489e4a

File tree

10 files changed

+222
-106
lines changed

10 files changed

+222
-106
lines changed

engines/ep/src/durability/active_durability_monitor.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ void ActiveDurabilityMonitor::addSyncWrite(const void* cookie,
113113
"ActiveDurabilityMonitor::addSyncWrite: Impossible");
114114
}
115115

116-
state.wlock()->addSyncWrite(cookie, item);
116+
state.wlock()->addSyncWrite(cookie, std::move(item));
117117

118118
// @todo: Missing step - check for satisfied SyncWrite, we may need to
119119
// commit immediately in the no-replica scenario. Consider to do that in
@@ -486,9 +486,10 @@ void ActiveDurabilityMonitor::State::setReplicationTopology(
486486
}
487487

488488
void ActiveDurabilityMonitor::State::addSyncWrite(const void* cookie,
489-
const queued_item& item) {
490-
trackedWrites.emplace_back(cookie, item, *firstChain);
491-
lastTrackedSeqno = item->getBySeqno();
489+
queued_item item) {
490+
const auto seqno = item->getBySeqno();
491+
trackedWrites.emplace_back(cookie, std::move(item), firstChain.get());
492+
lastTrackedSeqno = seqno;
492493
}
493494

494495
void ActiveDurabilityMonitor::State::removeExpired(

engines/ep/src/durability/active_durability_monitor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ class ActiveDurabilityMonitor : public DurabilityMonitor {
191191
struct State {
192192
void setReplicationTopology(const nlohmann::json& topology);
193193

194-
void addSyncWrite(const void* cookie, const queued_item& item);
194+
void addSyncWrite(const void* cookie, queued_item item);
195195

196196
/**
197197
* Returns the next position for a node iterator.

engines/ep/src/durability/durability_monitor_impl.cc

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,27 @@
2020

2121
DurabilityMonitor::SyncWrite::SyncWrite(const void* cookie,
2222
queued_item item,
23-
const ReplicationChain& chain)
23+
const ReplicationChain* chain)
2424
: cookie(cookie),
2525
item(item),
26-
majority(chain.majority),
2726
expiryTime(
2827
item->getDurabilityReqs().getTimeout()
2928
? std::chrono::steady_clock::now() +
3029
std::chrono::milliseconds(
3130
item->getDurabilityReqs().getTimeout())
3231
: boost::optional<
33-
std::chrono::steady_clock::time_point>{}),
34-
active(chain.active) {
35-
// We are making a SyncWrite for tracking, we must have already ensured
36-
// that the Durability Requirements can be met at this point.
37-
Expects(chain.size() >= majority);
38-
for (const auto& entry : chain.positions) {
39-
acks[entry.first] = false;
32+
std::chrono::steady_clock::time_point>{}) {
33+
if (chain) {
34+
majority = chain->majority;
35+
active = chain->active;
36+
37+
// We are making a SyncWrite for tracking, we must have already ensured
38+
// that the Durability Requirements can be met at this point.
39+
Expects(chain->size() >= majority);
40+
41+
for (const auto& entry : chain->positions) {
42+
acks[entry.first] = false;
43+
}
4044
}
4145
}
4246

engines/ep/src/durability/durability_monitor_impl.h

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,21 @@
3434
static const std::string UndefinedNode{};
3535

3636
/**
37-
* Represents a tracked SyncWrite.
37+
* Represents a tracked SyncWrite. It is mainly a wrapper around a pending
38+
* Prepare item.
3839
*/
3940
class DurabilityMonitor::SyncWrite {
4041
public:
42+
/**
43+
* @param (optional) cookie The cookie representing the client connection.
44+
* Necessary at Active for notifying the client at SyncWrite completion.
45+
* @param item The pending Prepare being wrapped
46+
* @param (optional) chain The repl-chain that the write is tracked against.
47+
* Necessary at Active for verifying the SW Durability Requirements.
48+
*/
4149
SyncWrite(const void* cookie,
4250
queued_item item,
43-
const ReplicationChain& chain);
51+
const ReplicationChain* chain);
4452

4553
const StoredDocKey& getKey() const;
4654

@@ -89,19 +97,19 @@ class DurabilityMonitor::SyncWrite {
8997

9098
// This optimization eliminates the need of scanning the ACK map for
9199
// verifying Durability Requirements
92-
Monotonic<uint8_t> ackCount;
100+
Monotonic<uint8_t> ackCount{0};
93101

94102
// Majority in the arithmetic definition: num-nodes / 2 + 1
95-
const uint8_t majority;
103+
uint8_t majority{0};
104+
105+
// Name of the active node in replication-chain. Used at Durability
106+
// Requirements verification.
107+
std::string active{UndefinedNode};
96108

97109
// Used for enforcing the Durability Requirements Timeout. It is set
98110
// when this SyncWrite is added for tracking into the DurabilityMonitor.
99111
const boost::optional<std::chrono::steady_clock::time_point> expiryTime;
100112

101-
// Name of the active node in replication-chain. Used at Durability
102-
// Requirements verification.
103-
const std::string active;
104-
105113
friend std::ostream& operator<<(std::ostream&, const SyncWrite&);
106114
};
107115

engines/ep/src/durability/passive_durability_monitor.cc

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717

1818
#include "passive_durability_monitor.h"
19+
#include "durability_monitor_impl.h"
1920

2021
#include "bucket_logger.h"
2122
#include "item.h"
@@ -56,9 +57,25 @@ int64_t PassiveDurabilityMonitor::getHighPreparedSeqno() const {
5657
return 0;
5758
}
5859

60+
void PassiveDurabilityMonitor::addSyncWrite(queued_item item) {
61+
auto durReq = item->getDurabilityReqs();
62+
63+
if (durReq.getLevel() == cb::durability::Level::None) {
64+
throw std::invalid_argument(
65+
"PassiveDurabilityMonitor::addSyncWrite: Level::None");
66+
}
67+
68+
state.wlock()->trackedWrites.emplace_back(
69+
nullptr /*cookie*/, std::move(item), nullptr /*firstChain*/);
70+
71+
/*
72+
* @todo: Missing step - we may be able to increase the high_prepared_seqno
73+
* and ack back back to the Active.
74+
*/
75+
}
76+
5977
size_t PassiveDurabilityMonitor::getNumTracked() const {
60-
// @todo-durability: return a correct value for this.
61-
return 0;
78+
return state.rlock()->trackedWrites.size();
6279
}
6380

6481
void PassiveDurabilityMonitor::toOStream(std::ostream& os) const {

engines/ep/src/durability/passive_durability_monitor.h

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
#pragma once
1818

1919
#include "durability_monitor.h"
20+
#include "item.h"
21+
22+
#include <folly/Synchronized.h>
2023

2124
class VBucket;
2225

@@ -38,11 +41,34 @@ class PassiveDurabilityMonitor : public DurabilityMonitor {
3841

3942
int64_t getHighPreparedSeqno() const override;
4043

41-
protected:
44+
/**
45+
* Add a pending Prepare for tracking into the PDM.
46+
*
47+
* @param item the queued_item
48+
*/
49+
void addSyncWrite(queued_item item);
50+
4251
size_t getNumTracked() const override;
4352

53+
protected:
4454
void toOStream(std::ostream& os) const override;
4555

56+
/*
57+
* This class embeds the state of a PDM. It has been designed for being
58+
* wrapped by a folly::Synchronized<T>, which manages the read/write
59+
* concurrent access to the T instance.
60+
* Note: all members are public as accessed directly only by PDM, this is
61+
* a protected struct. Avoiding direct access by PDM would require
62+
* re-implementing most of the PDM functions into PDM::State and exposing
63+
* them on the PDM::State public interface.
64+
*/
65+
struct State {
66+
/// The container of pending Prepares.
67+
Container trackedWrites;
68+
};
69+
4670
// The VBucket owning this DurabilityMonitor instance
4771
VBucket& vb;
72+
73+
folly::Synchronized<State> state;
4874
};

engines/ep/src/kv_bucket.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -897,10 +897,9 @@ ENGINE_ERROR_CODE KVBucket::setVBucketState_UNLOCKED(
897897
// For now necessary at least for tests.
898898
// Durability: Re-set vb-state for applying the ReplicationChain
899899
// encoded in 'meta'. This is for supporting the case where
900-
// ns_server issues a single set-vb-state call for creating an
901-
// Active VBucket.
900+
// ns_server issues a single set-vb-state call for creating a VB.
902901
// Note: Must be done /after/ the new VBucket has been added to vbMap.
903-
if (to == vbucket_state_active) {
902+
if (to == vbucket_state_active || to == vbucket_state_replica) {
904903
vbMap.setState(newvb, to, meta, vbStateLock);
905904
}
906905

engines/ep/src/vbucket.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2185,10 +2185,12 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
21852185

21862186
static double mutationMemThreshold;
21872187

2188-
friend class VBucketTestBase;
2189-
friend class VBucketDurabilityTest;
2188+
friend class ActiveDurabilityMonitorTest;
21902189
friend class DurabilityMonitorTest;
2190+
friend class PassiveDurabilityMonitorTest;
21912191
friend class SingleThreadedActiveStreamTest;
2192+
friend class VBucketTestBase;
2193+
friend class VBucketDurabilityTest;
21922194

21932195
DISALLOW_COPY_AND_ASSIGN(VBucket);
21942196
};

0 commit comments

Comments
 (0)