Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Commit fdc60ec

Browse files
committed
merge the two QueuePools together
1 parent ed97cf6 commit fdc60ec

File tree

5 files changed

+18
-87
lines changed

5 files changed

+18
-87
lines changed

src/common/init.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
#include "gc/gc_manager_factory.h"
2525
#include "settings/settings_manager.h"
2626
#include "threadpool/mono_queue_pool.h"
27-
#include "threadpool/brain_thread_pool.h"
2827

2928
namespace peloton {
3029

@@ -41,11 +40,11 @@ void PelotonInit::Initialize() {
4140
thread_pool.Initialize(0, std::thread::hardware_concurrency() + 3);
4241

4342
// start worker pool
44-
threadpool::MonoQueuePool::GetInstance().Startup();
43+
threadpool::MonoQueuePool::GetInstance(32, 4).Startup();
4544

4645
// start brain thread pool
4746
if (settings::SettingsManager::GetBool(settings::SettingId::brain)) {
48-
threadpool::BrainThreadPool::GetInstance().Startup();
47+
threadpool::MonoQueuePool::GetBrainInstance(32, 1).Startup();
4948
}
5049

5150
int parallelism = (std::thread::hardware_concurrency() + 3) / 4;
@@ -111,11 +110,11 @@ void PelotonInit::Shutdown() {
111110
concurrency::EpochManagerFactory::GetInstance().StopEpoch();
112111

113112
// stop worker pool
114-
threadpool::MonoQueuePool::GetInstance().Shutdown();
113+
threadpool::MonoQueuePool::GetInstance(32, 4).Shutdown();
115114

116115
// stop brain thread pool
117116
if (settings::SettingsManager::GetBool(settings::SettingId::brain)) {
118-
threadpool::BrainThreadPool::GetInstance().Shutdown();
117+
threadpool::MonoQueuePool::GetBrainInstance(32, 1).Shutdown();
119118
}
120119

121120
thread_pool.Shutdown();

src/gc/transaction_level_gc_manager.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
#include "storage/tile_group.h"
2323
#include "storage/tuple.h"
2424
#include "storage/storage_manager.h"
25-
#include "threadpool/brain_thread_pool.h"
25+
#include "threadpool/mono_queue_pool.h"
2626

2727

2828
namespace peloton {
@@ -136,7 +136,7 @@ int TransactionLevelGCManager::Unlink(const int &thread_id,
136136
std::vector<std::string> query_strings = txn_ctx->GetQueryStrings();
137137
if(query_strings.size() != 0) {
138138
uint64_t timestamp = txn_ctx->GetTimestamp();
139-
auto &pool = threadpool::BrainThreadPool::GetInstance();
139+
auto &pool = threadpool::MonoQueuePool::GetBrainInstance(32, 1);
140140
for(auto query_string: query_strings) {
141141
pool.SubmitTask([this, query_string, timestamp] {
142142
brain::QueryLogger::LogQuery(query_string, timestamp);

src/include/threadpool/brain_thread_pool.h

Lines changed: 0 additions & 70 deletions
This file was deleted.

src/include/threadpool/mono_queue_pool.h

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,15 @@
1717
namespace peloton {
1818
namespace threadpool {
1919

20-
// TODO: tune these variables
21-
constexpr static size_t kDefaultTaskQueueSize = 32;
22-
constexpr static size_t kDefaultWorkerPoolSize = 4;
23-
2420
/**
2521
* @brief Wrapper class for single queue and single pool.
2622
* One should use this if possible.
2723
*/
2824
class MonoQueuePool {
2925
public:
30-
MonoQueuePool()
31-
: task_queue_(kDefaultTaskQueueSize),
32-
worker_pool_(kDefaultWorkerPoolSize, &task_queue_),
26+
MonoQueuePool(int task_queue_size = 32, int worker_pool_size = 4)
27+
: task_queue_(task_queue_size),
28+
worker_pool_(worker_pool_size, &task_queue_),
3329
is_running_(false) {}
3430

3531
~MonoQueuePool() {
@@ -55,11 +51,17 @@ class MonoQueuePool {
5551
task_queue_.Enqueue(std::move(func));
5652
}
5753

58-
static MonoQueuePool &GetInstance() {
59-
static MonoQueuePool mono_queue_pool;
54+
static MonoQueuePool &GetInstance(int task_queue_size, int worker_pool_size) {
55+
static MonoQueuePool mono_queue_pool(task_queue_size, worker_pool_size);
6056
return mono_queue_pool;
6157
}
6258

59+
static MonoQueuePool &GetBrainInstance(int task_queue_size,
60+
int worker_pool_size) {
61+
static MonoQueuePool brain_queue_pool(task_queue_size, worker_pool_size);
62+
return brain_queue_pool;
63+
}
64+
6365
private:
6466
TaskQueue task_queue_;
6567
WorkerPool worker_pool_;

src/traffic_cop/traffic_cop.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ executor::ExecutionResult TrafficCop::ExecuteHelper(
187187
task_callback_(task_callback_arg_);
188188
};
189189

190-
auto &pool = threadpool::MonoQueuePool::GetInstance();
190+
auto &pool = threadpool::MonoQueuePool::GetInstance(32, 4);
191191
pool.SubmitTask([plan, txn, &params, &result, &result_format, on_complete] {
192192
executor::PlanExecutor::ExecutePlan(plan, txn, params, result_format,
193193
on_complete);

0 commit comments

Comments
 (0)