Skip to content

Commit 5e14acc

Browse files
alextaskovevergreen
authored andcommitted
SERVER-42914 Implement random chunk selection policy for balancer for use in concurrency_*_with_balancer workloads
1 parent b2741ca commit 5e14acc

File tree

3 files changed

+96
-7
lines changed

3 files changed

+96
-7
lines changed

src/mongo/db/s/balancer/balancer.cpp

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
#include "mongo/s/shard_util.h"
5555
#include "mongo/util/concurrency/idle_thread_block.h"
5656
#include "mongo/util/exit.h"
57+
#include "mongo/util/fail_point_service.h"
5758
#include "mongo/util/log.h"
5859
#include "mongo/util/timer.h"
5960
#include "mongo/util/version.h"
@@ -66,6 +67,8 @@ using std::vector;
6667

6768
namespace {
6869

70+
MONGO_FAIL_POINT_DEFINE(overrideBalanceRoundInterval);
71+
6972
const Seconds kBalanceRoundDefaultInterval(10);
7073

7174
// Sleep between balancer rounds in the case where the last round found some chunks which needed to
@@ -391,9 +394,20 @@ void Balancer::_mainThread() {
391394
LOG(1) << "*** End of balancing round";
392395
}
393396

394-
_endRound(opCtx.get(),
395-
_balancedLastTime ? kShortBalanceRoundInterval
396-
: kBalanceRoundDefaultInterval);
397+
auto balancerInterval = [&]() -> Milliseconds {
398+
MONGO_FAIL_POINT_BLOCK(overrideBalanceRoundInterval, data) {
399+
int interval = data.getData()["intervalMs"].numberInt();
400+
log() << "overrideBalanceRoundInterval: using shorter balancing interval: "
401+
<< interval << "ms";
402+
403+
return Milliseconds(interval);
404+
}
405+
406+
return _balancedLastTime ? kShortBalanceRoundInterval
407+
: kBalanceRoundDefaultInterval;
408+
}();
409+
410+
_endRound(opCtx.get(), balancerInterval);
397411
} catch (const DBException& e) {
398412
log() << "caught exception while doing balance: " << e.what();
399413

@@ -441,7 +455,7 @@ void Balancer::_beginRound(OperationContext* opCtx) {
441455
_condVar.notify_all();
442456
}
443457

444-
void Balancer::_endRound(OperationContext* opCtx, Seconds waitTimeout) {
458+
void Balancer::_endRound(OperationContext* opCtx, Milliseconds waitTimeout) {
445459
{
446460
stdx::lock_guard<stdx::mutex> lock(_mutex);
447461
_inBalancerRound = false;
@@ -453,7 +467,7 @@ void Balancer::_endRound(OperationContext* opCtx, Seconds waitTimeout) {
453467
_sleepFor(opCtx, waitTimeout);
454468
}
455469

456-
void Balancer::_sleepFor(OperationContext* opCtx, Seconds waitTimeout) {
470+
void Balancer::_sleepFor(OperationContext* opCtx, Milliseconds waitTimeout) {
457471
stdx::unique_lock<stdx::mutex> lock(_mutex);
458472
_condVar.wait_for(lock, waitTimeout.toSystemDuration(), [&] { return _state != kRunning; });
459473
}

src/mongo/db/s/balancer/balancer.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,13 +172,13 @@ class Balancer {
172172
* Signals the beginning and end of a balancing round.
173173
*/
174174
void _beginRound(OperationContext* opCtx);
175-
void _endRound(OperationContext* opCtx, Seconds waitTimeout);
175+
void _endRound(OperationContext* opCtx, Milliseconds waitTimeout);
176176

177177
/**
178178
* Blocks the caller for the specified timeout or until the balancer condition variable is
179179
* signaled, whichever comes first.
180180
*/
181-
void _sleepFor(OperationContext* opCtx, Seconds waitTimeout);
181+
void _sleepFor(OperationContext* opCtx, Milliseconds waitTimeout);
182182

183183
/**
184184
* Returns true if all the servers listed in configdb as being shards are reachable and are

src/mongo/db/s/balancer/balancer_policy.cpp

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,19 @@
3333

3434
#include "mongo/db/s/balancer/balancer_policy.h"
3535

36+
#include <random>
37+
3638
#include "mongo/db/s/balancer/type_migration.h"
3739
#include "mongo/s/catalog/type_shard.h"
3840
#include "mongo/s/catalog/type_tags.h"
41+
#include "mongo/util/fail_point_service.h"
3942
#include "mongo/util/log.h"
4043
#include "mongo/util/str.h"
4144

4245
namespace mongo {
4346

47+
MONGO_FAIL_POINT_DEFINE(balancerShouldReturnRandomMigrations);
48+
4449
using std::map;
4550
using std::numeric_limits;
4651
using std::set;
@@ -287,11 +292,81 @@ ShardId BalancerPolicy::_getMostOverloadedShard(const ShardStatisticsVector& sha
287292
return worst;
288293
}
289294

295+
// Returns a random integer in [0, max) using a uniform random distribution.
296+
int getRandomIndex(int max) {
297+
std::default_random_engine gen(time(nullptr));
298+
std::uniform_int_distribution<int> dist(0, max - 1);
299+
300+
return dist(gen);
301+
}
302+
303+
// Iterates through the shardStats vector starting from index until it finds an element that has > 0
304+
// chunks. It will wrap around at the end and stop at the starting index. If no shards have chunks,
305+
// it will return the original index value.
306+
int getNextShardWithChunks(const ShardStatisticsVector& shardStats,
307+
const DistributionStatus& distribution,
308+
int index) {
309+
int retIndex = index;
310+
311+
while (distribution.numberOfChunksInShard(shardStats[retIndex].shardId) == 0) {
312+
retIndex = (retIndex + 1) % shardStats.size();
313+
314+
if (retIndex == index)
315+
return index;
316+
}
317+
318+
return retIndex;
319+
}
320+
321+
// Returns a randomly chosen pair of source -> destination shards for testing.
322+
// The random pair is chosen by the following algorithm:
323+
// - create an array of indices with values [0, n)
324+
// - select a random index from this set
325+
// - advance the chosen index until we encounter a shard with chunks to move
326+
// - remove the chosen index from the set by swapping it with the last element
327+
// - select the destination index from the remaining indices
328+
MigrateInfo chooseRandomMigration(const ShardStatisticsVector& shardStats,
329+
const DistributionStatus& distribution) {
330+
std::vector<int> indices(shardStats.size());
331+
332+
int i = 0;
333+
std::generate(indices.begin(), indices.end(), [&i] { return i++; });
334+
335+
int choice = getRandomIndex(indices.size());
336+
337+
const int sourceIndex = getNextShardWithChunks(shardStats, distribution, indices[choice]);
338+
const auto& sourceShardId = shardStats[sourceIndex].shardId;
339+
std::swap(indices[sourceIndex], indices[indices.size() - 1]);
340+
341+
choice = getRandomIndex(indices.size() - 1);
342+
const int destIndex = indices[choice];
343+
const auto& destShardId = shardStats[destIndex].shardId;
344+
345+
LOG(1) << "balancerShouldReturnRandomMigrations: source: " << sourceShardId
346+
<< " dest: " << destShardId;
347+
348+
const auto& chunks = distribution.getChunks(sourceShardId);
349+
350+
return {destShardId, chunks[getRandomIndex(chunks.size())]};
351+
}
352+
290353
vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardStats,
291354
const DistributionStatus& distribution,
292355
std::set<ShardId>* usedShards) {
293356
vector<MigrateInfo> migrations;
294357

358+
if (MONGO_FAIL_POINT(balancerShouldReturnRandomMigrations) &&
359+
!distribution.nss().isConfigDB()) {
360+
LOG(1) << "balancerShouldReturnRandomMigrations failpoint is set";
361+
362+
if (shardStats.size() < 2)
363+
return migrations;
364+
365+
migrations.push_back(chooseRandomMigration(shardStats, distribution));
366+
367+
return migrations;
368+
}
369+
295370
// 1) Check for shards, which are in draining mode
296371
{
297372
for (const auto& stat : shardStats) {

0 commit comments

Comments
 (0)