Skip to content

Commit ee1baad

Browse files
paolococchidaverigby
authored andcommitted
MB-33860 [SR]: Implement High Prepared Seqno (HPS) logic in PassiveDM
The HPS represents the last locally-satisfied Prepare on a node. As per In-Order Ack/Commit, the HPS is updated within the "durability-fence" constraint. See DesignDoc for details. Practically, an HPS update is triggered in response of one of the following events: 1) A new Prepare is queued into the PassiveDM. The new Prepare may be locally-satisfied immediatelly. 2) The Flusher has persisted some Prepare. That may "move" the durability-fence onward and satisfy a number of pending Prepares. In follow-up patches, the HPS will be the quantity that Replica nodes ack back to the Active. Change-Id: I95b31c1cb15f929d20fcc2c67e23debd9755e7f8 Reviewed-on: http://review.couchbase.org/108027 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Dave Rigby <[email protected]>
1 parent bc7499c commit ee1baad

File tree

7 files changed

+409
-100
lines changed

7 files changed

+409
-100
lines changed

engines/ep/src/durability/active_durability_monitor.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,7 @@ class ActiveDurabilityMonitor : public DurabilityMonitor {
9494
*/
9595
void processTimeout(std::chrono::steady_clock::time_point asOf);
9696

97-
/**
98-
* Advances the local disk-tracking to the last persisted seqno for VBucket.
99-
* Expected to be called by the Flusher.
100-
*
101-
* @throw std::logic_error if the replication-chain is not set
102-
*/
103-
void notifyLocalPersistence();
97+
void notifyLocalPersistence() override;
10498

10599
/**
106100
* Output DurabiltyMonitor stats.

engines/ep/src/durability/durability_monitor.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#pragma once
1818

1919
#include "memcached/engine_common.h"
20+
#include "monotonic.h"
2021
#include <list>
2122

2223
/*
@@ -43,6 +44,13 @@ class DurabilityMonitor {
4344
*/
4445
virtual size_t getNumTracked() const = 0;
4546

47+
/**
48+
* Inform the DurabilityMonitor that the Flusher has run.
49+
* Expected to be called by the Flusher after a flush-batch (that contains
50+
* pending Prepares) has been committed to the storage.
51+
*/
52+
virtual void notifyLocalPersistence() = 0;
53+
4654
protected:
4755
class SyncWrite;
4856
struct ReplicationChain;
@@ -57,3 +65,34 @@ class DurabilityMonitor {
5765

5866
friend std::ostream& operator<<(std::ostream&, const SyncWrite&);
5967
};
68+
69+
/**
70+
* Represents the tracked state of a node in topology.
71+
* Note that the lifetime of a Position is determined by the logic in
72+
* DurabilityMonitor.
73+
*
74+
* - it: Iterator that points to a position in the Container of tracked
75+
* SyncWrites. This is an optimization: logically it points always
76+
* to the last SyncWrite acknowledged by the tracked node, so that we can
77+
* avoid any O(N) scan when updating the node state at seqno-ack received.
78+
* It may point to Container::end (e.g, when the pointed SyncWrite is the
79+
* last element in Container and it is removed).
80+
*
81+
* - lastWriteSeqno: Stores always the seqno of the last SyncWrite
82+
* acknowledged by the tracked node, even when Position::it points
83+
* to Container::end. Used for validation at seqno-ack received and stats.
84+
*
85+
* - lastAckSeqno: Stores always the last seqno acknowledged by the tracked
86+
* node. Used for validation at seqno-ack received and stats.
87+
*/
88+
struct DurabilityMonitor::Position {
89+
Position() = default;
90+
Position(const Container::iterator& it) : it(it) {
91+
}
92+
Container::iterator it;
93+
// @todo: Consider using (strictly) Monotonic here. Weakly monotonic was
94+
// necessary when we tracked both memory and disk seqnos.
95+
// Now a Replica is not supposed to ack the same seqno twice.
96+
WeaklyMonotonic<int64_t, ThrowExceptionPolicy> lastWriteSeqno{0};
97+
WeaklyMonotonic<int64_t, ThrowExceptionPolicy> lastAckSeqno{0};
98+
};

engines/ep/src/durability/durability_monitor_impl.h

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424

2525
#include "durability_monitor.h"
2626
#include "item.h"
27-
#include "monotonic.h"
2827

2928
#include <chrono>
3029
#include <unordered_map>
@@ -113,40 +112,6 @@ class DurabilityMonitor::SyncWrite {
113112
friend std::ostream& operator<<(std::ostream&, const SyncWrite&);
114113
};
115114

116-
// @todo: remove?
117-
// friend std::ostream& operator<<(std::ostream&, const
118-
// DurabilityMonitor::SyncWrite&);
119-
120-
/**
121-
* Represents the tracked state of a node in topology.
122-
* Note that the lifetime of a Position is determined by the logic in
123-
* DurabilityMonitor.
124-
*
125-
* - it: Iterator that points to a position in the Container of tracked
126-
* SyncWrites. This is an optimization: logically it points always
127-
* to the last SyncWrite acknowledged by the tracked node, so that we can
128-
* avoid any O(N) scan when updating the node state at seqno-ack received.
129-
* It may point to Container::end (e.g, when the pointed SyncWrite is the
130-
* last element in Container and it is removed).
131-
*
132-
* - lastWriteSeqno: Stores always the seqno of the last SyncWrite
133-
* acknowledged by the tracked node, even when Position::it points
134-
* to Container::end. Used for validation at seqno-ack received and stats.
135-
*
136-
* - lastAckSeqno: Stores always the last seqno acknowledged by the tracked
137-
* node. Used for validation at seqno-ack received and stats.
138-
*/
139-
struct DurabilityMonitor::Position {
140-
Position(const Container::iterator& it) : it(it) {
141-
}
142-
Container::iterator it;
143-
// @todo: Consider using (strictly) Monotonic here. Weakly monotonic was
144-
// necessary when we tracked both memory and disk seqnos.
145-
// Now a Replica is not supposed to ack the same seqno twice.
146-
WeaklyMonotonic<int64_t, ThrowExceptionPolicy> lastWriteSeqno{0};
147-
WeaklyMonotonic<int64_t, ThrowExceptionPolicy> lastAckSeqno{0};
148-
};
149-
150115
/**
151116
* Represents a VBucket Replication Chain in the ns_server meaning,
152117
* i.e. a list of active/replica nodes where the VBucket resides.

engines/ep/src/durability/passive_durability_monitor.cc

Lines changed: 70 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@
2727
#include <gsl.h>
2828
#include <unordered_map>
2929

30-
PassiveDurabilityMonitor::PassiveDurabilityMonitor(VBucket& vb) : vb(vb) {
30+
PassiveDurabilityMonitor::PassiveDurabilityMonitor(VBucket& vb)
31+
: vb(vb), state(*this) {
32+
// By design, instances of Container::Position can never be invalid
33+
auto s = state.wlock();
34+
s->highPreparedSeqno = Position(s->trackedWrites.end());
3135
}
3236

3337
PassiveDurabilityMonitor::~PassiveDurabilityMonitor() = default;
@@ -53,8 +57,7 @@ void PassiveDurabilityMonitor::addStats(const AddStatFn& addStat,
5357
}
5458

5559
int64_t PassiveDurabilityMonitor::getHighPreparedSeqno() const {
56-
// @todo-durability: return a correct value for this.
57-
return 0;
60+
return state.rlock()->highPreparedSeqno.lastWriteSeqno;
5861
}
5962

6063
void PassiveDurabilityMonitor::addSyncWrite(queued_item item) {
@@ -65,20 +68,79 @@ void PassiveDurabilityMonitor::addSyncWrite(queued_item item) {
6568
"PassiveDurabilityMonitor::addSyncWrite: Level::None");
6669
}
6770

68-
state.wlock()->trackedWrites.emplace_back(
71+
auto s = state.wlock();
72+
s->trackedWrites.emplace_back(
6973
nullptr /*cookie*/, std::move(item), nullptr /*firstChain*/);
7074

