Skip to content

Commit 89fa2bf

Browse files
committed
MB-39815: Add event-driven SyncWrite timeout handling
+Summary+ Adds a new event-driven mode for aborting SyncWrites which have exceeded their durability timeout. This has a much lower idle overhead compared to the current polling method. The default mode remains "polling", subsequent patch will change the default to "event-driven". +Background+ When SyncWrites were introduced in 6.5.0, each SyncWrite requst has a timeout associated with it - if the SyncWrite cannot be completed (Committed or Aborted) within that time, then abort it and inform the client that it was not successful. This was implemented in simple (naive?) polling - have a per-Bucket NonIO task which is scheduled to run every 25ms (by default), and when it runs check every vBucket for any pending SyncWrites which have now exceeded their timeout. Functionally this works fine, however it is relatively expensive - every 25ms we must iterate across every vBucket on every Bucket, and call into the DurabiltyMonitor to check for SyncWrites which should be timed out. This is the case irrespective of if there are any SyncWrites which are overdue; or even if there are any SyncWrites at all. For example, an idle node with 10 Buckets shows 35% CPU utilization - the vast majority of which is in NonIO threads running the DurabilityTimeoutTask. This is obviously undesirable - and the issue scales with even larger bucket counts. +Solution+ To reduce the idle CPU usage, change from a polling to an event-driven model - have a per-vBucket task which is scheduled to run only when the next SyncWrite for that vBucket is due to timeout. We only need 1 task per vBucket (and not 1 per SyncWrite) because SyncWrites (within a vBucket) must always complete in-order; therefore we only need to consider the timeout of the oldest SyncWrite in the ActiveDurabiltyMonitor for a given vBucket. This task will only be executed _if_ the next SyncWrite isn't otherwise Committed before the timeout - when the SyncWrite is Commited the task will be re-scheduled to run when the _next_ SyncWrite is due - or cancelled if there are no more SyncWrites in progress (for the vBucket). As such, the CPU cost for SyncWrite timeout handling when the Bucket is idle goes to zero - nothing is executed. There are some additional costs with the event-driven approach: 1. Additional CPU cost whenever the ActiveDM::trackedWrites container changes (specifically when the head changes), as we must reschedule or cancel the new per-vBucket task. However that is less than 1 microsecond with the default FollyExecutorPool; so likely dwarfed by the other activity around adding / Committing SyncWrites. 2. Additional memory footprint for 1024 Tasks instead of 1 (per Bucket). Note that this is relatively insignificant - each ExpiredSWCallback task is 96 Bytes, so we have only increased each Bucket by at-most 96KB (only active vBuckets have a ExpiredSWCallback). Change-Id: Ia70a68f4d1551a3407c8bdbb56e91eb5f5f995e2 Reviewed-on: http://review.couchbase.org/c/kv_engine/+/130419 Reviewed-by: Ben Huddleston <[email protected]> Reviewed-by: Paolo Cocchi <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 5cf6c45 commit 89fa2bf

31 files changed

+421
-75
lines changed

