Skip to content

Commit 1bd47cb

Browse files
authored
feat(native): Add ability to detach worker when overloaded for too long (#26544)
## Description We noticed that some bugs can lead to worker use most of the memory and not let it go for a long time. This results in memory manager mem-killing most of the queries as it always on the verge of global OOM. This makes the whole cluster pretty much non-operational, even worse, failing big share of queries. We add the logic similar to stuck drivers - when we are continuously overloaded for some long time (controlled by the new config parameter) we detach the worker to keep the cluster otherwise healthy. The default config threshold is zero, which effectively disables the feature. ``` == NO RELEASE NOTE == ```
1 parent 05ac6c5 commit 1bd47cb

File tree

7 files changed

+64
-5
lines changed

7 files changed

+64
-5
lines changed

presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -540,16 +540,42 @@ void PeriodicTaskManager::addWatchdogTask() {
540540
}
541541
RECORD_METRIC_VALUE(kCounterNumStuckDrivers, stuckOpCalls.size());
542542

543+
const char* detachReason = nullptr;
544+
543545
// Detach worker from the cluster if more than a certain number of
544546
// driver threads are blocked by stuck operators (one unique operator
545547
// can only get stuck on one unique thread).
546548
const auto numStuckOperatorsToDetachWorker = std::min(
547549
SystemConfig::instance()->driverNumStuckOperatorsToDetachWorker(),
548550
numDriverThreads_);
549551
if (stuckOpCalls.size() >= numStuckOperatorsToDetachWorker) {
550-
detachWorker("detected stuck operators");
552+
detachReason = "detected stuck operators";
551553
} else if (!deadlockTasks.empty()) {
552-
detachWorker("starving or deadlocked task");
554+
detachReason = "starving or deadlocked task";
555+
}
556+
557+
// Detach worker from the cluster if it has been overloaded for too
558+
// long.
559+
const auto now = velox::getCurrentTimeSec();
560+
const auto lastNotOverloadedTime =
561+
taskManager_->lastNotOverloadedTimeInSecs();
562+
const auto overloadedDurationSec =
563+
taskManager_->isServerOverloaded() && (now > lastNotOverloadedTime)
564+
? now - lastNotOverloadedTime
565+
: 0UL;
566+
RECORD_METRIC_VALUE(
567+
kCounterOverloadedDurationSec, overloadedDurationSec);
568+
if (detachReason == nullptr) {
569+
const uint64_t secondsThreshold =
570+
SystemConfig::instance()->workerOverloadedSecondsToDetachWorker();
571+
if (secondsThreshold > 0 &&
572+
overloadedDurationSec > secondsThreshold) {
573+
detachReason = "worker has been overloaded for too long";
574+
}
575+
}
576+
577+
if (detachReason != nullptr) {
578+
detachWorker(detachReason);
553579
} else {
554580
maybeAttachWorker();
555581
}

presto-native-execution/presto_cpp/main/TaskManager.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,8 @@ TaskManager::TaskManager(
360360
driverExecutor,
361361
spillerExecutor)),
362362
bufferManager_(velox::exec::OutputBufferManager::getInstanceRef()),
363-
httpSrvCpuExecutor_(httpSrvCpuExecutor) {
363+
httpSrvCpuExecutor_(httpSrvCpuExecutor),
364+
lastNotOverloadedTimeInSecs_(velox::getCurrentTimeSec()) {
364365
VELOX_CHECK_NOT_NULL(bufferManager_, "invalid OutputBufferManager");
365366
}
366367

@@ -471,6 +472,13 @@ TaskManager::buildTaskSpillDirectoryPath(
471472
std::move(taskSpillDirPath), std::move(dateSpillDirPath));
472473
}
473474

