Skip to content

Commit 6ed1fa6

Browse files
authored
Merge branch 'customizations/24.3.11' into 24.3.5_aarch64-integration-tests
2 parents 3987ff9 + b8492db commit 6ed1fa6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+439
-357
lines changed

docker/test/stress/run.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ stop_server
196196
export USE_S3_STORAGE_FOR_MERGE_TREE=1
197197
export RANDOMIZE_OBJECT_KEY_TYPE=1
198198
export ZOOKEEPER_FAULT_INJECTION=1
199+
export THREAD_POOL_FAULT_INJECTION=1
199200
configure
200201

201202
# But we still need default disk because some tables loaded only into it

programs/server/Server.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1580,6 +1580,9 @@ try
15801580
new_server_settings.http_connections_store_limit,
15811581
});
15821582

1583+
if (global_context->isServerCompletelyStarted())
1584+
CannotAllocateThreadFaultInjector::setFaultProbability(new_server_settings.cannot_allocate_thread_fault_injection_probability);
1585+
15831586
ProfileEvents::increment(ProfileEvents::MainConfigLoads);
15841587

15851588
/// Must be the last.
@@ -2069,6 +2072,8 @@ try
20692072
startup_watch.stop();
20702073
ProfileEvents::increment(ProfileEvents::ServerStartupMilliseconds, startup_watch.elapsedMilliseconds());
20712074

2075+
CannotAllocateThreadFaultInjector::setFaultProbability(server_settings.cannot_allocate_thread_fault_injection_probability);
2076+
20722077
try
20732078
{
20742079
global_context->startClusterDiscovery();

src/Backups/BackupEntriesCollector.cpp

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -786,20 +786,15 @@ void BackupEntriesCollector::makeBackupEntriesForTablesData()
786786
if (backup_settings.structure_only)
787787
return;
788788

789-
std::vector<std::future<void>> futures;
789+
ThreadPoolCallbackRunnerLocal<void> runner(threadpool, "BackupCollect");
790790
for (const auto & table_name : table_infos | boost::adaptors::map_keys)
791791
{
792-
futures.push_back(scheduleFromThreadPool<void>([&]()
792+
runner([&]()
793793
{
794794
makeBackupEntriesForTableData(table_name);
795-
}, threadpool, "BackupCollect"));
796-
}
797-
/// Wait for all tasks.
798-
for (auto & future : futures)
799-
future.wait();
800-
/// Make sure there is no exception.
801-
for (auto & future : futures)
802-
future.get();
795+
});
796+
}
797+
runner.waitForAllToFinishAndRethrowFirstError();
803798
}
804799

805800
void BackupEntriesCollector::makeBackupEntriesForTableData(const QualifiedTableName & table_name)

src/Backups/BackupFileInfo.cpp

Lines changed: 11 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -210,70 +210,39 @@ BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entr
210210
BackupFileInfos infos;
211211
infos.resize(backup_entries.size());
212212

213-
size_t num_active_jobs = 0;
214-
std::mutex mutex;
215-
std::condition_variable event;
216-
std::exception_ptr exception;
213+
std::atomic_bool failed = false;
217214

218-
auto thread_group = CurrentThread::getGroup();
219215
LoggerPtr log = getLogger("FileInfosFromBackupEntries");
220216

217+
ThreadPoolCallbackRunnerLocal<void> runner(thread_pool, "BackupWorker");
221218
for (size_t i = 0; i != backup_entries.size(); ++i)
222219
{
223-
{
224-
std::lock_guard lock{mutex};
225-
if (exception)
226-
break;
227-
++num_active_jobs;
228-
}
220+
if (failed)
221+
break;
229222

230-
auto job = [&mutex, &num_active_jobs, &event, &exception, &infos, &backup_entries, &read_settings, &base_backup, &thread_group, &process_list_element, i, log]()
223+
runner([&infos, &backup_entries, &read_settings, &base_backup, &process_list_element, i, log, &failed]()
231224
{
232-
SCOPE_EXIT_SAFE({
233-
std::lock_guard lock{mutex};
234-
if (!--num_active_jobs)
235-
event.notify_all();
236-
CurrentThread::detachFromGroupIfNotDetached();
237-
});
238-
225+
if (failed)
226+
return;
239227
try
240228
{
241229
const auto & name = backup_entries[i].first;
242230
const auto & entry = backup_entries[i].second;
243231

244-
if (thread_group)
245-
CurrentThread::attachToGroup(thread_group);
246-
247-
setThreadName("BackupWorker");
248-
249-
{
250-
std::lock_guard lock{mutex};
251-
if (exception)
252-
return;
253-
}
254-
255232
if (process_list_element)
256233
process_list_element->checkTimeLimit();
257234

258235
infos[i] = buildFileInfoForBackupEntry(name, entry, base_backup, read_settings, log);
259236
}
260237
catch (...)
261238
{
262-
std::lock_guard lock{mutex};
263-
if (!exception)
264-
exception = std::current_exception();
239+
failed = true;
240+
throw;
265241
}
266-
};
267-
268-
thread_pool.scheduleOrThrowOnError(job);
242+
});
269243
}
270244

