Skip to content

Commit c7e524a

Browse files
paolococchidaverigby
authored andcommitted
MB-34017 [SR]: Include the prepared-seqno in Commit queued items
When we complete a Prepare, we queue a Commit or Abort item into the CheckpointManager. At persistence of Commit/Abort, we don't know what is the Prepare associated with that Commit/Abort. While that information has never been necessary so far, now it is for persisting the High Completed Seqno to disk. This patch ensures that both the Commit and Abort queued items in CM carry the seqno of the associated Prepare (ie, the prepared-seqno). In follow-up patches, the prepared-seqno will be used at persistence for computing the HCS and writing it to disk. Change-Id: Idb1473c6b89dcc3b7e401c616cc2d785331b4d78 Reviewed-on: http://review.couchbase.org/109718 Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 57dc021 commit c7e524a

File tree

4 files changed

+63
-19
lines changed

4 files changed

+63
-19
lines changed

engines/ep/src/item.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -515,10 +515,10 @@ class Item : public RCValue {
515515
std::atomic<int64_t> bySeqno;
516516

517517
// @todo: Try to avoid this, as it increases mem_usage of 8 bytes per Item.
518-
// This is added for Durability items Commit and Abort, which both carry
519-
// 2 seqnos:
520-
// 1) the seqno of this (Commit/Abort) Item (encoded in bySeqno)
521-
// 2) the seqno of the Committed/Aborted Prepare
518+
// This is added for Durability items Commit and Abort, for which need the
519+
// associated Prepare's seqno at persistence for writing the correct High
520+
// Completed Seqno (HCS) to disk. Note that the HCS is the seqno of the
521+
// last Committed or Aborted Prepare.
522522
cb::uint48_t prepareSeqno;
523523

524524
uint32_t queuedTime;

engines/ep/src/vbucket.cc

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,9 @@ ENGINE_ERROR_CODE VBucket::commit(
798798
// it may have been set explicitly.
799799
queueItmCtx.genCas = GenerateCas::No;
800800

801+
queueItmCtx.durability =
802+
DurabilityItemCtx{res.pending->getBySeqno(), nullptr /*cookie*/};
803+
801804
auto notify = commitStoredValue(res, queueItmCtx, commitSeqno);
802805

803806
notifyNewSeqno(notify);
@@ -1145,17 +1148,26 @@ VBNotifyCtx VBucket::queueDirty(const HashTable::HashBucketLock& hbl,
11451148
setMaxCasAndTrackDrift(v.getCas());
11461149
}
11471150

1148-
// If we are queuing a SyncWrite StoredValue; extract the durability
1151+
// If we are queueing a SyncWrite StoredValue; extract the durability
11491152
// requirements to use to create the Item.
11501153
boost::optional<cb::durability::Requirements> durabilityReqs;
1151-
if (ctx.durability) {
1152-
durabilityReqs = ctx.durability->requirements;
1154+
if (v.isPending()) {
1155+
Expects(ctx.durability.is_initialized());
1156+
durabilityReqs = boost::get<cb::durability::Requirements>(
1157+
ctx.durability->requirementsOrPreparedSeqno);
11531158
}
1159+
11541160
queued_item qi(v.toItem(getId(),
11551161
StoredValue::HideLockedCas::No,
11561162
StoredValue::IncludeValue::Yes,
11571163
durabilityReqs));
11581164

1165+
if (qi->isCommitSyncWrite()) {
1166+
Expects(ctx.durability.is_initialized());
1167+
qi->setPrepareSeqno(boost::get<int64_t>(
1168+
ctx.durability->requirementsOrPreparedSeqno));
1169+
}
1170+
11591171
// MB-27457: Timestamp deletes only when they don't already have a timestamp
11601172
// assigned. This is here to ensure all deleted items have a timestamp which
11611173
// our tombstone purger can use to determine which tombstones to purge. A

engines/ep/src/vbucket.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
#include "vbucket_bgfetch_item.h"
2929
#include "vbucket_fwd.h"
3030

31+
#include "boost/variant.hpp"
32+
3133
#include <folly/Synchronized.h>
3234
#include <memcached/engine.h>
3335
#include <nlohmann/json.hpp>
@@ -78,8 +80,13 @@ struct VBNotifyCtx {
7880
* has Durability requirements.
7981
*/
8082
struct DurabilityItemCtx {
81-
/// The durability requirements for this item.
82-
cb::durability::Requirements requirements;
83+
/**
84+
* Stores:
85+
* - the Durability Requirements, if we queue a Prepare
86+
* - the prepared-seqno, if we queue a Commit
87+
*/
88+
boost::variant<cb::durability::Requirements, int64_t>
89+
requirementsOrPreparedSeqno;
8390
/**
8491
* The client cookie associated with the Durability operation. If non-null
8592
* then notifyIOComplete will be called on it when operation is committed.
@@ -1795,6 +1802,7 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
17951802
*
17961803
* @param hbl The lock to the HashTable-bucket containing the StoredValue
17971804
* @param v The StoredValue of the Pending being aborted.
1805+
* @param prepareSeqno The seqno of the Prepare being aborted
17981806
* @param ctx The VBQueueItemCtx. Holds info needed to enqueue the item,
17991807
* look at the structure for details.
18001808
* @return the notification context used for notifying the Flusher and

engines/ep/tests/module_tests/vbucket_durability_test.cc

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ TEST_P(VBucketDurabilityTest, Active_Commit_MultipleReplicas) {
491491
CheckpointManagerTestIntrospector::public_getCheckpointList(
492492
*ckptMgr);
493493

494-
auto checkPending = [this, &key, &ckptList]() -> void {
494+
auto checkPending = [this, &key, &ckptList, preparedSeqno]() -> void {
495495
EXPECT_EQ(nullptr, ht->findForRead(key).storedValue);
496496
const auto sv = ht->findForWrite(key).storedValue;
497497
ASSERT_NE(nullptr, sv);
@@ -501,11 +501,13 @@ TEST_P(VBucketDurabilityTest, Active_Commit_MultipleReplicas) {
501501
for (const auto& qi : *ckptList.front()) {
502502
if (!qi->isCheckPointMetaItem()) {
503503
EXPECT_EQ(queue_op::pending_sync_write, qi->getOperation());
504+
EXPECT_EQ(preparedSeqno, qi->getBySeqno());
505+
EXPECT_EQ("value", qi->getValue()->to_s());
504506
}
505507
}
506508
};
507509

508-
auto checkCommitted = [this, &key, &ckptList]() -> void {
510+
auto checkCommitted = [this, &key, &ckptList, preparedSeqno]() -> void {
509511
const auto sv = ht->findForRead(key).storedValue;
510512
ASSERT_NE(nullptr, sv);
511513
EXPECT_NE(nullptr, ht->findForWrite(key).storedValue);
@@ -515,6 +517,9 @@ TEST_P(VBucketDurabilityTest, Active_Commit_MultipleReplicas) {
515517
for (const auto& qi : *ckptList.front()) {
516518
if (!qi->isCheckPointMetaItem()) {
517519
EXPECT_EQ(queue_op::commit_sync_write, qi->getOperation());
520+
EXPECT_GT(qi->getBySeqno() /*commitSeqno*/, preparedSeqno);
521+
EXPECT_EQ(preparedSeqno, qi->getPrepareSeqno());
522+
EXPECT_EQ("value", qi->getValue()->to_s());
518523
}
519524
}
520525
};
@@ -679,7 +684,8 @@ TEST_P(VBucketDurabilityTest, NonPendingKeyAtAbort) {
679684
* 3) the abort_sync_write is not added to the DurabilityMonitor
680685
*/
681686
TEST_P(VBucketDurabilityTest, Active_AbortSyncWrite) {
682-
storeSyncWrites({1} /*seqno*/);
687+
const int64_t preparedSeqno = 1;
688+
storeSyncWrites({preparedSeqno});
683689
ASSERT_EQ(1,
684690
VBucketTestIntrospector::public_getActiveDM(*vbucket)
685691
.getNumTracked());
@@ -695,9 +701,9 @@ TEST_P(VBucketDurabilityTest, Active_AbortSyncWrite) {
695701
// Visible at write
696702
auto storedItem = ht->findForWrite(key);
697703
ASSERT_TRUE(storedItem.storedValue);
698-
// item pending
699704
EXPECT_EQ(CommittedState::Pending,
700705
storedItem.storedValue->getCommitted());
706+
EXPECT_EQ(preparedSeqno, storedItem.storedValue->getBySeqno());
701707
}
702708

703709
const auto& ckptList =
@@ -719,6 +725,7 @@ TEST_P(VBucketDurabilityTest, Active_AbortSyncWrite) {
719725
it++;
720726
ASSERT_EQ(1, ckpt->getNumItems());
721727
EXPECT_EQ(queue_op::pending_sync_write, (*it)->getOperation());
728+
EXPECT_EQ(preparedSeqno, (*it)->getBySeqno());
722729
EXPECT_EQ("value", (*it)->getValue()->to_s());
723730

724731
// The Pending is tracked by the DurabilityMonitor
@@ -765,6 +772,8 @@ TEST_P(VBucketDurabilityTest, Active_AbortSyncWrite) {
765772
EXPECT_EQ(queue_op::abort_sync_write, (*it)->getOperation());
766773
EXPECT_TRUE((*it)->isDeleted());
767774
EXPECT_FALSE((*it)->getValue());
775+
EXPECT_GT((*it)->getBySeqno(), preparedSeqno);
776+
EXPECT_EQ(preparedSeqno, (*it)->getPrepareSeqno());
768777

769778
// The Aborted item is not added for tracking.
770779
// Note: The Pending has not been removed as we are testing at VBucket
@@ -1026,8 +1035,9 @@ void VBucketDurabilityTest::testHTSyncDeleteCommit() {
10261035
auto* writeView = ht->findForWrite(key).storedValue;
10271036
ASSERT_TRUE(writeView);
10281037
VBQueueItemCtx ctx;
1029-
ctx.durability =
1030-
DurabilityItemCtx{{cb::durability::Level::Majority, {}}, cookie};
1038+
ctx.durability = DurabilityItemCtx{
1039+
cb::durability::Requirements{cb::durability::Level::Majority, {}},
1040+
cookie};
10311041
ASSERT_EQ(MutationStatus::WasDirty,
10321042
public_processSoftDelete(key, ctx).first);
10331043

@@ -1462,14 +1472,28 @@ void VBucketDurabilityTest::testCompleteSWInPassiveDM(vbucket_state_t state,
14621472
const auto& ckptList =
14631473
CheckpointManagerTestIntrospector::public_getCheckpointList(
14641474
*ckptMgr);
1475+
// 1 checkpoint
14651476
ASSERT_EQ(1, ckptList.size());
1466-
EXPECT_EQ(writes.size(), ckptList.front()->getNumItems());
1477+
// empty-item
1478+
const auto& ckpt = *ckptList.front();
1479+
auto it = ckpt.begin();
1480+
ASSERT_EQ(queue_op::empty, (*it)->getOperation());
1481+
// 1 metaitem (checkpoint-start)
1482+
it++;
1483+
ASSERT_EQ(1, ckpt.getNumMetaItems());
1484+
EXPECT_EQ(queue_op::checkpoint_start, (*it)->getOperation());
1485+
// 3 non-metaitem are Committed or Aborted
1486+
ASSERT_EQ(writes.size(), ckpt.getNumItems());
14671487
const auto expectedOp =
14681488
(res == Resolution::Commit ? queue_op::commit_sync_write
14691489
: queue_op::abort_sync_write);
1470-
for (const auto& qi : *ckptList.front()) {
1471-
if (!qi->isCheckPointMetaItem()) {
1472-
EXPECT_EQ(expectedOp, qi->getOperation());
1490+
for (const auto& prepare : writes) {
1491+
it++;
1492+
EXPECT_EQ(expectedOp, (*it)->getOperation());
1493+
EXPECT_GT((*it)->getBySeqno() /*commitSeqno*/, prepare.seqno);
1494+
EXPECT_EQ(prepare.seqno, (*it)->getPrepareSeqno());
1495+
if (expectedOp == queue_op::commit_sync_write) {
1496+
EXPECT_EQ("value", (*it)->getValue()->to_s());
14731497
}
14741498
}
14751499

0 commit comments

Comments
 (0)