Skip to content

Commit 8b4d53a

Browse files
committed
MB-41782: Create one task for scheduleCompaction
Currently multiple calls to scheduleCompaction result in new tasks being created and scheduled, they will all eventually run and compact the vbucket many times. This change aims to improve this by trying to maintain just one CompactTask per vbucket. The aim is that if multiple schedculeCompaction calls occur * The first call creates a task with the given CompactionConfig * Subsequent calls update the task with the most recent CompactionConfig The task itself is changed so that there is a window/scope inside the run function where it copies the configuration and starts compacting. Once this has happened the compaction will complete, but any schedule calls happening whilst compaction is running will update the tasks config and set a flag so that when the task completes it reschedules ready to run again with the new config. The main purpose of this change is to improve what happens when a scope is dropped. We have seen from logs that a scope drop may trigger many compactions as each collection that is dropped calls scheduleCompaction. As part of this improvement the scheduleCompaction function now takes a delay parameter which when 0 means no-delay (schedule now) otherwise schedule but run in 'delay' seconds. The collections use of scheduleCompaction sets a short delay and once all collection drops have been flushed one compaction will run a short time later. Change-Id: I998b5fe6833b2b7e1681cac64933d58da7b9560f Reviewed-on: http://review.couchbase.org/c/kv_engine/+/140364 Tested-by: Build Bot <[email protected]> Reviewed-by: James Harrison <[email protected]>
1 parent d30054c commit 8b4d53a

20 files changed

+457
-89
lines changed