71-
/*
72-
* @todo: Missing step - we may be able to increase the high_prepared_seqno
73-
* and ack back back to the Active.
74-
*/
75+
// Maybe the new tracked Prepare is already satisfied and could be ack'ed
76+
// back to the Active.
77+
s->updateHighPreparedSeqno();
78+
79+
// @todo: Send SeqnoAck
7580
}
7681

7782
size_t PassiveDurabilityMonitor::getNumTracked() const {
7883
return state.rlock()->trackedWrites.size();
7984
}
8085

86+
void PassiveDurabilityMonitor::notifyLocalPersistence() {
87+
state.wlock()->updateHighPreparedSeqno();
88+
89+
// @todo: Send SeqnoAck
90+
}
91+
8192
void PassiveDurabilityMonitor::toOStream(std::ostream& os) const {
8293
os << "PassiveDurabilityMonitor[" << this << "]"
8394
<< " high_prepared_seqno:" << getHighPreparedSeqno();
8495
}
96+
97+
DurabilityMonitor::Container::iterator
98+
PassiveDurabilityMonitor::State::getIteratorNext(
99+
const Container::iterator& it) {
100+
// Note: Container::end could be the new position when the pointed SyncWrite
101+
// is removed from Container and the iterator repositioned.
102+
// In that case next=Container::begin
103+
return (it == trackedWrites.end()) ? trackedWrites.begin() : std::next(it);
104+
}
105+
106+
void PassiveDurabilityMonitor::State::updateHighPreparedSeqno() {
107+
if (trackedWrites.empty()) {
108+
return;
109+
}
110+
111+
const auto updateHPS = [this](Container::iterator next) -> void {
112+
// Note: Update last-write-seqno first to enforce monotonicity and
113+
// avoid any state-change if monotonicity checks fail
114+
highPreparedSeqno.lastWriteSeqno = next->getBySeqno();
115+
highPreparedSeqno.it = next;
116+
};
117+
118+
Container::iterator next;
119+
// First, blindly move HPS up to the given persistedSeqno. Note that
120+
// here we don't need to check any Durability Level: persistence makes
121+
// locally-satisfied all the pending Prepares up to persistedSeqno.
122+
while ((next = getIteratorNext(highPreparedSeqno.it)) !=
123+
trackedWrites.end() &&
124+
static_cast<uint64_t>(next->getBySeqno()) <=
125+
pdm.vb.getPersistenceSeqno()) {
126+
updateHPS(next);
127+
}
128+
129+
// Then, move the HPS to the last Prepare with Level != PersistToMajority.
130+
// I.e., all the Majority and MajorityAndPersistToMaster Prepares that were
131+
// blocked by non-satisfied PersistToMajority Prepares are implicitly
132+
// satisfied now. The first non-satisfied Prepare is the first
133+
// Level:PersistToMajority not covered by persistedSeqno.
134+
while ((next = getIteratorNext(highPreparedSeqno.it)) !=
135+
trackedWrites.end()) {
136+
const auto level = next->getDurabilityReqs().getLevel();
137+
Expects(level != cb::durability::Level::None);
138+
// Note: We are in the PassiveDM. The first Level::PersistToMajority
139+
// SyncWrite is our durability-fence.
140+
if (level == cb::durability::Level::PersistToMajority) {
141+
break;
142+
}
143+
144+
updateHPS(next);
145+
}
146+
}

