2828#include < phosphor/phosphor.h>
2929#include < platform/make_unique.h>
3030
31+ #include < utility>
32+
3133/* *
3234 * Remove all the closed unreferenced checkpoints for each vbucket.
3335 */
@@ -90,7 +92,8 @@ class CheckpointVisitor : public VBucketVisitor {
9092 std::atomic<bool > &stateFinalizer;
9193};
9294
93- void ClosedUnrefCheckpointRemoverTask::cursorDroppingIfNeeded (void ) {
95+ std::pair<bool , size_t >
96+ ClosedUnrefCheckpointRemoverTask::isCursorDroppingNeeded () const {
9497 /* *
9598 * Cursor dropping will commence if one of the following conditions is met:
9699 * 1. if the total memory used is greater than the upper threshold which is
@@ -112,8 +115,8 @@ void ClosedUnrefCheckpointRemoverTask::cursorDroppingIfNeeded(void) {
112115 .getActiveVBucketsTotalCheckpointMemoryUsage ();
113116
114117 const auto chkptMemLimit =
115- bucketQuota *
116- (config. getCursorDroppingCheckpointMemUpperMark () / 100 ) ;
118+ ( bucketQuota * config. getCursorDroppingCheckpointMemUpperMark ()) /
119+ 100 ;
117120
118121 const bool hitCheckpointMemoryThreshold =
119122 activeVBChkptMemSize >= chkptMemLimit;
@@ -140,8 +143,9 @@ void ClosedUnrefCheckpointRemoverTask::cursorDroppingIfNeeded(void) {
140143 // memory usage threshold.
141144 amountOfMemoryToClear =
142145 stats.getEstimatedTotalMemoryUsed () -
143- (bucketQuota *
144- (config.getCursorDroppingCheckpointMemLowerMark () / 100 ));
146+ ((bucketQuota *
147+ config.getCursorDroppingCheckpointMemLowerMark ()) /
148+ 100 );
145149 LOG (EXTENSION_LOG_INFO,
146150 " Triggering cursor dropping as checkpoint_memory (%lu MB) "
147151 " exceeds cursor_dropping_checkpoint_mem_upper_mark (%lu%%, "
@@ -163,43 +167,49 @@ void ClosedUnrefCheckpointRemoverTask::cursorDroppingIfNeeded(void) {
163167 toMB (stats.cursorDroppingUThreshold .load ()),
164168 toMB (amountOfMemoryToClear));
165169 }
170+ // Cursor dropping is required.
171+ return std::make_pair (true , amountOfMemoryToClear);
172+ }
173+ // Cursor dropping is not required.
174+ return std::make_pair (false , 0 );
175+ }
166176
167- size_t memoryCleared = 0 ;
168- KVBucketIface* kvBucket = engine->getKVBucket ();
169- // Get a list of active vbuckets sorted by memory usage
170- // of their respective checkpoint managers.
171- auto vbuckets =
172- kvBucket->getVBuckets ().getActiveVBucketsSortedByChkMgrMem ();
173- for (const auto & it: vbuckets) {
174- if (memoryCleared < amountOfMemoryToClear) {
175- uint16_t vbid = it.first ;
176- VBucketPtr vb = kvBucket->getVBucket (vbid);
177- if (vb) {
178- // Get a list of cursors that can be dropped from the
179- // vbucket's checkpoint manager, so as to unreference
180- // an estimated number of checkpoints.
181- std::vector<std::string> cursors =
182- vb->checkpointManager ->getListOfCursorsToDrop ();
183- std::vector<std::string>::iterator itr = cursors.begin ();
184- for (; itr != cursors.end (); ++itr) {
185- if (memoryCleared < amountOfMemoryToClear) {
186- if (engine->getDcpConnMap ().handleSlowStream (vbid,
187- *itr))
188- {
189- auto memoryFreed =
190- vb->getChkMgrMemUsageOfUnrefCheckpoints ();
191- ++stats.cursorsDropped ;
192- stats.cursorMemoryFreed += memoryFreed;
193- memoryCleared += memoryFreed;
194- }
195- } else {
196- break ;
177+ void ClosedUnrefCheckpointRemoverTask::attemptCursorDropping (
178+ size_t amountOfMemoryToClear) {
179+ size_t memoryCleared = 0 ;
180+ KVBucketIface* kvBucket = engine->getKVBucket ();
181+ // Get a list of active vbuckets sorted by memory usage
182+ // of their respective checkpoint managers.
183+ auto vbuckets =
184+ kvBucket->getVBuckets ().getActiveVBucketsSortedByChkMgrMem ();
185+ for (const auto & it : vbuckets) {
186+ if (memoryCleared < amountOfMemoryToClear) {
187+ uint16_t vbid = it.first ;
188+ VBucketPtr vb = kvBucket->getVBucket (vbid);
189+ if (vb) {
190+ // Get a list of cursors that can be dropped from the
191+ // vbucket's checkpoint manager, so as to unreference
192+ // an estimated number of checkpoints.
193+ std::vector<std::string> cursors =
194+ vb->checkpointManager ->getListOfCursorsToDrop ();
195+ std::vector<std::string>::iterator itr = cursors.begin ();
196+ for (; itr != cursors.end (); ++itr) {
197+ if (memoryCleared < amountOfMemoryToClear) {
198+ if (engine->getDcpConnMap ().handleSlowStream (vbid,
199+ *itr)) {
200+ auto memoryFreed =
201+ vb->getChkMgrMemUsageOfUnrefCheckpoints ();
202+ ++stats.cursorsDropped ;
203+ stats.cursorMemoryFreed += memoryFreed;
204+ memoryCleared += memoryFreed;
197205 }
206+ } else {
207+ break ;
198208 }
199209 }
200- } else { // memoryCleared >= amountOfMemoryToClear
201- break ;
202210 }
211+ } else { // memoryCleared >= amountOfMemoryToClear
212+ break ;
203213 }
204214 }
205215}
@@ -208,7 +218,13 @@ bool ClosedUnrefCheckpointRemoverTask::run(void) {
208218 TRACE_EVENT0 (" ep-engine/task" , " ClosedUnrefCheckpointRemoverTask" );
209219 bool inverse = true ;
210220 if (available.compare_exchange_strong (inverse, false )) {
211- cursorDroppingIfNeeded ();
221+ bool shouldTriggerCursorDropping{false };
222+ size_t amountOMemoryToClear{0 };
223+ std::tie (shouldTriggerCursorDropping, amountOMemoryToClear) =
224+ isCursorDroppingNeeded ();
225+ if (shouldTriggerCursorDropping) {
226+ attemptCursorDropping (amountOMemoryToClear);
227+ }
212228 KVBucketIface* kvBucket = engine->getKVBucket ();
213229 auto pv =
214230 std::make_unique<CheckpointVisitor>(kvBucket, stats, available);
0 commit comments