271-
{
272-
std::unique_lock lock{mutex};
273-
event.wait(lock, [&] { return !num_active_jobs; });
274-
if (exception)
275-
std::rethrow_exception(exception);
276-
}
245+
runner.waitForAllToFinishAndRethrowFirstError();
277246

278247
return infos;
279248
}

src/Backups/BackupIO_AzureBlobStorage.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup,
121121
/* dest_path */ blob_path[0],
122122
settings,
123123
read_settings,
124-
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupRDAzure"),
124+
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupRDAzure"),
125125
/* for_disk_azure_blob_storage= */ true);
126126

127127
return file_size;
@@ -178,7 +178,7 @@ void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backu
178178
fs::path(configuration.blob_path) / path_in_backup,
179179
settings,
180180
read_settings,
181-
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"));
181+
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"));
182182
return; /// copied!
183183
}
184184
}
@@ -201,14 +201,14 @@ void BackupWriterAzureBlobStorage::copyFile(const String & destination, const St
201201
/* dest_path */ destination,
202202
settings,
203203
read_settings,
204-
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"),
204+
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"),
205205
/* for_disk_azure_blob_storage= */ true);
206206
}
207207

208208
void BackupWriterAzureBlobStorage::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length)
209209
{
210210
copyDataToAzureBlobStorageFile(create_read_buffer, start_pos, length, client, configuration.container, path_in_backup, settings,
211-
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"));
211+
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"));
212212
}
213213

214214
BackupWriterAzureBlobStorage::~BackupWriterAzureBlobStorage() = default;

src/Backups/BackupIO_S3.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
194194
read_settings,
195195
blob_storage_log,
196196
object_attributes,
197-
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupReaderS3"),
197+
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupReaderS3"),
198198
/* for_disk_s3= */ true);
199199

200200
return file_size;
@@ -263,7 +263,7 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src
263263
read_settings,
264264
blob_storage_log,
265265
{},
266-
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
266+
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
267267
return; /// copied!
268268
}
269269
}
@@ -288,14 +288,14 @@ void BackupWriterS3::copyFile(const String & destination, const String & source,
288288
read_settings,
289289
blob_storage_log,
290290
{},
291-
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
291+
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
292292
}
293293

294294
void BackupWriterS3::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length)
295295
{
296296
copyDataToS3File(create_read_buffer, start_pos, length, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup,
297297
s3_settings.request_settings, blob_storage_log, {},
298-
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
298+
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
299299
}
300300

301301
BackupWriterS3::~BackupWriterS3() = default;
@@ -330,7 +330,7 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
330330
s3_settings.request_settings,
331331
blob_storage_log,
332332
std::nullopt,
333-
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"),
333+
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"),
334334
write_settings);
335335
}
336336

src/Backups/BackupsWorker.cpp

Lines changed: 14 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -703,51 +703,27 @@ void BackupsWorker::writeBackupEntries(
703703
backup_entries.size());
704704
}
705705

706-
size_t num_active_jobs = 0;
707-
std::mutex mutex;
708-
std::condition_variable event;
709-
std::exception_ptr exception;
706+
707+
std::atomic_bool failed = false;
710708

711709
bool always_single_threaded = !backup->supportsWritingInMultipleThreads();
712710
auto & thread_pool = getThreadPool(ThreadPoolId::BACKUP_COPY_FILES);
713-
auto thread_group = CurrentThread::getGroup();
714711

