Skip to content

Commit 9f74182

Browse files
committed
MB-41804: Merge branch 'couchbase/alice' into mad-hatter
Note: Due to changes in the CheckpointManager memory tracking between alice and mad-hatter, checks in ReplicaMemoryTrackingStateChange have been temporarily weakened, and will be restored in a later patch. * commit '69730e6f4': MB-41804: Use 'Pagable' mem_used & watermark Change-Id: Ieec3dee7137a733b7d5aa3161410ac7286c7fa82
2 parents c6ce6ae + 69730e6 commit 9f74182

22 files changed

+573
-43
lines changed

engines/ep/src/checkpoint.cc

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,8 @@ Checkpoint::Checkpoint(EPStats& st,
181181
uint64_t visibleSnapEnd,
182182
boost::optional<uint64_t> highCompletedSeqno,
183183
Vbid vbid,
184-
CheckpointType checkpointType)
184+
CheckpointType checkpointType,
185+
const std::function<void(int64_t delta)>& memOverheadChangedCallback)
185186
: stats(st),
186187
checkpointId(id),
187188
snapStartSeqno(snapStart),
@@ -199,8 +200,10 @@ Checkpoint::Checkpoint(EPStats& st,
199200
keyIndexMemUsage(0),
200201
queuedItemsMemUsage(0),
201202
checkpointType(checkpointType),
202-
highCompletedSeqno(highCompletedSeqno) {
203+
highCompletedSeqno(highCompletedSeqno),
204+
memOverheadChangedCallback(memOverheadChangedCallback) {
203205
stats.coreLocal.get()->memOverhead.fetch_add(sizeof(Checkpoint));
206+
memOverheadChangedCallback(sizeof(Checkpoint));
204207
}
205208

206209
Checkpoint::~Checkpoint() {
@@ -213,8 +216,9 @@ Checkpoint::~Checkpoint() {
213216
* of queued_items in the checkpoint.
214217
*/
215218
auto queueMemOverhead = sizeof(queued_item) * toWrite.size();
216-
stats.coreLocal.get()->memOverhead.fetch_sub(
217-
sizeof(Checkpoint) + keyIndexMemUsage + queueMemOverhead);
219+
auto overhead = sizeof(Checkpoint) + keyIndexMemUsage + queueMemOverhead;
220+
stats.coreLocal.get()->memOverhead.fetch_sub(overhead);
221+
memOverheadChangedCallback(-ssize_t(overhead));
218222
}
219223

220224
QueueDirtyStatus Checkpoint::queueDirty(const queued_item& qi,
@@ -384,8 +388,9 @@ QueueDirtyStatus Checkpoint::queueDirty(const queued_item& qi,
384388
* item to the queue (toWrite). This is approximated to the
385389
* addition to metaKeyIndex / keyIndex plus sizeof(queued_item).
386390
*/
387-
stats.coreLocal.get()->memOverhead.fetch_add(indexKeyUsage +
388-
sizeof(queued_item));
391+
auto overhead = indexKeyUsage + sizeof(queued_item);
392+
stats.coreLocal.get()->memOverhead.fetch_add(overhead);
393+
memOverheadChangedCallback(overhead);
389394
/**
390395
* Update the total metaKeyIndex / keyIndex memory usage which is
391396
* used when the checkpoint is destructed to manually account

engines/ep/src/checkpoint.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,10 @@ std::string to_string(QueueDirtyStatus value);
396396
* if the cursor is pointing to a meta-item the position to expel from is moved
397397
* backwards until either a mutation item or the dummy item is reached.
398398
*
399+
* Checkpoints call the provided memOverheadChangedCallback on any action that
400+
* changes the memory overhead of the checkpoint - that is, the memory required
401+
* _beyond_ that of the Items the Checkpoint holds. This occurs at
402+
* creation/destruction or when queuing new items.
399403
*/
400404
class Checkpoint {
401405
public:
@@ -406,7 +410,9 @@ class Checkpoint {
406410
uint64_t visibleSnapEnd,
407411
boost::optional<uint64_t> highCompletedSeqno,
408412
Vbid vbid,
409-
CheckpointType checkpointType);
413+
CheckpointType checkpointType,
414+
const std::function<void(int64_t delta)>&
415+
memOverheadChangedCallback);
410416

411417
~Checkpoint();
412418

@@ -770,5 +776,8 @@ class Checkpoint {
770776
// de-duplication.
771777
boost::optional<uint64_t> maxDeletedRevSeqno;
772778

779+
// Reference to callback owned by checkpoint manager for stat tracking
780+
const std::function<void(int64_t delta)>& memOverheadChangedCallback;
781+
773782
friend std::ostream& operator <<(std::ostream& os, const Checkpoint& m);
774783
};

engines/ep/src/checkpoint_manager.cc

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,8 @@ void CheckpointManager::addOpenCheckpoint(
250250
visibleSnapEnd,
251251
highCompletedSeqno,
252252
vbucketId,
253-
checkpointType);
253+
checkpointType,
254+
overheadChangedCallback);
254255
// Add an empty-item into the new checkpoint.
255256
// We need this because every CheckpointCursor will point to this empty-item
256257
// at creation. So, the cursor will point at the first actual non-meta item
@@ -1451,12 +1452,12 @@ queued_item CheckpointManager::createCheckpointItem(uint64_t id,
14511452
return qi;
14521453
}
14531454

1454-
uint64_t CheckpointManager::createNewCheckpoint() {
1455+
uint64_t CheckpointManager::createNewCheckpoint(bool force) {
14551456
LockHolder lh(queueLock);
14561457

14571458
const auto& openCkpt = getOpenCheckpoint_UNLOCKED(lh);
14581459

1459-
if (openCkpt.getNumItems() == 0) {
1460+
if (openCkpt.getNumItems() == 0 && !force) {
14601461
return openCkpt.getId();
14611462
}
14621463

@@ -1631,6 +1632,34 @@ bool CheckpointManager::isOpenCheckpointDisk() {
16311632
return checkpointList.back()->isDiskCheckpoint();
16321633
}
16331634

1635+
void CheckpointManager::updateStatsForStateChange(vbucket_state_t from,
1636+
vbucket_state_t to) {
1637+
LockHolder lh(queueLock);
1638+
if (from == vbucket_state_replica && to != vbucket_state_replica) {
1639+
// vbucket is changing state away from replica, it's memory usage
1640+
// should no longer be accounted for as a replica.
1641+
stats.replicaCheckpointOverhead -= getMemoryOverhead_UNLOCKED();
1642+
} else if (from != vbucket_state_replica && to == vbucket_state_replica) {
1643+
// vbucket is changing state to _become_ a replica, it's memory usage
1644+
// _should_ be accounted for as a replica.
1645+
stats.replicaCheckpointOverhead += getMemoryOverhead_UNLOCKED();
1646+
}
1647+
}
1648+
1649+
void CheckpointManager::setOverheadChangedCallback(
1650+
std::function<void(int64_t delta)> callback) {
1651+
LockHolder lh(queueLock);
1652+
overheadChangedCallback = std::move(callback);
1653+
1654+
overheadChangedCallback(getMemoryOverhead_UNLOCKED());
1655+
}
1656+
1657+
std::function<void(int64_t delta)>
1658+
CheckpointManager::getOverheadChangedCallback() const {
1659+
LockHolder lh(queueLock);
1660+
return overheadChangedCallback;
1661+
}
1662+
16341663
std::ostream& operator <<(std::ostream& os, const CheckpointManager& m) {
16351664
os << "CheckpointManager[" << &m << "] with numItems:"
16361665
<< m.getNumItems() << " checkpoints:" << m.checkpointList.size()

engines/ep/src/checkpoint_manager.h

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,9 +372,12 @@ class CheckpointManager {
372372

373373
/**
374374
* Create a new open checkpoint by force.
375+
*
376+
* @param force create a new checkpoint even if the existing one
377+
* contains no non-meta items
375378
* @return the new open checkpoint id
376379
*/
377-
uint64_t createNewCheckpoint();
380+
uint64_t createNewCheckpoint(bool force = false);
378381

379382
/**
380383
* Get id of the previous checkpoint that is followed by the checkpoint
@@ -484,6 +487,24 @@ class CheckpointManager {
484487
/// @return true if the current open checkpoint is a DiskCheckpoint
485488
bool isOpenCheckpointDisk();
486489

490+
void updateStatsForStateChange(vbucket_state_t from, vbucket_state_t to);
491+
492+
/**
493+
* Sets the callback to be invoked whenever memory usage changes due to a
494+
* new queued item or checkpoint removal (or checkpoint expelling, in
495+
* versions this is implemented in). This allows changes in checkpoint
496+
* memory usage to be monitored.
497+
*/
498+
void setOverheadChangedCallback(
499+
std::function<void(int64_t delta)> callback);
500+
501+
/**
502+
* Gets the callback to be invoked whenever memory usage changes due to a
503+
* new queued item or checkpoint removal (or checkpoint expelling, in
504+
* versions this is implemented in).
505+
*/
506+
std::function<void(int64_t delta)> getOverheadChangedCallback() const;
507+
487508
/**
488509
* Member std::function variable, to allow us to inject code into
489510
* removeCursor_UNLOCKED() for unit MB36146
@@ -519,6 +540,16 @@ class CheckpointManager {
519540
VBucket& vb,
520541
const queued_item& qi);
521542

543+
/**
544+
* function to invoke whenever memory usage changes due to a new
545+
* queued item or checkpoint removal (or checkpoint expelling, in versions
546+
* this ins implemented in).
547+
* Must be declared before checkpointList to ensure it still exists
548+
* when any Checkpoints within the list are destroyed during destruction
549+
* of this CheckpointManager.
550+
*/
551+
std::function<void(int64_t delta)> overheadChangedCallback{[](int64_t) {}};
552+
522553
CheckpointList checkpointList;
523554

524555
// Pair of {sequence number, cursor at checkpoint start} used when

engines/ep/src/ep_bucket.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,6 +1276,20 @@ ENGINE_ERROR_CODE EPBucket::getPerVBucketDiskStats(const void* cookie,
12761276
return ENGINE_SUCCESS;
12771277
}
12781278

1279+
size_t EPBucket::getPageableMemCurrent() const {
1280+
// EP Buckets can (theoretically) page out all memory, active(+pending) or
1281+
// replica.
1282+
return stats.getEstimatedTotalMemoryUsed();
1283+
}
1284+
1285+
size_t EPBucket::getPageableMemHighWatermark() const {
1286+
return stats.mem_high_wat;
1287+
}
1288+
1289+
size_t EPBucket::getPageableMemLowWatermark() const {
1290+
return stats.mem_low_wat;
1291+
}
1292+
12791293
VBucketPtr EPBucket::makeVBucket(
12801294
Vbid id,
12811295
vbucket_state_t state,

engines/ep/src/ep_bucket.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,11 @@ class EPBucket : public KVBucket {
129129

130130
ENGINE_ERROR_CODE getPerVBucketDiskStats(
131131
const void* cookie, const AddStatFn& add_stat) override;
132+
133+
size_t getPageableMemCurrent() const override;
134+
size_t getPageableMemHighWatermark() const override;
135+
size_t getPageableMemLowWatermark() const override;
136+
132137
/**
133138
* Creates a VBucket object from warmup (can set collection state)
134139
*/

engines/ep/src/ep_engine.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3215,6 +3215,14 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doMemoryStats(
32153215
add_stat,
32163216
cookie);
32173217

3218+
add_casted_stat(
3219+
"ht_mem_used_replica", stats.replicaHTMemory, add_stat, cookie);
3220+
3221+
add_casted_stat("replica_checkpoint_memory_overhead",
3222+
stats.replicaCheckpointOverhead,
3223+
add_stat,
3224+
cookie);
3225+
32183226
add_casted_stat("ep_kv_size", stats.getCurrentSize(), add_stat, cookie);
32193227
add_casted_stat(
32203228
"ep_value_size", stats.getTotalValueSize(), add_stat, cookie);

engines/ep/src/ephemeral_bucket.cc

Lines changed: 101 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@
2828
#include "replicationthrottle.h"
2929
#include "rollback_result.h"
3030
#include "statwriter.h"
31+
#include <checkpoint_manager.h>
3132

3233
#include <platform/sized_buffer.h>
3334

35+
#include <algorithm>
36+
3437
/**
3538
* A configuration value changed listener that responds to Ephemeral bucket
3639
* parameter changes.
@@ -148,6 +151,70 @@ bool EphemeralBucket::initialize() {
148151
return true;
149152
}
150153

154+
size_t EphemeralBucket::getPageableMemCurrent() const {
155+
// Ephemeral buckets differ from persistent in terms of how memory can
156+
// be freed - only active items can (potentially) be directly deleted
157+
// (auto-delete or items which have expired) - given that the replica
158+
// must be an exact copy of the active.
159+
// As such, 'pageable' memory is non-replica memmory.
160+
161+
// We don't directly track "active_mem_used", but we can roughtly estimate
162+
// it from mem_used - replica_ht_mem - replica_checkpoint_mem
163+
const auto estimatedActiveMemory =
164+
int64_t(stats.getEstimatedTotalMemoryUsed()) -
165+
stats.replicaHTMemory - stats.replicaCheckpointOverhead;
166+
return std::max(estimatedActiveMemory, int64_t(0));
167+
}
168+
169+
size_t EphemeralBucket::getPageableMemHighWatermark() const {
170+
// Ephemeral buckets can only page out non-replica memory (see comments
171+
// in getPageableMemCurrent). As such, set pagable high watermark to
172+
// a fraction of the overall high watermark based on what faction of
173+
// vBuckets are active. Memory used by any dead vbs should be reclaimed
174+
// soon, so ignore them here; they don't need to be allocated a portion
175+
// of the quota.
176+
const double activeVBCount = vbMap.getVBStateCount(vbucket_state_active);
177+
const double pendingVBCount = vbMap.getVBStateCount(vbucket_state_pending);
178+
const double replicaVBCount = vbMap.getVBStateCount(vbucket_state_replica);
179+
const double totalVBCount = activeVBCount + pendingVBCount + replicaVBCount;
180+
181+
if (totalVBCount <= 0) {
182+
// not an expected situation, bail out and return the full high
183+
// watermark
184+
return stats.mem_high_wat.load();
185+
}
186+
187+
const double activePendingHighWat =
188+
(stats.mem_high_wat.load() / totalVBCount) *
189+
(activeVBCount + pendingVBCount);
190+
191+
return activePendingHighWat;
192+
}
193+
194+
size_t EphemeralBucket::getPageableMemLowWatermark() const {
195+
// Ephemeral buckets can only page out non-replica memory (see comments
196+
// in getPageableMemCurrent). As such, set pagable low watermark to
197+
// a fraction of the overall low watermark based on what faction of
198+
// vBuckets are active. Memory used by any dead vbs should be reclaimed
199+
// soon, so ignore them here; they don't need to be allocated a portion
200+
// of the quota.
201+
const double activeVBCount = vbMap.getVBStateCount(vbucket_state_active);
202+
const double pendingVBCount = vbMap.getVBStateCount(vbucket_state_pending);
203+
const double replicaVBCount = vbMap.getVBStateCount(vbucket_state_replica);
204+
const double totalVBCount = activeVBCount + pendingVBCount + replicaVBCount;
205+
206+
if (totalVBCount <= 0) {
207+
// not an expected situation, bail out and return the full low
208+
// watermark
209+
return stats.mem_low_wat.load();
210+
}
211+
212+
const double activeLowWat = (stats.mem_low_wat.load() / totalVBCount) *
213+
(activeVBCount + pendingVBCount);
214+
215+
return activeLowWat;
216+
}
217+
151218
void EphemeralBucket::attemptToFreeMemory() {
152219
// Call down to the base class; do to whatever it can to free memory.
153220
KVBucket::attemptToFreeMemory();
@@ -200,28 +267,40 @@ VBucketPtr EphemeralBucket::makeVBucket(
200267
// 1. make_shared doesn't accept a Deleter
201268
// 2. allocate_shared has inconsistencies between platforms in calling
202269
// alloc.destroy (libc++ doesn't call it)
203-
return VBucketPtr(new EphemeralVBucket(id,
204-
state,
205-
stats,
206-
engine.getCheckpointConfig(),
207-
shard,
208-
lastSeqno,
209-
lastSnapStart,
210-
lastSnapEnd,
211-
std::move(table),
212-
std::move(newSeqnoCb),
213-
makeSyncWriteResolvedCB(),
214-
makeSyncWriteCompleteCB(),
215-
makeSeqnoAckCB(),
216-
engine.getConfiguration(),
217-
eviction_policy,
218-
std::move(manifest),
219-
initState,
220-
purgeSeqno,
221-
maxCas,
222-
mightContainXattrs,
223-
replicationTopology),
224-
VBucket::DeferredDeleter(engine));
270+
auto* vb = new EphemeralVBucket(id,
271+
state,
272+
stats,
273+
engine.getCheckpointConfig(),
274+
shard,
275+
lastSeqno,
276+
lastSnapStart,
277+
lastSnapEnd,
278+
std::move(table),
279+
std::move(newSeqnoCb),
280+
makeSyncWriteResolvedCB(),
281+
makeSyncWriteCompleteCB(),
282+
makeSeqnoAckCB(),
283+
engine.getConfiguration(),
284+
eviction_policy,
285+
std::move(manifest),
286+
initState,
287+
purgeSeqno,
288+
maxCas,
289+
mightContainXattrs,
290+
replicationTopology);
291+
292+
vb->ht.setMemChangedCallback([this, vb](int64_t delta) {
293+
if (vb->getState() == vbucket_state_replica) {
294+
this->stats.replicaHTMemory += delta;
295+
}
296+
});
297+
vb->checkpointManager->setOverheadChangedCallback(
298+
[this, vb](int64_t delta) {
299+
if (vb->getState() == vbucket_state_replica) {
300+
this->stats.replicaCheckpointOverhead += delta;
301+
}
302+
});
303+
return VBucketPtr(vb, VBucket::DeferredDeleter(engine));
225304
}
226305

227306
void EphemeralBucket::completeStatsVKey(const void* cookie,

engines/ep/src/ephemeral_bucket.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ class EphemeralBucket : public KVBucket {
6262
return ENGINE_KEY_ENOENT;
6363
}
6464

65+
size_t getPageableMemCurrent() const override;
66+
size_t getPageableMemHighWatermark() const override;
67+
size_t getPageableMemLowWatermark() const override;
68+
6569
void attemptToFreeMemory() override;
6670

6771
/**

0 commit comments

Comments
 (0)