Skip to content

Commit 5e5aa4c

Browse files
committed
MB-58402: Limit concurrency of bucket deletion
Add a new task type that both limits concurrency using a semaphore and allows the function being run to return a sleep time after which it wishes to run again. Change-Id: Idf7c499cf5ae7d67290872fa3f594d51a0db28a7 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/196661 Reviewed-by: Trond Norbye <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 4f3034f commit 5e5aa4c

File tree

5 files changed

+204
-49
lines changed

5 files changed

+204
-49
lines changed

daemon/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ add_library(memcached_daemon STATIC
109109
tls_configuration.h
110110
tracing.cc
111111
tracing.h
112+
yielding_limited_concurrency_task.cc
112113
yielding_task.cc
113114
)
114115

daemon/protocol/mcbp/bucket_management_command_context.cc

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
#include <daemon/one_shot_limited_concurrency_task.h>
1717
#include <daemon/session_cas.h>
1818
#include <daemon/settings.h>
19-
#include <daemon/yielding_task.h>
19+
#include <daemon/yielding_limited_concurrency_task.h>
2020
#include <executor/executorpool.h>
2121
#include <logger/logger.h>
2222
#include <memcached/config_parser.h>
@@ -157,35 +157,36 @@ cb::engine_errc BucketManagementCommandContext::remove() {
157157

158158
std::string taskname{"Delete bucket [" + name + "]"};
159159
using namespace std::chrono_literals;
160-
ExecutorPool::get()->schedule(std::make_shared<YieldingTask>(
161-
TaskId::Core_DeleteBucketTask,
162-
taskname,
163-
[destroyer = std::move(*optDestroyer),
164-
client = &cookie,
165-
nm = std::move(name)]() mutable
160+
auto task = [destroyer = std::move(*optDestroyer),
161+
client = &cookie,
162+
nm = std::move(name)]() mutable
166163
-> std::optional<std::chrono::duration<double>> {
167-
auto& connection = client->getConnection();
168-
cb::engine_errc status;
169-
try {
170-
status = destroyer.drive();
171-
} catch (const std::runtime_error& error) {
172-
LOG_WARNING(
173-
"{}: An error occurred while deleting bucket [{}]: "
174-
"{}",
175-
connection.getId(),
176-
nm,
177-
error.what());
178-
status = cb::engine_errc::failed;
179-
}
180-
if (status == cb::engine_errc::would_block) {
181-
// destroyer hasn't completed yet (waiting for connections
182-
// or operations), try again in 10ms
183-
return {10ms};
184-
}
185-
client->notifyIoComplete(status);
186-
return {};
187-
},
188-
100ms));
164+
auto& connection = client->getConnection();
165+
cb::engine_errc status;
166+
try {
167+
status = destroyer.drive();
168+
} catch (const std::runtime_error& error) {
169+
LOG_WARNING("{}: An error occurred while deleting bucket [{}]: {}",
170+
connection.getId(),
171+
nm,
172+
error.what());
173+
status = cb::engine_errc::failed;
174+
}
175+
if (status == cb::engine_errc::would_block) {
176+
// destroyer hasn't completed yet (waiting for connections
177+
// or operations), try again in 10ms
178+
return {10ms};
179+
}
180+
client->notifyIoComplete(status);
181+
return {};
182+
};
183+
ExecutorPool::get()->schedule(
184+
std::make_shared<YieldingLimitedConcurrencyTask>(
185+
TaskId::Core_DeleteBucketTask,
186+
std::move(taskname),
187+
std::move(task),
188+
ConcurrencySemaphores::instance().bucket_management,
189+
100ms));
189190

190191
state = State::Done;
191192
return cb::engine_errc::would_block;

daemon/tasks_test.cc

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <folly/portability/GMock.h>
1212
#include <folly/portability/GTest.h>
1313

14+
#include <daemon/yielding_limited_concurrency_task.h>
1415
#include <daemon/yielding_task.h>
1516
#include <executor/executorpool.h>
1617
#include <executor/fake_executorpool.h>
@@ -37,19 +38,17 @@ class TasksTest : public ::testing::Test {
3738
}
3839
};
3940

