diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5d886f6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +tags +output diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..8f03cd2 --- /dev/null +++ b/Makefile @@ -0,0 +1,22 @@ +.PHONY: all test clean + +CXX := g++ +CXXFLAG := -g -Wall +LDFLAGS := -lpthread +DEPS_INCLUDE_PATH=-I deps/simple_log/output/include/ +DEPS_LIB_PATH=deps/simple_log/output/lib/libsimplelog.a + +all: + echo "make all" + mkdir -p output/include + mkdir -p output/lib + mkdir -p output/bin + $(CXX) $(CXXFLAG) $(DEPS_INCLUDE_PATH) -c src/threadpool.cpp -o src/threadpool.o + ar -rcs libthreadpool.a src/*.o + rm -rf src/*.o + + cp src/*.h output/include/ + mv libthreadpool.a output/lib/ + +test: test/threadpool_test.cpp + $(CXX) $(CXXFLAG) $(LDFLAGS) -I output/include $(DEPS_INCLUDE_PATH) test/threadpool_test.cpp output/lib/libthreadpool.a $(DEPS_LIB_PATH) -o output/bin/threadpool_test diff --git a/deps/simple_log/Makefile b/deps/simple_log/Makefile new file mode 100644 index 0000000..49c395f --- /dev/null +++ b/deps/simple_log/Makefile @@ -0,0 +1,22 @@ +.PHONY: all test clean + +CXX := g++ +CXXFLAG := -g -Wall +LDFLAGS := -lpthread + +all: + echo "make all" + mkdir -p output/include + mkdir -p output/lib + mkdir -p output/bin + $(CXX) $(CXXFLAG)-c src/simple_config.cpp -o src/simple_config.o + $(CXX) $(CXXFLAG)-c src/simple_log.cpp -o src/simple_log.o + ar -rcs libsimplelog.a src/*.o + rm -rf src/*.o + + cp src/*.h output/include/ + mv libsimplelog.a output/lib/ + +test: src/simple_log.cpp test/simple_log_test.cpp + $(CXX) $(CXXFLAG) $(LDFLAGS) -I output/include test/simple_log_test.cpp output/lib/libsimplelog.a -o output/bin/simple_log_test + mkdir -p log diff --git a/deps/simple_log/src/simple_config.cpp b/deps/simple_log/src/simple_config.cpp new file mode 100644 index 0000000..9454ffe --- /dev/null +++ b/deps/simple_log/src/simple_config.cpp @@ -0,0 +1,34 @@ +/* + * simple_config.cpp + * + * Created on: Dec 27, 2014 + * Author: liao + */ +#include +#include +#include "simple_config.h" + +int get_config_map(const char *config_file, std::map &configs) { + std::ifstream fs(config_file); + if(!fs.is_open()) { + return -1; + } + + while(fs.good()) { + std::string line; + std::getline(fs, line); + + if (line[0] == '#') { + continue; + } + std::stringstream ss; + ss << line; + std::string key, value; + std::getline(ss, key, '='); + std::getline(ss, value, '='); + + configs[key] = value; + } + fs.close(); + return 0; +} diff --git a/deps/simple_log/src/simple_config.h b/deps/simple_log/src/simple_config.h new file mode 100644 index 0000000..46b80b8 --- /dev/null +++ b/deps/simple_log/src/simple_config.h @@ -0,0 +1,16 @@ +/* + * simple_config.h + * + * Created on: Dec 26, 2014 + * Author: liao + */ + +#ifndef SIMPLE_CONFIG_H_ +#define SIMPLE_CONFIG_H_ + +#include +#include + +int get_config_map(const char *config_file, std::map &configs); + +#endif /* SIMPLE_CONFIG_H_ */ diff --git a/deps/simple_log/src/simple_log.cpp b/deps/simple_log/src/simple_log.cpp new file mode 100644 index 0000000..615303d --- /dev/null +++ b/deps/simple_log/src/simple_log.cpp @@ -0,0 +1,269 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "stdarg.h" +#include +#include + +#include "simple_config.h" +#include "simple_log.h" + +// log context +const int MAX_SINGLE_LOG_SIZE = 2048; +const int ONE_DAY_SECONDS = 86400; + +int log_level = DEBUG_LEVEL; +std::string g_dir; +std::string g_config_file; +bool use_file_appender = false; +FileAppender g_file_appender; + +FileAppender::FileAppender() { + _is_inited = false; + _retain_day = -1; +} + +FileAppender::~FileAppender() { + if (_fs.is_open()) { + _fs.close(); + } +} + +int FileAppender::init(std::string dir, std::string log_file) { + if (!dir.empty()) { + int ret = mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + if (ret != 0 && errno != EEXIST) { + printf("mkdir error which dir:%s err:%s\n", dir.c_str(), strerror(errno)); + _is_inited = true; + return -1; + } + } else { + dir = "."; // current dir + } + _log_dir = dir; + _log_file = log_file; + _log_file_path = dir + "/" + log_file; + _fs.open(_log_file_path.c_str(), std::fstream::out | std::fstream::app); + _is_inited = true; + pthread_rwlock_init(&rwlock, NULL); + return 0; +} + +int FileAppender::write_log(char *log, const char *format, va_list ap) { + pthread_rwlock_rdlock(&rwlock); + if (_fs.is_open()) { + vsnprintf(log, MAX_SINGLE_LOG_SIZE - 1, format, ap); + _fs << log << "\n"; + _fs.flush(); + } + pthread_rwlock_unlock(&rwlock); + return 0; +} + +int FileAppender::shift_file_if_need(struct timeval tv, struct timezone tz) { + if (_last_sec == 0) { + _last_sec = tv.tv_sec; + return 0; + } + long fix_now_sec = tv.tv_sec - tz.tz_minuteswest * 60; + long fix_last_sec = _last_sec - tz.tz_minuteswest * 60; + if (fix_now_sec / ONE_DAY_SECONDS - fix_last_sec / ONE_DAY_SECONDS) { + pthread_rwlock_wrlock(&rwlock); + + struct tm *tm; + time_t y_sec = tv.tv_sec - ONE_DAY_SECONDS; + tm = localtime(&y_sec); //yesterday + char new_file[100]; + bzero(new_file, 100); + sprintf(new_file, "%s.%04d-%02d-%02d", + _log_file.c_str(), + tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday); + std::string new_file_path = _log_dir + "/" + new_file; + if (access(new_file_path.c_str(), F_OK) != 0) { + rename(_log_file_path.c_str(), new_file_path.c_str()); + // reopen new log file + _fs.close(); + _fs.open(_log_file_path.c_str(), std::fstream::out | std::fstream::app); + } + + pthread_rwlock_unlock(&rwlock); + + delete_old_log(tv); + } + + _last_sec = tv.tv_sec; + return 0; +} + +bool FileAppender::is_inited() { + return _is_inited; +} + +void FileAppender::set_retain_day(int rd) { + _retain_day = rd; +} + +int FileAppender::delete_old_log(timeval tv) { + if (_retain_day <= 0) { + return 0; + } + struct timeval old_tv; + old_tv.tv_sec = tv.tv_sec - _retain_day * 3600 * 24; + old_tv.tv_usec = tv.tv_usec; + char old_file[100]; + memset(old_file, 0, 100); + struct tm *tm; + tm = localtime(&old_tv.tv_sec); + sprintf(old_file, "%s.%04d-%02d-%02d", + _log_file.c_str(), tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday - 1); + std::string old_file_path = _log_dir + "/" + old_file; + return remove(old_file_path.c_str()); +} + +int _check_config_file() { + std::map configs; + std::string log_config_file = g_dir + "/" + g_config_file; + get_config_map(log_config_file.c_str(), configs); + if (configs.empty()) { + return 0; + } + // read log level + std::string log_level_str = configs["log_level"]; + set_log_level(log_level_str.c_str()); + + std::string rd = configs["retain_day"]; + if (!rd.empty()) { + g_file_appender.set_retain_day(atoi(rd.c_str())); + } + // read log file + std::string dir = configs["log_dir"]; + std::string log_file = configs["log_file"]; + int ret = 0; + if (!log_file.empty()) { + use_file_appender = true; + if (!g_file_appender.is_inited()) { + ret = g_file_appender.init(dir, log_file); + } + } + return ret; +} + +void sigreload(int sig) { + //printf("receive sig:%d \n", sig); + _check_config_file(); +} + +std::string _get_show_time(timeval tv) { + char show_time[40]; + memset(show_time, 0, 40); + + struct tm *tm; + tm = localtime(&tv.tv_sec); + + sprintf(show_time, "%04d-%02d-%02d %02d:%02d:%02d.%03d", + tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, + tm->tm_hour, tm->tm_min, tm->tm_sec, (int)(tv.tv_usec/1000)); + return std::string(show_time); +} + +int _get_log_level(const char *level_str) { + if(strcasecmp(level_str, "ERROR") == 0) { + return ERROR_LEVEL; + } + if(strcasecmp(level_str, "WARN") == 0) { + return WARN_LEVEL; + } + if(strcasecmp(level_str, "INFO") == 0) { + return INFO_LEVEL; + } + if(strcasecmp(level_str, "DEBUG") == 0) { + return DEBUG_LEVEL; + } + return DEBUG_LEVEL; +} + +void set_log_level(const char *level) { + log_level = _get_log_level(level); +} + +void _log(const char *format, va_list ap) { + if (!use_file_appender) { // if no config, send log to stdout + vprintf(format, ap); + printf("\n"); + return; + } + struct timeval now; + struct timezone tz; + gettimeofday(&now, &tz); + std::string fin_format = _get_show_time(now) + " " + format; + + g_file_appender.shift_file_if_need(now, tz); + char single_log[MAX_SINGLE_LOG_SIZE]; + bzero(single_log, MAX_SINGLE_LOG_SIZE); + g_file_appender.write_log(single_log, fin_format.c_str(), ap); +} + +int log_init(std::string dir, std::string file) { + g_dir = dir; + g_config_file = file; + signal(SIGUSR1, sigreload); + return _check_config_file(); +} + +void log_error(const char *format, ...) { + if (log_level < ERROR_LEVEL) { + return; + } + + va_list ap; + va_start(ap, format); + + _log(format, ap); + + va_end(ap); +} + +void log_warn(const char *format, ...) { + if (log_level < WARN_LEVEL) { + return; + } + + va_list ap; + va_start(ap, format); + + _log(format, ap); + + va_end(ap); +} + +void log_info(const char *format, ...) { + if (log_level < INFO_LEVEL) { + return; + } + + va_list ap; + va_start(ap, format); + + _log(format, ap); + + va_end(ap); +} + +void log_debug(const char *format, ...) { + if (log_level < DEBUG_LEVEL) { + return; + } + + va_list ap; + va_start(ap, format); + + _log(format, ap); + + va_end(ap); +} diff --git a/deps/simple_log/src/simple_log.h b/deps/simple_log/src/simple_log.h new file mode 100644 index 0000000..9a09fa7 --- /dev/null +++ b/deps/simple_log/src/simple_log.h @@ -0,0 +1,69 @@ +#ifndef SIMPLE_LOG_H +#define SIMPLE_LOG_H + +#include +#include +#include +#include "sys/time.h" +#include + +const int ERROR_LEVEL = 1; +const int WARN_LEVEL = 2; +const int INFO_LEVEL = 3; +const int DEBUG_LEVEL = 4; + +// log config +extern int log_level; + +#define LOG_ERROR(format, args...) \ + if(log_level >= ERROR_LEVEL) { \ + log_error("%s %s(%d): " format, "ERROR", __FILE__, __LINE__, ##args); \ + } + +#define LOG_WARN(format, args...) \ + if(log_level >= WARN_LEVEL) { \ + log_warn("%s %s(%d): " format, "WARN", __FILE__, __LINE__, ##args); \ + } + +#define LOG_INFO(format, args...) \ + if(log_level >= INFO_LEVEL) { \ + log_info("%s %s(%d): " format, "INFO", __FILE__, __LINE__, ##args); \ + } + +#define LOG_DEBUG(format, args...) \ + if(log_level >= DEBUG_LEVEL) { \ + log_debug("%s %s(%d): " format, "DEBUG", __FILE__, __LINE__, ##args); \ + } + + +std::string _get_show_time(); + +int log_init(std::string dir, std::string file); +void log_error(const char *format, ...); +void log_warn(const char *format, ...); +void log_info(const char *format, ...); +void log_debug(const char *format, ...); +void set_log_level(const char *level); + +class FileAppender { + public: + FileAppender(); + ~FileAppender(); + int init(std::string dir, std::string file); + bool is_inited(); + int write_log(char *log, const char *format, va_list ap); + int shift_file_if_need(struct timeval tv, struct timezone tz); + int delete_old_log(timeval tv); + void set_retain_day(int rd); + private: + std::fstream _fs; + std::string _log_file; + std::string _log_dir; + std::string _log_file_path; + long _last_sec; + bool _is_inited; + int _retain_day; + pthread_rwlock_t rwlock; +}; + +#endif diff --git a/threadpool.cpp b/src/threadpool.cpp similarity index 65% rename from threadpool.cpp rename to src/threadpool.cpp index bf42450..8d0f5f8 100644 --- a/threadpool.cpp +++ b/src/threadpool.cpp @@ -2,6 +2,7 @@ #include #include +#include "simple_log.h" Task::Task(void (*fn_ptr)(void*), void* arg) : m_fn_ptr(fn_ptr), m_arg(arg) { @@ -11,28 +12,12 @@ Task::~Task() { } -void Task::operator()() -{ - (*m_fn_ptr)(m_arg); - if (m_arg != NULL) { - delete m_arg; - } -} - void Task::run() { (*m_fn_ptr)(m_arg); } -ThreadPool::ThreadPool() : m_pool_size(DEFAULT_POOL_SIZE) -{ - cout << "Constructed ThreadPool of size " << m_pool_size << endl; -} - -ThreadPool::ThreadPool(int pool_size) : m_pool_size(pool_size) -{ - cout << "Constructed ThreadPool of size " << m_pool_size << endl; -} +ThreadPool::ThreadPool() {} ThreadPool::~ThreadPool() { @@ -53,23 +38,23 @@ void* start_thread(void* arg) return NULL; } -int ThreadPool::initialize_threadpool() +int ThreadPool::init(int pool_size) { - // TODO: COnsider lazy loading threads instead of creating all at once - m_pool_state = STARTED; - int ret = -1; - for (int i = 0; i < m_pool_size; i++) { - pthread_t tid; - ret = pthread_create(&tid, NULL, start_thread, (void*) this); - if (ret != 0) { - cerr << "pthread_create() failed: " << ret << endl; - return -1; + m_pool_size = pool_size; + m_pool_state = STARTED; + int ret = -1; + for (int i = 0; i < m_pool_size; i++) { + pthread_t tid; + ret = pthread_create(&tid, NULL, start_thread, (void*) this); + if (ret != 0) { + LOG_ERROR("pthread_create() failed: %d", ret); + return -1; + } + m_threads.push_back(tid); } - m_threads.push_back(tid); - } - cout << m_pool_size << " threads created by the thread pool" << endl; + LOG_INFO("%d threads created by the thread pool", m_pool_size); - return 0; + return 0; } int ThreadPool::destroy_threadpool() @@ -81,27 +66,27 @@ int ThreadPool::destroy_threadpool() m_task_mutex.lock(); m_pool_state = STOPPED; m_task_mutex.unlock(); - cout << "Broadcasting STOP signal to all threads..." << endl; + LOG_INFO("Broadcasting STOP signal to all threads..."); m_task_cond_var.broadcast(); // notify all threads we are shttung down int ret = -1; for (int i = 0; i < m_pool_size; i++) { void* result; ret = pthread_join(m_threads[i], &result); - cout << "pthread_join() returned " << ret << ": " << strerror(errno) << endl; + LOG_DEBUG("pthread_join() returned %d:%s", ret, strerror(errno)); m_task_cond_var.broadcast(); // try waking up a bunch of threads that are still waiting } - cout << m_pool_size << " threads exited from the thread pool" << endl; + LOG_INFO("%d threads exited from the thread pool", m_pool_size); return 0; } void* ThreadPool::execute_thread() { Task* task = NULL; - cout << "Starting thread " << pthread_self() << endl; + LOG_DEBUG("Starting thread :%u", pthread_self()); while(true) { // Try to pick a task - cout << "Locking: " << pthread_self() << endl; + LOG_DEBUG("Locking: %u", pthread_self()); m_task_mutex.lock(); // We need to put pthread_cond_wait in a loop for two reasons: @@ -113,26 +98,26 @@ void* ThreadPool::execute_thread() while ((m_pool_state != STOPPED) && (m_tasks.empty())) { // Wait until there is a task in the queue // Unlock mutex while wait, then lock it back when signaled - cout << "Unlocking and waiting: " << pthread_self() << endl; + LOG_DEBUG("Unlocking and waiting: %u", pthread_self()); m_task_cond_var.wait(m_task_mutex.get_mutex_ptr()); - cout << "Signaled and locking: " << pthread_self() << endl; + LOG_DEBUG("Signaled and locking: %u", pthread_self()); } // If the thread was woken up to notify process shutdown, return from here if (m_pool_state == STOPPED) { - cout << "Unlocking and exiting: " << pthread_self() << endl; + LOG_DEBUG("Unlocking and exiting: %u", pthread_self()); m_task_mutex.unlock(); pthread_exit(NULL); } task = m_tasks.front(); m_tasks.pop_front(); - cout << "Unlocking: " << pthread_self() << endl; + LOG_DEBUG("Unlocking: %u", pthread_self()); m_task_mutex.unlock(); //cout << "Executing thread " << pthread_self() << endl; // execute the task - (*task)(); // could also do task->run(arg); + task->run(); // //cout << "Done executing thread " << pthread_self() << endl; delete task; } diff --git a/threadpool.h b/src/threadpool.h similarity index 91% rename from threadpool.h rename to src/threadpool.h index b5dedc7..2a26d59 100644 --- a/threadpool.h +++ b/src/threadpool.h @@ -2,14 +2,9 @@ #define _H_THREADPOOL #include - #include -#include #include -using namespace std; - -const int DEFAULT_POOL_SIZE = 10; const int STARTED = 0; const int STOPPED = 1; @@ -65,7 +60,6 @@ class Task // Task(TCLass::* obj_fn_ptr); // pass an object method pointer Task(void (*fn_ptr)(void*), void* arg); // pass a free function pointer ~Task(); - void operator()(); void run(); private: // TClass* _obj_fn_ptr; @@ -77,9 +71,8 @@ class ThreadPool { public: ThreadPool(); - ThreadPool(int pool_size); ~ThreadPool(); - int initialize_threadpool(); + int init(int pool_size); int destroy_threadpool(); void* execute_thread(); int add_task(Task* task); diff --git a/test/threadpool_test.cpp b/test/threadpool_test.cpp new file mode 100644 index 0000000..99185c7 --- /dev/null +++ b/test/threadpool_test.cpp @@ -0,0 +1,36 @@ +#include "threadpool.h" +#include "simple_log.h" + +const int MAX_TASKS = 4; + +void hello(void* arg) +{ + int* x = (int*) arg; + LOG_INFO("Hello %d", *x); + delete (int *)arg; +} + +int main(int argc, char* argv[]) +{ + ThreadPool tp; + int ret = tp.init(2); + if (ret == -1) { + LOG_ERROR("Failed to initialize thread pool!"); + return 0; + } + + for (int i = 0; i < MAX_TASKS; i++) { + int* x = new int(); + *x = i+1; + Task* t = new Task(&hello, (void*) x); + tp.add_task(t); + } + + sleep(2); + + tp.destroy_threadpool(); + + LOG_INFO("Exiting app..."); + + return 0; +} diff --git a/threadpool_test.cpp b/threadpool_test.cpp deleted file mode 100644 index b377ec4..0000000 --- a/threadpool_test.cpp +++ /dev/null @@ -1,43 +0,0 @@ -#include "threadpool.h" - -#include - -using namespace std; - -const int MAX_TASKS = 4; - -void hello(void* arg) -{ - int* x = (int*) arg; - cout << "Hello " << *x << endl; -// cout << "\n"; -} - -int main(int argc, char* argv[]) -{ - ThreadPool tp(2); - int ret = tp.initialize_threadpool(); - if (ret == -1) { - cerr << "Failed to initialize thread pool!" << endl; - return 0; - } - - for (int i = 0; i < MAX_TASKS; i++) { - int* x = new int(); - *x = i+1; - Task* t = new Task(&hello, (void*) x); -// cout << "Adding to pool, task " << i+1 << endl; - tp.add_task(t); -// cout << "Added to pool, task " << i+1 << endl; - } - - sleep(2); - - tp.destroy_threadpool(); - - // TODO: delete worker objects - - cout << "Exiting app..." << endl; - - return 0; -}