Skip to content

Commit 8c055ee

Browse files
jameseh96daverigby
authored andcommitted
MB-35252: Simplify allowed duplicate prepare logic
The alowedDuplicatePrepareSeqnos set is used to determine whether an existing prepare found in the hashtable may be replaced by a new prepare. This is a special case, only permitted after a disconnect/reconnect when the commit for the existing prepare may have been deduplicated. Otherwise, a second prepare for the same key without an intervening commit/abort is rejected as "sync write in progress". This set was previously initialised with all seqnos between the highCompletedSeqno and the highSeqno of the VB. Prepares before the HCS have been completed and should not be in the hashtable. Prepares after the highSeqno have not yet been received, and do not need to be considered. The number of seqnos in the set when generated in this manner can be very large; if no SyncWrites are seen, the lower bound of the window (the highCompletedSeqno) would be zero. This leads to a very large set being allocated, costing time and memory. Solution: Instead, store the current highest trackedWrite seqno. Any pending SyncWrite in the hashtable with a seqno lower than this value *may* be replaced by another prepare, because the expected Commit/Abort may have been deduplicated. If such a prepare in the hashtable is then completed (indicating the commit/abort was *not* deduped), any further prepare for the same key after the completion is treated like any other new SyncWrite - it does not need special treatment. Change-Id: Ib07bc45e8e54a1256c988edea8a4bb17d6187507 Reviewed-on: http://review.couchbase.org/112596 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Dave Rigby <[email protected]>
1 parent 7634a82 commit 8c055ee

File tree

4 files changed

+37
-45
lines changed

4 files changed

+37
-45
lines changed

engines/ep/src/durability/passive_durability_monitor.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,15 @@ void PassiveDurabilityMonitor::postProcessRollback(
389389
s->highPreparedSeqno.lastWriteSeqno.reset(rollbackResult.highPreparedSeqno);
390390
}
391391

