Skip to content

Commit 44b3932

Browse files
committed
[BP] MB-59817 [1/2]: Warmup vbuckets in the order specified
KVBucket::pauseResumeVisit previously only allowed visiting vbuckets in increasing vbid order, which could cause warmup to load few active vbuckets. Change-Id: I6b6a6c09b0f2ef15b60bd5da5c7281140623fb0a Reviewed-on: https://review.couchbase.org/c/kv_engine/+/201849 Well-Formed: Restriction Checker Tested-by: Pavlos Georgiou <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 04480b9 commit 44b3932

File tree

4 files changed

+98
-8
lines changed

4 files changed

+98
-8
lines changed

engines/ep/src/kv_bucket.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2448,6 +2448,21 @@ KVBucket::Position KVBucket::pauseResumeVisit(PauseResumeVBVisitor& visitor,
24482448
return KVBucket::Position(vbid);
24492449
}
24502450

2451+
size_t KVBucket::pauseResumeVisit(PauseResumeVBVisitor& visitor,
2452+
size_t currentPosition,
2453+
gsl::span<Vbid> vbsToVisit) {
2454+
for (; currentPosition < vbsToVisit.size(); ++currentPosition) {
2455+
VBucketPtr vb = getVBucket(vbsToVisit[currentPosition]);
2456+
if (vb) {
2457+
bool visitorPaused = !visitor.visit(*vb);
2458+
if (visitorPaused) {
2459+
break;
2460+
}
2461+
}
2462+
}
2463+
return currentPosition;
2464+
}
2465+
24512466
KVBucket::Position KVBucket::startPosition() const
24522467
{
24532468
return KVBucket::Position(Vbid(0));

engines/ep/src/kv_bucket.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,24 @@ class KVBucket : public KVBucketIface {
382382
Position& start_pos,
383383
VBucketFilter* filter = nullptr) override;
384384

385+
/**
386+
* Visit vbuckets in the order specified,
387+
* allowing the visitor to indicate a pause.
388+
*
389+
* @param visitor object with a <code>virtual bool visit(VBucket& vb)</code>
390+
* method which returns true if the vb has been processed
391+
* and false if iteration should be paused
392+
* @param currentPosition the vbsToVisit index at which iteration should
393+
* proceed
394+
* @param vbsToVisit span of vbids to visit in order
395+
* @return the next position to visit
396+
* (the vbsToVisit index at which the visitor indicated a pause)
397+
* or <code>vbsToVisit.size()</code> at completion
398+
*/
399+
[[nodiscard]] size_t pauseResumeVisit(PauseResumeVBVisitor& visitor,
400+
size_t currentPosition,
401+
gsl::span<Vbid> vbsToVisit);
402+
385403
Position startPosition() const override;
386404

387405
Position endPosition() const override;

engines/ep/src/warmup.cc

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -429,9 +429,8 @@ class WarmupBackfillTask : public GlobalTask {
429429
120) /
430430
100),
431431
currentNumBackfillTasks(threadTaskCount),
432-
filter(warmup.shardVbIds[shardId]),
433432
visitor(bucket, *this),
434-
epStorePosition(bucket.startPosition()) {
433+
vbsToVisit(warmup.shardVbIds[shardId]) {
435434
warmup.addToTaskSet(uid);
436435
}
437436

@@ -446,7 +445,7 @@ class WarmupBackfillTask : public GlobalTask {
446445
bool run() override {
447446
TRACE_EVENT1(
448447
"ep-engine/task", "WarmupBackfillTask", "shard", getShardId());
449-
if (filter.empty() || engine->getEpStats().isShutdown) {
448+
if (vbsToVisit.empty() || engine->getEpStats().isShutdown) {
450449
// Technically "isShutdown" being true doesn't equate to a
451450
// successful task finish, however if we are shutting down we want
452451
// warmup to advance and be considered "done".
@@ -457,8 +456,7 @@ class WarmupBackfillTask : public GlobalTask {
457456
visitor.begin();
458457
auto& kvBucket = *engine->getKVBucket();
459458
try {
460-
epStorePosition = kvBucket.pauseResumeVisit(
461-
visitor, epStorePosition, &filter);
459+
visitPos = kvBucket.pauseResumeVisit(visitor, visitPos, vbsToVisit);
462460
} catch (std::exception& e) {
463461
EP_LOG_CRITICAL(
464462
"WarmupBackfillTask::run(): caught exception while running "
@@ -467,7 +465,7 @@ class WarmupBackfillTask : public GlobalTask {
467465
finishTask(false);
468466
return false;
469467
}
470-
if (epStorePosition == kvBucket.endPosition()) {
468+
if (visitPos >= vbsToVisit.size()) {
471469
finishTask(true);
472470
return false;
473471
}
@@ -517,9 +515,9 @@ class WarmupBackfillTask : public GlobalTask {
517515
/// After how long should this task yield, allowing other tasks to run?
518516
const std::chrono::milliseconds maxExpectedRuntime;
519517
std::atomic<size_t>& currentNumBackfillTasks;
520-
VBucketFilter filter;
521518
WarmupVbucketVisitor visitor;
522-
KVBucketIface::Position epStorePosition;
519+
size_t visitPos = 0;
520+
std::vector<Vbid> vbsToVisit;
523521
};
524522

525523
bool WarmupVbucketVisitor::visit(VBucket& vb) {

engines/ep/tests/module_tests/evp_store_warmup_test.cc

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "programs/engine_testapp/mock_cookie.h"
3232
#include "programs/engine_testapp/mock_server.h"
3333
#include "test_helpers.h"
34+
#include "vb_visitors.h"
3435
#include "vbucket.h"
3536
#include "vbucket_state.h"
3637
#include "warmup.h"
@@ -718,6 +719,64 @@ TEST_F(WarmupTest, MB_58135_CorruptAccessLog) {
718719
EXPECT_EQ(WarmupState::State::Done, warmup->getWarmupState());
719720
}
720721

722+
class WarmupOrderTest : public SingleThreadedKVBucketTest {
723+
void SetUp() override {
724+
config_string = "max_vbuckets=32";
725+
SingleThreadedKVBucketTest::SetUp();
726+
for (size_t ii = 0; ii < store->getVBMapSize(); ++ii) {
727+
setVBucketStateAndRunPersistTask(
728+
Vbid(ii),
729+
(ii & 4) ? vbucket_state_active : vbucket_state_replica);
730+
}
731+
}
732+
};
733+
734+
/// Test that KVBucket::pauseResumeVisit visits vbuckets in the order specified
735+
TEST_F(WarmupOrderTest, pauseResumeVisit) {
736+
class Visitor final : public PauseResumeVBVisitor {
737+
public:
738+
bool visit(VBucket& vb) override {
739+
auto vbid = vb.getId();
740+
if (pause && *pause == vbid) {
741+
return false;
742+
}
743+
visited.push_back(vbid);
744+
return true;
745+
}
746+
747+
std::optional<Vbid> pause;
748+
std::vector<Vbid> visited;
749+
} visitor;
750+
751+
std::vector<Vbid> toVisit;
752+
for (int ii = store->getVBMapSize() - 1; ii >= 0; ii -= 3) {
753+
toVisit.emplace_back(ii);
754+
}
755+
EXPECT_EQ(11, toVisit.size());
756+
757+
// Visit all
758+
size_t position = 0;
759+
position = store->pauseResumeVisit(visitor, position, toVisit);
760+
EXPECT_EQ(toVisit.size(), position);
761+
EXPECT_EQ(toVisit, visitor.visited);
762+
763+
// Visit until the middle and pause
764+
const auto pausePosition = toVisit.size() / 2;
765+
visitor.pause = toVisit.at(pausePosition);
766+
visitor.visited.clear();
767+
position = 0;
768+
position = store->pauseResumeVisit(visitor, position, toVisit);
769+
EXPECT_EQ(pausePosition, position);
770+
EXPECT_EQ(gsl::span(toVisit).subspan(0, pausePosition),
771+
gsl::span(visitor.visited));
772+
773+
// Continue from middle
774+
visitor.pause.reset();
775+
position = store->pauseResumeVisit(visitor, position, toVisit);
776+
EXPECT_EQ(position, toVisit.size());
777+
EXPECT_EQ(visitor.visited, toVisit);
778+
}
779+
721780
// Test fixture for Durability-related Warmup tests.
722781
class DurabilityWarmupTest : public DurabilityKVBucketTest {
723782
protected:

0 commit comments

Comments
 (0)