Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Global.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#pragma once

// pre processor symbols
//#define VERBOSE

const int DEFAULT_POOL_SIZE = 10;
const int STARTED = 0;
const int STOPPED = 1;
17 changes: 10 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
OBJPATH=bin/obj
EXAMPLEPATH=bin/example
BINPATH=./bin
OBJPATH=./obj

all:
g++ CondVar.cpp -lpthread -c -o $(OBJPATH)/CondVar.o
g++ Mutex.cpp -lpthread -c -o $(OBJPATH)/Mutex.o
g++ Task.cpp -lpthread -c -o $(OBJPATH)/Task.o
g++ ThreadPool.cpp -lpthread -c -o $(OBJPATH)/ThreadPool.o
g++ $(OBJPATH)/CondVar.o $(OBJPATH)/Mutex.o $(OBJPATH)/Task.o $(OBJPATH)/ThreadPool.o threadpool_test.cpp -lpthread -o $(EXAMPLEPATH)threadpool_test
g++ CondVar.cpp -lpthread -fpic -c -o $(OBJPATH)/CondVar.o
g++ Mutex.cpp -lpthread -fpic -c -o $(OBJPATH)/Mutex.o
g++ Task.cpp -lpthread -fpic -c -o $(OBJPATH)/Task.o
g++ ThreadPool.cpp -lpthread -fpic -c -o $(OBJPATH)/ThreadPool.o
g++ -fPIC -shared -lpthread $(OBJPATH)/CondVar.o $(OBJPATH)/Mutex.o $(OBJPATH)/Task.o $(OBJPATH)/ThreadPool.o -o $(BINPATH)/libthreadpool.so
# g++ threadpool_test.cpp -L$(BINPATH) -lthreadpool -lpthread -fpic -o $(BINPATH)/threadpool_test

g++ $(OBJPATH)/CondVar.o $(OBJPATH)/Mutex.o $(OBJPATH)/Task.o $(OBJPATH)/ThreadPool.o threadpool_test.cpp -lpthread -o $(BINPATH)/threadpool_test

#all:
# g++ threadpool.cpp -lpthread -fpic -c -o bin/obj/threadpool.o
Expand Down
3 changes: 2 additions & 1 deletion Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ Task::~Task() {
}

void Task::operator()() {
(*m_fn_ptr)(m_arg);
// (*m_fn_ptr)(m_arg);
run(); // we can reuse run() here instead of copying the code from run
if (m_arg != NULL) {
delete m_arg;
}
Expand Down
26 changes: 24 additions & 2 deletions ThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

ThreadPool::ThreadPool() : m_pool_size(DEFAULT_POOL_SIZE)
{
#ifdef VERBOSE
cout << "Constructed ThreadPool of size " << m_pool_size << endl;
#endif
}

ThreadPool::ThreadPool(int pool_size) : m_pool_size(pool_size)
{
#ifdef VERBOSE
cout << "Constructed ThreadPool of size " << m_pool_size << endl;
#endif
}

ThreadPool::~ThreadPool()
Expand Down Expand Up @@ -43,7 +47,9 @@ int ThreadPool::initialize_threadpool()
}
m_threads.push_back(tid);
}
#ifdef VERBOSE
cout << m_pool_size << " threads created by the thread pool" << endl;
#endif

return 0;
}
Expand All @@ -57,27 +63,37 @@ int ThreadPool::destroy_threadpool()
m_task_mutex.lock();
m_pool_state = STOPPED;
m_task_mutex.unlock();
#ifdef VERBOSE
cout << "Broadcasting STOP signal to all threads..." << endl;
#endif
m_task_cond_var.broadcast(); // notify all threads we are shttung down

int ret = -1;
for (int i = 0; i < m_pool_size; i++) {
void* result;
ret = pthread_join(m_threads[i], &result);
#ifdef VERBOSE
cout << "pthread_join() returned " << ret << ": " << strerror(errno) << endl;
#endif
m_task_cond_var.broadcast(); // try waking up a bunch of threads that are still waiting
}
#ifdef VERBOSE
cout << m_pool_size << " threads exited from the thread pool" << endl;
#endif
return 0;
}

void* ThreadPool::execute_thread()
{
Task* task = NULL;
#ifdef VERBOSE
cout << "Starting thread " << pthread_self() << endl;
#endif
while(true) {
// Try to pick a task
#ifdef VERBOSE
cout << "Locking: " << pthread_self() << endl;
#endif
m_task_mutex.lock();

// We need to put pthread_cond_wait in a loop for two reasons:
Expand All @@ -89,27 +105,33 @@ void* ThreadPool::execute_thread()
while ((m_pool_state != STOPPED) && (m_tasks.empty())) {
// Wait until there is a task in the queue
// Unlock mutex while wait, then lock it back when signaled
#ifdef VERBOSE
cout << "Unlocking and waiting: " << pthread_self() << endl;
#endif
m_task_cond_var.wait(m_task_mutex.get_mutex_ptr());
#ifdef VERBOSE
cout << "Signaled and locking: " << pthread_self() << endl;
#endif
}

// If the thread was woken up to notify process shutdown, return from here
if (m_pool_state == STOPPED) {
#ifdef VERBOSE
cout << "Unlocking and exiting: " << pthread_self() << endl;
#endif
m_task_mutex.unlock();
pthread_exit(NULL);
}

task = m_tasks.front();
m_tasks.pop_front();
#ifdef VERBOSE
cout << "Unlocking: " << pthread_self() << endl;
#endif
m_task_mutex.unlock();

//cout << "Executing thread " << pthread_self() << endl;
// execute the task
(*task)(); // could also do task->run(arg);
//cout << "Done executing thread " << pthread_self() << endl;
delete task;
}
return NULL;
Expand Down
Binary file removed bin/example/threadpool_test
Binary file not shown.
Binary file removed bin/examplethreadpool_test
Binary file not shown.
1 change: 0 additions & 1 deletion threadpool_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ void hello(void* arg)
{
int* x = (int*) arg;
cout << "Hello " << *x << endl;
// cout << "\n";
}

int main(int argc, char* argv[])
Expand Down