Skip to content

Commit 1d75037

Browse files
committed
MB-24817: During takeover, hold stream lock until vb is set to dead
During DCP takeover, when we decide to set the vbucket to dead state on the producer side, we also transition the stream state from STREAM_TAKEOVER_WAIT to STREAM_TAKEOVER_SEND state to send out any remaining items that were received before the vbucket was set to dead state. The stream lock must be held until the vbucket is set to the dead state, so that we do not prematurely finish sending out the last items when the vbucket is not dead yet (that is if the vbucket is not dead, it could get some more items). This commit addresses the issue and also handles the ordering of locks involved at stream level, ep store level and vbucket level in the scenario. Change-Id: I89bb42edec4f3765c8a9c67e6e89e9680eb40875 Reviewed-on: http://review.couchbase.org/79532 Well-Formed: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 43b0f32 commit 1d75037

File tree

5 files changed

+146
-56
lines changed

5 files changed

+146
-56
lines changed

src/dcp/stream.cc

Lines changed: 57 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -422,42 +422,68 @@ void ActiveStream::snapshotMarkerAckReceived() {
422422
}
423423

424424
void ActiveStream::setVBucketStateAckRecieved() {
425-
LockHolder lh(streamMutex);
426-
if (state_ == STREAM_TAKEOVER_WAIT) {
427-
if (takeoverState == vbucket_state_pending) {
428-
producer->getLogger().log(EXTENSION_LOG_INFO,
429-
"(vb %" PRIu16 ") Receive ack for set vbucket state to pending "
430-
"message", vb_);
431-
432-
takeoverState = vbucket_state_active;
433-
transitionState(STREAM_TAKEOVER_SEND);
434-
lh.unlock();
425+
RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
426+
if (!vbucket) {
427+
producer->getLogger().log(EXTENSION_LOG_WARNING,
428+
"(vb %" PRIu16
429+
") not present during ack for set vbucket "
430+
"during takeover",
431+
vb_);
432+
return;
433+
}
435434

436-
engine->getEpStore()->setVBucketState(vb_, vbucket_state_dead,
437-
false, false);
438-
RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
439-
producer->getLogger().log(EXTENSION_LOG_NOTICE,
440-
"(vb %" PRIu16 ") Vbucket marked as dead, last sent seqno: %"
441-
PRIu64 ", high seqno: %" PRIu64,
442-
vb_, lastSentSeqno.load(), vbucket->getHighSeqno());
435+
{
436+
/* Order in which the below 3 locks are acquired is important to avoid
437+
any potential lock inversion problems */
438+
LockHolder epVbSetLh(engine->getEpStore()->getVbSetMutexLock());
439+
WriterLockHolder vbStateLh(vbucket->getStateLock());
440+
LockHolder lh(streamMutex);
441+
if (state_ == STREAM_TAKEOVER_WAIT) {
442+
if (takeoverState == vbucket_state_pending) {
443+
producer->getLogger().log(EXTENSION_LOG_INFO,
444+
"(vb %" PRIu16 ") Receive ack for set vbucket state to "
445+
"pending message",
446+
vb_);
447+
448+
takeoverState = vbucket_state_active;
449+
transitionState(STREAM_TAKEOVER_SEND);
450+
451+
engine->getEpStore()->setVBucketState_UNLOCKED(
452+
vb_,
453+
vbucket_state_dead,
454+
false /* transfer */,
455+
false /* notify_dcp */,
456+
epVbSetLh,
457+
&vbStateLh);
458+
459+
producer->getLogger().log(EXTENSION_LOG_NOTICE,
460+
"(vb %" PRIu16 ") Vbucket marked as dead, last sent "
461+
"seqno: %" PRIu64 ", high seqno: %" PRIu64,
462+
vb_,
463+
lastSentSeqno.load(),
464+
vbucket->getHighSeqno());
465+
} else {
466+
producer->getLogger().log(EXTENSION_LOG_NOTICE,
467+
"(vb %" PRIu16 ") Receive ack for set vbucket state to "
468+
"active message",
469+
vb_);
470+
endStream(END_STREAM_OK);
471+
}
443472
} else {
444-
producer->getLogger().log(EXTENSION_LOG_INFO,
445-
"(vb %" PRIu16 ") Receive ack for set vbucket state to active "
446-
"message", vb_);
447-
endStream(END_STREAM_OK);
448-
lh.unlock();
449-
}
450-
451-
bool inverse = false;
452-
if (itemsReady.compare_exchange_strong(inverse, true)) {
453-
producer->notifyStreamReady(vb_);
473+
producer->getLogger().log(EXTENSION_LOG_WARNING,
474+
"(vb %" PRIu16 ") Unexpected ack for set vbucket op on "
475+
"stream '%s' state '%s'",
476+
vb_,
477+
name_.c_str(),
478+
stateName(state_));
479+
return;
454480
}
455-
} else {
456-
producer->getLogger().log(EXTENSION_LOG_WARNING,
457-
"(vb %" PRIu16 ") Unexpected ack for set vbucket op on stream '%s' "
458-
"state '%s'", vb_, name_.c_str(), stateName(state_));
459481
}
460482

483+
bool inverse = false;
484+
if (itemsReady.compare_exchange_strong(inverse, true)) {
485+
producer->notifyStreamReady(vb_);
486+
}
461487
}
462488

463489
DcpResponse* ActiveStream::backfillPhase() {

src/ep.cc

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,11 +1115,13 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
11151115
return setVBucketState_UNLOCKED(vbid, to, transfer, notify_dcp, lh);
11161116
}
11171117

1118-
ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState_UNLOCKED(uint16_t vbid,
1119-
vbucket_state_t to,
1120-
bool transfer,
1121-
bool notify_dcp,
1122-
LockHolder& vbset) {
1118+
ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState_UNLOCKED(
1119+
uint16_t vbid,
1120+
vbucket_state_t to,
1121+
bool transfer,
1122+
bool notify_dcp,
1123+
LockHolder& vbset,
1124+
WriterLockHolder* vbStateLock) {
11231125
RCPtr<VBucket> vb = vbMap.getBucket(vbid);
11241126
if (vb && to == vb->getState()) {
11251127
return ENGINE_SUCCESS;
@@ -1128,7 +1130,11 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState_UNLOCKED(uint16_t v
11281130
if (vb) {
11291131
vbucket_state_t oldstate = vb->getState();
11301132

1131-
vb->setState(to);
1133+
if (vbStateLock) {
1134+
vb->setState_UNLOCKED(to, *vbStateLock);
1135+
} else {
1136+
vb->setState(to);
1137+
}
11321138

11331139
if (oldstate != to && notify_dcp) {
11341140
bool closeInboundStreams = false;

src/ep.h

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -515,11 +515,53 @@ class EventuallyPersistentStore {
515515
return vbMap.getPersistenceSeqno(vb);
516516
}
517517

518-
/* transfer should be set to true *only* if this vbucket is becoming master
519-
* as the result of the previous master cleanly handing off control. */
518+
/**
519+
* Sets the vbucket or creates a vbucket with the desired state
520+
*
521+
* @param vbid vbucket id
522+
* @param state desired state of the vbucket
523+
* @param transfer indicates that the vbucket is transferred to the active
524+
* post a failover and/or rebalance
525+
* @param notify_dcp indicates whether we must consider closing DCP streams
526+
* associated with the vbucket
527+
*
528+
* return status of the operation
529+
*/
520530
ENGINE_ERROR_CODE setVBucketState(uint16_t vbid, vbucket_state_t state,
521531
bool transfer, bool notify_dcp = true);
522532

533+
/**
534+
* Sets the vbucket or creates a vbucket with the desired state
535+
*
536+
* @param vbid vbucket id
537+
* @param state desired state of the vbucket
538+
* @param transfer indicates that the vbucket is transferred to the active
539+
* post a failover and/or rebalance
540+
* @param notify_dcp indicates whether we must consider closing DCP streams
541+
* associated with the vbucket
542+
* @param vbset LockHolder acquiring the 'vbsetMutex' lock in the
543+
* EventuallyPersistentStore class
544+
* @param vbStateLock ptr to WriterLockHolder of 'stateLock' in the vbucket
545+
* class. if passed as null, the function acquires the
546+
* vbucket 'stateLock'
547+
*
548+
* return status of the operation
549+
*/
550+
ENGINE_ERROR_CODE setVBucketState_UNLOCKED(
551+
uint16_t vbid,
552+
vbucket_state_t state,
553+
bool transfer,
554+
bool notify_dcp,
555+
LockHolder& vbset,
556+
WriterLockHolder* vbStateLock = nullptr);
557+
558+
/**
559+
* Returns the 'vbsetMutex'
560+
*/
561+
Mutex& getVbSetMutexLock() {
562+
return vbsetMutex;
563+
}
564+
523565
/**
524566
* Physically deletes a VBucket from disk. This function should only
525567
* be called on a VBucket that has already been logically deleted.
@@ -1022,10 +1064,6 @@ class EventuallyPersistentStore {
10221064

10231065
bool resetVBucket_UNLOCKED(uint16_t vbid, LockHolder& vbset);
10241066

1025-
ENGINE_ERROR_CODE setVBucketState_UNLOCKED(uint16_t vbid, vbucket_state_t state,
1026-
bool transfer, bool notify_dcp,
1027-
LockHolder& vbset);
1028-
10291067
friend class Warmup;
10301068
friend class Flusher;
10311069
friend class BGFetchCallback;

src/vbucket.cc

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -185,22 +185,26 @@ void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
185185
}
186186

187187
void VBucket::setState(vbucket_state_t to) {
188-
vbucket_state_t oldstate;
189-
{
190-
WriterLockHolder wlh(stateLock);
191-
oldstate = state;
192-
193-
if (to == vbucket_state_active &&
194-
checkpointManager.getOpenCheckpointId() < 2) {
195-
checkpointManager.setOpenCheckpointId(2);
196-
}
188+
WriterLockHolder wlh(stateLock);
189+
setState_UNLOCKED(to, wlh);
190+
}
197191

198-
LOG(EXTENSION_LOG_NOTICE,
199-
"VBucket::setState: transitioning vbucket:%" PRIu16 " from:%s to:%s",
200-
id, VBucket::toString(oldstate), VBucket::toString(to));
192+
void VBucket::setState_UNLOCKED(vbucket_state_t to,
193+
WriterLockHolder &vbStateLock) {
194+
vbucket_state_t oldstate = state;
201195

202-
state = to;
196+
if (to == vbucket_state_active &&
197+
checkpointManager.getOpenCheckpointId() < 2) {
198+
checkpointManager.setOpenCheckpointId(2);
203199
}
200+
201+
LOG(EXTENSION_LOG_NOTICE,
202+
"VBucket::setState: transitioning vbucket:%" PRIu16 " from:%s to:%s",
203+
id,
204+
VBucket::toString(oldstate),
205+
VBucket::toString(to));
206+
207+
state = to;
204208
}
205209

206210
vbucket_state VBucket::getVBucketState() const {

src/vbucket.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,23 @@ class VBucket : public RCValue {
278278

279279
id_type getId() const { return id; }
280280
vbucket_state_t getState(void) const { return state.load(); }
281+
282+
/**
283+
* Sets the vbucket state to a desired state
284+
*
285+
* @param to desired vbucket state
286+
*/
281287
void setState(vbucket_state_t to);
288+
289+
/**
290+
* Sets the vbucket state to a desired state with the 'stateLock' already
291+
* acquired
292+
*
293+
* @param to desired vbucket state
294+
* @param vbStateLock write lock holder on 'stateLock'
295+
*/
296+
void setState_UNLOCKED(vbucket_state_t to, WriterLockHolder& vbStateLock);
297+
282298
RWLock& getStateLock() {return stateLock;}
283299

284300
vbucket_state_t getInitialState(void) { return initialState; }

0 commit comments

Comments
 (0)