392+
int64_t PassiveDurabilityMonitor::getHighestTrackedSeqno() const {
393+
auto s = state.rlock();
394+
if (!s->trackedWrites.empty()) {
395+
return s->trackedWrites.back().getBySeqno();
396+
} else {
397+
return 0;
398+
}
399+
}
400+
392401
void PassiveDurabilityMonitor::toOStream(std::ostream& os) const {
393402
os << "PassiveDurabilityMonitor[" << this << "]"
394403
<< " high_prepared_seqno:" << getHighPreparedSeqno()

engines/ep/src/durability/passive_durability_monitor.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,13 @@ class PassiveDurabilityMonitor : public DurabilityMonitor {
152152
*/
153153
void postProcessRollback(const RollbackResult& rollbackResult);
154154

155+
/**
156+
* Get the highest seqno for which there is a SyncWrite in trackedWrites.
157+
* Returns 0 if trackedWrites is empty.
158+
*
159+
*/
160+
int64_t getHighestTrackedSeqno() const;
161+
155162
protected:
156163
void toOStream(std::ostream& os) const override;
157164
/**

engines/ep/src/vbucket.cc

Lines changed: 17 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -835,9 +835,6 @@ ENGINE_ERROR_CODE VBucket::commit(
835835

836836
Expects(prepareSeqno);
837837

838-
// Remove from the allowed duplicate prepare set (if it exists)
839-
allowedDuplicatePrepareSeqnos.erase(res.pending->getBySeqno());
840-
841838
// Value for Pending must never be ejected
842839
Expects(res.pending->isResident());
843840

@@ -940,9 +937,6 @@ ENGINE_ERROR_CODE VBucket::abort(
940937
return ENGINE_EINVAL;
941938
}
942939

943-
// Remove from the allowed duplicate prepare set (if it exists)
944-
allowedDuplicatePrepareSeqnos.erase(htRes.storedValue->getBySeqno());
945-
946940
// If prepare seqno is not the same as our stored seqno then we should be
947941
// a replica and have missed a completion and a prepare due to de-dupe.
948942
if (prepareSeqno !=
@@ -1834,9 +1828,7 @@ ENGINE_ERROR_CODE VBucket::prepare(
18341828
nullptr /* No pre link step needed */,
18351829
{} /*overwritingPrepareSeqno*/};
18361830

1837-
auto itr = allowedDuplicatePrepareSeqnos.end();
1838-
if (v && (itr = allowedDuplicatePrepareSeqnos.find(v->getBySeqno())) !=
1839-
allowedDuplicatePrepareSeqnos.end()) {
1831+
if (v && v->getBySeqno() <= allowedDuplicatePrepareThreshold) {
18401832
// Valid duplicate prepare - call processSetInner and skip the
18411833
// SyncWrite checks.
18421834
queueItmCtx.overwritingPrepareSeqno = v->getBySeqno();
@@ -1849,23 +1841,6 @@ ENGINE_ERROR_CODE VBucket::prepare(
18491841
queueItmCtx,
18501842
{/*no predicate*/},
18511843
maybeKeyExists);
1852-
switch (status) {
1853-
case MutationStatus::WasClean:
1854-
case MutationStatus::WasDirty:
1855-
// We should not see this seqno again so remove from the set.
1856-
allowedDuplicatePrepareSeqnos.erase(itr);
1857-
break;
1858-
case MutationStatus::NotFound:
1859-
case MutationStatus::InvalidCas:
1860-
case MutationStatus::IsLocked:
1861-
case MutationStatus::NoMem:
1862-
case MutationStatus::NeedBgFetch:
1863-
case MutationStatus::IsPendingSyncWrite:
1864-
// The old prepare is still in the hashtable, keep tracking
1865-
// it in the PDM. Also. if we (e.g.) retry after NoMem we want
1866-
// to treat the duplicate as "allowed" still.
1867-
break;
1868-
}
18691844
} else {
18701845
// Not a valid duplicate prepare, call processSet and hit the SyncWrite
18711846
// checks.
@@ -3906,22 +3881,22 @@ void VBucket::removeQueuedAckFromDM(const std::string& node) {
39063881
}
39073882

39083883
void VBucket::setDuplicateSyncWriteWindow(uint64_t highSeqno) {
3909-
setUpAllowedDuplicatePrepareWindow();
3910-
}
3911-
3912-
void VBucket::setUpAllowedDuplicatePrepareWindow() {
3913-
auto& dm = getDurabilityMonitor();
3914-
auto hcs = dm.getHighCompletedSeqno();
3915-
auto highSeqno = getHighSeqno();
3916-
Expects(hcs <= highSeqno);
3917-
3918-
int64_t newDuplicateCount = highSeqno - hcs;
3919-
allowedDuplicatePrepareSeqnos.reserve(allowedDuplicatePrepareSeqnos.size() +
3920-
newDuplicateCount);
3921-
3922-
for (int64_t dupSeqno = hcs + 1; dupSeqno <= highSeqno; dupSeqno++) {
3923-
allowedDuplicatePrepareSeqnos.insert(dupSeqno);
3924-
}
3884+
setUpAllowedDuplicatePrepareThreshold();
3885+
}
3886+
3887+
void VBucket::setUpAllowedDuplicatePrepareThreshold() {
3888+
const auto& pdm = getPassiveDM();
3889+
// We should only see duplicates for prepares currently in trackedWrites
3890+
// prepares which are not in trackedWrites are either
3891+
// - Completed: A new prepare would be valid anyway, as the item is
3892+
// completed
3893+
// - New: Added after this setup, and should not be followed by another
3894+
// prepare without an intervening Commit/Abort
3895+
// As a sanity check, store the current highTrackedSeqno and assert that
3896+
// any prepares attempting to replace an existing prepare have a seqno
3897+
// less than or equal to this value.
3898+
// If no SyncWrites are being tracked, nothing can be duplicated
3899+
allowedDuplicatePrepareThreshold = pdm.getHighestTrackedSeqno();
39253900
}
39263901

39273902
void VBucket::addSyncWriteForRollback(const Item& item) {

engines/ep/src/vbucket.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1711,7 +1711,7 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
17111711
* Set the allowed duplicate prepared seqnos to the range
17121712
* (HighCompletedSeqno and HighPreparedSeqno].
17131713
*/
1714-
void setUpAllowedDuplicatePrepareWindow();
1714+
void setUpAllowedDuplicatePrepareThreshold();
17151715

17161716
std::queue<queued_item> rejectQueue;
17171717
std::unique_ptr<FailoverTable> failovers;
@@ -2481,8 +2481,9 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
24812481

24822482
static double mutationMemThreshold;
24832483

2484-
// The set of prepare seqnos that we may have to overwrite.
2485-
std::unordered_set<int64_t> allowedDuplicatePrepareSeqnos;
2484+
// The seqno threshold below which we may replace a prepare with another
2485+
// prepare (if the associated Commit/Abort may have been deduped)
2486+
int64_t allowedDuplicatePrepareThreshold = 0;
24862487

24872488
friend class DurabilityMonitorTest;
24882489
friend class SingleThreadedActiveStreamTest;

0 commit comments

Comments
 (0)