Skip to content

Commit 4e8acd0

Browse files
committed
MB-31869: Remove the ReplicationThrottle class
A lot of unnecessary boilerplate for making just a simple check on the bucket memory state. This is in preparation for further changes in the PassiveStream code. Change-Id: Ib8a94621bef329cb6f8768b4fda61ec91340a56b Reviewed-on: https://review.couchbase.org/c/kv_engine/+/202686 Tested-by: Paolo Cocchi <[email protected]> Reviewed-by: Vesko Karaganev <[email protected]>
1 parent 2b05aaa commit 4e8acd0

17 files changed

+76
-208
lines changed

engines/ep/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,6 @@ ADD_LIBRARY(ep_objs OBJECT
374374
src/probabilistic_counter.cc
375375
src/progress_tracker.cc
376376
src/quota_sharing_item_pager.cc
377-
src/replicationthrottle.cc
378377
src/linked_list.cc
379378
src/range_lock_manager.cc
380379
src/range_scans/range_scan.cc

engines/ep/src/dcp/consumer.cc

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,9 @@
2323
#include "failover-table.h"
2424
#include "kv_bucket.h"
2525
#include "objectregistry.h"
26-
#include "replicationthrottle.h"
2726
#include "vbucket.h"
28-
#include <fmt/chrono.h>
2927
#include <executor/executorpool.h>
28+
#include <fmt/chrono.h>
3029
#include <phosphor/phosphor.h>
3130
#include <xattr/utils.h>
3231

@@ -1314,13 +1313,13 @@ process_items_error_t DcpConsumer::drainStreamsBufferedItems(
13141313
uint32_t bytesProcessed = 0;
13151314
size_t iterations = 0;
13161315
do {
1317-
switch (engine_.getReplicationThrottle().getStatus()) {
1318-
case ReplicationThrottle::Status::Pause:
1316+
switch (engine_.getKVBucket()->getReplicationThrottleStatus()) {
1317+
case KVBucket::ReplicationThrottleStatus::Pause:
13191318
backoffs++;
13201319
vbReady.pushUnique(stream->getVBucket());
13211320
return cannot_process;
13221321

1323-
case ReplicationThrottle::Status::Disconnect:
1322+
case KVBucket::ReplicationThrottleStatus::Disconnect:
13241323
backoffs++;
13251324
vbReady.pushUnique(stream->getVBucket());
13261325
logger->warn(
@@ -1329,7 +1328,7 @@ process_items_error_t DcpConsumer::drainStreamsBufferedItems(
13291328
stream->getVBucket());
13301329
return stop_processing;
13311330

1332-
case ReplicationThrottle::Status::Process:
1331+
case KVBucket::ReplicationThrottleStatus::Process:
13331332
bytesProcessed = 0;
13341333
rval = stream->processBufferedMessages(
13351334
bytesProcessed, processBufferedMessagesBatchSize);

engines/ep/src/dcp/passive_stream.cc

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
#include "ep_engine.h"
2222
#include "failover-table.h"
2323
#include "kv_bucket.h"
24-
#include "replicationthrottle.h"
2524
#include "vbucket.h"
2625

2726
#include <gsl/gsl-lite.hpp>
@@ -350,20 +349,21 @@ cb::engine_errc PassiveStream::messageReceived(
350349
}
351350
}
352351

353-
switch (engine->getReplicationThrottle().getStatus()) {
354-
case ReplicationThrottle::Status::Disconnect:
352+
const auto& bucket = *engine->getKVBucket();
353+
switch (bucket.getReplicationThrottleStatus()) {
354+
case KVBucket::ReplicationThrottleStatus::Disconnect:
355355
log(spdlog::level::level_enum::warn,
356356
"{} Disconnecting the connection as there is "
357357
"no memory to complete replication",
358358
vb_);
359359
return cb::engine_errc::disconnect;
360-
case ReplicationThrottle::Status::Process:
360+
case KVBucket::ReplicationThrottleStatus::Process:
361361
if (buffer.empty() && !alwaysBufferOperations) {
362362
// Memory available and no message buffered -> process the response
363363
const auto ret = processMessage(dcpResponse.get());
364364
const auto err = ret.getError();
365365
if (err == cb::engine_errc::no_memory) {
366-
if (engine->getReplicationThrottle().doDisconnectOnNoMem()) {
366+
if (bucket.disconnectReplicationAtOOM()) {
367367
log(spdlog::level::level_enum::warn,
368368
"{} Disconnecting the connection as there is no "
369369
"memory to complete replication; process dcp "
@@ -381,7 +381,7 @@ cb::engine_errc PassiveStream::messageReceived(
381381
}
382382
}
383383
break;
384-
case ReplicationThrottle::Status::Pause:
384+
case KVBucket::ReplicationThrottleStatus::Pause:
385385
/* Do nothing specific here, we buffer item for this case and
386386
other cases below */
387387
break;
@@ -476,7 +476,7 @@ process_items_error_t PassiveStream::processBufferedMessages(
476476
processed_bytes = total_bytes_processed;
477477

478478
if (failed) {
479-
if (noMem && engine->getReplicationThrottle().doDisconnectOnNoMem()) {
479+
if (noMem && engine->getKVBucket()->disconnectReplicationAtOOM()) {
480480
log(spdlog::level::level_enum::warn,
481481
"{} Processor task indicating disconnection as "
482482
"there is no memory to complete replication; process dcp "

engines/ep/src/ep_bucket.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
#include "kvstore/persistence_callback.h"
3030
#include "kvstore/rollback_callback.h"
3131
#include "range_scans/range_scan_callbacks.h"
32-
#include "replicationthrottle.h"
3332
#include "rollback_result.h"
3433
#include "tasks.h"
3534
#include "vb_commit.h"
@@ -254,7 +253,6 @@ EPBucket::EPBucket(EventuallyPersistentEngine& engine)
254253
} else {
255254
eviction_policy = EvictionPolicy::Full;
256255
}
257-
replicationThrottle = std::make_unique<ReplicationThrottleEP>(engine);
258256

259257
// Pre 7.0.0 Flushers were a part of KVShard so keep the same default
260258
// scaling.
@@ -2715,3 +2713,7 @@ cb::engine_errc EPBucket::prepareForResume() {
27152713

27162714
return cb::engine_errc::success;
27172715
}
2716+
2717+
bool EPBucket::disconnectReplicationAtOOM() const {
2718+
return false;
2719+
}

engines/ep/src/ep_bucket.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,8 @@ class EPBucket : public KVBucket {
360360
*/
361361
std::shared_ptr<RangeScan> takeNextRangeScan(size_t taskId);
362362

363+
bool disconnectReplicationAtOOM() const override;
364+
363365
/// Hook that gets called from prepareForPause. Phase of prepareForPause()
364366
/// specified by the single string_view arg
365367
TestingHook<std::string_view> prepareForPauseTestingHook;

engines/ep/src/ep_engine.cc

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
#include "kvstore/kvstore.h"
4242
#include "quota_sharing_item_pager.h"
4343
#include "range_scans/range_scan_callbacks.h"
44-
#include "replicationthrottle.h"
4544
#include "stats-info.h"
4645
#include "string_utils.h"
4746
#include "trace_helpers.h"
@@ -7036,10 +7035,6 @@ EventuallyPersistentEngine::~EventuallyPersistentEngine() {
70367035
/* Unique_ptr(s) are deleted in the reverse order of the initialization */
70377036
}
70387037

7039-
ReplicationThrottle& EventuallyPersistentEngine::getReplicationThrottle() {
7040-
return getKVBucket()->getReplicationThrottle();
7041-
}
7042-
70437038
const std::string& EpEngineTaskable::getName() const {
70447039
return myEngine->getName();
70457040
}

engines/ep/src/ep_engine.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ class VBucketCountVisitor;
5656

5757
// Forward decl
5858
class EventuallyPersistentEngine;
59-
class ReplicationThrottle;
6059

6160
namespace cb::prometheus {
6261
enum class MetricGroup;
@@ -730,13 +729,6 @@ class EventuallyPersistentEngine : public EngineIface, public DcpIface {
730729
return *dcpFlowControlManager;
731730
}
732731

733-
/**
734-
* Returns the replication throttle instance
735-
*
736-
* @return Ref to replication throttle
737-
*/
738-
ReplicationThrottle& getReplicationThrottle();
739-
740732
CheckpointConfig& getCheckpointConfig() {
741733
return *checkpointConfig;
742734
}

engines/ep/src/ephemeral_bucket.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#include "ephemeral_vb.h"
2121
#include "ephemeral_vb_count_visitor.h"
2222
#include "failover-table.h"
23-
#include "replicationthrottle.h"
2423
#include "rollback_result.h"
2524
#include "seqno_persistence_notify_task.h"
2625
#include <executor/executorpool.h>
@@ -103,8 +102,6 @@ EphemeralBucket::EphemeralBucket(EventuallyPersistentEngine& engine)
103102
// in initialize().
104103
tombstonePurgerTask =
105104
std::make_shared<EphTombstoneHTCleaner>(engine, *this);
106-
107-
replicationThrottle = std::make_unique<ReplicationThrottleEphe>(engine);
108105
}
109106

110107
EphemeralBucket::~EphemeralBucket() = default;
@@ -462,3 +459,8 @@ cb::engine_errc EphemeralBucket::getImplementationStats(
462459
collector.addStat(Key::ep_pending_compactions, 0);
463460
return cb::engine_errc::success;
464461
}
462+
463+
bool EphemeralBucket::disconnectReplicationAtOOM() const {
464+
return engine.getConfiguration().getEphemeralFullPolicy() ==
465+
"fail_new_data";
466+
}

engines/ep/src/ephemeral_bucket.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ class EphemeralBucket : public KVBucket {
181181
return true;
182182
}
183183

184+
bool disconnectReplicationAtOOM() const override;
185+
184186
protected:
185187
std::unique_ptr<VBucketCountVisitor> makeVBCountVisitor(
186188
vbucket_state_t state) override;

engines/ep/src/kv_bucket.cc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
#include "kvshard.h"
4242
#include "kvstore/kvstore.h"
4343
#include "range_scans/range_scan_callbacks.h"
44-
#include "replicationthrottle.h"
4544
#include "rollback_result.h"
4645
#include "seqno_persistence_notify_task.h"
4746
#include "tasks.h"
@@ -64,6 +63,7 @@
6463
#include <platform/timeutils.h>
6564
#include <statistics/collector.h>
6665
#include <statistics/labelled_collector.h>
66+
#include <utilities/math_utilities.h>
6767

6868
#include <chrono>
6969
#include <cstring>
@@ -3368,3 +3368,15 @@ bool KVBucket::isCompactionExpiryFetchInline() const {
33683368
size_t KVBucket::getNumCheckpointDestroyers() const {
33693369
return ckptDestroyerTasks.rlock()->size();
33703370
}
3371+
3372+
KVBucket::ReplicationThrottleStatus KVBucket::getReplicationThrottleStatus()
3373+
const {
3374+
const auto& stats = engine.getEpStats();
3375+
const auto memoryUsed = stats.getEstimatedTotalMemoryUsed();
3376+
const auto bucketQuota = stats.getMaxDataSize();
3377+
if (memoryUsed <= cb::fractionOf(bucketQuota, mutationMemRatio)) {
3378+
return ReplicationThrottleStatus::Process;
3379+
}
3380+
return disconnectReplicationAtOOM() ? ReplicationThrottleStatus::Disconnect
3381+
: ReplicationThrottleStatus::Pause;
3382+
}

0 commit comments

Comments
 (0)