712+
ThreadPoolCallbackRunnerLocal<void> runner(thread_pool, "BackupWorker");
715713
for (size_t i = 0; i != backup_entries.size(); ++i)
716714
{
715+
if (failed)
716+
break;
717+
717718
auto & entry = backup_entries[i].second;
718719
const auto & file_info = file_infos[i];
719720

721+
auto job = [&]()
720722
{
721-
std::unique_lock lock{mutex};
722-
if (exception)
723-
break;
724-
++num_active_jobs;
725-
}
726-
727-
auto job = [&](bool async)
728-
{
729-
SCOPE_EXIT_SAFE(
730-
std::lock_guard lock{mutex};
731-
if (!--num_active_jobs)
732-
event.notify_all();
733-
if (async)
734-
CurrentThread::detachFromGroupIfNotDetached();
735-
);
736-
723+
if (failed)
724+
return;
737725
try
738726
{
739-
if (async && thread_group)
740-
CurrentThread::attachToGroup(thread_group);
741-
742-
if (async)
743-
setThreadName("BackupWorker");
744-
745-
{
746-
std::lock_guard lock{mutex};
747-
if (exception)
748-
return;
749-
}
750-
751727
if (process_list_element)
752728
process_list_element->checkTimeLimit();
753729

@@ -770,27 +746,21 @@ void BackupsWorker::writeBackupEntries(
770746
}
771747
catch (...)
772748
{
773-
std::lock_guard lock{mutex};
774-
if (!exception)
775-
exception = std::current_exception();
749+
failed = true;
750+
throw;
776751
}
777752
};
778753

779754
if (always_single_threaded)
780755
{
781-
job(false);
756+
job();
782757
continue;
783758
}
784759

785-
thread_pool.scheduleOrThrowOnError([job] { job(true); });
760+
runner(std::move(job));
786761
}
787762

788-
{
789-
std::unique_lock lock{mutex};
790-
event.wait(lock, [&] { return !num_active_jobs; });
791-
if (exception)
792-
std::rethrow_exception(exception);
793-
}
763+
runner.waitForAllToFinishAndRethrowFirstError();
794764
}
795765

796766

src/Backups/RestorerFromBackup.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ void RestorerFromBackup::schedule(std::function<void()> && task_, const char * t
231231

232232
checkIsQueryCancelled();
233233

234-
auto future = scheduleFromThreadPool<void>(
234+
auto future = scheduleFromThreadPoolUnsafe<void>(
235235
[this, task = std::move(task_)]() mutable
236236
{
237237
if (exception_caught)

src/Common/AsyncLoader.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -873,6 +873,7 @@ void AsyncLoader::spawn(Pool & pool, std::unique_lock<std::mutex> & lock)
873873
ALLOW_ALLOCATIONS_IN_SCOPE;
874874
if (log_events)
875875
LOG_DEBUG(log, "Spawn loader worker #{} in {}", pool.workers, pool.name);
876+
auto blocker = CannotAllocateThreadFaultInjector::blockFaultInjections();
876877
pool.thread_pool->scheduleOrThrowOnError([this, &pool] { worker(pool); });
877878
});
878879
}

src/Common/ThreadPool.cpp

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,9 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
215215
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds,
216216
watch.elapsedMicroseconds());
217217

218+
if (CannotAllocateThreadFaultInjector::injectFault())
219+
return on_error("fault injected");
220+
218221
auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };
219222

220223
if (wait_microseconds) /// Check for optional. Condition is true if the optional is set. Even if the value is zero.
@@ -621,3 +624,42 @@ void GlobalThreadPool::shutdown()
621624
the_instance->finalize();
622625
}
623626
}
627+
628+
CannotAllocateThreadFaultInjector & CannotAllocateThreadFaultInjector::instance()
629+
{
630+
static CannotAllocateThreadFaultInjector ins;
631+
return ins;
632+
}
633+
634+
void CannotAllocateThreadFaultInjector::setFaultProbability(double probability)
635+
{
636+
auto & ins = instance();
637+
std::lock_guard lock(ins.mutex);
638+
ins.enabled = 0 < probability && probability <= 1;
639+
if (ins.enabled)
640+
ins.random.emplace(probability);
641+
else
642+
ins.random.reset();
643+
}
644+
645+
bool CannotAllocateThreadFaultInjector::injectFault()
646+
{
647+
auto & ins = instance();
648+
if (!ins.enabled.load(std::memory_order_relaxed))
649+
return false;
650+
651+
if (ins.block_fault_injections)
652+
return false;
653+
654+
std::lock_guard lock(ins.mutex);
655+
return ins.random && (*ins.random)(ins.rndgen);
656+
}
657+
658+
thread_local bool CannotAllocateThreadFaultInjector::block_fault_injections = false;
659+
660+
scope_guard CannotAllocateThreadFaultInjector::blockFaultInjections()
661+
{
662+
auto & ins = instance();
663+
ins.block_fault_injections = true;
664+
return [&ins](){ ins.block_fault_injections = false; };
665+
}

0 commit comments

Comments
 (0)