engines/ep/configuration.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,12 @@
197197
"dynamic": false,
198198
"type": "size_t"
199199
},
200+
"collections_drop_compaction_delay" : {
201+
"default": "5000",
202+
"descr": "How many milliseconds before compaction runs following the drop of a collection",
203+
"dynamic": false,
204+
"type": "size_t"
205+
},
200206
"collections_enabled" : {
201207
"default": "true",
202208
"descr": "Enable the collections functionality, enabling the storage of collection metadata",

engines/ep/src/collections/flush.cc

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "collections/vbucket_manifest.h"
2424
#include "collections/vbucket_manifest_handles.h"
2525
#include "ep_bucket.h"
26+
#include "ep_engine.h"
2627
#include "item.h"
2728

2829
namespace Collections::VB {
@@ -242,8 +243,16 @@ void Flush::checkAndTriggerPurge(Vbid vbid, EPBucket& bucket) const {
242243
}
243244

244245
void Flush::triggerPurge(Vbid vbid, EPBucket& bucket) {
245-
CompactionConfig config;
246-
bucket.scheduleCompaction(vbid, config, nullptr);
246+
// There's no requirement for compaction to run 'now', schedule with a delay
247+
// which allows for any other drop events in the queue to all end up
248+
// 'coalesced' into one run of compaction.
249+
bucket.scheduleCompaction(
250+
vbid,
251+
nullptr,
252+
std::chrono::milliseconds(
253+
bucket.getEPEngine()
254+
.getConfiguration()
255+
.getCollectionsDropCompactionDelay()));
247256
}
248257

249258
static std::pair<bool, std::optional<CollectionID>> getCollectionID(

engines/ep/src/ep_bucket.cc

Lines changed: 91 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,9 +1000,11 @@ void EPBucket::stopBgFetcher() {
10001000
}
10011001
}
10021002

1003-
ENGINE_ERROR_CODE EPBucket::scheduleCompaction(Vbid vbid,
1004-
const CompactionConfig& c,
1005-
const void* cookie) {
1003+
ENGINE_ERROR_CODE EPBucket::scheduleCompaction(
1004+
Vbid vbid,
1005+
std::optional<CompactionConfig> config,
1006+
const void* cookie,
1007+
std::chrono::milliseconds delay) {
10061008
ENGINE_ERROR_CODE errCode = checkForDBExistence(vbid);
10071009
if (errCode != ENGINE_SUCCESS) {
10081010
return errCode;
@@ -1014,41 +1016,66 @@ ENGINE_ERROR_CODE EPBucket::scheduleCompaction(Vbid vbid,
10141016
return ENGINE_NOT_MY_VBUCKET;
10151017
}
10161018

1017-
LockHolder lh(compactionLock);
1018-
ExTask task = std::make_shared<CompactTask>(*this, vbid, c, cookie);
1019-
compactionTasks.emplace_back(std::make_pair(vbid, task));
1020-
bool snoozed = false;
1021-
if (compactionTasks.size() > 1) {
1022-
if ((stats.diskQueueSize > compactionWriteQueueCap &&
1023-
compactionTasks.size() > (vbMap.getNumShards() / 2)) ||
1024-
engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
1025-
// Snooze a new compaction task.
1026-
// We will wake it up when one of the existing compaction tasks is
1027-
// done.
1028-
task->snooze(60);
1029-
snoozed = true;
1019+
auto handle = compactionTasks.wlock();
1020+
1021+
// Convert delay to ExecutorPool 'double' e.g. 1500ms = 1.5 secs
1022+
std::chrono::duration<double> execDelay = delay;
1023+
1024+
// try to emplace an empty shared_ptr
1025+
auto [itr, emplaced] = handle->try_emplace(vbid, nullptr);
1026+
auto& task = itr->second;
1027+
1028+
if (!emplaced) {
1029+
// The existing task must be poked - it needs to either reschedule if
1030+
// it is currently running or run with the given config.
1031+
task->runCompactionWithConfig(config);
1032+
if (execDelay.count() > 0.0) {
1033+
ExecutorPool::get()->snooze(task->getId(), execDelay.count());
1034+
} else {
1035+
ExecutorPool::get()->wake(task->getId());
1036+
}
1037+
} else {
1038+
// Nothing in the map for this vbid now construct the task
1039+
itr->second =
1040+
std::make_shared<CompactTask>(*this, vbid, config, cookie);
1041+
if (handle->size() > 1) {
1042+
if ((stats.diskQueueSize > compactionWriteQueueCap &&
1043+
handle->size() > (vbMap.getNumShards() / 2)) ||
1044+
engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
1045+
// Snooze a new compaction task.
1046+
// We will wake it up when one of the existing compaction tasks
1047+
// is done.
1048+
execDelay = std::chrono::seconds(60);
1049+
}
1050+
}
1051+
1052+
if (execDelay.count() > 0.0) {
1053+
task->snooze(execDelay.count());
10301054
}
1055+
ExecutorPool::get()->schedule(task);
10311056
}
10321057

1033-
ExecutorPool::get()->schedule(task);
1058+
return ENGINE_EWOULDBLOCK;
1059+
}
10341060

1035-
EP_LOG_INFO(
1036-
"Compaction of {}, task:{}, purge_before_ts:{}, "
1037-
"purge_before_seq:{}, "
1038-
"drop_deletes:{}, snoozed:{}, scheduled (awaiting completion).",
1039-
vbid,
1040-
uint64_t(task->getId()),
1041-
c.purge_before_ts,
1042-
c.purge_before_seq,
1043-
c.drop_deletes,
1044-
snoozed);
1061+
ENGINE_ERROR_CODE EPBucket::scheduleCompaction(
1062+
Vbid vbid,
1063+
const CompactionConfig& config,
1064+
const void* cookie,
1065+
std::chrono::milliseconds delay) {
1066+
return scheduleCompaction(
1067+
vbid, std::optional<CompactionConfig>{config}, cookie, delay);
1068+
}
10451069

1046-
return ENGINE_EWOULDBLOCK;
1070+
ENGINE_ERROR_CODE EPBucket::scheduleCompaction(
1071+
Vbid vbid, const void* cookie, std::chrono::milliseconds delay) {
1072+
return scheduleCompaction(
1073+
vbid, std::optional<CompactionConfig>{}, cookie, delay);
10471074
}
10481075

10491076
ENGINE_ERROR_CODE EPBucket::cancelCompaction(Vbid vbid) {
1050-
LockHolder lh(compactionLock);
1051-
for (const auto& task : compactionTasks) {
1077+
auto handle = compactionTasks.wlock();
1078+
for (const auto& task : *handle) {
10521079
task.second->cancel();
10531080
}
10541081
return ENGINE_SUCCESS;
@@ -1231,35 +1258,50 @@ bool EPBucket::doCompact(Vbid vbid,
12311258
engine.decrementSessionCtr();
12321259
}
12331260

1234-
updateCompactionTasks(vbid);
1235-
12361261
if (cookie) {
12371262
engine.notifyIOComplete(cookie, err);
12381263
}
1239-
--stats.pendingCompactions;
1264+
--stats.pendingCompactions; // just size of map??
12401265
return false;
12411266
}
12421267

1243-
void EPBucket::updateCompactionTasks(Vbid db_file_id) {
1244-
LockHolder lh(compactionLock);
1245-
bool erased = false, woke = false;
1246-
auto it = compactionTasks.begin();
1247-
while (it != compactionTasks.end()) {
1248-
if ((*it).first == db_file_id) {
1249-
it = compactionTasks.erase(it);
1250-
erased = true;
1251-
} else {
1252-
ExTask& task = (*it).second;
1253-
if (task->getState() == TASK_SNOOZED) {
1254-
ExecutorPool::get()->wake(task->getId());
1255-
woke = true;
1268+
bool EPBucket::updateCompactionTasks(Vbid vbid, bool canErase) {
1269+
auto handle = compactionTasks.wlock();
1270+
// Copy the size before we may erase a task
1271+
auto size = handle->size();
1272+
bool reschedule = false;
1273+
1274+
// process the caller and then find a second task to wake
1275+
if (canErase) {
1276+
if (auto itr = handle->find(vbid); itr != handle->end()) {
1277+
const auto& task = (*itr).second;
1278+
if (task->isRescheduleRequired()) {
1279+
// Nope can't erase!
1280+
reschedule = true;
1281+
} else {
1282+
// Done, can now erase from the compactionTasks map
1283+
handle->erase(itr);
12561284
}
1257-
++it;
1285+
} else {
1286+
throw std::logic_error(
1287+
"EPBucket::updateCompactionTasks no task for vbid:" +
1288+
vbid.to_string());
12581289
}
1259-
if (erased && woke) {
1260-
break;
1290+
}
1291+
1292+
// If another task does exist, find it and wake it
1293+
if (size > 1) {
1294+
for (const auto& [key, task] : *handle) {
1295+
if (key != vbid) {
1296+
if (task->getState() == TASK_SNOOZED) {
1297+
ExecutorPool::get()->wake(task->getId());
1298+
// wake one other task
1299+
break;
1300+
}
1301+
}
12611302
}
12621303
}
1304+
return reschedule;
12631305
}
12641306

12651307
std::pair<uint64_t, bool> EPBucket::getLastPersistedCheckpointId(Vbid vb) {

engines/ep/src/ep_bucket.h

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class Commit;
2828
}
2929
enum class ValueFilter;
3030
class BucketStatCollector;
31+
class CompactTask;
3132
struct CompactionContext;
3233

3334
/**
@@ -113,9 +114,24 @@ class EPBucket : public KVBucket {
113114
/// Stops the background fetcher for each shard.
114115
void stopBgFetcher();
115116

117+
/**
118+
* Schedule compaction with a config -override of KVBucket method
119+
*/
120+
ENGINE_ERROR_CODE scheduleCompaction(
121+
Vbid vbid,
122+
const CompactionConfig& c,
123+
const void* ck,
124+
std::chrono::milliseconds delay) override;
125+
126+
/**
127+
* Schedule compaction with no config. If a CompactTask is already
128+
* scheduled then the task will still run, but with whatever config it
129+
* already has. If a task is already scheduled, the given delay parameter
130+
* takes effect.
131+
*/
116132
ENGINE_ERROR_CODE scheduleCompaction(Vbid vbid,
117-
const CompactionConfig& c,
118-
const void* ck) override;
133+
const void* cookie,
134+
std::chrono::milliseconds delay);
119135

120136
ENGINE_ERROR_CODE cancelCompaction(Vbid vbid) override;
121137

@@ -133,6 +149,15 @@ class EPBucket : public KVBucket {
133149
CompactionConfig& config,
134150
const void* cookie);
135151

152+
/**
153+
* After compaction completes the task can be removed if no further
154+
* compaction is required. If other compaction tasks exist, one of them
155+
* will be 'poked' to run. This method is called from CompactTask
156+
*
157+
* @param vbid id of vbucket that has completed compaction
158+
*/
159+
bool updateCompactionTasks(Vbid vbid, bool canErase);
160+
136161
std::pair<uint64_t, bool> getLastPersistedCheckpointId(Vbid vb) override;
137162

138163
ENGINE_ERROR_CODE getFileStats(
@@ -266,13 +291,6 @@ class EPBucket : public KVBucket {
266291
*/
267292
void compactionCompletionCallback(CompactionContext& ctx);
268293

269-
/**
270-
* Remove completed compaction tasks or wake snoozed tasks
271-
*
272-
* @param db_file_id vbucket id for couchstore
273-
*/
274-
void updateCompactionTasks(Vbid db_file_id);
275-
276294
void stopWarmup();
277295

278296
/// function which is passed down to compactor for dropping keys
@@ -321,6 +339,11 @@ class EPBucket : public KVBucket {
321339
*/
322340
void initializeShards();
323341

342+
ENGINE_ERROR_CODE scheduleCompaction(Vbid vbid,
343+
std::optional<CompactionConfig> config,
344+
const void* cookie,
345+
std::chrono::milliseconds delay);
346+
324347
/**
325348
* Max number of backill items in a single flusher batch before we split
326349
* into multiple batches. Actual batch size may be larger as we will not
@@ -340,6 +363,9 @@ class EPBucket : public KVBucket {
340363
std::unique_ptr<Warmup> warmupTask;
341364

342365
std::vector<std::unique_ptr<BgFetcher>> bgFetchers;
366+
367+
folly::Synchronized<std::unordered_map<Vbid, std::shared_ptr<CompactTask>>>
368+
compactionTasks;
343369
};
344370

345371
std::ostream& operator<<(std::ostream& os, const EPBucket::FlushResult& res);

engines/ep/src/ep_engine.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6368,7 +6368,8 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::deleteVBucket(
63686368

63696369
ENGINE_ERROR_CODE EventuallyPersistentEngine::compactDB(
63706370
Vbid vbid, const CompactionConfig& c, const void* cookie) {
6371-
return kvBucket->scheduleCompaction(vbid, c, cookie);
6371+
return kvBucket->scheduleCompaction(
6372+
vbid, c, cookie, std::chrono::seconds(0));
63726373
}
63736374

63746375
ENGINE_ERROR_CODE EventuallyPersistentEngine::getAllVBucketSequenceNumbers(

engines/ep/src/ephemeral_bucket.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,11 @@ void EphemeralBucket::attemptToFreeMemory() {
168168
}
169169
}
170170

171-
ENGINE_ERROR_CODE EphemeralBucket::scheduleCompaction(Vbid vbid,
172-
const CompactionConfig& c,
173-
const void* ck) {
171+
ENGINE_ERROR_CODE EphemeralBucket::scheduleCompaction(
172+
Vbid vbid,
173+
const CompactionConfig& c,
174+
const void* ck,
175+
std::chrono::milliseconds delay) {
174176
return ENGINE_ENOTSUP;
175177
}
176178

engines/ep/src/ephemeral_bucket.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@ class EphemeralBucket : public KVBucket {
3636

3737
bool initialize() override;
3838

39-
ENGINE_ERROR_CODE scheduleCompaction(Vbid vbid,
40-
const CompactionConfig& c,
41-
const void* ck) override;
39+
ENGINE_ERROR_CODE scheduleCompaction(
40+
Vbid vbid,
41+
const CompactionConfig& c,
42+
const void* ck,
43+
std::chrono::milliseconds delay) override;
4244

4345
ENGINE_ERROR_CODE cancelCompaction(Vbid vbid) override;
4446

engines/ep/src/kv_bucket.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,6 @@ class VBCBAdaptor : public GlobalTask {
105105
const uint16_t EP_PRIMARY_SHARD = 0;
106106
class KVShard;
107107

108-
using CompTaskEntry = std::pair<Vbid, ExTask>;
109-
110108
/**
111109
* KVBucket is the base class for concrete Key/Value bucket implementations
112110
* which use the concept of VBuckets to support replication, persistence and
@@ -848,9 +846,6 @@ class KVBucket : public KVBucketIface {
848846
std::atomic<size_t> lastTransTimePerItem;
849847
EvictionPolicy eviction_policy;
850848

851-
std::mutex compactionLock;
852-
std::list<CompTaskEntry> compactionTasks;
853-
854849
std::unique_ptr<Collections::Manager> collectionsManager;
855850

856851
/**

engines/ep/src/kv_bucket_iface.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -448,10 +448,13 @@ class KVBucketIface {
448448
* @param vbid The vbucket to compact
449449
* @param c The context for compaction of a DB file
450450
* @param ck cookie used to notify connection of operation completion
451+
* @param delay millisecond delay for the task to execute, 0 is run 'now'
451452
*/
452-
virtual ENGINE_ERROR_CODE scheduleCompaction(Vbid vbid,
453-
const CompactionConfig& c,
454-
const void* ck) = 0;
453+
virtual ENGINE_ERROR_CODE scheduleCompaction(
454+
Vbid vbid,
455+
const CompactionConfig& c,
456+
const void* ck,
457+
std::chrono::milliseconds delay) = 0;
455458

456459
/**
457460
* Cancels compaction of a database file

engines/ep/src/kvstore.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,3 +601,10 @@ IORequest::~IORequest() = default;
601601
bool IORequest::isDelete() const {
602602
return item->isDeleted() && !item->isPending();
603603
}
604+
605+
bool CompactionConfig::operator==(const CompactionConfig& other) const {
606+
return purge_before_ts == other.purge_before_ts &&
607+
purge_before_seq == other.purge_before_seq &&
608+
drop_deletes == other.drop_deletes &&
609+
retain_erroneous_tombstones == other.retain_erroneous_tombstones;
610+
}

0 commit comments

Comments
 (0)