From 265f85ce0984ccdf3874ada1428ca9b1acaf0559 Mon Sep 17 00:00:00 2001 From: mbutrovich Date: Tue, 19 Jun 2018 15:32:21 -0400 Subject: [PATCH 1/3] Modify WorkerPool threads to not sleep for fixed amount of time, but rather wake up when something goes into the task_queue for them to work on. --- src/include/threadpool/mono_queue_pool.h | 6 +++++- src/include/threadpool/worker_pool.h | 5 +++++ src/threadpool/worker_pool.cpp | 21 ++++++++++----------- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/include/threadpool/mono_queue_pool.h b/src/include/threadpool/mono_queue_pool.h index fbee1985f22..f1d9fc0a87b 100644 --- a/src/include/threadpool/mono_queue_pool.h +++ b/src/include/threadpool/mono_queue_pool.h @@ -83,7 +83,11 @@ inline void MonoQueuePool::SubmitTask(const F &func) { if (!is_running_) { Startup(); } - task_queue_.Enqueue(std::move(func)); + { + std::lock_guard lock(worker_pool_.cv_lock); + task_queue_.Enqueue(std::move(func)); + } + worker_pool_.not_empty.notify_one(); } inline MonoQueuePool &MonoQueuePool::GetInstance() { diff --git a/src/include/threadpool/worker_pool.h b/src/include/threadpool/worker_pool.h index 2c9ee30ef96..797f1cf2c5b 100644 --- a/src/include/threadpool/worker_pool.h +++ b/src/include/threadpool/worker_pool.h @@ -13,7 +13,9 @@ #pragma once #include +#include #include +#include #include #include #include @@ -52,6 +54,9 @@ class WorkerPool { */ uint32_t NumWorkers() const { return num_workers_; } + std::mutex cv_lock; + std::condition_variable not_empty; + private: // The name of this pool std::string pool_name_; diff --git a/src/threadpool/worker_pool.cpp b/src/threadpool/worker_pool.cpp index 740e5190dd4..69cdc21ebc3 100644 --- a/src/threadpool/worker_pool.cpp +++ b/src/threadpool/worker_pool.cpp @@ -10,32 +10,31 @@ // //===----------------------------------------------------------------------===// +#include + #include "threadpool/worker_pool.h" #include "common/logger.h" + namespace peloton { namespace threadpool { namespace { void WorkerFunc(std::string thread_name, std::atomic_bool *is_running, - TaskQueue *task_queue) { - constexpr auto kMinPauseTime = std::chrono::microseconds(1); - constexpr auto kMaxPauseTime = std::chrono::microseconds(1000); + TaskQueue *task_queue, std::mutex *cv_lock, + std::condition_variable *not_empty) { LOG_INFO("Thread %s starting ...", thread_name.c_str()); - auto pause_time = kMinPauseTime; while (is_running->load() || !task_queue->IsEmpty()) { + std::unique_lock lock(*cv_lock); + not_empty->wait_for(lock, std::chrono::milliseconds(1), [&]{return !task_queue->IsEmpty();}); std::function task; - if (!task_queue->Dequeue(task)) { - // Polling with exponential back-off - std::this_thread::sleep_for(pause_time); - pause_time = std::min(pause_time * 2, kMaxPauseTime); - } else { + if (task_queue->Dequeue(task)) { + lock.unlock(); task(); - pause_time = kMinPauseTime; } } @@ -56,7 +55,7 @@ void WorkerPool::Startup() { if (is_running_.compare_exchange_strong(running, true)) { for (size_t i = 0; i < num_workers_; i++) { std::string name = pool_name_ + "-worker-" + std::to_string(i); - workers_.emplace_back(WorkerFunc, name, &is_running_, &task_queue_); + workers_.emplace_back(WorkerFunc, name, &is_running_, &task_queue_, &cv_lock, ¬_empty); } } } From e500ccf94c8025d6c18f1d0570802736b8409de1 Mon Sep 17 00:00:00 2001 From: mbutrovich Date: Tue, 19 Jun 2018 15:33:42 -0400 Subject: [PATCH 2/3] clang-format. --- src/threadpool/worker_pool.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/threadpool/worker_pool.cpp b/src/threadpool/worker_pool.cpp index 69cdc21ebc3..1e798940011 100644 --- a/src/threadpool/worker_pool.cpp +++ b/src/threadpool/worker_pool.cpp @@ -16,7 +16,6 @@ #include "common/logger.h" - namespace peloton { namespace threadpool { @@ -25,12 +24,12 @@ namespace { void WorkerFunc(std::string thread_name, std::atomic_bool *is_running, TaskQueue *task_queue, std::mutex *cv_lock, std::condition_variable *not_empty) { - LOG_INFO("Thread %s starting ...", thread_name.c_str()); while (is_running->load() || !task_queue->IsEmpty()) { std::unique_lock lock(*cv_lock); - not_empty->wait_for(lock, std::chrono::milliseconds(1), [&]{return !task_queue->IsEmpty();}); + not_empty->wait_for(lock, std::chrono::milliseconds(1), + [&] { return !task_queue->IsEmpty(); }); std::function task; if (task_queue->Dequeue(task)) { lock.unlock(); @@ -55,7 +54,8 @@ void WorkerPool::Startup() { if (is_running_.compare_exchange_strong(running, true)) { for (size_t i = 0; i < num_workers_; i++) { std::string name = pool_name_ + "-worker-" + std::to_string(i); - workers_.emplace_back(WorkerFunc, name, &is_running_, &task_queue_, &cv_lock, ¬_empty); + workers_.emplace_back(WorkerFunc, name, &is_running_, &task_queue_, + &cv_lock, ¬_empty); } } } From 64567a5fcf9afe679c29927f05091e42b9ad429c Mon Sep 17 00:00:00 2001 From: mbutrovich Date: Wed, 20 Jun 2018 10:40:56 -0400 Subject: [PATCH 3/3] Relax timeout for thread waiting to not be so aggressive. --- src/threadpool/worker_pool.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/threadpool/worker_pool.cpp b/src/threadpool/worker_pool.cpp index 1e798940011..e5696552670 100644 --- a/src/threadpool/worker_pool.cpp +++ b/src/threadpool/worker_pool.cpp @@ -28,7 +28,7 @@ void WorkerFunc(std::string thread_name, std::atomic_bool *is_running, while (is_running->load() || !task_queue->IsEmpty()) { std::unique_lock lock(*cv_lock); - not_empty->wait_for(lock, std::chrono::milliseconds(1), + not_empty->wait_for(lock, std::chrono::seconds(1), [&] { return !task_queue->IsEmpty(); }); std::function task; if (task_queue->Dequeue(task)) {