Skip to content

Commit bcc9126

Browse files
authored
Merge pull request #14056 from panyx0718/fix
Fix threadpool
2 parents 961baea + 70effdd commit bcc9126

File tree

3 files changed

+19
-52
lines changed

3 files changed

+19
-52
lines changed

paddle/fluid/framework/threadpool.cc

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ DEFINE_int32(dist_threadpool_size, 0,
2525

2626
namespace paddle {
2727
namespace framework {
28-
2928
std::unique_ptr<ThreadPool> ThreadPool::threadpool_(nullptr);
3029
std::once_flag ThreadPool::init_flag_;
3130

@@ -47,8 +46,7 @@ void ThreadPool::Init() {
4746
}
4847
}
4948

50-
ThreadPool::ThreadPool(int num_threads)
51-
: total_threads_(num_threads), idle_threads_(num_threads), running_(true) {
49+
ThreadPool::ThreadPool(int num_threads) : running_(true) {
5250
threads_.resize(num_threads);
5351
for (auto& thread : threads_) {
5452
// TODO(Yancey1989): binding the thread on the specify CPU number
@@ -59,6 +57,7 @@ ThreadPool::ThreadPool(int num_threads)
5957
ThreadPool::~ThreadPool() {
6058
{
6159
// notify all threads to stop running
60+
std::lock_guard<std::mutex> l(mutex_);
6261
running_ = false;
6362
scheduled_.notify_all();
6463
}
@@ -69,36 +68,24 @@ ThreadPool::~ThreadPool() {
6968
}
7069
}
7170

72-
void ThreadPool::Wait() {
73-
std::unique_lock<std::mutex> lock(mutex_);
74-
completed_.wait(lock, [=] { return Done() == true; });
75-
}
76-
7771
void ThreadPool::TaskLoop() {
78-
while (running_) {
72+
while (true) {
7973
std::unique_lock<std::mutex> lock(mutex_);
80-
scheduled_.wait(lock, [=] { return !tasks_.empty() || !running_; });
8174

82-
if (!running_) {
83-
break;
75+
scheduled_.wait(
76+
lock, [this] { return !this->tasks_.empty() || !this->running_; });
77+
78+
if (!running_ || tasks_.empty()) {
79+
return;
8480
}
81+
8582
// pop a task from the task queue
8683
auto task = std::move(tasks_.front());
8784
tasks_.pop();
88-
89-
--idle_threads_;
9085
lock.unlock();
9186

9287
// run the task
9388
task();
94-
95-
{
96-
std::unique_lock<std::mutex> lock(mutex_);
97-
++idle_threads_;
98-
if (Done()) {
99-
completed_.notify_all();
100-
}
101-
}
10289
}
10390
}
10491

paddle/fluid/framework/threadpool.h

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,6 @@ class ThreadPool {
5757

5858
~ThreadPool();
5959

60-
// Returns the number of threads created by the constructor.
61-
size_t Threads() const { return total_threads_; }
62-
63-
// Returns the number of currently idle threads.
64-
size_t IdleThreads() {
65-
std::unique_lock<std::mutex> lock(mutex_);
66-
return idle_threads_;
67-
}
68-
6960
// Run pushes a function to the task queue and returns a std::future
7061
// object. To wait for the completion of the task, call
7162
// std::future::wait().
@@ -94,25 +85,13 @@ class ThreadPool {
9485
});
9586
std::future<std::unique_ptr<platform::EnforceNotMet>> f = task.get_future();
9687
tasks_.push(std::move(task));
97-
lock.unlock();
9888
scheduled_.notify_one();
9989
return f;
10090
}
10191

102-
// Wait until all the tasks are completed.
103-
void Wait();
104-
10592
private:
10693
DISABLE_COPY_AND_ASSIGN(ThreadPool);
10794

108-
// If the task queue is empty and avaialbe is equal to the number of
109-
// threads, means that all tasks are completed. Note: this function
110-
// is not thread-safe. Returns true if all tasks are completed.
111-
// Note: don't delete the data member total_threads_ and use
112-
// threads_.size() instead; because you'd need to lock the mutex
113-
// before accessing threads_.
114-
bool Done() { return tasks_.empty() && idle_threads_ == total_threads_; }
115-
11695
// The constructor starts threads to run TaskLoop, which retrieves
11796
// and runs tasks from the queue.
11897
void TaskLoop();
@@ -125,14 +104,11 @@ class ThreadPool {
125104
static std::once_flag init_flag_;
126105

127106
std::vector<std::unique_ptr<std::thread>> threads_;
128-
const size_t total_threads_;
129-
size_t idle_threads_;
130107

131108
std::queue<Task> tasks_;
132109
std::mutex mutex_;
133110
bool running_;
134111
std::condition_variable scheduled_;
135-
std::condition_variable completed_;
136112
};
137113

138114
class ThreadPoolIO : ThreadPool {

paddle/fluid/framework/threadpool_test.cc

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ limitations under the License. */
1919

2020
namespace framework = paddle::framework;
2121

22-
void do_sum(framework::ThreadPool* pool, std::atomic<int>* sum, int cnt) {
23-
std::vector<std::future<void>> fs;
22+
void do_sum(std::vector<std::future<void>>* fs, std::mutex* mu,
23+
std::atomic<int>* sum, int cnt) {
2424
for (int i = 0; i < cnt; ++i) {
25-
fs.push_back(framework::Async([sum]() { sum->fetch_add(1); }));
25+
std::lock_guard<std::mutex> l(*mu);
26+
fs->push_back(framework::Async([sum]() { sum->fetch_add(1); }));
2627
}
2728
}
2829

@@ -40,18 +41,21 @@ TEST(ThreadPool, ConcurrentInit) {
4041
}
4142

4243
TEST(ThreadPool, ConcurrentRun) {
43-
framework::ThreadPool* pool = framework::ThreadPool::GetInstance();
4444
std::atomic<int> sum(0);
4545
std::vector<std::thread> threads;
46+
std::vector<std::future<void>> fs;
47+
std::mutex fs_mu;
4648
int n = 50;
4749
// sum = (n * (n + 1)) / 2
4850
for (int i = 1; i <= n; ++i) {
49-
std::thread t(do_sum, pool, &sum, i);
51+
std::thread t(do_sum, &fs, &fs_mu, &sum, i);
5052
threads.push_back(std::move(t));
5153
}
5254
for (auto& t : threads) {
5355
t.join();
5456
}
55-
pool->Wait();
57+
for (auto& t : fs) {
58+
t.wait();
59+
}
5660
EXPECT_EQ(sum, ((n + 1) * n) / 2);
5761
}

0 commit comments

Comments
 (0)