engines/ep/benchmarks/defragmenter_bench.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class DefragmentBench : public benchmark::Fixture {
5555
/*newSeqnoCb*/ nullptr,
5656
[](Vbid) { return; },
5757
NoopSyncWriteCompleteCb,
58+
NoopSyncWriteTimeoutFactory,
5859
NoopSeqnoAckCb,
5960
ImmediateCkptDisposer,
6061
config,

engines/ep/benchmarks/item_compressor_bench.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class ItemCompressorBench : public benchmark::Fixture {
5555
/*newSeqnoCb*/ nullptr,
5656
[](Vbid) { return; },
5757
NoopSyncWriteCompleteCb,
58+
NoopSyncWriteTimeoutFactory,
5859
NoopSeqnoAckCb,
5960
ImmediateCkptDisposer,
6061
config,

engines/ep/configuration.json

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,11 +467,27 @@
467467
"dynamic": true,
468468
"type": "size_t"
469469
},
470+
"durability_timeout_mode": {
471+
"default": "polling",
472+
"descr": "How should durability timeouts be scheduled? polling=periodic task running every 'durability_timeout_task_interval'; event-driven=per-VBucket tasks scheduled based on when next SyncWrite will time out.",
473+
"dynamic": false,
474+
"type": "std::string",
475+
"validator": {
476+
"enum": [
477+
"polling",
478+
"event-driven"
479+
]
480+
}
481+
},
470482
"durability_timeout_task_interval": {
471483
"default": "25",
472484
"descr": "Interval (in ms) between subsequent runs of the DurabilityTimeoutTask",
473485
"dynamic": true,
474-
"type": "size_t"
486+
"type": "size_t",
487+
"requires": {
488+
"durability_timeout_mode": "polling"
489+
}
490+
475491
},
476492
"durability_min_level": {
477493
"default": "none",

engines/ep/src/durability/active_durability_monitor.cc

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -143,19 +143,23 @@ class ActiveDurabilityMonitor::ResolvedQueue {
143143
}
144144
};
145145

146-
ActiveDurabilityMonitor::ActiveDurabilityMonitor(EPStats& stats, VBucket& vb)
146+
ActiveDurabilityMonitor::ActiveDurabilityMonitor(
147+
EPStats& stats,
148+
VBucket& vb,
149+
std::unique_ptr<EventDrivenDurabilityTimeoutIface> nextExpiryChanged)
147150
: stats(stats),
148151
vb(vb),
149-
state(std::make_unique<State>(*this)),
152+
state(std::make_unique<State>(*this, std::move(nextExpiryChanged))),
150153
resolvedQueue(std::make_unique<ResolvedQueue>(vb.getId())) {
151154
}
152155

153156
ActiveDurabilityMonitor::ActiveDurabilityMonitor(
154157
EPStats& stats,
155158
VBucket& vb,
156159
const vbucket_state& vbs,
160+
std::unique_ptr<EventDrivenDurabilityTimeoutIface> nextExpiryChanged,
157161
std::vector<queued_item>&& outstandingPrepares)
158-
: ActiveDurabilityMonitor(stats, vb) {
162+
: ActiveDurabilityMonitor(stats, vb, std::move(nextExpiryChanged)) {
159163
if (!vbs.transition.replicationTopology.is_null()) {
160164
setReplicationTopology(vbs.transition.replicationTopology);
161165
}
@@ -165,6 +169,8 @@ ActiveDurabilityMonitor::ActiveDurabilityMonitor(
165169
// Any outstanding prepares "grandfathered" into the DM from warmup
166170
// should have an infinite timeout (we cannot abort them as they
167171
// may already have been Committed before we restarted).
172+
// (This also means there's no need to consider scheduling the
173+
// timeout callback).
168174
Expects(prepare->getDurabilityReqs().getTimeout().isInfinite());
169175
s->trackedWrites.emplace_back(nullptr,
170176
std::move(prepare),
@@ -183,9 +189,11 @@ ActiveDurabilityMonitor::ActiveDurabilityMonitor(
183189
s->highCompletedSeqno.reset(vbs.persistedCompletedSeqno);
184190
}
185191

186-
ActiveDurabilityMonitor::ActiveDurabilityMonitor(EPStats& stats,
187-
PassiveDurabilityMonitor&& pdm)
188-
: ActiveDurabilityMonitor(stats, pdm.vb) {
192+
ActiveDurabilityMonitor::ActiveDurabilityMonitor(
193+
EPStats& stats,
194+
PassiveDurabilityMonitor&& pdm,
195+
std::unique_ptr<EventDrivenDurabilityTimeoutIface> nextExpiryChanged)
196+
: ActiveDurabilityMonitor(stats, pdm.vb, std::move(nextExpiryChanged)) {
189197
EP_LOG_INFO(
190198
"ActiveDurabilityMonitor::ctor(PDM&&): {} Transitioning from "
191199
"PDM: HPS:{}, HCS:{}, numTracked:{}, highestTracked:{}",
@@ -197,7 +205,13 @@ ActiveDurabilityMonitor::ActiveDurabilityMonitor(EPStats& stats,
197205

198206
auto s = state.wlock();
199207
for (auto& write : pdm.state.wlock()->trackedWrites) {
208+
// Any prepares converted from the PDM into the ADM have an infinite
209+
// timeout set (we cannot abort them as they may already have been
210+
// Committed when we were non-active.
211+
// This also means there's no need to consider scheduling the timeout
212+
// callback here.
200213
s->trackedWrites.emplace_back(std::move(write));
214+
Expects(!s->trackedWrites.back().getExpiryTime());
201215
}
202216

203217
if (!s->trackedWrites.empty()) {
@@ -344,7 +358,7 @@ void ActiveDurabilityMonitor::processTimeout(
344358
}
345359

346360
// Identify SyncWrites which can be timed out as of this time point
347-
// and should be aborted, transferring them into the completedQeuue (under
361+
// and should be aborted, transferring them into the completedQueue (under
348362
// the correct locks).
349363
state.wlock()->removeExpired(asOf, *resolvedQueue);
350364

@@ -559,8 +573,10 @@ void ActiveDurabilityMonitor::removedQueuedAck(const std::string& node) {
559573
state.wlock()->queuedSeqnoAcks.erase(node);
560574
}
561575

562-
ActiveDurabilityMonitor::State::State(const ActiveDurabilityMonitor& adm)
563-
: adm(adm) {
576+
ActiveDurabilityMonitor::State::State(
577+
ActiveDurabilityMonitor& adm,
578+
std::unique_ptr<EventDrivenDurabilityTimeoutIface> nextExpiryChanged)
579+
: adm(adm), nextExpiryChanged(std::move(nextExpiryChanged)) {
564580
const auto prefix = "ActiveDM(" + adm.vb.getId().to_string() + ")::State::";
565581
lastTrackedSeqno.setLabel(prefix + "lastTrackedSeqno");
566582
lastCommittedSeqno.setLabel(prefix + "lastCommittedSeqno");
@@ -804,11 +820,15 @@ ActiveDurabilityMonitor::State::removeSyncWrite(Container::iterator it,
804820
// the SyncWrite from trackedWrites.
805821
it->resetChains();
806822

823+
// If we are removing the first element then (a) the "previous" item is
824+
// different, and (b) we need to re-schedule the SyncWrite timeout task.
825+
const bool removingFirstElement = it == trackedWrites.begin();
826+
807827
Container::iterator prev;
808828
// Note: iterators in trackedWrites are never singular, Container::end
809829
// is used as placeholder element for when an iterator cannot point to
810830
// any valid element in Container
811-
if (it == trackedWrites.begin()) {
831+
if (removingFirstElement) {
812832
prev = trackedWrites.end();
813833
} else {
814834
prev = std::prev(it);
@@ -839,6 +859,14 @@ ActiveDurabilityMonitor::State::removeSyncWrite(Container::iterator it,
839859

840860
Container removed;
841861
removed.splice(removed.end(), trackedWrites, it);
862+
863+
if (removingFirstElement) {
864+
// If first element was removed, then a new SyncWrite (or possibly none
865+
// at all) is at the head of trackedWrites and hence now the next
866+
// SyncWrite to be timeed out - reschedule the timeout callback.
867+
scheduleTimeoutCallback();
868+
}
869+
842870
return std::move(removed.front());
843871
}
844872

@@ -1426,11 +1454,20 @@ void ActiveDurabilityMonitor::State::addSyncWrite(const CookieIface* cookie,
14261454
queued_item item) {
14271455
Expects(firstChain.get());
14281456
const auto seqno = item->getBySeqno();
1457+
const auto wasEmpty = trackedWrites.empty();
14291458
trackedWrites.emplace_back(cookie,
14301459
std::move(item),
14311460
defaultTimeout,
14321461
firstChain.get(),
14331462
secondChain.get());
1463+
1464+
if (wasEmpty) {
1465+
// trackedWrites transitioned from empty to non-empty; so the front
1466+
// item has changed (we now have one) and hence the next timeout
1467+
// callback should be scheduled.
1468+
scheduleTimeoutCallback();
1469+
}
1470+
14341471
lastTrackedSeqno = seqno;
14351472
totalAccepted++;
14361473
}
@@ -1457,6 +1494,18 @@ void ActiveDurabilityMonitor::State::removeExpired(
14571494
}
14581495
}
14591496

1497+
void ActiveDurabilityMonitor::State::scheduleTimeoutCallback() {
1498+
if (!trackedWrites.empty()) {
1499+
const auto nextExpiry = trackedWrites.front().getExpiryTime();
1500+
if (nextExpiry) {
1501+
nextExpiryChanged->updateNextExpiryTime(*nextExpiry);
1502+
return;
1503+
}
1504+
}
1505+
// No SyncWrites exist, or no expiry set - cancel expiry task.
1506+
nextExpiryChanged->cancelNextExpiryTime();
1507+
}
1508+
14601509
void ActiveDurabilityMonitor::State::updateHighPreparedSeqno(
14611510
ResolvedQueue& completed) {
14621511
// Note: All the logic below relies on the fact that HPS for Active is

engines/ep/src/durability/active_durability_monitor.h

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,16 @@ class ActiveDurabilityMonitor : public DurabilityMonitor {
103103
// Container type used for State::trackedWrites
104104
using Container = std::list<DurabilityMonitor::ActiveSyncWrite>;
105105

106-
// Note: constructor and destructor implementation in the .cc file to allow
107-
// the forward declaration of ReplicationChain in the header
108-
ActiveDurabilityMonitor(EPStats& stats, VBucket& vb);
106+
/**
107+
* Construct an ActiveDM for the given vBucket.
108+
* @param stats EPStats object for the associated Bucket.
109+
* @param vb VBucket which owns this Durability Monitor.
110+
* @param nextExpiryChanged Object to use for timing out SyncWrites.
111+
*/
112+
ActiveDurabilityMonitor(EPStats& stats,
113+
VBucket& vb,
114+
std::unique_ptr<EventDrivenDurabilityTimeoutIface>
115+
nextExpiryChanged);
109116

110117
/**
111118
* Construct an ActiveDM for the given vBucket, with the specified
@@ -114,6 +121,7 @@ class ActiveDurabilityMonitor : public DurabilityMonitor {
114121
* @param stats EPStats object for the associated Bucket.
115122
* @param vb VBucket which owns this Durability Monitor.
116123
* @param vbs reference to the vbucket_state found at warmup
124+
* @param nextExpiryChanged Object to use for timing out SyncWrites.
117125
* @param outstandingPrepares In-flight prepares which the DM should take
118126
* responsibility for.
119127
* These must be ordered by ascending seqno, otherwise
@@ -122,6 +130,8 @@ class ActiveDurabilityMonitor : public DurabilityMonitor {
122130
ActiveDurabilityMonitor(EPStats& stats,
123131
VBucket& vb,
124132
const vbucket_state& vbs,
133+
std::unique_ptr<EventDrivenDurabilityTimeoutIface>
134+
nextExpiryChanged,
125135
std::vector<queued_item>&& outstandingPrepares);
126136

127137
/**
@@ -131,7 +141,10 @@ class ActiveDurabilityMonitor : public DurabilityMonitor {
131141
* @param stats EPStats object for the associated Bucket.
132142
* @param pdm The PassiveDM to be converted
133143
*/
134-
ActiveDurabilityMonitor(EPStats& stats, PassiveDurabilityMonitor&& pdm);
144+
ActiveDurabilityMonitor(EPStats& stats,
145+
PassiveDurabilityMonitor&& pdm,
146+
std::unique_ptr<EventDrivenDurabilityTimeoutIface>
147+
nextExpiryChanged);
135148

136149
~ActiveDurabilityMonitor() override;
137150

engines/ep/src/durability/durability_monitor_impl.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ DurabilityMonitor::ActiveSyncWrite::getStartTime() const {
123123
return startTime;
124124
}
125125

126+
std::optional<std::chrono::steady_clock::time_point>
127+
DurabilityMonitor::ActiveSyncWrite::getExpiryTime() const {
128+
return expiryTime;
129+
}
130+
126131
void DurabilityMonitor::ActiveSyncWrite::ack(const std::string& node) {
127132
if (!firstChain) {
128133
throw std::logic_error(

engines/ep/src/durability/durability_monitor_impl.h

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,12 @@ class DurabilityMonitor::ActiveSyncWrite : public DurabilityMonitor::SyncWrite {
165165

166166
std::chrono::steady_clock::time_point getStartTime() const;
167167

168+
/**
169+
* @returns The time point this SyncWrite will expire at. Will return
170+
* an empty optional for SyncWrites which have no expiry time set.
171+
*/
172+
std::optional<std::chrono::steady_clock::time_point> getExpiryTime() const;
173+
168174
/**
169175
* Notify this SyncWrite that it has been ack'ed by node.
170176
*
@@ -406,7 +412,9 @@ struct ActiveDurabilityMonitor::State {
406412
/**
407413
* @param adm The owning ActiveDurabilityMonitor
408414
*/
409-
explicit State(const ActiveDurabilityMonitor& adm);
415+
explicit State(ActiveDurabilityMonitor& adm,
416+
std::unique_ptr<EventDrivenDurabilityTimeoutIface>
417+
nextExpiryChanged);
410418

411419
/**
412420
* Create a replication chain. Not static as we require an iterator from
@@ -499,6 +507,14 @@ struct ActiveDurabilityMonitor::State {
499507
void removeExpired(std::chrono::steady_clock::time_point asOf,
500508
ResolvedQueue& expired);
501509

510+
/**
511+
* Schedule the timeout callback based on the state of trackedWrites.
512+
* If trackedWrites is non-empty then schedule timeout callback to run
513+
* when trackedWrites.front() is due to expire; otherwise cancel the
514+
* timeout callback.
515+
*/
516+
void scheduleTimeoutCallback();
517+
502518
/// @returns the name of the active node. Assumes the first chain is valid.
503519
const std::string& getActive() const;
504520

@@ -736,6 +752,11 @@ struct ActiveDurabilityMonitor::State {
736752
std::unordered_map<std::string, Monotonic<int64_t, ThrowExceptionPolicy>>
737753
queuedSeqnoAcks;
738754

755+
/// Interface to the VBucket's SyncWriteExpiry task, used to schedule when
756+
/// the task should run to cancel (abort) any SyncWrites which have
757+
/// exceeded their durability timeout.
758+
std::unique_ptr<EventDrivenDurabilityTimeoutIface> nextExpiryChanged;
759+
739760
friend std::ostream& operator<<(std::ostream& os, const State& s) {
740761
os << "#trackedWrites:" << s.trackedWrites.size()
741762
<< " highPreparedSeqno:" << s.highPreparedSeqno

0 commit comments

Comments
 (0)