engines/ep/src/durability/passive_durability_monitor.h

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
#include <folly/Synchronized.h>
2323

24+
#include <queue>
25+
2426
class VBucket;
2527

2628
/*
@@ -50,6 +52,8 @@ class PassiveDurabilityMonitor : public DurabilityMonitor {
5052

5153
size_t getNumTracked() const override;
5254

55+
void notifyLocalPersistence() override;
56+
5357
protected:
5458
void toOStream(std::ostream& os) const override;
5559

@@ -63,8 +67,56 @@ class PassiveDurabilityMonitor : public DurabilityMonitor {
6367
* them on the PDM::State public interface.
6468
*/
6569
struct State {
70+
/**
71+
* @param pdm The owning PassiveDurabilityMonitor
72+
*/
73+
State(const PassiveDurabilityMonitor& pdm) : pdm(pdm) {
74+
}
75+
76+
/**
77+
* Returns the next position for a given Container::iterator.
78+
*
79+
* @param it The iterator
80+
* @return the next position in Container
81+
*/
82+
Container::iterator getIteratorNext(const Container::iterator& it);
83+
84+
/**
85+
* Logically 'moves' forward the High Prepared Seqno to the last
86+
* locally-satisfied Prepare. In other terms, the function moves the HPS
87+
* to before the current durability-fence.
88+
*
89+
* Details.
90+
*
91+
* In terms of Durability Requirements, Prepares at Replica can be
92+
* locally-satisfied:
93+
* (1) as soon as the they are queued into the PDM, if Level Majority or
94+
* MajorityAndPersistOnMaster
95+
* (2) when they are persisted, if Level PersistToMajority
96+
*
97+
* We call the first non-satisfied PersistToMajority Prepare the
98+
* "durability-fence". All Prepares /before/ the durability-fence are
99+
* locally-satisfied and can be ack'ed back to the Active.
100+
*
101+
* This functions's internal logic performs (2) first by moving the HPS
102+
* up to the latest persisted Prepare (i.e., the durability-fence) and
103+
* then (1) by moving to the HPS to the last Prepare /before/ the new
104+
* durability-fence (note that after step (2) the durability-fence has
105+
* implicitly moved as well).
106+
*/
107+
void updateHighPreparedSeqno();
108+
66109
/// The container of pending Prepares.
67110
Container trackedWrites;
111+
112+
// The seqno of the last Prepare satisfied locally. I.e.:
113+
// - the Prepare has been queued into the PDM, if Level Majority
114+
// or MajorityAndPersistToMaster
115+
// - the Prepare has been persisted locally, if Level
116+
// PersistToMajority
117+
Position highPreparedSeqno;
118+
119+
const PassiveDurabilityMonitor& pdm;
68120
};
69121

70122
// The VBucket owning this DurabilityMonitor instance

0 commit comments

Comments
 (0)