Skip to content

Commit 3b2a325

Browse files
authored
Merge pull request ClickHouse#76847 from ClickHouse/stoprep
Fix race in DatabaseReplicated between stopReplication and startupDatabaseAsync
2 parents 5345879 + 79c1bb2 commit 3b2a325

File tree

12 files changed

+151
-50
lines changed

12 files changed

+151
-50
lines changed

programs/server/Server.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2398,7 +2398,7 @@ try
23982398
LOG_INFO(log, "Stopping AsyncLoader.");
23992399

24002400
// Waits for all currently running jobs to finish and do not run any other pending jobs.
2401-
global_context->getAsyncLoader().stop();
2401+
global_context->getAsyncLoader().shutdown();
24022402
);
24032403

24042404
try

src/Common/AsyncLoader.cpp

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -241,14 +241,7 @@ AsyncLoader::~AsyncLoader()
241241
// When all jobs are done we could still have finalizing workers.
242242
// These workers could call updateCurrentPriorityAndSpawn() that scans all pools.
243243
// We need to stop all of them before destructing any of them.
244-
stop();
245-
}
246-
247-
void AsyncLoader::start()
248-
{
249-
std::unique_lock lock{mutex};
250-
is_running = true;
251-
updateCurrentPriorityAndSpawn(lock);
244+
shutdown();
252245
}
253246

254247
void AsyncLoader::wait()
@@ -277,7 +270,27 @@ void AsyncLoader::wait()
277270
}
278271
}
279272

