Skip to content

Commit 0ec87f9

Browse files
committed
MB-34780: Clear the pending sync writes during bucket shutdown
During bucket deletion all of the pending sync writes needs to be terminated for bucket deletion to complete. As part of bucket deletion we tear down all of the DCP streams so there is no way for the ack/nack's from the replicas to arrive and unblock the cookie and continue bucket deletion. Change-Id: I1f2801c69cb1ee35cd0cfa4622d7ab5dc847f1e4 Reviewed-on: http://review.couchbase.org/111536 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 46d8c9e commit 0ec87f9

File tree

12 files changed

+142
-7
lines changed

12 files changed

+142
-7
lines changed

daemon/memcached.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1967,6 +1967,7 @@ void DestroyBucketThread::destroy() {
19671967
LOG_INFO("{} Delete bucket [{}]. Notifying engine", connection_id, name);
19681968

19691969
all_buckets[idx].getEngine()->initiate_shutdown();
1970+
all_buckets[idx].getEngine()->cancel_all_operations_in_ewb_state();
19701971

19711972
LOG_INFO("{} Delete bucket [{}]. Engine ready for shutdown",
19721973
connection_id,
@@ -2030,6 +2031,7 @@ void DestroyBucketThread::destroy() {
20302031
connection.signalIfIdle();
20312032
}
20322033
});
2034+
bucket.getEngine()->cancel_all_operations_in_ewb_state();
20332035
guard.lock();
20342036
continue;
20352037
}