475+
void TaskManager::setServerOverloaded(bool serverOverloaded) {
476+
serverOverloaded_ = serverOverloaded;
477+
if (!serverOverloaded) {
478+
lastNotOverloadedTimeInSecs_ = velox::getCurrentTimeSec();
479+
}
480+
}
481+
474482
void TaskManager::getDataForResultRequests(
475483
const std::unordered_map<int64_t, std::shared_ptr<ResultRequest>>&
476484
resultRequests) {

presto-native-execution/presto_cpp/main/TaskManager.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,14 @@ class TaskManager {
179179

180180
/// Presto Server can notify the Task Manager that the former is overloaded,
181181
/// so the Task Manager can optionally change Task admission algorithm.
182-
void setServerOverloaded(bool serverOverloaded) {
183-
serverOverloaded_ = serverOverloaded;
182+
void setServerOverloaded(bool serverOverloaded);
183+
184+
bool isServerOverloaded() const {
185+
return serverOverloaded_;
186+
}
187+
188+
uint64_t lastNotOverloadedTimeInSecs() const {
189+
return lastNotOverloadedTimeInSecs_;
184190
}
185191

186192
/// Returns last known number of queued drivers. Used in determining if the
@@ -236,6 +242,7 @@ class TaskManager {
236242
folly::Synchronized<TaskQueue> taskQueue_;
237243
folly::Executor* httpSrvCpuExecutor_;
238244
std::atomic_bool serverOverloaded_{false};
245+
std::atomic_uint64_t lastNotOverloadedTimeInSecs_;
239246
std::atomic_uint32_t numQueuedDrivers_{0};
240247
};
241248

presto-native-execution/presto_cpp/main/common/Configs.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ SystemConfig::SystemConfig() {
188188
NUM_PROP(kWorkerOverloadedThresholdCpuPct, 0),
189189
NUM_PROP(kWorkerOverloadedThresholdNumQueuedDriversHwMultiplier, 0.0),
190190
NUM_PROP(kWorkerOverloadedCooldownPeriodSec, 5),
191+
NUM_PROP(kWorkerOverloadedSecondsToDetachWorker, 0),
191192
BOOL_PROP(kWorkerOverloadedTaskQueuingEnabled, false),
192193
NUM_PROP(kMallocHeapDumpThresholdGb, 20),
193194
NUM_PROP(kMallocMemMinHeapDumpInterval, 10),
@@ -576,6 +577,11 @@ uint32_t SystemConfig::workerOverloadedCooldownPeriodSec() const {
576577
return optionalProperty<uint32_t>(kWorkerOverloadedCooldownPeriodSec).value();
577578
}
578579

580+
uint64_t SystemConfig::workerOverloadedSecondsToDetachWorker() const {
581+
return optionalProperty<uint64_t>(kWorkerOverloadedSecondsToDetachWorker)
582+
.value();
583+
}
584+
579585
bool SystemConfig::workerOverloadedTaskQueuingEnabled() const {
580586
return optionalProperty<bool>(kWorkerOverloadedTaskQueuingEnabled).value();
581587
}

presto-native-execution/presto_cpp/main/common/Configs.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,11 @@ class SystemConfig : public ConfigBase {
353353
/// This is to prevent spiky fluctuation of the overloaded status.
354354
static constexpr std::string_view kWorkerOverloadedCooldownPeriodSec{
355355
"worker-overloaded-cooldown-period-sec"};
356+
/// The number of seconds the worker needs to be continuously overloaded for
357+
/// us to detach the worker from the cluster in an attempt to keep the
358+
/// cluster operational. Ignored if set to zero. Default is zero.
359+
static constexpr std::string_view kWorkerOverloadedSecondsToDetachWorker{
360+
"worker-overloaded-seconds-to-detach-worker"};
356361
/// If true, the worker starts queuing new tasks when overloaded, and
357362
/// starts them gradually when it stops being overloaded.
358363
static constexpr std::string_view kWorkerOverloadedTaskQueuingEnabled{
@@ -948,6 +953,8 @@ class SystemConfig : public ConfigBase {
948953

949954
uint32_t workerOverloadedCooldownPeriodSec() const;
950955

956+
uint64_t workerOverloadedSecondsToDetachWorker() const;
957+
951958
bool workerOverloadedTaskQueuingEnabled() const;
952959

953960
bool mallocMemHeapDumpEnabled() const;

presto-native-execution/presto_cpp/main/common/Counters.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ void registerPrestoMetrics() {
102102
DEFINE_METRIC(kCounterOverloaded, facebook::velox::StatType::AVG);
103103
DEFINE_METRIC(kCounterNumStuckDrivers, facebook::velox::StatType::AVG);
104104
DEFINE_METRIC(kCounterTaskPlannedTimeMs, facebook::velox::StatType::AVG);
105+
DEFINE_METRIC(kCounterOverloadedDurationSec, facebook::velox::StatType::AVG);
105106
DEFINE_METRIC(
106107
kCounterTotalPartitionedOutputBuffer, facebook::velox::StatType::AVG);
107108
DEFINE_METRIC(

presto-native-execution/presto_cpp/main/common/Counters.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,10 @@ constexpr folly::StringPiece kCounterOverloaded{"presto_cpp.overloaded"};
137137
/// planned) in milliseconds.
138138
constexpr folly::StringPiece kCounterTaskPlannedTimeMs{
139139
"presto_cpp.task_planned_time_ms"};
140+
/// Exports the current overloaded duration in seconds or 0 if not currently
141+
/// overloaded.
142+
constexpr folly::StringPiece kCounterOverloadedDurationSec{
143+
"presto_cpp.overloaded_duration_sec"};
140144

141145
/// Number of total OutputBuffer managed by all
142146
/// OutputBufferManager

0 commit comments

Comments
 (0)