Skip to content

Commit 73d8447

Browse files
committed
[BP] MB-25337: Handle the vbucket state change gracefully during rollback
Rollback is done asynchronously in kv-engine. When a scheduled rollback task is run, the vbucket state might have already changed to non-replica. Upon such a condition, rollback task must just finish running (as a null operation) rather than throwing an exception. Change-Id: I459768be3727ca19e141a917e38892f91b5e43f9 Reviewed-on: http://review.couchbase.org/81238 Well-Formed: Build Bot <[email protected]> Reviewed-by: Daniel Owen <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 1d75037 commit 73d8447

File tree

5 files changed

+90
-45
lines changed

5 files changed

+90
-45
lines changed

src/dcp/consumer.cc

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -696,35 +696,37 @@ ENGINE_ERROR_CODE DcpConsumer::handleResponse(
696696
return ENGINE_DISCONNECT;
697697
}
698698

699-
bool DcpConsumer::doRollback(uint32_t opaque, uint16_t vbid,
699+
bool DcpConsumer::doRollback(uint32_t opaque,
700+
uint16_t vbid,
700701
uint64_t rollbackSeqno) {
701-
ENGINE_ERROR_CODE err = engine_.getEpStore()->rollback(vbid, rollbackSeqno);
702+
TaskStatus status = engine_.getEpStore()->rollback(vbid, rollbackSeqno);
702703

703-
switch (err) {
704-
case ENGINE_NOT_MY_VBUCKET:
705-
LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Rollback failed because the "
706-
"vbucket was not found", logHeader(), vbid);
707-
return false;
708-
709-
case ENGINE_TMPFAIL:
704+
switch (status) {
705+
case TaskStatus::Reschedule:
710706
return true; // Reschedule the rollback.
711-
712-
case ENGINE_SUCCESS:
713-
// expected
707+
case TaskStatus::Abort:
708+
logger.log(EXTENSION_LOG_WARNING,
709+
"vb:%" PRIu16 " Rollback failed on the vbucket",
710+
vbid);
711+
break;
712+
case TaskStatus::Complete: {
713+
RCPtr<VBucket> vb = engine_.getVBucket(vbid);
714+
if (!vb) {
715+
logger.log(EXTENSION_LOG_WARNING,
716+
"vb:%" PRIu16
717+
" Aborting rollback task as the vbucket "
718+
"was deleted after rollback",
719+
vbid);
720+
break;
721+
}
722+
passive_stream_t stream = streams[vbid];
723+
if (stream) {
724+
stream->reconnectStream(vb, opaque, vb->getHighSeqno());
725+
}
714726
break;
715-
716-
default:
717-
throw std::logic_error("DcpConsumer::doRollback: Unexpected error "
718-
"code from EpStore::rollback: " + std::to_string(err));
719727
}
720-
721-
RCPtr<VBucket> vb = engine_.getVBucket(vbid);
722-
passive_stream_t stream = streams[vbid];
723-
if (stream) {
724-
stream->reconnectStream(vb, opaque, vb->getHighSeqno());
725728
}
726-
727-
return false;
729+
return false; // Do not reschedule the rollback
728730
}
729731

730732
void DcpConsumer::addStats(ADD_STAT add_stat, const void *c) {

src/ep.cc

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3982,46 +3982,62 @@ void EventuallyPersistentStore::rollbackCheckpoint(RCPtr<VBucket> &vb,
39823982
}
39833983
}
39843984

3985-
ENGINE_ERROR_CODE
3986-
EventuallyPersistentStore::rollback(uint16_t vbid,
3987-
uint64_t rollbackSeqno) {
3985+
TaskStatus EventuallyPersistentStore::rollback(uint16_t vbid,
3986+
uint64_t rollbackSeqno) {
39883987
LockHolder vbset(vbsetMutex);
39893988

39903989
LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
39913990

39923991
if (!lh.islocked()) {
3993-
return ENGINE_TMPFAIL; // Reschedule a vbucket rollback task.
3992+
return TaskStatus::Reschedule; // Reschedule a vbucket rollback task.
39943993
}
39953994

39963995
RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3996+
if (!vb) {
3997+
LOG(EXTENSION_LOG_WARNING,
3998+
"vb:%" PRIu16 " Aborting rollback as the vbucket was not found",
3999+
vbid);
4000+
return TaskStatus::Abort;
4001+
}
4002+
39974003
ReaderLockHolder rlh(vb->getStateLock());
39984004
if (vb->getState() == vbucket_state_replica) {
3999-
uint64_t prevHighSeqno = static_cast<uint64_t>
4000-
(vb->checkpointManager.getHighSeqno());
4005+
uint64_t prevHighSeqno =
4006+
static_cast<uint64_t>(vb->checkpointManager.getHighSeqno());
40014007
if (rollbackSeqno != 0) {
40024008
std::shared_ptr<Rollback> cb(new Rollback(engine));
4003-
KVStore* rwUnderlying = vbMap.getShardByVbId(vbid)->getRWUnderlying();
4004-
RollbackResult result = rwUnderlying->rollback(vbid, rollbackSeqno, cb);
4009+
KVStore* rwUnderlying =
4010+
vbMap.getShardByVbId(vbid)->getRWUnderlying();
4011+
RollbackResult result =
4012+
rwUnderlying->rollback(vbid, rollbackSeqno, cb);
40054013

40064014
if (result.success) {
40074015
rollbackCheckpoint(vb, result.highSeqno);
40084016
vb->failovers->pruneEntries(result.highSeqno);
40094017
vb->checkpointManager.clear(vb, result.highSeqno);
4010-
vb->setPersistedSnapshot(result.snapStartSeqno, result.snapEndSeqno);
4018+
vb->setPersistedSnapshot(result.snapStartSeqno,
4019+
result.snapEndSeqno);
40114020
vb->incrRollbackItemCount(prevHighSeqno - result.highSeqno);
40124021
vb->checkpointManager.setOpenCheckpointId(1);
4013-
return ENGINE_SUCCESS;
4022+
return TaskStatus::Complete;
40144023
}
40154024
}
40164025

40174026
if (resetVBucket_UNLOCKED(vbid, vbset)) {
40184027
RCPtr<VBucket> newVb = vbMap.getBucket(vbid);
40194028
newVb->incrRollbackItemCount(prevHighSeqno);
4020-
return ENGINE_SUCCESS;
4029+
return TaskStatus::Complete;
40214030
}
4022-
return ENGINE_NOT_MY_VBUCKET;
4031+
LOG(EXTENSION_LOG_WARNING,
4032+
"vb:%" PRIu16 " Aborting rollback as reset of the vbucket failed",
4033+
vbid);
4034+
return TaskStatus::Abort;
40234035
} else {
4024-
return ENGINE_EINVAL;
4036+
LOG(EXTENSION_LOG_WARNING,
4037+
"vb:%" PRIu16 " Rollback not supported on the vbucket state %s",
4038+
vbid,
4039+
VBucket::toString(vb->getState()));
4040+
return TaskStatus::Abort;
40254041
}
40264042
}
40274043

src/ep.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -881,10 +881,14 @@ class EventuallyPersistentStore {
881881
*
882882
* @param vbid The vbucket to rollback
883883
* @rollbackSeqno The seqno to rollback to.
884-
* @return ENGINE_EINVAL if VB is not replica, ENGINE_NOT_MY_VBUCKET if vbid
885-
* is not managed by this instance or ENGINE_SUCCESS.
884+
*
885+
* @return TaskStatus::Complete upon successful rollback
886+
* TaskStatus::Abort if vbucket is not replica or
887+
* if vbucket is not valid
888+
* if vbucket reset and rollback fails
889+
* TaskStatus::Reschedule if you cannot get a lock on the vbucket
886890
*/
887-
ENGINE_ERROR_CODE rollback(uint16_t vbid, uint64_t rollbackSeqno);
891+
TaskStatus rollback(uint16_t vbid, uint64_t rollbackSeqno);
888892

889893
void wakeUpItemPager() {
890894
if (itmpTask->getState() == TASK_SNOOZED) {

src/ep_types.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,9 @@ static inline std::string to_string(GenerateCas generateCas) {
5757
return "";
5858
}
5959
}
60+
61+
enum class TaskStatus {
62+
Reschedule, /* Reschedule for later */
63+
Complete, /* Complete in this run */
64+
Abort /* Abort task immediately */
65+
};

tests/module_tests/evp_store_rollback_test.cc

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ class RollbackTest : public EventuallyPersistentStoreTest,
8787
// Test - rollback to seqno of item_v1 and verify that the previous value
8888
// of the item has been restored.
8989
store->setVBucketState(vbid, vbucket_state_replica, false);
90-
ASSERT_EQ(ENGINE_SUCCESS, store->rollback(vbid, item_v1.getBySeqno()));
90+
ASSERT_EQ(TaskStatus::Complete,
91+
store->rollback(vbid, item_v1.getBySeqno()));
9192
auto result = store->public_getInternal(a, vbid, /*cookie*/nullptr,
9293
vbucket_state_replica, {});
9394
ASSERT_EQ(ENGINE_SUCCESS, result.getStatus());
@@ -122,7 +123,8 @@ class RollbackTest : public EventuallyPersistentStoreTest,
122123
// Test - rollback to seqno of item_v1 and verify that the previous value
123124
// of the item has been restored.
124125
store->setVBucketState(vbid, vbucket_state_replica, false);
125-
ASSERT_EQ(ENGINE_SUCCESS, store->rollback(vbid, item_v1.getBySeqno()));
126+
ASSERT_EQ(TaskStatus::Complete,
127+
store->rollback(vbid, item_v1.getBySeqno()));
126128
ASSERT_EQ(item_v1.getBySeqno(), store->getVBucket(vbid)->getHighSeqno());
127129

128130
// a should have the value of 'old'
@@ -181,7 +183,7 @@ class RollbackTest : public EventuallyPersistentStoreTest,
181183

182184
// Rollback should succeed, but rollback to 0
183185
store->setVBucketState(vbid, vbucket_state_replica, false);
184-
EXPECT_EQ(ENGINE_SUCCESS, store->rollback(vbid, rollback));
186+
EXPECT_EQ(TaskStatus::Complete, store->rollback(vbid, rollback));
185187

186188
// These keys should be gone after the rollback
187189
for (int i = 0; i < 3; i++) {
@@ -258,7 +260,7 @@ TEST_P(RollbackTest, RollbackToMiddleOfAnUnPersistedSnapshot) {
258260

259261
/* do rollback */
260262
store->setVBucketState(vbid, vbucket_state_replica, false);
261-
EXPECT_EQ(ENGINE_SUCCESS, store->rollback(vbid, rollbackReqSeqno));
263+
EXPECT_EQ(TaskStatus::Complete, store->rollback(vbid, rollbackReqSeqno));
262264

263265
/* confirm that we have rolled back to the disk snapshot */
264266
EXPECT_EQ(rollback_item.getBySeqno(),
@@ -284,8 +286,8 @@ TEST_P(RollbackTest, MB21784) {
284286
// Make the vbucket a replica
285287
store->setVBucketState(vbid, vbucket_state_replica, false);
286288
// Perform a rollback
287-
EXPECT_EQ(ENGINE_SUCCESS, store->rollback(vbid, initial_seqno))
288-
<< "rollback did not return ENGINE_SUCCESS";
289+
EXPECT_EQ(TaskStatus::Complete, store->rollback(vbid, initial_seqno))
290+
<< "rollback did not return success";
289291

290292
// Assert the checkpointmanager clear function (called during rollback)
291293
// has set the opencheckpointid to one
@@ -316,6 +318,21 @@ TEST_P(RollbackTest, MB21784) {
316318
engine->handleDisconnect(cookie);
317319
}
318320

321+
TEST_P(RollbackTest, RollbackOnActive) {
322+
/* Store 3 items */
323+
const int numItems = 3;
324+
for (int i = 0; i < numItems; i++) {
325+
std::string key = "key_" + std::to_string(i);
326+
store_item(vbid, key.c_str(), "not rolled back");
327+
}
328+
329+
/* Try to rollback on active (default state) vbucket */
330+
EXPECT_EQ(TaskStatus::Abort,
331+
store->rollback(vbid, numItems - 1 /*rollbackReqSeqno*/));
332+
333+
EXPECT_EQ(TaskStatus::Abort, store->rollback(vbid, 0 /*rollbackReqSeqno*/));
334+
}
335+
319336
// Test cases which run in both Full and Value eviction
320337
INSTANTIATE_TEST_CASE_P(FullAndValueEviction,
321338
RollbackTest,

0 commit comments

Comments
 (0)