Skip to content

Commit bbc1efd

Browse files
rdemellowdaverigby
authored andcommitted
[BP] MB-46439: Don't remove resolved sync-writes when getting cookies
Currently ActiveDurabilityMonitor::getCookiesForInFlightSyncWrites() isn't idempotent with respect to the sync-writes stored in resolvedQueue. As it removes ActiveSyncWrite when trying to get hold of their cookie so we can give an ambiguous status to the client. This is problematic as before this patch the DurabilityCompletionTask and DurabilityTimeoutTask could run after getCookiesForInFlightSyncWrites() is called. This could mean that we end up completing and persisting sync-writes to disk out of order. Due to the sync-writes that where in resolvedQueue being removed before they had be process by the DurabilityCompletionTask. To fix this we've modified getCookiesForInFlightSyncWrites() to leave the resolvedQueue in a valid state, with all the sync-writes that it had before still there and in the same order, just with their cookie removed. To do this we take hold of a write lock to the ADM state and also the resolvedQueue consumer lock. This should give use exclusive access so both the resolvedQueue and trackedWrites. Then we go though the resolvedQueue removing ActiveSyncWrites and cache them in a vector. To restore the state of the resolvedQueue we reset the queue after its empty and re-push all the ActiveSyncWrites. Change-Id: If4529c6e4074ef2e332e196a728919a26ba65c98 Reviewed-on: http://review.couchbase.org/c/kv_engine/+/154128 Tested-by: Build Bot <[email protected]> Well-Formed: Build Bot <[email protected]> Reviewed-by: Richard de Mellow <[email protected]>
1 parent 8893be9 commit bbc1efd

File tree

2 files changed

+78
-10
lines changed

2 files changed

+78
-10
lines changed

engines/ep/src/durability/active_durability_monitor.cc

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -894,19 +894,30 @@ ActiveDurabilityMonitor::prepareTransitionAwayFromActive() {
894894
std::vector<const void*>
895895
ActiveDurabilityMonitor::getCookiesForInFlightSyncWrites() {
896896
auto vec = std::vector<const void*>();
897-
{
898-
std::lock_guard<ResolvedQueue::ConsumerLock> lock(
899-
resolvedQueue->getConsumerLock());
900-
while (auto write = resolvedQueue->try_dequeue(lock)) {
901-
auto* cookie = write->getCookie();
902-
if (cookie) {
903-
vec.push_back(cookie);
904-
write->clearCookie();
905-
}
897+
898+
std::lock_guard<ResolvedQueue::ConsumerLock> lock(
899+
resolvedQueue->getConsumerLock());
900+
// Take a write lock on the state now as we don't want trackedWrites being
901+
// completed and being placed on the resolvedQueue while we pop all the
902+
// SyncWrites from the resolvedQueue and then re-push them.
903+
auto s = state.wlock();
904+
std::vector<ActiveSyncWrite> resolvedSwToBeRePushed;
905+
while (auto write = resolvedQueue->try_dequeue(lock)) {
906+
auto* cookie = write->getCookie();
907+
if (cookie) {
908+
vec.push_back(cookie);
909+
write->clearCookie();
906910
}
911+
resolvedSwToBeRePushed.push_back(*write);
912+
}
913+
// "reset" the queue, this just sets the value of highEnqueuedSeqno back
914+
// to 0 and ensures that the queue is empty. No memory management is
915+
// performed by the method.
916+
resolvedQueue->reset(lock);
917+
for (auto& sw : resolvedSwToBeRePushed) {
918+
resolvedQueue->enqueue(*s, std::move(sw));
907919
}
908920

909-
auto s = state.wlock();
910921
for (auto& write : s->trackedWrites) {
911922
auto* cookie = write.getCookie();
912923
if (cookie) {

engines/ep/tests/module_tests/evp_store_durability_test.cc

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4404,6 +4404,63 @@ TEST_P(DurabilityBucketTest, PrepareDoesNotExpire) {
44044404
EXPECT_EQ(DeleteSource::TTL, res.storedValue->getDeletionSource());
44054405
}
44064406

4407+
TEST_P(DurabilityBucketTest, MB_46272) {
4408+
using namespace cb::durability;
4409+
setVBucketToActiveWithValidTopology();
4410+
4411+
auto vb = store->getVBucket(vbid);
4412+
auto& adm = VBucketTestIntrospector::public_getActiveDM(*vb);
4413+
4414+
// 1. Two writes, one to abort after 5 seconds, one to abort after 10
4415+
auto keyA = makeStoredDocKey("keyA");
4416+
ASSERT_EQ(
4417+
ENGINE_SYNC_WRITE_PENDING,
4418+
store->set(*makePendingItem(
4419+
keyA, "value", {Level::Majority, Timeout(4000)}),
4420+
cookie));
4421+
auto keyB = makeStoredDocKey("keyB");
4422+
ASSERT_EQ(
4423+
ENGINE_SYNC_WRITE_PENDING,
4424+
store->set(*makePendingItem(
4425+
keyB, "value", {Level::Majority, Timeout(9000)}),
4426+
cookie));
4427+
// 2. Flush the two prepares
4428+
flushVBucketToDiskIfPersistent(vbid, 2);
4429+
4430+
// 3. timeout keyA which should generate an abort
4431+
adm.processTimeout(std::chrono::steady_clock::now() +
4432+
std::chrono::seconds(5));
4433+
// 4. cancel the inflight sync-writes returning to the client as ambiguous
4434+
engine->cancel_all_operations_in_ewb_state();
4435+
// Now check that logically we would be in a consistent state if the
4436+
// durability timeout or completion tasks where to run
4437+
4438+
// 5. Timeout keyB which should generate an abort
4439+
adm.processTimeout(std::chrono::steady_clock::now() +
4440+
std::chrono::seconds(10));
4441+
// 6. Process resolved sync-writes this should cause us to add two aborts
4442+
// to the checkpoint manager first for keyA then keyB
4443+
vb->processResolvedSyncWrites();
4444+
const auto& ckptList =
4445+
CheckpointManagerTestIntrospector::public_getCheckpointList(
4446+
*vb->checkpointManager);
4447+
const auto& ckpt = *ckptList.back();
4448+
auto it = ckpt.begin();
4449+
it++; // skip the empty-item
4450+
it++; // skip checkpoint_start
4451+
// 7. Check keyA's abort
4452+
EXPECT_EQ(keyA, (*it)->getKey());
4453+
EXPECT_EQ(queue_op::abort_sync_write, (*it)->getOperation());
4454+
EXPECT_EQ(3, (*it)->getBySeqno());
4455+
it++; // move to keyB's abort
4456+
// 8. Check keyB's abort
4457+
EXPECT_EQ(keyB, (*it)->getKey());
4458+
EXPECT_EQ(4, (*it)->getBySeqno());
4459+
EXPECT_EQ(queue_op::abort_sync_write, (*it)->getOperation());
4460+
// 9. Flush the two aborts to disk
4461+
flushVBucketToDiskIfPersistent(vbid, 2);
4462+
}
4463+
44074464
// Test cases which run against couchstore
44084465
INSTANTIATE_TEST_CASE_P(AllBackends,
44094466
DurabilityCouchstoreBucketTest,

0 commit comments

Comments
 (0)