Skip to content

Commit 1d5e99c

Browse files
BenHuddlestondaverigby
authored andcommitted
MB-39745: Add BgFetchers to EPBucket
Move the BgFetchers to EPBucket in preparation for decoupling them from KVShards. The behaviour of BgFetchers in this patch should be identical to their behaviour before. Change-Id: I40f57029b07b27180390e48d9ab460476faa5fae Reviewed-on: http://review.couchbase.org/c/kv_engine/+/136377 Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent ba5d4f2 commit 1d5e99c

File tree

10 files changed

+60
-40
lines changed

10 files changed

+60
-40
lines changed

engines/ep/src/ep_bucket.cc

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,9 @@ EPBucket::EPBucket(EventuallyPersistentEngine& theEngine)
266266
engine.getConfiguration(), stats);
267267

268268
vbMap.enablePersistence(*this);
269+
for (size_t i = 0; i < vbMap.getNumShards(); i++) {
270+
bgFetchers.emplace_back(std::make_unique<BgFetcher>(*this));
271+
}
269272

270273
setFlusherBatchSplitTrigger(config.getFlusherTotalBatchLimit());
271274
config.addValueChangedListener(
@@ -948,21 +951,16 @@ void EPBucket::wakeUpFlusher() {
948951
}
949952

950953
bool EPBucket::startBgFetcher() {
951-
for (const auto& shard : vbMap.shards) {
952-
BgFetcher* bgfetcher = shard->getBgFetcher();
953-
if (bgfetcher == NULL) {
954-
EP_LOG_WARN("Failed to start bg fetcher for shard {}",
955-
shard->getId());
956-
return false;
957-
}
958-
bgfetcher->start();
954+
for (const auto& bgFetcher : bgFetchers) {
955+
bgFetcher->start();
959956
}
960957
return true;
961958
}
962959

963960
void EPBucket::stopBgFetcher() {
961+
EP_LOG_INFO("Stopping bg fetchers");
962+
964963
for (const auto& shard : vbMap.shards) {
965-
BgFetcher* bgfetcher = shard->getBgFetcher();
966964
for (const auto vbid : shard->getVBuckets()) {
967965
VBucketPtr vb = shard->getBucket(vbid);
968966
if (vb && vb->hasPendingBGFetchItems()) {
@@ -974,9 +972,10 @@ void EPBucket::stopBgFetcher() {
974972
break;
975973
}
976974
}
975+
}
977976

978-
EP_LOG_INFO("Stopping bg fetcher for shard:{}", shard->getId());
979-
bgfetcher->stop();
977+
for (const auto& bgFetcher : bgFetchers) {
978+
bgFetcher->stop();
980979
}
981980
}
982981

@@ -1353,6 +1352,7 @@ VBucketPtr EPBucket::makeVBucket(
13531352
engine.getConfiguration(),
13541353
eviction_policy,
13551354
std::move(manifest),
1355+
this,
13561356
initState,
13571357
purgeSeqno,
13581358
maxCas,
@@ -2041,3 +2041,10 @@ std::ostream& operator<<(std::ostream& os, const EPBucket::FlushResult& res) {
20412041
<< (res.wakeupCkptRemover == EPBucket::WakeCkptRemover::Yes) << "}";
20422042
return os;
20432043
}
2044+
2045+
BgFetcher& EPBucket::getBgFetcher(Vbid vbid) {
2046+
// For now we just use modulo, same as we do/would for shards to pick out
2047+
// the associated BgFetcher
2048+
auto id = vbid.get() % bgFetchers.size();
2049+
return *bgFetchers.at(id);
2050+
}

engines/ep/src/ep_bucket.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "kv_bucket.h"
2121

22+
class BgFetcher;
2223
namespace Collections::VB {
2324
class Flush;
2425
}
@@ -236,6 +237,8 @@ class EPBucket : public KVBucket {
236237
const void* cookie,
237238
std::unique_ptr<Collections::Manifest>& newManifest) override;
238239

240+
BgFetcher& getBgFetcher(Vbid vbid);
241+
239242
protected:
240243
// During the warmup phase we might want to enable external traffic
241244
// at a given point in time.. The LoadStorageKvPairCallback will be
@@ -333,6 +336,8 @@ class EPBucket : public KVBucket {
333336
cb::RelaxedAtomic<bool> retainErroneousTombstones;
334337

335338
std::unique_ptr<Warmup> warmupTask;
339+
340+
std::vector<std::unique_ptr<BgFetcher>> bgFetchers;
336341
};
337342

338343
std::ostream& operator<<(std::ostream& os, const EPBucket::FlushResult& res);

engines/ep/src/ep_vb.cc

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "dcp/backfill_by_seqno_disk.h"
2626
#include "durability/active_durability_monitor.h"
2727
#include "durability/passive_durability_monitor.h"
28+
#include "ep_bucket.h"
2829
#include "ep_engine.h"
2930
#include "ep_time.h"
3031
#include "executorpool.h"
@@ -56,6 +57,7 @@ EPVBucket::EPVBucket(Vbid i,
5657
Configuration& config,
5758
EvictionPolicy evictionPolicy,
5859
std::unique_ptr<Collections::VB::Manifest> manifest,
60+
EPBucket* bucket,
5961
vbucket_state_t initState,
6062
uint64_t purgeSeqno,
6163
uint64_t maxCas,
@@ -87,7 +89,8 @@ EPVBucket::EPVBucket(Vbid i,
8789
mightContainXattrs,
8890
replicationTopology,
8991
maxVisibleSeqno),
90-
shard(kvshard) {
92+
shard(kvshard),
93+
epBucket(bucket) {
9194
}
9295

9396
EPVBucket::~EPVBucket() {
@@ -596,7 +599,7 @@ size_t EPVBucket::getPageableMemUsage() {
596599

597600
size_t EPVBucket::queueBGFetchItem(const DocKey& key,
598601
std::unique_ptr<BGFetchItem> fetch,
599-
BgFetcher* bgFetcher) {
602+
BgFetcher& bgFetcher) {
600603
// While a DiskDocKey supports both the committed and prepared namespaces,
601604
// ep-engine doesn't support evicting prepared SyncWrites and as such
602605
// we don't allow bgfetching from Prepared namespace - so just construct
@@ -616,7 +619,7 @@ size_t EPVBucket::queueBGFetchItem(const DocKey& key,
616619
fetch->value = &bgfetch_itm_ctx.value;
617620
bgfetch_itm_ctx.bgfetched_list.push_back(std::move(fetch));
618621

619-
bgFetcher->addPendingVB(getId());
622+
bgFetcher.addPendingVB(getId());
620623
return pendingBGFetches.size();
621624
}
622625

@@ -747,12 +750,14 @@ void EPVBucket::bgFetch(const DocKey& key,
747750
const void* cookie,
748751
EventuallyPersistentEngine& engine,
749752
const bool isMeta) {
753+
// @TODO could the BgFetcher ever not be there? It should probably be a
754+
// reference if that's the case
750755
// schedule to the current batch of background fetch of the given
751756
// vbucket
752757
size_t bgfetch_size = queueBGFetchItem(
753758
key,
754759
std::make_unique<FrontEndBGFetchItem>(cookie, isMeta),
755-
getShard()->getBgFetcher());
760+
getBgFetcher());
756761
EP_LOG_DEBUG("Queued a background fetch, now at {}",
757762
uint64_t(bgfetch_size));
758763
}
@@ -780,12 +785,9 @@ void EPVBucket::bgFetchForCompactionExpiry(const DocKey& key,
780785
const Item& item) {
781786
// schedule to the current batch of background fetch of the given
782787
// vbucket
783-
auto shard = getShard();
784-
Expects(shard);
785-
auto bgFetchSize =
786-
queueBGFetchItem(key,
787-
std::make_unique<CompactionBGFetchItem>(item),
788-
shard->getBgFetcher());
788+
auto& bgFetcher = getBgFetcher();
789+
auto bgFetchSize = queueBGFetchItem(
790+
key, std::make_unique<CompactionBGFetchItem>(item), bgFetcher);
789791
EP_LOG_DEBUG("Queue a background fetch for compaction expiry, now at {}",
790792
bgFetchSize);
791793
}
@@ -1019,3 +1021,7 @@ void EPVBucket::processImplicitlyCompletedPrepare(
10191021
// used after.
10201022
ht.unlocked_del(v.getHBL(), v.release());
10211023
}
1024+
1025+
BgFetcher& EPVBucket::getBgFetcher() {
1026+
return epBucket->getBgFetcher(getId());
1027+
}

engines/ep/src/ep_vb.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@
2020
#include "vbucket.h"
2121
#include "vbucket_bgfetch_item.h"
2222

23-
struct vbucket_state;
2423
class BgFetcher;
24+
class EPBucket;
25+
struct vbucket_state;
2526

2627
/**
2728
* Eventually Peristent VBucket (EPVBucket) is a child class of VBucket.
@@ -46,6 +47,7 @@ class EPVBucket : public VBucket {
4647
Configuration& config,
4748
EvictionPolicy evictionPolicy,
4849
std::unique_ptr<Collections::VB::Manifest> manifest,
50+
EPBucket* bucket = nullptr,
4951
vbucket_state_t initState = vbucket_state_dead,
5052
uint64_t purgeSeqno = 0,
5153
uint64_t maxCas = 0,
@@ -128,6 +130,8 @@ class EPVBucket : public VBucket {
128130
return shard;
129131
}
130132

133+
BgFetcher& getBgFetcher();
134+
131135
UniqueDCPBackfillPtr createDCPBackfill(EventuallyPersistentEngine& e,
132136
std::shared_ptr<ActiveStream> stream,
133137
uint64_t startSeqno,
@@ -243,7 +247,7 @@ class EPVBucket : public VBucket {
243247
*/
244248
size_t queueBGFetchItem(const DocKey& key,
245249
std::unique_ptr<BGFetchItem> fetch,
246-
BgFetcher* bgFetcher);
250+
BgFetcher& bgFetcher);
247251

248252
private:
249253
std::tuple<StoredValue*, MutationStatus, VBNotifyCtx> updateStoredValue(
@@ -343,5 +347,10 @@ class EPVBucket : public VBucket {
343347
*/
344348
std::atomic<uint64_t> deferredDeletionFileRevision;
345349

350+
/**
351+
* Should replace KVShard
352+
*/
353+
EPBucket* epBucket;
354+
346355
friend class EPVBucketTest;
347356
};

engines/ep/src/kvshard.cc

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
#include <functional>
1919
#include <memory>
2020

21-
#include "bgfetcher.h"
2221
#include "couch-kvstore/couch-kvstore-config.h"
2322
#include "ep_bucket.h"
2423
#include "ep_engine.h"
@@ -84,7 +83,6 @@ KVShard::KVShard(EventuallyPersistentEngine& engine, id_type id)
8483

8584
void KVShard::enablePersistence(EPBucket& ep) {
8685
flusher = std::make_unique<Flusher>(&ep, this);
87-
bgFetcher = std::make_unique<BgFetcher>(ep);
8886
}
8987

9088
// Non-inline destructor so we can destruct
@@ -95,10 +93,6 @@ Flusher *KVShard::getFlusher() {
9593
return flusher.get();
9694
}
9795

98-
BgFetcher *KVShard::getBgFetcher() {
99-
return bgFetcher.get();
100-
}
101-
10296
VBucketPtr KVShard::getBucket(Vbid id) const {
10397
if (id.get() < kvConfig->getMaxVBuckets()) {
10498
auto element = getElement(id);

engines/ep/src/kvshard.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
* | vbuckets: VBucket[] (partitions)|----> [(VBucket),(VBucket)..]
4646
* | |
4747
* | flusher: Flusher |
48-
* | BGFetcher: bgFetcher |
4948
* | |
5049
* | rwUnderlying: KVStore (write) |----> (CouchKVStore)
5150
* | roUnderlying: KVStore (read) |----> (CouchKVStore)
@@ -67,7 +66,6 @@
6766
* 1022 2
6867
* 1023 3
6968
*/
70-
class BgFetcher;
7169
class Configuration;
7270
class EPBucket;
7371
class EventuallyPersistentEngine;
@@ -82,7 +80,7 @@ class KVShard {
8280
KVShard(EventuallyPersistentEngine& engine, KVShard::id_type id);
8381
~KVShard();
8482

85-
/// Enable persistence for this KVShard; setting up flusher and BGFetcher.
83+
/// Enable persistence for this KVShard; setting up the flusher.
8684
void enablePersistence(EPBucket& epBucket);
8785

8886
KVStore* getRWUnderlying() {
@@ -115,7 +113,6 @@ class KVShard {
115113
}
116114

117115
Flusher *getFlusher();
118-
BgFetcher *getBgFetcher();
119116

120117
VBucketPtr getBucket(Vbid id) const;
121118
void setBucket(VBucketPtr vb);
@@ -251,7 +248,6 @@ class KVShard {
251248
std::unique_ptr<KVStore> roStore;
252249

253250
std::unique_ptr<Flusher> flusher;
254-
std::unique_ptr<BgFetcher> bgFetcher;
255251

256252
public:
257253
std::atomic<size_t> highPriorityCount;

engines/ep/tests/module_tests/evp_store_with_meta.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1387,8 +1387,7 @@ TEST_P(DelWithMetaTest, setting_zero_deleteTime) {
13871387
metadata,
13881388
deleted,
13891389
datatype));
1390-
MockGlobalTask mockTask(engine->getTaskable(), TaskId::MultiBGFetcherTask);
1391-
store->getVBucket(vbid)->getShard()->getBgFetcher()->run(&mockTask);
1390+
runBGFetcherTask();
13921391
EXPECT_EQ(ENGINE_SUCCESS,
13931392
store->getMetaData({"mykey", DocKeyEncodesCollectionId::No},
13941393
vbid,

engines/ep/tests/module_tests/evp_vbucket_test.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ void EPVBucketTest::TearDown() {
3737
size_t EPVBucketTest::public_queueBGFetchItem(
3838
const DocKey& key,
3939
std::unique_ptr<BGFetchItem> fetchItem,
40-
BgFetcher* bgFetcher) {
40+
BgFetcher& bgFetcher) {
4141
return dynamic_cast<EPVBucket&>(*vbucket).queueBGFetchItem(
4242
key, std::move(fetchItem), bgFetcher);
4343
}
@@ -63,7 +63,7 @@ TEST_P(EPVBucketTest, GetBGFetchItemsPerformance) {
6363
/*isMeta*/ false);
6464
this->public_queueBGFetchItem(makeStoredDocKey(std::to_string(ii)),
6565
std::move(fetchItem),
66-
&bgFetcher);
66+
bgFetcher);
6767
}
6868
auto items = this->vbucket->getBGFetchItems();
6969
}

engines/ep/tests/module_tests/evp_vbucket_test.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,5 @@ class EPVBucketTest : public ::testing::WithParamInterface<EvictionPolicy>,
3838
void TearDown() override;
3939
size_t public_queueBGFetchItem(const DocKey& key,
4040
std::unique_ptr<BGFetchItem> fetchItem,
41-
BgFetcher* bgFetcher);
41+
BgFetcher& bgFetcher);
4242
};

engines/ep/tests/module_tests/kv_bucket_test.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "ep_bucket.h"
3535
#include "ep_engine.h"
3636
#include "ep_time.h"
37+
#include "ep_vb.h"
3738
#include "evp_store_single_threaded_test.h"
3839
#include "failover-table.h"
3940
#include "fakes/fake_executorpool.h"
@@ -302,7 +303,10 @@ bool KVBucketTest::isItemFreqDecayerTaskSnoozed() const {
302303
void KVBucketTest::runBGFetcherTask() {
303304
MockGlobalTask mockTask(engine->getTaskable(),
304305
TaskId::MultiBGFetcherTask);
305-
store->getVBucket(vbid)->getShard()->getBgFetcher()->run(&mockTask);
306+
auto vb = store->getVBucket(vbid);
307+
ASSERT_TRUE(vb);
308+
auto* epVb = dynamic_cast<EPVBucket*>(vb.get());
309+
epVb->getBgFetcher().run(&mockTask);
306310
}
307311

308312
/**

0 commit comments

Comments
 (0)