Skip to content

Commit 7fdb98a

Browse files
committed
MB-35458 [SR]: Move SyncWrite completion to bg DurabilityCompletionTask
Change how SyncWrites which are Resolved and awaiting Completion are handled, by moving the final VBucket::commit() / abort() into a background task - DurabilityCompletionTask. +Background+ There are two reasons for making this change: a) Performance - specifically latency of front-end worker threads. By moving completion into a background task, we reduce the amount of work done on the thread which actually detected the SyncWrite was resolved - typically the front-end DCP threads when a DCP_SEQNO_ACK is processed. Given that we SEQNO_ACK at the end of Snapshot, A single SEQNO_ACK could result in committing multiple SyncWrites. Committing one SyncWrite is similar to a normal front-end Set operation, so there is potentially a non-trivial amount of work needed to be done when completing SyncWrites, which could tie up the front-end thread (causing other Connections to have to wait) for a noticable amount of time. b) Simplification of lock management. Doing completion in a background task simplifies lock management, for example we avoid lock inversions with earlier locks acquired during dcpSeqnoAck when attemping to later call notifySeqnoAvailable when this was done on the original thread. +Problem+ While (a) was the first reason identified for making this change (see MB-33092), (b) is the reason this change is being made now. During testing the following lock-order-inversion was seen: WARNING: ThreadSanitizer: lock-order-inversion (potential deadlock) Cycle in lock order graph: Stream::streamMutex => StreamContainer::rwlock => Stream::streamMutex The crux of the issue is the processing of DCP_SEQNO_ACKNOWLEDGED messages by the DcpProducer - this acquires the Stream::streamMutex before calling VBucket::seqnoAcknowledged(), however that function currently results in VBucket::commit() being called to synchronously complete the SyncWrite; which in turn must nodify all connected replica that a new seqno is available, requiring StreamContainer::rwlock to be acquired: Mutex StreamContainer::rwlock acquired here while holding mutex Stream::streamMutex in thread T15: ... #6 StreamContainer<std::shared_ptr<Stream> >::rlock() #7 DcpProducer::notifySeqnoAvailable(Vbid, unsigned long) ... #13 VBucket::commit(...) #14 ActiveDurabilityMonitor::commit(...) #15 ActiveDurabilityMonitor::processCompletedSyncWriteQueue() #16 ActiveDurabilityMonitor::seqnoAckReceived(...) #17 VBucket::seqnoAcknowledged(...) #18 ActiveStream::seqnoAck(...) #19 DcpProducer::seqno_acknowledged(...) ... Mutex Stream::streamMutex previously acquired by the same thread here: ... #3 std::lock_guard<std::mutex>::lock_guard(std::mutex&) #4 ActiveStream::seqnoAck(...) #5 DcpProducer::seqno_acknowledged(...) ... This conflicts with the ordering seen when sending items out on the DCP connection - inside DcpProducer::step() where the StreamContainer::rwlock is acquired first, then ActiveStream::mutex acquired later: Mutex Stream::streamMutex acquired here while holding mutex StreamContainer::rwlock in thread T15: ... #3 std::lock_guard<std::mutex>::lock_guard(std::mutex&) #4 ActiveStream::next() #5 DcpProducer::getNextItem() #6 DcpProducer::step(dcp_message_producers*) ... Mutex StreamContainer::rwlock previously acquired by the same thread here: #0 pthread_rwlock_rdlock <null> (libtsan.so.0+0x00000002c98b) ... #4 std::shared_lock<cb::RWLock>::shared_lock(cb::RWLock&) #5 StreamContainer<>::ResumableIterationHandle::ResumableIterationHandle() #6 StreamContainer<>::startResumable() #7 DcpProducer::getNextItem() #8 DcpProducer::step(dcp_message_producers*) ... +Solution+ The processing of resolved SyncWrites moved into a new background task. Instead of immediately processing them within ActiveDM::seqnoAckReceived(), that function notifies the new NonIO DurabilityCompletionTask that there are SyncWrites waiting for completion. DurabilityCompletionTask maintains a bool per vBucket indicating if there are SyncWrites for that vBucket pending completion. When the task is run, for each flag which is true it calls VBucket::processResolvedSyncWrites() for the associated VBucket. +Implementaiton Notes+ Currently there is just a single DurabilityCompletionTask (per Bucket), this was chosen as 1 task per vBucket (i.e. 1024 per Bucket) would be inefficient for our current background task scheduler (both in terms of latency to schedule each task for only one vBucket's worth of work, and in terms of managing that many tasks in the future queue). However, that does _potentially_ mean there's fewer resources (threads) available to complete SyncWrites on - previously that work could be done concurrently on all frontend threads (~O(num_cpus). Now the same work only has 1 thread available to run on (there's only a single DurabilityCompletionTask). _If_ this becomes a bottleneck we could look at increasing the number of DurabilityCompletionTask - e.g. sharding all vBuckets across multiple tasks like flusher / bgfetcher. Change-Id: I87897af1e3fd0a57e5abc2cb1ba9f795a9d3c63e Reviewed-on: http://review.couchbase.org/113141 Tested-by: Build Bot <[email protected]> Reviewed-by: Ben Huddleston <[email protected]>
1 parent 96ed3eb commit 7fdb98a

