Skip to content

Commit 69730e6

Browse files
daverigbyjameseh96
authored andcommitted
MB-41804: Use 'Pagable' mem_used & watermark
Ephemeral buckets cannot evict from replica vbuckets directly, but memory used in replicas is still included in calculations determining when the item pager should run to delete items. Because of this, ephemeral buckets can, in certain situations (see MB) evict all active documents from a given node, as the remaining memory is occupied by replicas. In this situation, with enough nodes, the bucket memory usage can eventually reach 99% of the quota from _just_ replicas. At this point, the node will backoff on replication. The node will then never recover from this state - it cannot delete items from replicas, and can also no longer stream any deletions from other nodes for the replicas. Change-Id: I96d7be615bca3b53bf31597a93aada0bcf202ea9 Reviewed-on: http://review.couchbase.org/c/kv_engine/+/136495 Well-Formed: Build Bot <[email protected]> Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 44d02c3 commit 69730e6

19 files changed

+666
-49
lines changed

engines/ep/src/checkpoint.cc

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,13 @@ std::ostream& operator<<(std::ostream& os, const CheckpointCursor& c) {
8989
return os;
9090
}
9191

92-
Checkpoint::Checkpoint(EPStats& st,
93-
uint64_t id,
94-
uint64_t snapStart,
95-
uint64_t snapEnd,
96-
uint16_t vbid)
92+
Checkpoint::Checkpoint(
93+
EPStats& st,
94+
uint64_t id,
95+
uint64_t snapStart,
96+
uint64_t snapEnd,
97+
uint16_t vbid,
98+
const std::function<void(int64_t delta)>& memOverheadChangedCallback)
9799
: stats(st),
98100
checkpointId(id),
99101
snapStartSeqno(snapStart),
@@ -104,15 +106,18 @@ Checkpoint::Checkpoint(EPStats& st,
104106
numItems(0),
105107
numMetaItems(0),
106108
memOverhead(0),
109+
memOverheadChangedCallback(memOverheadChangedCallback),
107110
effectiveMemUsage(0) {
108111
stats.coreLocal.get()->memOverhead.fetch_add(memorySize());
112+
memOverheadChangedCallback(memorySize());
109113
}
110114

111115
Checkpoint::~Checkpoint() {
112116
LOG(EXTENSION_LOG_INFO,
113117
"Checkpoint %" PRIu64 " for vbucket %d is purged from memory",
114118
checkpointId, vbucketId);
115119
stats.coreLocal.get()->memOverhead.fetch_sub(memorySize());
120+
memOverheadChangedCallback(-ssize_t(memorySize()));
116121
}
117122

118123
size_t Checkpoint::getNumMetaItems() const {
@@ -246,7 +251,7 @@ queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
246251
if (rv == queue_dirty_t::NEW_ITEM) {
247252
size_t newEntrySize = qi->getKey().size() +
248253
sizeof(index_entry) + sizeof(queued_item);
249-
memOverhead += newEntrySize;
254+
increaseMemoryOverhead(newEntrySize);
250255
stats.coreLocal.get()->memOverhead.fetch_add(newEntrySize);
251256
}
252257
}
@@ -365,7 +370,7 @@ size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
365370
*/
366371
setSnapshotStartSeqno(getLowSeqno());
367372

368-
memOverhead += newEntryMemOverhead;
373+
increaseMemoryOverhead(newEntryMemOverhead);
369374
stats.coreLocal.get()->memOverhead.fetch_add(newEntryMemOverhead);
370375

371376
return numNewItems;
@@ -518,8 +523,12 @@ bool CheckpointManager::addNewCheckpoint_UNLOCKED(uint64_t id,
518523
id, vbucketId, snapStartSeqno);
519524

520525
bool was_empty = checkpointList.empty() ? true : false;
521-
auto checkpoint = std::make_unique<Checkpoint>(stats, id, snapStartSeqno,
522-
snapEndSeqno, vbucketId);
526+
auto checkpoint = std::make_unique<Checkpoint>(stats,
527+
id,
528+
snapStartSeqno,
529+
snapEndSeqno,
530+
vbucketId,
531+
overheadChangedCallback);
523532
// Add a dummy item into the new checkpoint, so that any cursor referring
524533
// to the actual first
525534
// item in this new checkpoint can be safely shifted left by 1 if the
@@ -1544,6 +1553,20 @@ size_t CheckpointManager::getNumOfMetaItemsFromCursor(const CheckpointCursor &cu
15441553
return result;
15451554
}
15461555

1556+
void CheckpointManager::updateStatsForStateChange(vbucket_state_t from,
1557+
vbucket_state_t to) {
1558+
LockHolder lh(queueLock);
1559+
if (from == vbucket_state_replica && to != vbucket_state_replica) {
1560+
// vbucket is changing state away from replica, it's memory usage
1561+
// should no longer be accounted for as a replica.
1562+
stats.replicaCheckpointOverhead -= getMemoryOverhead_UNLOCKED();
1563+
} else if (from != vbucket_state_replica && to == vbucket_state_replica) {
1564+
// vbucket is changing state to _become_ a replica, it's memory usage
1565+
// _should_ be accounted for as a replica.
1566+
stats.replicaCheckpointOverhead += getMemoryOverhead_UNLOCKED();
1567+
}
1568+
}
1569+
15471570
void CheckpointManager::decrCursorFromCheckpointEnd(const std::string &name) {
15481571
LockHolder lh(queueLock);
15491572
cursor_index::iterator it = connCursors.find(name);
@@ -1896,9 +1919,9 @@ queued_item CheckpointManager::createCheckpointItem(uint64_t id, uint16_t vbid,
18961919
return qi;
18971920
}
18981921

1899-
uint64_t CheckpointManager::createNewCheckpoint() {
1922+
uint64_t CheckpointManager::createNewCheckpoint(bool force) {
19001923
LockHolder lh(queueLock);
1901-
if (checkpointList.back()->getNumItems() > 0) {
1924+
if (force || checkpointList.back()->getNumItems() > 0) {
19021925
uint64_t chk_id = checkpointList.back()->getId();
19031926
addNewCheckpoint_UNLOCKED(chk_id + 1);
19041927
}

engines/ep/src/checkpoint.h

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -346,14 +346,21 @@ std::string to_string(queue_dirty_t value);
346346
* and system events - for meta-items like checkpoint start/end they share the
347347
* same sequence number as the associated op - for checkpoint_start that is the
348348
* ID of the following op, for checkpoint_end the ID of the proceeding op.
349+
*
350+
* Checkpoints call the provided memOverheadChangedCallback on any action that
351+
* changes the memory overhead of the checkpoint - that is, the memory required
352+
* _beyond_ that of the Items the Checkpoint holds. This occurs at
353+
* creation/destruction or when queuing new items.
349354
*/
350355
class Checkpoint {
351356
public:
352357
Checkpoint(EPStats& st,
353358
uint64_t id,
354359
uint64_t snapStart,
355360
uint64_t snapEnd,
356-
uint16_t vbid);
361+
uint16_t vbid,
362+
const std::function<void(int64_t delta)>&
363+
memOverheadChangedCallback);
357364

358365
~Checkpoint();
359366

@@ -587,6 +594,14 @@ class Checkpoint {
587594
static const StoredDocKey SetVBucketStateKey;
588595

589596
private:
597+
/**
598+
* Increase the tracked overhead of the checkpoint. See getMemoryOverhead
599+
*/
600+
void increaseMemoryOverhead(size_t delta) {
601+
memOverhead += delta;
602+
memOverheadChangedCallback(delta);
603+
}
604+
590605
EPStats &stats;
591606
uint64_t checkpointId;
592607
uint64_t snapStartSeqno;
@@ -605,6 +620,9 @@ class Checkpoint {
605620
checkpoint_index metaKeyIndex;
606621
size_t memOverhead;
607622

623+
// Reference to callback owned by checkpoint manager for stat tracking
624+
const std::function<void(int64_t delta)>& memOverheadChangedCallback;
625+
608626
// The following stat is to contain the memory consumption of all
609627
// the queued items in the given checkpoint.
610628
size_t effectiveMemUsage;
@@ -847,9 +865,12 @@ class CheckpointManager {
847865

848866
/**
849867
* Create a new open checkpoint by force.
868+
*
869+
* @param force create a new checkpoint even if the existing one
870+
* contains no non-meta items
850871
* @return the new open checkpoint id
851872
*/
852-
uint64_t createNewCheckpoint();
873+
uint64_t createNewCheckpoint(bool force = false);
853874

854875
void resetCursors(checkpointCursorInfoList &cursors);
855876

@@ -946,6 +967,27 @@ class CheckpointManager {
946967
return ++lastBySeqno;
947968
}
948969

970+
/**
971+
* Sets the callback to be invoked whenever memory usage changes due to a
972+
* new queued item or checkpoint removal (or checkpoint expelling, in
973+
* versions this is implemented in). This allows changes in checkpoint
974+
* memory usage to be monitored.
975+
*/
976+
void setOverheadChangedCallback(
977+
std::function<void(int64_t delta)> callback) {
978+
LockHolder lh(queueLock);
979+
overheadChangedCallback = std::move(callback);
980+
981+
size_t initialOverhead = 0;
982+
for (const auto& checkpoint : checkpointList) {
983+
initialOverhead += checkpoint->memorySize();
984+
}
985+
986+
overheadChangedCallback(initialOverhead);
987+
}
988+
989+
void updateStatsForStateChange(vbucket_state_t from, vbucket_state_t to);
990+
949991
void dump() const;
950992

951993
static const std::string pCursorName;
@@ -972,6 +1014,16 @@ class CheckpointManager {
9721014
void updateDiskQueueStats(VBucket& vbucket, size_t curr_remains,
9731015
size_t new_remains);
9741016

1017+
/**
1018+
* function to invoke whenever memory usage changes due to a new
1019+
* queued item or checkpoint removal (or checkpoint expelling, in versions
1020+
* this ins implemented in).
1021+
* Must be declared before checkpointList to ensure it still exists
1022+
* when any Checkpoints within the list are destroyed during destruction
1023+
* of this CheckpointManager.
1024+
*/
1025+
std::function<void(int64_t delta)> overheadChangedCallback{[](int64_t) {}};
1026+
9751027
CheckpointList checkpointList;
9761028

9771029
private:

engines/ep/src/ep_bucket.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -885,6 +885,20 @@ ENGINE_ERROR_CODE EPBucket::getPerVBucketDiskStats(const void* cookie,
885885
return ENGINE_SUCCESS;
886886
}
887887

888+
size_t EPBucket::getPageableMemCurrent() const {
889+
// EP Buckets can (theoretically) page out all memory, active(+pending) or
890+
// replica.
891+
return stats.getEstimatedTotalMemoryUsed();
892+
}
893+
894+
size_t EPBucket::getPageableMemHighWatermark() const {
895+
return stats.mem_high_wat;
896+
}
897+
898+
size_t EPBucket::getPageableMemLowWatermark() const {
899+
return stats.mem_low_wat;
900+
}
901+
888902
VBucketPtr EPBucket::makeVBucket(VBucket::id_type id,
889903
vbucket_state_t state,
890904
KVShard* shard,

engines/ep/src/ep_bucket.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,11 @@ class EPBucket : public KVBucket {
100100

101101
ENGINE_ERROR_CODE getPerVBucketDiskStats(const void* cookie,
102102
ADD_STAT add_stat) override;
103+
104+
size_t getPageableMemCurrent() const override;
105+
size_t getPageableMemHighWatermark() const override;
106+
size_t getPageableMemLowWatermark() const override;
107+
103108
/**
104109
* Creates a VBucket object.
105110
*/

engines/ep/src/ep_engine.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3142,6 +3142,14 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doMemoryStats(const void *cookie,
31423142
add_stat,
31433143
cookie);
31443144

3145+
add_casted_stat(
3146+
"ht_mem_used_replica", stats.replicaHTMemory, add_stat, cookie);
3147+
3148+
add_casted_stat("replica_checkpoint_memory_overhead",
3149+
stats.replicaCheckpointOverhead,
3150+
add_stat,
3151+
cookie);
3152+
31453153
add_casted_stat("ep_kv_size", stats.getCurrentSize(), add_stat, cookie);
31463154
add_casted_stat(
31473155
"ep_value_size", stats.getTotalValueSize(), add_stat, cookie);

0 commit comments

Comments
 (0)