diff --git a/src/include/threadpool/mono_queue_pool.h b/src/include/threadpool/mono_queue_pool.h index fbee1985f22..f1d9fc0a87b 100644 --- a/src/include/threadpool/mono_queue_pool.h +++ b/src/include/threadpool/mono_queue_pool.h @@ -83,7 +83,11 @@ inline void MonoQueuePool::SubmitTask(const F &func) { if (!is_running_) { Startup(); } - task_queue_.Enqueue(std::move(func)); + { + std::lock_guard lock(worker_pool_.cv_lock); + task_queue_.Enqueue(std::move(func)); + } + worker_pool_.not_empty.notify_one(); } inline MonoQueuePool &MonoQueuePool::GetInstance() { diff --git a/src/include/threadpool/worker_pool.h b/src/include/threadpool/worker_pool.h index 2c9ee30ef96..797f1cf2c5b 100644 --- a/src/include/threadpool/worker_pool.h +++ b/src/include/threadpool/worker_pool.h @@ -13,7 +13,9 @@ #pragma once #include +#include #include +#include #include #include #include @@ -52,6 +54,9 @@ class WorkerPool { */ uint32_t NumWorkers() const { return num_workers_; } + std::mutex cv_lock; + std::condition_variable not_empty; + private: // The name of this pool std::string pool_name_; diff --git a/src/threadpool/worker_pool.cpp b/src/threadpool/worker_pool.cpp index 740e5190dd4..e5696552670 100644 --- a/src/threadpool/worker_pool.cpp +++ b/src/threadpool/worker_pool.cpp @@ -10,6 +10,8 @@ // //===----------------------------------------------------------------------===// +#include + #include "threadpool/worker_pool.h" #include "common/logger.h" @@ -20,22 +22,18 @@ namespace threadpool { namespace { void WorkerFunc(std::string thread_name, std::atomic_bool *is_running, - TaskQueue *task_queue) { - constexpr auto kMinPauseTime = std::chrono::microseconds(1); - constexpr auto kMaxPauseTime = std::chrono::microseconds(1000); - + TaskQueue *task_queue, std::mutex *cv_lock, + std::condition_variable *not_empty) { LOG_INFO("Thread %s starting ...", thread_name.c_str()); - auto pause_time = kMinPauseTime; while (is_running->load() || !task_queue->IsEmpty()) { + std::unique_lock lock(*cv_lock); + not_empty->wait_for(lock, std::chrono::seconds(1), + [&] { return !task_queue->IsEmpty(); }); std::function task; - if (!task_queue->Dequeue(task)) { - // Polling with exponential back-off - std::this_thread::sleep_for(pause_time); - pause_time = std::min(pause_time * 2, kMaxPauseTime); - } else { + if (task_queue->Dequeue(task)) { + lock.unlock(); task(); - pause_time = kMinPauseTime; } } @@ -56,7 +54,8 @@ void WorkerPool::Startup() { if (is_running_.compare_exchange_strong(running, true)) { for (size_t i = 0; i < num_workers_; i++) { std::string name = pool_name_ + "-worker-" + std::to_string(i); - workers_.emplace_back(WorkerFunc, name, &is_running_, &task_queue_); + workers_.emplace_back(WorkerFunc, name, &is_running_, &task_queue_, + &cv_lock, ¬_empty); } } }