35 files changed

+455
-75
lines changed

engines/ep/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ ADD_LIBRARY(ep_objs OBJECT
227227
src/defragmenter_visitor.cc
228228
src/diskdockey.cc
229229
src/durability/active_durability_monitor.cc
230+
src/durability/durability_completion_task.cc
230231
src/durability/durability_monitor.cc
231232
src/durability/durability_monitor_impl.cc
232233
src/durability/passive_durability_monitor.cc

engines/ep/benchmarks/defragmenter_bench.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class DefragmentBench : public benchmark::Fixture {
5656
/*table*/ nullptr,
5757
std::make_shared<DummyCB>(),
5858
/*newSeqnoCb*/ nullptr,
59+
[](Vbid) { return; },
5960
NoopSyncWriteCompleteCb,
6061
NoopSeqnoAckCb,
6162
config,

engines/ep/benchmarks/item_compressor_bench.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class ItemCompressorBench : public benchmark::Fixture {
5555
/*table*/ nullptr,
5656
std::make_shared<DummyCB>(),
5757
/*newSeqnoCb*/ nullptr,
58+
[](Vbid) { return; },
5859
NoopSyncWriteCompleteCb,
5960
NoopSeqnoAckCb,
6061
config,

engines/ep/src/durability/active_durability_monitor.cc

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ void ActiveDurabilityMonitor::setReplicationTopology(
544544
s->setReplicationTopology(topology, *resolvedQueue);
545545
}
546546

547-
processCompletedSyncWriteQueue();
547+
checkForResolvedSyncWrites();
548548
}
549549

550550
int64_t ActiveDurabilityMonitor::getHighPreparedSeqno() const {
@@ -619,8 +619,9 @@ ENGINE_ERROR_CODE ActiveDurabilityMonitor::seqnoAckReceived(
619619
seqnoAckReceivedPostProcessHook();
620620
}
621621

622-
// Process the Completed Queue, committing all items and removing them.
623-
processCompletedSyncWriteQueue();
622+
// Check if any there's now any resolved SyncWrites which should be
623+
// completed.
624+
checkForResolvedSyncWrites();
624625

625626
return ENGINE_SUCCESS;
626627
}
@@ -639,7 +640,7 @@ void ActiveDurabilityMonitor::processTimeout(
639640
// the correct locks).
640641
state.wlock()->removeExpired(asOf, *resolvedQueue);
641642

642-
processCompletedSyncWriteQueue();
643+
checkForResolvedSyncWrites();
643644
}
644645

645646
void ActiveDurabilityMonitor::notifyLocalPersistence() {
@@ -728,6 +729,13 @@ void ActiveDurabilityMonitor::addStatsForChain(
728729
}
729730
}
730731

732+
void ActiveDurabilityMonitor::checkForResolvedSyncWrites() {
733+
if (resolvedQueue->empty()) {
734+
return;
735+
}
736+
vb.notifySyncWritesPendingCompletion();
737+
}
738+
731739
void ActiveDurabilityMonitor::processCompletedSyncWriteQueue() {
732740
std::lock_guard<ResolvedQueue::ConsumerLock> lock(
733741
resolvedQueue->getConsumerLock());
@@ -1645,11 +1653,7 @@ void ActiveDurabilityMonitor::checkForCommit() {
16451653
// the resolvedQueue (under the correct locks).
16461654
state.wlock()->updateHighPreparedSeqno(*resolvedQueue);
16471655

1648-
// @todo: Consider to commit in a dedicated function for minimizing
1649-
// contention on front-end threads, as this function is supposed to
1650-
// execute under VBucket-level lock.
1651-
1652-
processCompletedSyncWriteQueue();
1656+
checkForResolvedSyncWrites();
16531657
}
16541658

16551659
template <class exception>

engines/ep/src/durability/active_durability_monitor.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,12 @@ class ActiveDurabilityMonitor : public DurabilityMonitor {
282282
*/
283283
void removedQueuedAck(const std::string& node);
284284

285+
/**
286+
* For all items in the completedSWQueue, call VBucket::commit /
287+
* VBucket::abort as appropriate, then remove the item from the queue.
288+
*/
289+
void processCompletedSyncWriteQueue();
290+
285291
/**
286292
* @return all of the currently tracked writes
287293
*/
@@ -363,10 +369,10 @@ class ActiveDurabilityMonitor : public DurabilityMonitor {
363369
const ReplicationChain& chain) const;
364370

365371
/**
366-
* For all items in the completedSWQueue, call VBucket::commit /
367-
* VBucket::abort as appropriate, then remove the item from the queue.
372+
* Checks if the resolvedQueue contains any SyncWrites awaiting completion,
373+
* and if so notifies the VBucket.
368374
*/
369-
void processCompletedSyncWriteQueue();
375+
void checkForResolvedSyncWrites();
370376

371377
// The stats object for the owning Bucket
372378
EPStats& stats;
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2019 Couchbase, Inc
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "durability_completion_task.h"
19+
20+
#include "ep_engine.h"
21+
#include "executorpool.h"
22+
#include "vbucket.h"
23+
24+
#include <climits>
25+
26+
using namespace std::chrono_literals;
27+
28+
DurabilityCompletionTask::DurabilityCompletionTask(
29+
EventuallyPersistentEngine& engine)
30+
: GlobalTask(&engine, TaskId::DurabilityCompletionTask),
31+
pendingVBs(engine.getConfiguration().getMaxVbuckets()),
32+
lastVb(pendingVBs.size() - 1) {
33+
for (auto& vb : pendingVBs) {
34+
vb.store(false);
35+
}
36+
}
37+
38+
bool DurabilityCompletionTask::run() {
39+
if (engine->getEpStats().isShutdown) {
40+
return false;
41+
}
42+
43+
// Start by putting ourselves back to sleep once run() completes.
44+
// If a new VB is notified (or a VB is re-notified after it is processed in
45+
// the loop below) then that will cause the task to be re-awoken.
46+
snooze(INT_MAX);
47+
// Clear the wakeUpScheduled flag - that allows notifySyncWritesToComplete()
48+
// to wake up (re-schedule) this task if new vBuckets have SyncWrites which
49+
// need completing.
50+
wakeUpScheduled.store(false);
51+
52+
const auto startTime = std::chrono::steady_clock::now();
53+
54+
// Loop for each vBucket, starting from where we previously left off.
55+
int vbid = (lastVb + 1) % pendingVBs.size();
56+
// For each vbucket, if the pending flag is set then clear it process
57+
// its' resolved SyncWrites.
58+
for (; vbid != lastVb; vbid = (vbid + 1) % pendingVBs.size()) {
59+
if (pendingVBs[vbid].exchange(false)) {
60+
engine->getVBucket(Vbid(vbid))->processResolvedSyncWrites();
61+
}
62+
// Yield back to scheduler if we have exceeded the maximum runtime
63+
// for a single execution.
64+
auto runtime = std::chrono::steady_clock::now() - startTime;
65+
if (runtime > maxChunkDuration) {
66+
wakeUp();
67+
break;
68+
}
69+
}
70+
lastVb = vbid;
71+
72+
return true;
73+
}
74+
75+
void DurabilityCompletionTask::notifySyncWritesToComplete(Vbid vbid) {
76+
bool expected = false;
77+
if (!pendingVBs[vbid.get()].compare_exchange_strong(expected, true)) {
78+
// This VBucket transitioned from false -> true - wake ourselves up so
79+
// we can start to process the SyncWrites.
80+
expected = false;
81+
82+
// Performance: Only wake up the task once (and don't repeatedly try to
83+
// wake if it's already scheduled to wake) - ExecutorPool::wake() isn't
84+
// super cheap so avoid it if already pending.
85+
if (wakeUpScheduled.compare_exchange_strong(expected, true)) {
86+
ExecutorPool::get()->wake(getId());
87+
}
88+
}
89+
}
90+
91+
const std::chrono::steady_clock::duration
92+
DurabilityCompletionTask::maxChunkDuration = 25ms;
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2019 Couchbase, Inc
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#pragma once
18+
19+
#include "globaltask.h"
20+
#include <memcached/vbucket.h>
21+
22+
/*
23+
* This task is used to complete (commit or abort) all SyncWrites which have
24+
* been resolved by each vbucket's ActiveDM.
25+
*
26+
* This is done in a separate task to reduce the amount of work done on
27+
* the thread which actually detected the SyncWrite was resolved - typically
28+
* the front-end DCP threads when a DCP_SEQNO_ACK is processed.
29+
* Given that we SEQNO_ACK at the end of Snapshot, A single SEQNO_ACK could
30+
* result in committing multiple SyncWrites, and Committing one SyncWrite is
31+
* similar to a normal front-end Set operation, we want to move this to a
32+
* background task.
33+
*
34+
* Additionally, by doing this in a background task it simplifies lock
35+
* management, for example we avoid lock inversions with earlier locks acquired
36+
* during dcpSeqnoAck when attemping to later call notifySeqnoAvailable when
37+
* this was done on the original thread.
38+
*/
39+
class DurabilityCompletionTask : public GlobalTask {
40+
public:
41+
DurabilityCompletionTask(EventuallyPersistentEngine& engine);
42+
43+
bool run() override;
44+
45+
std::string getDescription() override {
46+
return "DurabilityCompletionTask";
47+
}
48+
49+
std::chrono::microseconds maxExpectedDuration() override {
50+
// Task shouldn't run much longer than maxChunkDuration; given we yield
51+
// after that duration - however _could_ exceed a bit given we check
52+
// the duration on each vBucket. As such add a 2x margin of error.
53+
return std::chrono::duration_cast<std::chrono::microseconds>(
54+
2 * maxChunkDuration);
55+
}
56+
57+
/**
58+
* Notifies the task that the given vBucket has SyncWrite(s) ready to
59+
* be completed.
60+
* If the given vBucket isn't already pending, then will wake up the task
61+
* for it to run.
62+
*/
63+
void notifySyncWritesToComplete(Vbid vbid);
64+
65+
private:
66+
/**
67+
* A flag for each (possible) Vbid, set to true if there are SyncWrites
68+
* which need to be resolved.
69+
*/
70+
std::vector<std::atomic_bool> pendingVBs;
71+
72+
/// The last vBucket which SyncWrites were completed for, and the vBucket
73+
/// the next run() method will continue from.
74+
int lastVb;
75+
76+
/**
77+
* Flag which is used to check if a wakeup has already been schedueled for
78+
* this task.
79+
*/
80+
std::atomic<bool> wakeUpScheduled{false};
81+
82+
/// Maximum duration this task should execute for before yielding back to
83+
/// the ExecutorPool (to allow other tasks to run).
84+
static const std::chrono::steady_clock::duration maxChunkDuration;
85+
};

engines/ep/src/ep_bucket.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,6 +1128,7 @@ VBucketPtr EPBucket::makeVBucket(
11281128
std::move(table),
11291129
flusherCb,
11301130
std::move(newSeqnoCb),
1131+
makeSyncWriteResolvedCB(),
11311132
makeSyncWriteCompleteCB(),
11321133
makeSeqnoAckCB(),
11331134
engine.getConfiguration(),

engines/ep/src/ep_vb.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ EPVBucket::EPVBucket(Vbid i,
4747
std::unique_ptr<FailoverTable> table,
4848
std::shared_ptr<Callback<Vbid>> flusherCb,
4949
NewSeqnoCallback newSeqnoCb,
50+
SyncWriteResolvedCallback syncWriteResolvedCb,
5051
SyncWriteCompleteCallback syncWriteCb,
5152
SeqnoAckCallback seqnoAckCb,
5253
Configuration& config,
@@ -69,6 +70,7 @@ EPVBucket::EPVBucket(Vbid i,
6970
flusherCb,
7071
std::make_unique<StoredValueFactory>(st),
7172
std::move(newSeqnoCb),
73+
syncWriteResolvedCb,
7274
syncWriteCb,
7375
seqnoAckCb,
7476
config,

engines/ep/src/ep_vb.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class EPVBucket : public VBucket {
3939
std::unique_ptr<FailoverTable> table,
4040
std::shared_ptr<Callback<Vbid>> flusherCb,
4141
NewSeqnoCallback newSeqnoCb,
42+
SyncWriteResolvedCallback syncWriteResolvedCb,
4243
SyncWriteCompleteCallback syncWriteCb,
4344
SeqnoAckCallback seqnoAckCb,
4445
Configuration& config,

0 commit comments

Comments
 (0)