Skip to content

Commit 8c4e237

Browse files
BenHuddlestondaverigby
authored andcommitted
MB-34946: Use HashTable::FindCommitResult in processSet
In a following change, we need to make use of the FindCommitResult in VBucket::processSet as it contains both the pending and committed StoredValues. To keep a single processSet interface, update all callers to pass a FindCommitResult instead of a single StoredValue*. Change-Id: I43fc9eb5c58cdde9710bffbe5bed35d17ed816e7 Reviewed-on: http://review.couchbase.org/111854 Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 702c0b0 commit 8c4e237

File tree

7 files changed

+114
-48
lines changed

7 files changed

+114
-48
lines changed

engines/ep/src/hash_table.cc

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,42 @@ HashTable::StoredValueProxy::StoredValueProxy(HashBucketLock&& hbl,
9696
}
9797

9898
HashTable::StoredValueProxy::~StoredValueProxy() {
99-
valueStats.get().epilogue(pre, value);
99+
if (value) {
100+
valueStats.get().epilogue(pre, value);
101+
}
100102
}
101103

102104
void HashTable::StoredValueProxy::setCommitted(CommittedState state) {
103105
value->setCommitted(state);
104106
value->setCompletedOrDeletedTime(ep_current_time());
105107
}
106108

109+
StoredValue* HashTable::StoredValueProxy::release() {
110+
auto* tmp = value;
111+
value = nullptr;
112+
return tmp;
113+
}
114+
115+
StoredValue* HashTable::FindCommitResult::selectSVToModify(bool durability) {
116+
if (durability) {
117+
if (pending) {
118+
return pending.getSV();
119+
} else {
120+
return committed;
121+
}
122+
} else {
123+
if (pending && !pending->isCompleted()) {
124+
return pending.getSV();
125+
} else {
126+
return committed;
127+
}
128+
}
129+
}
130+
131+
StoredValue* HashTable::FindCommitResult::selectSVToModify(const Item& itm) {
132+
return selectSVToModify(itm.isPending());
133+
}
134+
107135
HashTable::HashTable(EPStats& st,
108136
std::unique_ptr<AbstractStoredValueFactory> svFactory,
109137
size_t initialSize,

engines/ep/src/hash_table.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,15 @@ class HashTable {
723723

724724
void setCommitted(CommittedState state);
725725

726+
/**
727+
* Release this handle to the StoredValue* to the caller. This will
728+
* allow us to do things such as call a HashTable delete function using
729+
* the StoredValueProxy without worrying about double stats calls.
730+
*
731+
* @return The StoredValue* for value
732+
*/
733+
StoredValue* release();
734+
726735
private:
727736
HashBucketLock lock;
728737
StoredValue* value;
@@ -741,6 +750,28 @@ class HashTable {
741750
* returned (in the pending StoredValueProxy).
742751
*/
743752
struct FindCommitResult {
753+
/**
754+
* Return the StoredValue that should generally be used based on the
755+
* item. Callers of the findForWrite and findForSyncWrite functions
756+
* typically choose one or the other based on the item being updated
757+
* (itm.isPending() ? findForSyncWrite(...) : findForWrite(...)). This
758+
* function allows the caller to do the same with the FindCommitResult.
759+
*
760+
* @param bool is the current operation a durable one
761+
* @return The prepare if the item is pending and the prepare exists,
762+
* otherwise the commit. The prepare if the item is not pending
763+
* and the prepare is not completed, otherwise the commit. If
764+
* neither exist, nullptr is returned.
765+
*/
766+
StoredValue* selectSVToModify(bool durability);
767+
768+
/// Overload of above selectSVToModify that takes an Item.
769+
StoredValue* selectSVToModify(const Item& itm);
770+
771+
HashBucketLock& getHBL() {
772+
return pending.getHBL();
773+
}
774+
744775
StoredValueProxy pending;
745776
StoredValue* committed;
746777
};

engines/ep/src/vbucket.cc

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1493,10 +1493,9 @@ ENGINE_ERROR_CODE VBucket::set(
14931493
bool cas_op = (itm.getCas() != 0);
14941494

14951495
{ // HashBucketLock scope
1496-
auto htRes = itm.isPending() ? ht.findForSyncWrite(itm.getKey())
1497-
: ht.findForWrite(itm.getKey());
1498-
auto* v = htRes.storedValue;
1499-
auto& hbl = htRes.lock;
1496+
auto htRes = ht.findForCommit(itm.getKey());
1497+
auto* v = htRes.selectSVToModify(itm);
1498+
auto& hbl = htRes.getHBL();
15001499

15011500
cb::StoreIfStatus storeIfStatus = cb::StoreIfStatus::Continue;
15021501
if (predicate && (storeIfStatus = callPredicate(predicate, v)) ==
@@ -1533,7 +1532,7 @@ ENGINE_ERROR_CODE VBucket::set(
15331532
queueItmCtx.preLinkDocumentContext = &preLinkDocumentContext;
15341533
MutationStatus status;
15351534
boost::optional<VBNotifyCtx> notifyCtx;
1536-
std::tie(status, notifyCtx) = processSet(hbl,
1535+
std::tie(status, notifyCtx) = processSet(htRes,
15371536
v,
15381537
itm,
15391538
itm.getCas(),
@@ -1608,10 +1607,13 @@ ENGINE_ERROR_CODE VBucket::replace(
16081607
}
16091608

16101609
{ // HashBucketLock scope
1611-
auto htRes = itm.isPending() ? ht.findForSyncReplace(itm.getKey())
1612-
: ht.findForWrite(itm.getKey());
1613-
auto* v = htRes.storedValue;
1614-
auto& hbl = htRes.lock;
1610+
auto htRes = ht.findForCommit(itm.getKey());
1611+
auto* v = htRes.selectSVToModify(itm);
1612+
auto& hbl = htRes.getHBL();
1613+
1614+
if (v && v->isCompleted()) {
1615+
v = nullptr;
1616+
}
16151617

16161618
cb::StoreIfStatus storeIfStatus = cb::StoreIfStatus::Continue;
16171619
if (predicate && (storeIfStatus = callPredicate(predicate, v)) ==
@@ -1638,7 +1640,7 @@ ENGINE_ERROR_CODE VBucket::replace(
16381640
queueItmCtx.durability =
16391641
DurabilityItemCtx{itm.getDurabilityReqs(), cookie};
16401642
}
1641-
std::tie(mtype, notifyCtx) = processSet(hbl,
1643+
std::tie(mtype, notifyCtx) = processSet(htRes,
16421644
v,
16431645
itm,
16441646
0,
@@ -1706,9 +1708,9 @@ ENGINE_ERROR_CODE VBucket::replace(
17061708
ENGINE_ERROR_CODE VBucket::addBackfillItem(
17071709
Item& itm,
17081710
const Collections::VB::Manifest::CachingReadHandle& cHandle) {
1709-
auto htRes = ht.findForWrite(itm.getKey());
1710-
auto* v = htRes.storedValue;
1711-
auto& hbl = htRes.lock;
1711+
auto htRes = ht.findForCommit(itm.getKey());
1712+
auto* v = htRes.selectSVToModify(itm);
1713+
auto& hbl = htRes.getHBL();
17121714

17131715
// Note that this function is only called on replica or pending vbuckets.
17141716
if (v && v->isLocked(ep_current_time())) {
@@ -1725,7 +1727,7 @@ ENGINE_ERROR_CODE VBucket::addBackfillItem(
17251727
nullptr /* No pre link should happen */};
17261728
MutationStatus status;
17271729
boost::optional<VBNotifyCtx> notifyCtx;
1728-
std::tie(status, notifyCtx) = processSet(hbl,
1730+
std::tie(status, notifyCtx) = processSet(htRes,
17291731
v,
17301732
itm,
17311733
0,
@@ -1798,9 +1800,9 @@ ENGINE_ERROR_CODE VBucket::prepare(
17981800
GenerateBySeqno genBySeqno,
17991801
GenerateCas genCas,
18001802
const Collections::VB::Manifest::CachingReadHandle& cHandle) {
1801-
auto htRes = ht.findOnlyPrepared(itm.getKey());
1802-
auto* v = htRes.storedValue;
1803-
auto& hbl = htRes.lock;
1803+
auto htRes = ht.findForCommit(itm.getKey());
1804+
auto* v = htRes.pending.getSV();
1805+
auto& hbl = htRes.getHBL();
18041806
bool maybeKeyExists = true;
18051807
MutationStatus status;
18061808
boost::optional<VBNotifyCtx> notifyCtx;
@@ -1853,7 +1855,7 @@ ENGINE_ERROR_CODE VBucket::prepare(
18531855
} else {
18541856
// Not a valid duplicate prepare, call processSet and hit the SyncWrite
18551857
// checks.
1856-
std::tie(status, notifyCtx) = processSet(hbl,
1858+
std::tie(status, notifyCtx) = processSet(htRes,
18571859
v,
18581860
itm,
18591861
cas,
@@ -1924,10 +1926,9 @@ ENGINE_ERROR_CODE VBucket::setWithMeta(
19241926
GenerateBySeqno genBySeqno,
19251927
GenerateCas genCas,
19261928
const Collections::VB::Manifest::CachingReadHandle& cHandle) {
1927-
auto htRes = itm.isPending() ? ht.findForSyncWrite(itm.getKey())
1928-
: ht.findForWrite(itm.getKey());
1929-
auto* v = htRes.storedValue;
1930-
auto& hbl = htRes.lock;
1929+
auto htRes = ht.findForCommit(itm.getKey());
1930+
auto* v = htRes.selectSVToModify(itm);
1931+
auto& hbl = htRes.getHBL();
19311932
bool maybeKeyExists = true;
19321933

19331934
// Effectively ignore logically deleted keys, they cannot stop the op
@@ -1994,7 +1995,7 @@ ENGINE_ERROR_CODE VBucket::setWithMeta(
19941995

19951996
MutationStatus status;
19961997
boost::optional<VBNotifyCtx> notifyCtx;
1997-
std::tie(status, notifyCtx) = processSet(hbl,
1998+
std::tie(status, notifyCtx) = processSet(htRes,
19981999
v,
19992000
itm,
20002001
cas,
@@ -3124,7 +3125,7 @@ void VBucket::decrDirtyQueuePendingWrites(size_t decrementBy)
31243125
}
31253126

31263127
std::pair<MutationStatus, boost::optional<VBNotifyCtx>> VBucket::processSet(
3127-
const HashTable::HashBucketLock& hbl,
3128+
HashTable::FindCommitResult& htRes,
31283129
StoredValue*& v,
31293130
Item& itm,
31303131
uint64_t cas,
@@ -3151,6 +3152,12 @@ std::pair<MutationStatus, boost::optional<VBNotifyCtx>> VBucket::processSet(
31513152
Expects(itm.isCommitted());
31523153
getPassiveDM().completeSyncWrite(
31533154
itm.getKey(), PassiveDurabilityMonitor::Resolution::Commit);
3155+
3156+
// @TODO we must remove the prepare and overwrite the mutation if we
3157+
// are replacing a prepare with a mutation
3158+
// Release the pending SV from the SVP that is holding it to prevent
3159+
// a double stat update that would cause a stat underflow exception.
3160+
htRes.pending.release();
31543161
}
31553162

31563163
// This is a new SyncWrite, we just want to add a new prepare unless we
@@ -3161,12 +3168,12 @@ std::pair<MutationStatus, boost::optional<VBNotifyCtx>> VBucket::processSet(
31613168
// We have to modify the StoredValue pointer passed in or we do not
31623169
// return the correct cas to the client.
31633170
std::tie(v, notifyCtx) = addNewStoredValue(
3164-
hbl, itm, queueItmCtx, GenerateRevSeqno::No);
3171+
htRes.getHBL(), itm, queueItmCtx, GenerateRevSeqno::No);
31653172
return {MutationStatus::WasClean, notifyCtx};
31663173
}
31673174
}
31683175

3169-
return processSetInner(hbl,
3176+
return processSetInner(htRes.getHBL(),
31703177
v,
31713178
itm,
31723179
cas,

engines/ep/src/vbucket.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1766,10 +1766,10 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
17661766
* Prevents operations on in-flight SyncWrites.
17671767
* Redirects the addition of new prepares to addNewStoredValue.
17681768
*
1769-
* @param hbl Hash table bucket lock that must be held
1770-
* @param v Reference to the ptr of StoredValue. This can be changed if a
1771-
* new StoredValue is added or just its contents is changed if the
1772-
* exisiting StoredValue is updated
1769+
* @param htRes Committed and Pending StoredValues, include HBL.
1770+
* @param v Reference to the ptr of StoredValue to modify. This can be
1771+
* changed if a new StoredValue is added or just its contents is
1772+
* changes if the existing StoredValue is updated.
17731773
* @param itm Item to be added/updated. On success, its revSeqno is updated
17741774
* @param cas value to match
17751775
* @param allowExisting set to false if you want set to fail if the
@@ -1785,7 +1785,7 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
17851785
* info (if operation was successful).
17861786
*/
17871787
std::pair<MutationStatus, boost::optional<VBNotifyCtx>> processSet(
1788-
const HashTable::HashBucketLock& hbl,
1788+
HashTable::FindCommitResult& htRes,
17891789
StoredValue*& v,
17901790
Item& itm,
17911791
uint64_t cas,

engines/ep/tests/module_tests/dcp_stream_test.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1401,11 +1401,11 @@ void SingleThreadedActiveStreamTest::setupProducer(
14011401

14021402
MutationStatus SingleThreadedActiveStreamTest::public_processSet(
14031403
VBucket& vb, Item& item, const VBQueueItemCtx& ctx) {
1404-
auto htRes = ctx.durability ? vb.ht.findForSyncWrite(item.getKey())
1405-
: vb.ht.findForWrite(item.getKey());
1404+
auto htRes = vb.ht.findForCommit(item.getKey());
1405+
auto* v = htRes.selectSVToModify(item);
14061406
return vb
1407-
.processSet(htRes.lock,
1408-
htRes.storedValue,
1407+
.processSet(htRes,
1408+
v,
14091409
item,
14101410
0 /*cas*/,
14111411
true /*allowExisting*/,

engines/ep/tests/module_tests/durability_monitor_test.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,17 @@ void DurabilityMonitorTest::addSyncWrites(const std::vector<int64_t>& seqnos,
153153
}
154154

155155
MutationStatus DurabilityMonitorTest::processSet(Item& item) {
156-
auto htRes = vb->ht.findForWrite(item.getKey());
157156
VBQueueItemCtx ctx;
158157
ctx.genBySeqno = GenerateBySeqno::No;
159158
ctx.durability =
160159
DurabilityItemCtx{item.getDurabilityReqs(), /*cookie*/ nullptr};
160+
161+
auto htRes = vb->ht.findForCommit(item.getKey());
162+
auto* v = htRes.selectSVToModify(item);
163+
161164
return vb
162-
->processSet(htRes.lock,
163-
htRes.storedValue,
165+
->processSet(htRes,
166+
v,
164167
item,
165168
item.getCas(),
166169
true /*allow_existing*/,

engines/ep/tests/module_tests/vbucket_test.cc

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -199,16 +199,13 @@ MutationStatus VBucketTestBase::public_processSet(Item& itm,
199199
const VBQueueItemCtx& ctx) {
200200
// Need to take the collections read handle before the hbl
201201
auto cHandle = vbucket->lockCollections(itm.getKey());
202-
auto hbl_sv = lockAndFind(itm.getKey(), ctx);
202+
203+
auto htRes = vbucket->ht.findForCommit(itm.getKey());
204+
auto* v = htRes.selectSVToModify(itm);
205+
203206
return vbucket
204-
->processSet(hbl_sv.first,
205-
hbl_sv.second,
206-
itm,
207-
cas,
208-
true,
209-
false,
210-
ctx,
211-
{/*no predicate*/})
207+
->processSet(
208+
htRes, v, itm, cas, true, false, ctx, {/*no predicate*/})
212209
.first;
213210
}
214211

0 commit comments

Comments
 (0)