280-
void AsyncLoader::stop()
273+
void AsyncLoader::shutdown()
274+
{
275+
LoadJobSet jobs;
276+
277+
{
278+
std::unique_lock lock{mutex};
279+
shutdown_requested = true;
280+
is_running = false;
281+
282+
for (const auto & [job, _] : scheduled_jobs)
283+
jobs.insert(job);
284+
}
285+
286+
// Cancel scheduled jobs, wait for currently running jobs to finish.
287+
remove(jobs);
288+
289+
for (auto & p : pools)
290+
p.thread_pool->wait();
291+
}
292+
293+
void AsyncLoader::pause()
281294
{
282295
{
283296
std::unique_lock lock{mutex};
@@ -289,6 +302,13 @@ void AsyncLoader::stop()
289302
p.thread_pool->wait();
290303
}
291304

305+
void AsyncLoader::unpause()
306+
{
307+
std::unique_lock lock{mutex};
308+
is_running = true;
309+
updateCurrentPriorityAndSpawn(lock);
310+
}
311+
292312
void AsyncLoader::schedule(LoadTask & task)
293313
{
294314
chassert(this == &task.loader);
@@ -331,6 +351,12 @@ void AsyncLoader::schedule(const LoadJobSet & jobs_to_schedule)
331351
for (const auto & job : jobs_to_schedule)
332352
gatherNotScheduled(job, jobs, lock);
333353

354+
if (jobs.empty())
355+
return;
356+
357+
if (shutdown_requested)
358+
throw Exception(ErrorCodes::ASYNC_LOAD_CANCELED, "AsyncLoader was shut down");
359+
334360
// Ensure scheduled_jobs graph will have no cycles. The only way to get a cycle is to add a cycle, assuming old jobs cannot reference new ones.
335361
checkCycle(jobs, lock);
336362

src/Common/AsyncLoader.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -395,17 +395,20 @@ class AsyncLoader : private boost::noncopyable
395395
// WARNING: all tasks instances should be destructed before associated AsyncLoader.
396396
~AsyncLoader();
397397

398-
// Start workers to execute scheduled load jobs. Note that AsyncLoader is constructed as already started.
399-
void start();
400-
401398
// Wait for all load jobs to finish, including all new jobs. So at first take care to stop adding new jobs.
402399
void wait();
403400

401+
// Wait for currently executing jobs to finish, cancel pending jobs with an exception,
402+
// prevent scheduling of new jobs.
403+
void shutdown();
404+
404405
// Wait for currently executing jobs to finish, but do not run any other pending jobs.
405406
// Not finished jobs are left in pending state:
406-
// - they can be executed by calling start() again;
407+
// - they can be executed by calling unpause() again;
407408
// - or canceled using ~Task() or remove() later.
408-
void stop();
409+
// Currently only used in tests.
410+
void pause();
411+
void unpause();
409412

410413
// Schedule all jobs of given `task` and their dependencies (even if they are not in task).
411414
// All dependencies of a scheduled job inherit its pool if it has higher priority. This way higher priority job
@@ -484,6 +487,7 @@ class AsyncLoader : private boost::noncopyable
484487

485488
mutable std::mutex mutex; // Guards all the fields below.
486489
bool is_running = true;
490+
bool shutdown_requested = false;
487491
std::optional<Priority> current_priority; // highest priority among active pools
488492
UInt64 last_ready_seqno = 0; // Increasing counter for ready queue keys.
489493
UInt64 last_job_id = 0; // Increasing counter for job IDs

src/Common/FailPoint.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ static struct InitFiu
8787
REGULAR(zero_copy_unlock_zk_fail_after_op) \
8888
REGULAR(plain_rewritable_object_storage_azure_not_found_on_init) \
8989
PAUSEABLE(storage_merge_tree_background_clear_old_parts_pause) \
90+
PAUSEABLE(database_replicated_startup_pause) \
9091

9192

9293
namespace FailPoints

src/Common/tests/gtest_async_loader.cpp

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ struct AsyncLoaderTest
5353
explicit AsyncLoaderTest(std::vector<Initializer> initializers)
5454
: loader(getPoolInitializers(initializers), /* log_failures = */ false, /* log_progress = */ false, /* log_events = */ false)
5555
{
56-
loader.stop(); // All tests call `start()` manually to better control ordering
56+
loader.pause(); // All tests call `unpause()` manually to better control ordering
5757
}
5858

5959
explicit AsyncLoaderTest(size_t max_threads = 1)
@@ -181,7 +181,7 @@ TEST(AsyncLoader, Smoke)
181181

182182
std::thread waiter_thread([&t, job5] { t.loader.wait(job5); });
183183

184-
t.loader.start();
184+
t.loader.unpause();
185185

186186
t.loader.wait(job3);
187187
t.loader.wait();
@@ -196,7 +196,7 @@ TEST(AsyncLoader, Smoke)
196196
ASSERT_EQ(jobs_done, 5);
197197
ASSERT_EQ(low_priority_jobs_done, 1);
198198

199-
t.loader.stop();
199+
t.loader.pause();
200200
}
201201

202202
TEST(AsyncLoader, CycleDetection)
@@ -348,7 +348,7 @@ TEST(AsyncLoader, CancelPendingDependency)
348348
TEST(AsyncLoader, CancelExecutingJob)
349349
{
350350
AsyncLoaderTest t;
351-
t.loader.start();
351+
t.loader.unpause();
352352

353353
std::barrier sync(2);
354354

@@ -380,7 +380,7 @@ TEST(AsyncLoader, CancelExecutingJob)
380380
TEST(AsyncLoader, CancelExecutingTask)
381381
{
382382
AsyncLoaderTest t(16);
383-
t.loader.start();
383+
t.loader.unpause();
384384
std::barrier sync(2);
385385

386386
auto blocker_job_func = [&] (AsyncLoader &, const LoadJobPtr &)
@@ -436,7 +436,7 @@ TEST(AsyncLoader, CancelExecutingTask)
436436
TEST(AsyncLoader, JobFailure)
437437
{
438438
AsyncLoaderTest t;
439-
t.loader.start();
439+
t.loader.unpause();
440440

441441
std::string error_message = "test job failure";
442442

@@ -466,7 +466,7 @@ TEST(AsyncLoader, JobFailure)
466466
TEST(AsyncLoader, ScheduleJobWithFailedDependencies)
467467
{
468468
AsyncLoaderTest t;
469-
t.loader.start();
469+
t.loader.unpause();
470470

471471
std::string_view error_message = "test job failure";
472472

@@ -522,7 +522,7 @@ TEST(AsyncLoader, ScheduleJobWithCanceledDependencies)
522522
auto canceled_task = t.schedule({ canceled_job });
523523
canceled_task->remove();
524524

525-
t.loader.start();
525+
t.loader.unpause();
526526

527527
auto job_func = [&] (AsyncLoader &, const LoadJobPtr &) {};
528528
auto job1 = makeLoadJob({ canceled_job }, "job1", job_func);
@@ -559,7 +559,7 @@ TEST(AsyncLoader, IgnoreDependencyFailure)
559559
{
560560
AsyncLoaderTest t;
561561
std::atomic<bool> success{false};
562-
t.loader.start();
562+
t.loader.unpause();
563563

564564
std::string_view error_message = "test job failure";
565565

@@ -588,7 +588,7 @@ TEST(AsyncLoader, CustomDependencyFailure)
588588
int error_count = 0;
589589
std::atomic<size_t> good_count{0};
590590
std::barrier canceled_sync(4);
591-
t.loader.start();
591+
t.loader.unpause();
592592

593593
std::string_view error_message = "test job failure";
594594

@@ -675,7 +675,7 @@ TEST(AsyncLoader, WaitersLimit)
675675
};
676676

677677
std::barrier sync(2);
678-
t.loader.start();
678+
t.loader.unpause();
679679

680680
auto job_func = [&] (AsyncLoader &, const LoadJobPtr &) {
681681
sync.arrive_and_wait(); // (A)
@@ -723,7 +723,7 @@ TEST(AsyncLoader, WaitersLimit)
723723
TEST(AsyncLoader, TestConcurrency)
724724
{
725725
AsyncLoaderTest t(10);
726-
t.loader.start();
726+
t.loader.unpause();
727727

728728
for (int concurrency = 1; concurrency <= 10; concurrency++)
729729
{
@@ -750,7 +750,7 @@ TEST(AsyncLoader, TestConcurrency)
750750
TEST(AsyncLoader, TestOverload)
751751
{
752752
AsyncLoaderTest t(3);
753-
t.loader.start();
753+
t.loader.unpause();
754754

755755
size_t max_threads = t.loader.getMaxThreads(/* pool = */ 0);
756756
std::atomic<int> executing{0};
@@ -765,12 +765,12 @@ TEST(AsyncLoader, TestOverload)
765765
executing--;
766766
};
767767

768-
t.loader.stop();
768+
t.loader.pause();
769769
std::vector<LoadTaskPtr> tasks;
770770
tasks.reserve(concurrency);
771771
for (int i = 0; i < concurrency; i++)
772772
tasks.push_back(t.schedule(t.chainJobSet(5, job_func)));
773-
t.loader.start();
773+
t.loader.unpause();
774774
t.loader.wait();
775775
ASSERT_EQ(executing, 0);
776776
}
@@ -818,7 +818,7 @@ TEST(AsyncLoader, StaticPriorities)
818818
jobs.push_back(makeLoadJob({ jobs[6] }, 9, "H", job_func)); // 7
819819
auto task = t.schedule({ jobs.begin(), jobs.end() });
820820

821-
t.loader.start();
821+
t.loader.unpause();
822822
t.loader.wait();
823823
ASSERT_TRUE(schedule == "A9E9D9F9G9H9C4B3" || schedule == "A9D9E9F9G9H9C4B3");
824824
}
@@ -831,7 +831,7 @@ TEST(AsyncLoader, SimplePrioritization)
831831
{.max_threads = 1, .priority{-2}},
832832
});
833833

834-
t.loader.start();
834+
t.loader.unpause();
835835

836836
std::atomic<int> executed{0}; // Number of previously executed jobs (to test execution order)
837837
LoadJobPtr job_to_prioritize;
@@ -951,9 +951,9 @@ TEST(AsyncLoader, DynamicPriorities)
951951

952952
job_to_prioritize = jobs[6]; // G
953953

954-
t.loader.start();
954+
t.loader.unpause();
955955
t.loader.wait();
956-
t.loader.stop();
956+
t.loader.pause();
957957

958958
if (prioritize)
959959
{
@@ -1000,7 +1000,7 @@ TEST(AsyncLoader, JobPrioritizedWhileWaited)
10001000

10011001
job_to_wait = jobs[1];
10021002

1003-
t.loader.start();
1003+
t.loader.unpause();
10041004

10051005
while (job_to_wait->waitersCount() == 0)
10061006
std::this_thread::yield();
@@ -1011,15 +1011,15 @@ TEST(AsyncLoader, JobPrioritizedWhileWaited)
10111011
sync.arrive_and_wait();
10121012

10131013
t.loader.wait();
1014-
t.loader.stop();
1014+
t.loader.pause();
10151015
ASSERT_EQ(t.loader.suspendedWorkersCount(1), 0);
10161016
ASSERT_EQ(t.loader.suspendedWorkersCount(0), 0);
10171017
}
10181018

10191019
TEST(AsyncLoader, RandomIndependentTasks)
10201020
{
10211021
AsyncLoaderTest t(16);
1022-
t.loader.start();
1022+
t.loader.unpause();
10231023

10241024
auto job_func = [&] (AsyncLoader &, const LoadJobPtr & self)
10251025
{
@@ -1041,7 +1041,7 @@ TEST(AsyncLoader, RandomIndependentTasks)
10411041
TEST(AsyncLoader, RandomDependentTasks)
10421042
{
10431043
AsyncLoaderTest t(16);
1044-
t.loader.start();
1044+
t.loader.unpause();
10451045

10461046
std::mutex mutex;
10471047
std::condition_variable cv;
@@ -1108,7 +1108,7 @@ TEST(AsyncLoader, SetMaxThreads)
11081108
for (int i = 0; i < 1000; i++)
11091109
tasks.push_back(t.schedule({makeLoadJob({}, "job", job_func)}));
11101110

1111-
t.loader.start();
1111+
t.loader.unpause();
11121112
while (sync_index < syncs.size())
11131113
{
11141114
// Wait for `max_threads` jobs to start executing
@@ -1132,7 +1132,7 @@ TEST(AsyncLoader, SetMaxThreads)
11321132
TEST(AsyncLoader, SubJobs)
11331133
{
11341134
AsyncLoaderTest t(1);
1135-
t.loader.start();
1135+
t.loader.unpause();
11361136

11371137
// An example of component with an asynchronous loading interface
11381138
class MyComponent : boost::noncopyable {
@@ -1195,7 +1195,7 @@ TEST(AsyncLoader, SubJobs)
11951195
TEST(AsyncLoader, RecursiveJob)
11961196
{
11971197
AsyncLoaderTest t(1);
1198-
t.loader.start();
1198+
t.loader.unpause();
11991199

12001200
// An example of component with an asynchronous loading interface (a complicated one)
12011201
class MyComponent : boost::noncopyable {

src/Databases/DatabaseReplicated.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include <base/defines.h>
4141
#include <base/getFQDNOrHostName.h>
4242
#include <Common/Exception.h>
43+
#include <Common/FailPoint.h>
4344
#include <Common/Macros.h>
4445
#include <Common/OpenTelemetryTraceContext.h>
4546
#include <Common/PoolId.h>
@@ -95,6 +96,12 @@ namespace ErrorCodes
9596
extern const int CANNOT_RESTORE_TABLE;
9697
extern const int QUERY_IS_PROHIBITED;
9798
extern const int SUPPORT_IS_DISABLED;
99+
extern const int ASYNC_LOAD_CANCELED;
100+
}
101+
102+
namespace FailPoints
103+
{
104+
extern const char database_replicated_startup_pause[];
98105
}
99106

100107
static constexpr const char * REPLICATED_DATABASE_MARK = "DatabaseReplicated";
@@ -767,6 +774,8 @@ LoadTaskPtr DatabaseReplicated::startupDatabaseAsync(AsyncLoader & async_loader,
767774
if (is_probably_dropped)
768775
return;
769776

777+
FailPointInjection::pauseFailPoint(FailPoints::database_replicated_startup_pause);
778+
770779
{
771780
std::lock_guard lock{ddl_worker_mutex};
772781
ddl_worker = std::make_unique<DatabaseReplicatedDDLWorker>(this, getContext());
@@ -1695,6 +1704,17 @@ void DatabaseReplicated::renameDatabase(ContextPtr query_context, const String &
16951704

16961705
void DatabaseReplicated::stopReplication()
16971706
{
1707+
try
1708+
{
1709+
/// Make sure startupDatabaseAsync doesn't start ddl_worker after stopReplication().
1710+
waitDatabaseStarted();
1711+
}
1712+
catch (Exception & e)
1713+
{
1714+
if (e.code() != ErrorCodes::ASYNC_LOAD_CANCELED)
1715+
tryLogCurrentException("DatabaseReplicated", "Async loading failed", LogsLevel::warning);
1716+
}
1717+
16981718
std::lock_guard lock{ddl_worker_mutex};
16991719
if (ddl_worker)
17001720
ddl_worker->shutdown();

0 commit comments

Comments
 (0)