engines/ep/src/durability/active_durability_monitor.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -972,10 +972,11 @@ std::vector<const void*>
972972
ActiveDurabilityMonitor::getCookiesForInFlightSyncWrites() {
973973
auto s = state.wlock();
974974
auto vec = std::vector<const void*>();
975-
for (auto write : s->trackedWrites) {
975+
for (auto& write : s->trackedWrites) {
976976
auto* cookie = write.getCookie();
977977
if (cookie) {
978978
vec.push_back(cookie);
979+
write.clearCookie();
979980
}
980981
}
981982
return vec;

engines/ep/src/durability/active_durability_monitor.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,11 @@ class ActiveDurabilityMonitor : public DurabilityMonitor {
125125
void processTimeout(std::chrono::steady_clock::time_point asOf);
126126

127127
/**
128-
* Get the non-null cookies for all in-flight SyncWrites.
128+
* Get (and clear) the non-null cookies for all in-flight SyncWrites.
129129
* (Null cookies - for example originating from SyncWrites loaded during
130-
* warmup - are not returned).
130+
* warmup - are not returned). The reason for clearing the cookies is
131+
* to avoid a double notification on the cookie (which is illegal),
132+
* so the caller <u>must</u> notify these cookies.
131133
*/
132134
std::vector<const void*> getCookiesForInFlightSyncWrites();
133135

engines/ep/src/durability/durability_monitor_impl.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ const void* DurabilityMonitor::SyncWrite::getCookie() const {
7373
return cookie;
7474
}
7575

76+
void DurabilityMonitor::SyncWrite::clearCookie() {
77+
cookie = nullptr;
78+
}
79+
7680
std::chrono::steady_clock::time_point
7781
DurabilityMonitor::SyncWrite::getStartTime() const {
7882
return startTime;

engines/ep/src/durability/durability_monitor_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ class DurabilityMonitor::SyncWrite {
7272

7373
const void* getCookie() const;
7474

75+
void clearCookie();
76+
7577
std::chrono::steady_clock::time_point getStartTime() const;
7678

7779
/**

engines/ep/src/ep_engine.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5821,6 +5821,11 @@ void EventuallyPersistentEngine::initiate_shutdown() {
58215821
dcpConnMap_->shutdownAllConnections();
58225822
}
58235823

5824+
void EventuallyPersistentEngine::cancel_all_operations_in_ewb_state() {
5825+
auto eng = acquireEngine(this);
5826+
kvBucket->releaseRegisteredSyncWrites();
5827+
}
5828+
58245829
cb::mcbp::Status EventuallyPersistentEngine::stopFlusher(const char** msg,
58255830
size_t* msg_size) {
58265831
(void)msg_size;

engines/ep/src/ep_engine.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,7 @@ class EventuallyPersistentEngine : public EngineIface, public DcpIface {
564564

565565
void handleDisconnect(const void *cookie);
566566
void initiate_shutdown() override;
567+
void cancel_all_operations_in_ewb_state() override;
567568

568569
cb::mcbp::Status stopFlusher(const char** msg, size_t* msg_size);
569570

engines/ep/src/kv_bucket.cc

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,10 @@ class RespondAmbiguousNotification : public GlobalTask {
228228
public:
229229
RespondAmbiguousNotification(EventuallyPersistentEngine& e,
230230
VBucketPtr& vb,
231-
std::vector<const void*> cookies)
231+
std::vector<const void*>&& cookies_)
232232
: GlobalTask(&e, TaskId::RespondAmbiguousNotification, 0, false),
233233
weakVb(vb),
234-
cookies(cookies),
234+
cookies(std::move(cookies_)),
235235
description("Notify clients of Sync Write Ambiguous " +
236236
vb->getId().to_string()) {
237237
for (const auto* cookie : cookies) {
@@ -856,6 +856,29 @@ ENGINE_ERROR_CODE KVBucket::setVBucketState(Vbid vbid,
856856
vbid, to, meta, transfer, true /*notifyDcp*/, lh);
857857
}
858858

859+
void KVBucket::releaseRegisteredSyncWrites() {
860+
for (size_t vbid = 0; vbid < vbMap.size; ++vbid) {
861+
VBucketPtr vb = vbMap.getBucket(Vbid{gsl::narrow<uint16_t>(vbid)});
862+
if (!vb) {
863+
continue;
864+
}
865+
folly::SharedMutex::ReadHolder rlh(vb->getStateLock());
866+
if (vb->getState() != vbucket_state_active) {
867+
continue;
868+
}
869+
870+
auto cookies = vb->getCookiesForInFlightSyncWrites();
871+
if (!cookies.empty()) {
872+
EP_LOG_INFO("Cancel {} blocked durability requests in {}",
873+
cookies.size(),
874+
vb);
875+
ExTask notifyTask = std::make_shared<RespondAmbiguousNotification>(
876+
engine, vb, std::move(cookies));
877+
ExecutorPool::get()->schedule(notifyTask);
878+
}
879+
}
880+
}
881+
859882
ENGINE_ERROR_CODE KVBucket::setVBucketState_UNLOCKED(
860883
Vbid vbid,
861884
vbucket_state_t to,
@@ -887,7 +910,7 @@ ENGINE_ERROR_CODE KVBucket::setVBucketState_UNLOCKED(
887910
// Get a list of cookies that we should respond to
888911
auto connectionsToRespondTo = vb->getCookiesForInFlightSyncWrites();
889912
ExTask notifyTask = std::make_shared<RespondAmbiguousNotification>(
890-
engine, vb, connectionsToRespondTo);
913+
engine, vb, std::move(connectionsToRespondTo));
891914
ExecutorPool::get()->schedule(notifyTask);
892915
}
893916

engines/ep/src/kv_bucket.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,18 @@ class KVBucket : public KVBucketIface {
283283
}
284284
}
285285

286+
/**
287+
* Release all cookies blocked for sync write
288+
*
289+
* This method is called during bucket shutdown to make sure that
290+
* all of the cookies waiting for a durable write is released so that
291+
* we can continue bucket deletion. As part of bucket deletion one of
292+
* the first things we do is to tear down the DCP streams so that
293+
* the durable writes will never be notified and would be stuck
294+
* waiting for a timeout if we don't explicitly release them.
295+
*/
296+
void releaseRegisteredSyncWrites();
297+
286298
/**
287299
* Sets the vbucket or creates a vbucket with the desired state
288300
*

engines/ep/src/vbucket.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
524524
void fireAllOps(EventuallyPersistentEngine &engine);
525525

526526
/**
527-
* Get the cookies for all in-flight SyncWrites from the ADM
527+
* Get (and clear) the cookies for all in-flight SyncWrites from the ADM
528528
*/
529529
std::vector<const void*> getCookiesForInFlightSyncWrites();
530530

0 commit comments

Comments
 (0)