40-
TEST_F(TasksTest, YieldingTaskCalledAgain) {
41-
// test that a YieldingTask is called again if it returns a snooze time,
42-
// and is not called again when returning a nullopt to indicate it is
43-
// "done"
41+
using MockTaskFunc = testing::StrictMock<
42+
testing::MockFunction<std::optional<std::chrono::duration<double>>()>>;
43+
44+
/**
45+
* Test that a task is called again if it returns a snooze time,
46+
* and is not called again when returning a nullopt to indicate it is "done"
47+
*/
48+
static void testTaskCalledAgain(ExTask task, MockTaskFunc& mockTaskFunc) {
4449
using namespace testing;
45-
StrictMock<MockFunction<std::optional<std::chrono::duration<double>>()>>
46-
mockTaskFunc;
4750

48-
ExecutorPool::get()->schedule(
49-
std::make_shared<YieldingTask>(TaskId::Core_DeleteBucketTask,
50-
"foobar",
51-
mockTaskFunc.AsStdFunction(),
52-
std::chrono::seconds(30)));
51+
ExecutorPool::get()->schedule(task);
5352

5453
InSequence s;
5554

@@ -67,17 +66,34 @@ TEST_F(TasksTest, YieldingTaskCalledAgain) {
6766
std::logic_error);
6867
}
6968

70-
TEST_F(TasksTest, YieldingTaskSnoozes) {
71-
// test that a YieldingTask is correctly snooze()'ed
69+
TEST_F(TasksTest, YieldingTaskCalledAgain) {
70+
MockTaskFunc func;
71+
testTaskCalledAgain(
72+
std::make_shared<YieldingTask>(TaskId::Core_DeleteBucketTask,
73+
"foobar",
74+
func.AsStdFunction(),
75+
std::chrono::seconds(30)),
76+
func);
77+
}
7278

79+
TEST_F(TasksTest, YieldingLimitedConcurrencyTaskCalledAgain) {
80+
MockTaskFunc func;
81+
cb::AwaitableSemaphore semaphore{4};
82+
testTaskCalledAgain(std::make_shared<YieldingLimitedConcurrencyTask>(
83+
TaskId::Core_DeleteBucketTask,
84+
"foobar",
85+
func.AsStdFunction(),
86+
semaphore,
87+
std::chrono::seconds(30)),
88+
func);
89+
}
90+
91+
/**
92+
* Test that a task is correctly snooze()'ed
93+
*/
94+
static void testTaskSnoozes(ExTask task, MockTaskFunc& mockTaskFunc) {
7395
using namespace testing;
74-
StrictMock<MockFunction<std::optional<std::chrono::duration<double>>()>>
75-
mockTaskFunc;
7696

77-
auto task = std::make_shared<YieldingTask>(TaskId::Core_DeleteBucketTask,
78-
"foobar",
79-
mockTaskFunc.AsStdFunction(),
80-
std::chrono::seconds(30));
8197
ExecutorPool::get()->schedule(task);
8298

8399
using namespace std::chrono;
@@ -95,4 +111,26 @@ TEST_F(TasksTest, YieldingTaskSnoozes) {
95111

96112
EXPECT_EQ(TASK_SNOOZED, task->getState());
97113
EXPECT_GE(task->getWaketime(), beforeTime + 10min);
98-
}
114+
}
115+
116+
TEST_F(TasksTest, YieldingTaskSnoozes) {
117+
MockTaskFunc func;
118+
testTaskSnoozes(
119+
std::make_shared<YieldingTask>(TaskId::Core_DeleteBucketTask,
120+
"foobar",
121+
func.AsStdFunction(),
122+
std::chrono::seconds(30)),
123+
func);
124+
}
125+
126+
TEST_F(TasksTest, YieldingLimitedConcurrencyTaskSnoozes) {
127+
MockTaskFunc func;
128+
cb::AwaitableSemaphore semaphore{4};
129+
testTaskSnoozes(std::make_shared<YieldingLimitedConcurrencyTask>(
130+
TaskId::Core_DeleteBucketTask,
131+
"foobar",
132+
func.AsStdFunction(),
133+
semaphore,
134+
std::chrono::seconds(30)),
135+
func);
136+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2023-Present Couchbase, Inc.
3+
*
4+
* Use of this software is governed by the Business Source License included
5+
* in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
6+
* in that file, in accordance with the Business Source License, use of this
7+
* software will be governed by the Apache License, Version 2.0, included in
8+
* the file licenses/APL2.txt.
9+
*/
10+
11+
#include "yielding_limited_concurrency_task.h"
12+
#include <logger/logger.h>
13+
14+
YieldingLimitedConcurrencyTask::YieldingLimitedConcurrencyTask(
15+
TaskId id,
16+
std::string name,
17+
YieldingFunc function,
18+
cb::AwaitableSemaphore& semaphore,
19+
std::chrono::microseconds expectedRuntime)
20+
: LimitedConcurrencyTask(NoBucketTaskable::instance(), id, semaphore, true),
21+
name(std::move(name)),
22+
function(std::move(function)),
23+
expectedRuntime(std::move(expectedRuntime)) {
24+
}
25+
26+
bool YieldingLimitedConcurrencyTask::runInner() {
27+
auto guard = acquireOrWait();
28+
if (!guard) {
29+
// Could not acquire a token, queued for notification.
30+
// Already snooze()-ed forever, just return true to
31+
// reschedule.
32+
return true;
33+
}
34+
35+
// Do concurrency-limited work
36+
try {
37+
auto optSleepTime = function();
38+
if (optSleepTime) {
39+
snooze(optSleepTime->count());
40+
// Task _does_ wish to run again
41+
return true;
42+
}
43+
} catch (const std::exception& e) {
44+
LOG_CRITICAL(
45+
"YieldingLimitedConcurrencyTask::runInner(\"{}\"):"
46+
" received exception: {}",
47+
name,
48+
e.what());
49+
}
50+
51+
// Not running again
52+
return false;
53+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2023-Present Couchbase, Inc.
3+
*
4+
* Use of this software is governed by the Business Source License included
5+
* in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
6+
* in that file, in accordance with the Business Source License, use of this
7+
* software will be governed by the Apache License, Version 2.0, included in
8+
* the file licenses/APL2.txt.
9+
*/
10+
#pragma once
11+
12+
#include "nobucket_taskable.h"
13+
#include <executor/limited_concurrency_task.h>
14+
#include <functional>
15+
#include <optional>
16+
17+
/**
18+
* YieldingLimitedConcurrencyTask accepts a "generic" task which runs the
19+
* provided function until completed to avoid having to duplicate all of the
20+
* boilerplate code for a simple task.
21+
*
22+
* A semaphore is used to limit the number of tasks run concurrently.
23+
* The function may return a sleep time after which it wishes to run again
24+
* (0s may be used to yield the thread and run again asap), or a
25+
* nullopt to indicate it is complete.
26+
*/
27+
class YieldingLimitedConcurrencyTask : public LimitedConcurrencyTask {
28+
public:
29+
using YieldingFunc =
30+
std::function<std::optional<std::chrono::duration<double>>()>;
31+
32+
/**
33+
* Create a new instance of the YieldingLimitedConcurrencyTask
34+
*
35+
* @param id The task identifier for the task
36+
* @param name The name for the task
37+
* @param function The function to run on the executor
38+
* @param semaphore The semaphore to use for concurrency limiting
39+
* @param expectedRuntime Maximum duration `function` is expected to run
40+
*/
41+
YieldingLimitedConcurrencyTask(TaskId id,
42+
std::string name,
43+
YieldingFunc function,
44+
cb::AwaitableSemaphore& semaphore,
45+
std::chrono::microseconds expectedRuntime =
46+
std::chrono::milliseconds(100));
47+
48+
std::string getDescription() const override {
49+
return name;
50+
}
51+
52+
std::chrono::microseconds maxExpectedDuration() const override {
53+
return expectedRuntime;
54+
}
55+
56+
protected:
57+
bool runInner() override;
58+
59+
const std::string name;
60+
const YieldingFunc function;
61+
const std::chrono::microseconds expectedRuntime;
62+
};

0 commit comments

Comments
 (0)