From 4e7cb0cf5bf61cafd34446bc04032e5d8131d596 Mon Sep 17 00:00:00 2001 From: hfcrwx Date: Fri, 29 May 2020 20:52:47 +0800 Subject: [PATCH 1/6] Add contrib/mariadbclient --- CMakeLists.txt | 5 + contrib/CMakeLists.txt | 6 + contrib/mariadbclient/CMakeLists.txt | 5 + contrib/mariadbclient/MariaDBClient.cc | 418 ++++++++++++++++++++++++ contrib/mariadbclient/MariaDBClient.h | 137 ++++++++ contrib/mariadbclient/mmariadbclient.cc | 130 ++++++++ muduo/net/Channel.cc | 48 +-- muduo/net/Channel.h | 12 +- muduo/net/poller/EPollPoller.cc | 4 +- 9 files changed, 742 insertions(+), 23 deletions(-) create mode 100644 contrib/mariadbclient/CMakeLists.txt create mode 100644 contrib/mariadbclient/MariaDBClient.cc create mode 100644 contrib/mariadbclient/MariaDBClient.h create mode 100644 contrib/mariadbclient/mmariadbclient.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 6071a4ec8..4326f2359 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -63,6 +63,8 @@ find_library(BOOSTPO_LIBRARY NAMES boost_program_options) find_library(BOOSTSYSTEM_LIBRARY NAMES boost_system) find_path(TCMALLOC_INCLUDE_DIR gperftools/heap-profiler.h) find_library(TCMALLOC_LIBRARY NAMES tcmalloc_and_profiler) +find_path(MARIADBCLIENT_INCLUDE_DIR mysql/mysql.h) +find_library(MARIADBCLIENT_LIBRARY NAMES mysqlclient) find_path(HIREDIS_INCLUDE_DIR hiredis/hiredis.h) find_library(HIREDIS_LIBRARY NAMES hiredis) find_path(GD_INCLUDE_DIR gd.h) @@ -86,6 +88,9 @@ endif() if(ZLIB_FOUND) message(STATUS "found zlib") endif() +if(MARIADBCLIENT_INCLUDE_DIR AND MARIADBCLIENT_LIBRARY) + message(STATUS "found mariadbclient") +endif() if(HIREDIS_INCLUDE_DIR AND HIREDIS_LIBRARY) message(STATUS "found hiredis") endif() diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 8e4a50ea1..adcbf9e86 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -4,6 +4,12 @@ else() add_subdirectory(hiredis EXCLUDE_FROM_ALL) endif() +if(MARIADBCLIENT_INCLUDE_DIR AND MARIADBCLIENT_LIBRARY) + add_subdirectory(mariadbclient) +else() + add_subdirectory(mariadbclient EXCLUDE_FROM_ALL) +endif() + if(THRIFT_COMPILER AND THRIFT_INCLUDE_DIR AND THRIFT_LIBRARY) add_subdirectory(thrift) else() diff --git a/contrib/mariadbclient/CMakeLists.txt b/contrib/mariadbclient/CMakeLists.txt new file mode 100644 index 000000000..ca3a0ade2 --- /dev/null +++ b/contrib/mariadbclient/CMakeLists.txt @@ -0,0 +1,5 @@ +add_library(muduo_mariadbclient MariaDBClient.cc) +target_link_libraries(muduo_mariadbclient muduo_net mysqlclient) + +add_executable(mmariadbclient mmariadbclient.cc) +target_link_libraries(mmariadbclient muduo_mariadbclient) diff --git a/contrib/mariadbclient/MariaDBClient.cc b/contrib/mariadbclient/MariaDBClient.cc new file mode 100644 index 000000000..c16836a4f --- /dev/null +++ b/contrib/mariadbclient/MariaDBClient.cc @@ -0,0 +1,418 @@ +#include "MariaDBClient.h" + +#include +#include +#include +#include + +#include + +#define NEXT_IMMEDIATE(newState) do { state = newState; goto again; } while (0) + +using namespace mariadbclient; +using namespace muduo; +using namespace muduo::net; + +static void dummy(const std::shared_ptr&) +{ +} + +MariaDBClient::MariaDBClient(EventLoop* loop, + const InetAddress& serverAddr, + const string& user, + const string& password, + const string& db) + : loop_(loop), + serverAddr_(serverAddr), + user_(user), + password_(password), + db_(db), + isConnected_(false), + res_(NULL), + row_(NULL) +{ +} + +MariaDBClient::~MariaDBClient() +{ + if (isConnected_) + { + disconnect(); + } +} + +void MariaDBClient::connect() +{ + loop_->runInLoop(std::bind(&MariaDBClient::connectInLoop, this)); +} + +void MariaDBClient::disconnect() +{ + loop_->runInLoop(std::bind(&MariaDBClient::disconnectInLoop, this)); +} + +void MariaDBClient::query(StringArg queryStr, const QueryCallback& cb) +{ + loop_->runInLoop(std::bind(&MariaDBClient::queryInLoop, this, string(queryStr.c_str()), cb)); +} + +void MariaDBClient::queryFetch(StringArg queryStr, const QueryFetchCallback& cb) +{ + loop_->runInLoop(std::bind(&MariaDBClient::queryFetchInLoop, this, string(queryStr.c_str()), cb)); +} + +void MariaDBClient::connectInLoop() +{ + loop_->assertInLoopThread(); + assert(!isConnected_); + + ::mysql_init(&mysql_); + ::mysql_options(&mysql_, MYSQL_OPT_NONBLOCK, 0); + + stateMachineHandler(kRealConnectStart); +} + +void MariaDBClient::disconnectInLoop() +{ + loop_->assertInLoopThread(); + assert(isConnected_); + + stateMachineHandler(kCloseStart); +} + +void MariaDBClient::queryInLoop(StringArg queryStr, const QueryCallback& cb) +{ + loop_->assertInLoopThread(); + assert(isConnected_); + + queries_.emplace_back(new QueryData(QueryData::kQuery, queryStr.c_str(), cb)); + + if (queries_.size() == 1) + { + stateMachineHandler(kRealQueryStart); + } +} + +void MariaDBClient::queryFetchInLoop(StringArg queryStr, const QueryFetchCallback& cb) +{ + loop_->assertInLoopThread(); + assert(isConnected_); + + queries_.emplace_back(new QueryData(QueryData::kQueryFetch, queryStr.c_str(), cb)); + + if (queries_.size() == 1) + { + stateMachineHandler(kRealQueryStart); + } +} + +void MariaDBClient::stateMachineHandler(int state, int revents, Timestamp receiveTime) +{ + loop_->assertInLoopThread(); + + int mysqlRevents = toMySQLEvents(revents); + MYSQL* ret = NULL; + int err = 0; + + again: + switch (state) + { + case kRealConnectStart: + { + int mysqlEvents = ::mysql_real_connect_start(&ret, + &mysql_, + serverAddr_.toIp().c_str(), + user_.c_str(), + password_.c_str(), + db_.c_str(), + implicit_cast(serverAddr_.toPort()), + NULL, + 0); + if (mysqlEvents != 0) + { + channel_.reset(new Channel(loop_, fd(), false)); + + channel_->setEventsCallback(std::bind(&MariaDBClient::stateMachineHandler, this, kRealConnectCont, _1, _2)); + int events = toEvents(mysqlEvents); + channel_->enableEvents(events); + } + else + { + NEXT_IMMEDIATE(kRealConnectEnd); + } + } + break; + + case kRealConnectCont: + { + int mysqlEvents = ::mysql_real_connect_cont(&ret, &mysql_, mysqlRevents); + if (mysqlEvents != 0) + { + int events = toEvents(mysqlEvents); + channel_->enableEvents(events); + } + else + { + NEXT_IMMEDIATE(kRealConnectEnd); + } + } + break; + + case kRealConnectEnd: + { + if (!ret) + { + LOG_ERROR << "Failed to mysql_real_connect(): " << errorStr(); + } + else + { + logConnection(true); + isConnected_ = true; + channel_->setEventsCallback(Channel::EventsCallback()); + channel_->disableAll(); + } + + if (connectCb_) + { + connectCb_(this); + } + } + break; + + case kRealQueryStart: + { + int mysqlEvents = + ::mysql_real_query_start(&err, + &mysql_, + queries_.front()->queryStr_.c_str(), + queries_.front()->queryStr_.size()); + if (mysqlEvents != 0) + { + channel_->setEventsCallback(std::bind(&MariaDBClient::stateMachineHandler, this, kRealQueryCont, _1, _2)); + int events = toEvents(mysqlEvents); + channel_->enableEvents(events); + } + else + { + NEXT_IMMEDIATE(kRealQueryEnd); + } + } + break; + + case kRealQueryCont: + { + int mysqlEvents = ::mysql_real_query_cont(&err, &mysql_, mysqlRevents); + if (mysqlEvents != 0) + { + int events = toEvents(mysqlEvents); + channel_->enableEvents(events); + } + else + { + NEXT_IMMEDIATE(kRealQueryEnd); + } + } + break; + + case kRealQueryEnd: + { + if (queries_.front()->type_ == QueryData::kQuery) + { + if (queries_.front()->queryCb_) + { + queries_.front()->queryCb_(this); + } + queries_.pop_front(); + + if (!queries_.empty()) + { + NEXT_IMMEDIATE(kRealQueryStart); + } + } + else + { + assert(queries_.front()->type_ == QueryData::kQueryFetch); + assert(queries_.front()->queryFetchCb_); + if (err) + { + LOG_ERROR << "mysql_real_query() returns error: " << errorStr(); + queries_.front()->queryFetchCb_(this, FetchResultPtr()); + } + else + { + res_ = ::mysql_use_result(&mysql_); + if (!res_) + { + LOG_ERROR << "mysql_use_result() returns error: " << errorStr(); + queries_.front()->queryFetchCb_(this, FetchResultPtr()); + } + else + { + result_.reset(new FetchResult); + NEXT_IMMEDIATE(kFetchRowStart); + } + } + } + } + break; + + case kFetchRowStart: + { + int mysqlEvents = ::mysql_fetch_row_start(&row_, res_); + if (mysqlEvents != 0) + { + channel_->setEventsCallback(std::bind(&MariaDBClient::stateMachineHandler, this, kFetchRowCont, _1, _2)); + + int events = toEvents(mysqlEvents); + channel_->enableEvents(events); + } + else + { + NEXT_IMMEDIATE(kFetchRowEnd); + } + } + break; + + case kFetchRowCont: + { + int mysqlEvents = ::mysql_fetch_row_cont(&row_, res_, mysqlRevents); + if (mysqlEvents != 0) + { + int events = toEvents(mysqlEvents); + channel_->enableEvents(events); + } + else + { + NEXT_IMMEDIATE(kFetchRowEnd); + } + } + break; + + case kFetchRowEnd: + { + if (row_) + { + std::vector fields; + for (size_t i = 0; i < ::mysql_num_fields(res_); ++i) + { + if (row_[i]) + { + fields.push_back(row_[i]); + } + else + { + fields.push_back(""); + } + } + result_->push_back(fields); + + NEXT_IMMEDIATE(kFetchRowStart); + } + else + { + if (errorNo() != 0) + { + LOG_ERROR << "Got error while retrieving rows: " << errorStr(); + } + ::mysql_free_result(res_); + + assert(queries_.front()->queryFetchCb_); + queries_.front()->queryFetchCb_(this, result_); + queries_.pop_front(); + result_.reset(); + + if (!queries_.empty()) + { + NEXT_IMMEDIATE(kRealQueryStart); + } + } + } + break; + + case kCloseStart: + { + logConnection(false); + int mysqlEvents = ::mysql_close_start(&mysql_); + if (mysqlEvents != 0) + { + channel_->setEventsCallback(std::bind(&MariaDBClient::stateMachineHandler, this, kCloseContinue, _1, _2)); + int events = toEvents(mysqlEvents); + channel_->enableEvents(events); + } + else + { + NEXT_IMMEDIATE(kCloseEnd); + } + } + break; + + case kCloseContinue: + { + int mysqlEvents = ::mysql_close_cont(&mysql_, mysqlRevents); + if (mysqlEvents != 0) + { + int events = toEvents(mysqlEvents); + channel_->enableEvents(events); + } + else + { + NEXT_IMMEDIATE(kCloseEnd); + } + } + break; + + case kCloseEnd: + { + if (errorNo() != 0) + { + LOG_ERROR << "Got error while close: " << errorStr(); + } + else + { + isConnected_ = false; + channel_->disableAll(); + channel_->remove(); + loop_->queueInLoop(std::bind(dummy, channel_)); + channel_.reset(); + } + + if (disconnectCb_) + { + disconnectCb_(this); + } + } + break; + + default: + { + abort(); + } + } +} + +void MariaDBClient::logConnection(bool up) const +{ + InetAddress localAddr(sockets::getLocalAddr(fd())); + InetAddress peerAddr(sockets::getPeerAddr(fd())); + + LOG_INFO << localAddr.toIpPort() << " -> " + << peerAddr.toIpPort() << " is " + << (up ? "UP" : "DOWN"); +} + +int MariaDBClient::toEvents(int mysqlEvents) +{ + int events = (mysqlEvents & MYSQL_WAIT_READ ? POLLIN : 0) + | (mysqlEvents & MYSQL_WAIT_WRITE ? POLLOUT : 0) + | (mysqlEvents & MYSQL_WAIT_EXCEPT ? POLLPRI : 0); + + return events; +} + +int MariaDBClient::toMySQLEvents(int events) +{ + int mysqlEvents = (events & POLLIN ? MYSQL_WAIT_READ : 0) + | (events & POLLPRI ? MYSQL_WAIT_EXCEPT : 0) + | (events & POLLOUT ? MYSQL_WAIT_WRITE : 0); + return mysqlEvents; +} diff --git a/contrib/mariadbclient/MariaDBClient.h b/contrib/mariadbclient/MariaDBClient.h new file mode 100644 index 000000000..ae589b972 --- /dev/null +++ b/contrib/mariadbclient/MariaDBClient.h @@ -0,0 +1,137 @@ +#ifndef MUDUO_CONTRIB_MARIADBCLIENT_MARIADBCLIENT_H +#define MUDUO_CONTRIB_MARIADBCLIENT_MARIADBCLIENT_H + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace muduo +{ +namespace net +{ +class Channel; +class EventLoop; +} +} + +namespace mariadbclient +{ +using muduo::string; + +class MariaDBClient : muduo::noncopyable +{ + public: + typedef std::function ConnectCallback; + typedef std::function DisconnectCallback; + + typedef std::function QueryCallback; + + typedef std::vector> FetchResult; + typedef std::shared_ptr FetchResultPtr; + typedef std::function QueryFetchCallback; + + enum State + { + kRealConnectStart, + kRealConnectCont, + kRealConnectEnd, + + kRealQueryStart, + kRealQueryCont, + kRealQueryEnd, + + kFetchRowStart, + kFetchRowCont, + kFetchRowEnd, + + kCloseStart, + kCloseContinue, + kCloseEnd, + }; + + MariaDBClient(muduo::net::EventLoop* loop, + const muduo::net::InetAddress& serverAddr, + const string& user, + const string& password, + const string& db); + ~MariaDBClient(); + + void setConnectCallback(const ConnectCallback& cb) { connectCb_ = cb; } + void setDisconnectCallback(const DisconnectCallback& cb) { disconnectCb_ = cb; } + + void connect(); + void disconnect(); + + void query(muduo::StringArg queryStr, const QueryCallback& cb = QueryCallback()); + void queryFetch(muduo::StringArg queryStr, const QueryFetchCallback& cb); + + uint32_t errorNo() { return ::mysql_errno(&mysql_); } + const char* errorStr() { return ::mysql_error(&mysql_); } + + private: + void connectInLoop(); + void disconnectInLoop(); + + void queryInLoop(muduo::StringArg queryStr, const QueryCallback& cb = QueryCallback()); + void queryFetchInLoop(muduo::StringArg queryStr, const QueryFetchCallback& cb); + + void stateMachineHandler(int state, int revents = -1, muduo::Timestamp receiveTime = muduo::Timestamp::invalid()); + + void logConnection(bool up) const; + int fd() const { return ::mysql_get_socket(&mysql_); } + + static int toEvents(int mysqlEvents); + static int toMySQLEvents(int events); + + muduo::net::EventLoop* loop_; + const muduo::net::InetAddress serverAddr_; + const string user_; + const string password_; + const string db_; + std::shared_ptr channel_; + bool isConnected_; + ConnectCallback connectCb_; + DisconnectCallback disconnectCb_; + + MYSQL mysql_; + + struct QueryData + { + enum QueryType + { + kQuery, + kQueryFetch + }; + + QueryData(QueryType type, const string& queryStr, const QueryCallback& cb) + : type_(type), queryStr_(queryStr), queryCb_(cb) {} + QueryData(QueryType type, const string& queryStr, const QueryFetchCallback& cb) + : type_(type), queryStr_(queryStr), queryFetchCb_(cb) {} + ~QueryData() {} + + QueryType type_; + string queryStr_; + union + { + QueryCallback queryCb_; + QueryFetchCallback queryFetchCb_; + }; + }; + + std::deque> queries_; + + MYSQL_RES* res_; + MYSQL_ROW row_; + FetchResultPtr result_; +}; + +} // namespace mariadbclient + +#endif // MUDUO_CONTRIB_MARIADBCLIENT_MARIADBCLIENT_H diff --git a/contrib/mariadbclient/mmariadbclient.cc b/contrib/mariadbclient/mmariadbclient.cc new file mode 100644 index 000000000..2c6368457 --- /dev/null +++ b/contrib/mariadbclient/mmariadbclient.cc @@ -0,0 +1,130 @@ +#include "MariaDBClient.h" + +#include +#include +#include +#include + +using namespace muduo; +using namespace muduo::net; + +static const char* g_myGroups[] = {"client", NULL}; +static uint64_t g_id = 0; + +void connectCallback(mariadbclient::MariaDBClient* c, CountDownLatch* connected) +{ + if (c->errorNo() == 0) + { + LOG_INFO << "Connected... " << c->errorNo() << '\n' << c->errorStr(); + connected->countDown(); + } + else + { + LOG_ERROR << "connectCallback Error: " << c->errorNo() << '\n' << c->errorStr(); + } +} + +void disconnectCallback(mariadbclient::MariaDBClient* c, EventLoop* loop) +{ + if (c->errorNo() == 0) + { + LOG_INFO << "Disconnected... " << c->errorNo() << '\n' << c->errorStr(); + } + else + { + LOG_ERROR << "disconnectCallback Error: " << c->errorNo() << '\n' << c->errorStr(); + } + loop->quit(); +} + +void queryCallback(mariadbclient::MariaDBClient* c, uint64_t id) +{ + LOG_INFO << "message id: " << id << "\terrorNo: " << c->errorNo() << "\terrorStr: " << c->errorStr(); +} + +void selectCallback(mariadbclient::MariaDBClient* c, + const mariadbclient::MariaDBClient::FetchResultPtr& result, + uint64_t id) +{ + LOG_INFO << "message id: " << id << "\terrorNo: " << c->errorNo() << "\terrorStr: " << c->errorStr(); + LOG_INFO << "result size: " << result->size(); + if (!result->empty()) + { + for (size_t i = 0; i < result->size(); ++i) + { + for (size_t j = 0; j < (*result)[i].size(); ++j) + { + printf("%s\t", (*result)[i][j].c_str()); + } + printf("\n"); + } + } +} + +int main(int argc, char* argv[]) +{ + int err = mysql_library_init(argc, argv, const_cast(g_myGroups)); + if (err) + { + LOG_FATAL << "mysql_library_init() returns error: " << err; + } + + { + EventLoop loop; + EventLoopThread t; + mariadbclient::MariaDBClient mariadbClient(t.startLoop(), InetAddress("127.0.0.1", 3306), "root", "123456", "test"); + CountDownLatch connected(1); + mariadbClient.setConnectCallback(std::bind(&connectCallback, _1, &connected)); + mariadbClient.setDisconnectCallback(std::bind(&disconnectCallback, _1, &loop)); + mariadbClient.connect(); + connected.wait(); + + mariadbClient.query("DROP TABLE muduo_user", + std::bind(&queryCallback, _1, g_id)); + + mariadbClient.query("CREATE TABLE muduo_user (\ +id INT(11) NOT NULL AUTO_INCREMENT,\ +nick VARCHAR(64) NOT NULL,\ +PRIMARY KEY (id)\ +)", + std::bind(&queryCallback, _1, g_id++)); + + mariadbClient.queryFetch("SELECT * FROM muduo_user", + std::bind(&selectCallback, _1, _2, g_id++)); + + mariadbClient.query("INSERT INTO muduo_user \ +(id, nick) \ +VALUES (1, \"ChenShuo\")", + std::bind(&queryCallback, _1, g_id++)); + + mariadbClient.query("INSERT INTO muduo_user \ +(nick) \ +VALUES (\"Jack\")"); + + mariadbClient.query("INSERT INTO muduo_user \ +(nick) \ +VALUES (\"Lucy\")"); + + mariadbClient.query("UPDATE muduo_user \ +SET nick=\"Tom\" \ +WHERE id=2", + std::bind(&queryCallback, _1, g_id++)); + + mariadbClient.queryFetch("SELECT * FROM muduo_user", + std::bind(&selectCallback, _1, _2, g_id++)); + + mariadbClient.query("DELETE FROM muduo_user \ +WHERE id>1", + std::bind(&queryCallback, _1, g_id++)); + + mariadbClient.queryFetch("SELECT * FROM muduo_user", + std::bind(&selectCallback, _1, _2, g_id++)); + + loop.runAfter(5, std::bind(&mariadbclient::MariaDBClient::disconnect, &mariadbClient)); + + loop.loop(); + } + mysql_library_end(); + + return 0; +} diff --git a/muduo/net/Channel.cc b/muduo/net/Channel.cc index 1e9a40ae7..d559d38be 100644 --- a/muduo/net/Channel.cc +++ b/muduo/net/Channel.cc @@ -21,7 +21,7 @@ const int Channel::kNoneEvent = 0; const int Channel::kReadEvent = POLLIN | POLLPRI; const int Channel::kWriteEvent = POLLOUT; -Channel::Channel(EventLoop* loop, int fd__) +Channel::Channel(EventLoop* loop, int fd__, bool classify) : loop_(loop), fd_(fd__), events_(0), @@ -30,7 +30,8 @@ Channel::Channel(EventLoop* loop, int fd__) logHup_(true), tied_(false), eventHandling_(false), - addedToLoop_(false) + addedToLoop_(false), + classify_(classify) { } @@ -84,31 +85,38 @@ void Channel::handleEventWithGuard(Timestamp receiveTime) { eventHandling_ = true; LOG_TRACE << reventsToString(); - if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) + if (classify_) { - if (logHup_) + if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) { - LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP"; + if (logHup_) + { + LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP"; + } + if (closeCallback_) closeCallback_(); } - if (closeCallback_) closeCallback_(); - } - if (revents_ & POLLNVAL) - { - LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL"; - } + if (revents_ & POLLNVAL) + { + LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL"; + } - if (revents_ & (POLLERR | POLLNVAL)) - { - if (errorCallback_) errorCallback_(); - } - if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) - { - if (readCallback_) readCallback_(receiveTime); + if (revents_ & (POLLERR | POLLNVAL)) + { + if (errorCallback_) errorCallback_(); + } + if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) + { + if (readCallback_) readCallback_(receiveTime); + } + if (revents_ & POLLOUT) + { + if (writeCallback_) writeCallback_(); + } } - if (revents_ & POLLOUT) + else { - if (writeCallback_) writeCallback_(); + if (eventsCallback_) eventsCallback_(revents_, receiveTime); } eventHandling_ = false; } diff --git a/muduo/net/Channel.h b/muduo/net/Channel.h index bafa224fa..86f3853b4 100644 --- a/muduo/net/Channel.h +++ b/muduo/net/Channel.h @@ -36,7 +36,9 @@ class Channel : noncopyable typedef std::function EventCallback; typedef std::function ReadEventCallback; - Channel(EventLoop* loop, int fd); + typedef std::function EventsCallback; + + Channel(EventLoop* loop, int fd, bool classify = true); ~Channel(); void handleEvent(Timestamp receiveTime); @@ -49,6 +51,9 @@ class Channel : noncopyable void setErrorCallback(EventCallback cb) { errorCallback_ = std::move(cb); } + void setEventsCallback(const EventsCallback& cb) + { eventsCallback_ = cb; } + /// Tie this channel to the owner object managed by shared_ptr, /// prevent the owner object being destroyed in handleEvent. void tie(const std::shared_ptr&); @@ -67,6 +72,8 @@ class Channel : noncopyable bool isWriting() const { return events_ & kWriteEvent; } bool isReading() const { return events_ & kReadEvent; } + void enableEvents(int events__) { events_ = events__; update(); } + // for Poller int index() { return index_; } void set_index(int idx) { index_ = idx; } @@ -105,6 +112,9 @@ class Channel : noncopyable EventCallback writeCallback_; EventCallback closeCallback_; EventCallback errorCallback_; + + EventsCallback eventsCallback_; + bool classify_; }; } // namespace net diff --git a/muduo/net/poller/EPollPoller.cc b/muduo/net/poller/EPollPoller.cc index b2f913a4c..c5c01087a 100644 --- a/muduo/net/poller/EPollPoller.cc +++ b/muduo/net/poller/EPollPoller.cc @@ -182,11 +182,11 @@ void EPollPoller::update(int operation, Channel* channel) { if (operation == EPOLL_CTL_DEL) { - LOG_SYSERR << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd; + LOG_SYSERR << "epoll_ctl op = " << operationToString(operation) << " fd =" << fd; } else { - LOG_SYSFATAL << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd; + LOG_SYSFATAL << "epoll_ctl op = " << operationToString(operation) << " fd =" << fd; } } } From 5e7e6a26e22863f4909cae58dd7b1931a5d44eff Mon Sep 17 00:00:00 2001 From: hfcrwx Date: Sat, 30 May 2020 11:53:54 +0800 Subject: [PATCH 2/6] Update .travis.yml & build.sh --- .travis.yml | 1 + build.sh | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/.travis.yml b/.travis.yml index a568370e1..92c3dcfd4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,6 +12,7 @@ install: - sudo apt-get install libboost-test-dev libboost-program-options-dev libboost-system-dev - sudo apt-get install libc-ares-dev libcurl4-openssl-dev - sudo apt-get install zlib1g-dev libgd-dev + - sudo apt-get install libmariadbclient-dev env: - BUILD_TYPE=debug - BUILD_TYPE=release diff --git a/build.sh b/build.sh index a5447f05a..89e72d569 100755 --- a/build.sh +++ b/build.sh @@ -2,6 +2,10 @@ set -x +if [ -f "/usr/lib/x86_64-linux-gnu/libmariadbclient.so" ]; then + sudo ln -sf /usr/lib/x86_64-linux-gnu/libmariadbclient.so /usr/lib/x86_64-linux-gnu/libmysqlclient.so +fi + SOURCE_DIR=`pwd` BUILD_DIR=${BUILD_DIR:-../build} BUILD_TYPE=${BUILD_TYPE:-release} From 4cf062053dae059912b7b540e65b57af522f3dc4 Mon Sep 17 00:00:00 2001 From: hfcrwx Date: Sun, 31 May 2020 09:30:05 +0800 Subject: [PATCH 3/6] Reset build.sh & Update CMakeLists.txt --- CMakeLists.txt | 2 +- build.sh | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4326f2359..225d31467 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -64,7 +64,7 @@ find_library(BOOSTSYSTEM_LIBRARY NAMES boost_system) find_path(TCMALLOC_INCLUDE_DIR gperftools/heap-profiler.h) find_library(TCMALLOC_LIBRARY NAMES tcmalloc_and_profiler) find_path(MARIADBCLIENT_INCLUDE_DIR mysql/mysql.h) -find_library(MARIADBCLIENT_LIBRARY NAMES mysqlclient) +find_library(MARIADBCLIENT_LIBRARY NAMES mariadbclient) find_path(HIREDIS_INCLUDE_DIR hiredis/hiredis.h) find_library(HIREDIS_LIBRARY NAMES hiredis) find_path(GD_INCLUDE_DIR gd.h) diff --git a/build.sh b/build.sh index 89e72d569..a5447f05a 100755 --- a/build.sh +++ b/build.sh @@ -2,10 +2,6 @@ set -x -if [ -f "/usr/lib/x86_64-linux-gnu/libmariadbclient.so" ]; then - sudo ln -sf /usr/lib/x86_64-linux-gnu/libmariadbclient.so /usr/lib/x86_64-linux-gnu/libmysqlclient.so -fi - SOURCE_DIR=`pwd` BUILD_DIR=${BUILD_DIR:-../build} BUILD_TYPE=${BUILD_TYPE:-release} From 8dfdf534de49d430c98253dfc8cd88fedfee49c9 Mon Sep 17 00:00:00 2001 From: hfcrwx Date: Sun, 31 May 2020 09:38:12 +0800 Subject: [PATCH 4/6] Update CMakeLists.txt --- contrib/mariadbclient/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/mariadbclient/CMakeLists.txt b/contrib/mariadbclient/CMakeLists.txt index ca3a0ade2..add2d26a0 100644 --- a/contrib/mariadbclient/CMakeLists.txt +++ b/contrib/mariadbclient/CMakeLists.txt @@ -1,5 +1,5 @@ add_library(muduo_mariadbclient MariaDBClient.cc) -target_link_libraries(muduo_mariadbclient muduo_net mysqlclient) +target_link_libraries(muduo_mariadbclient muduo_net mariadbclient) add_executable(mmariadbclient mmariadbclient.cc) target_link_libraries(mmariadbclient muduo_mariadbclient) From a1c6206c34fab647906fd2960417411fd4d6d85c Mon Sep 17 00:00:00 2001 From: hfcrwx Date: Fri, 26 Jun 2020 01:43:05 +0800 Subject: [PATCH 5/6] zero copy MYSQL_RES --- contrib/mariadbclient/MariaDBClient.cc | 173 +++++++++--------------- contrib/mariadbclient/MariaDBClient.h | 58 ++++---- contrib/mariadbclient/mmariadbclient.cc | 168 +++++++++++------------ muduo/net/Channel.h | 6 +- 4 files changed, 165 insertions(+), 240 deletions(-) diff --git a/contrib/mariadbclient/MariaDBClient.cc b/contrib/mariadbclient/MariaDBClient.cc index c16836a4f..f48c35a3c 100644 --- a/contrib/mariadbclient/MariaDBClient.cc +++ b/contrib/mariadbclient/MariaDBClient.cc @@ -27,9 +27,7 @@ MariaDBClient::MariaDBClient(EventLoop* loop, user_(user), password_(password), db_(db), - isConnected_(false), - res_(NULL), - row_(NULL) + isConnected_(false) { } @@ -43,27 +41,6 @@ MariaDBClient::~MariaDBClient() void MariaDBClient::connect() { - loop_->runInLoop(std::bind(&MariaDBClient::connectInLoop, this)); -} - -void MariaDBClient::disconnect() -{ - loop_->runInLoop(std::bind(&MariaDBClient::disconnectInLoop, this)); -} - -void MariaDBClient::query(StringArg queryStr, const QueryCallback& cb) -{ - loop_->runInLoop(std::bind(&MariaDBClient::queryInLoop, this, string(queryStr.c_str()), cb)); -} - -void MariaDBClient::queryFetch(StringArg queryStr, const QueryFetchCallback& cb) -{ - loop_->runInLoop(std::bind(&MariaDBClient::queryFetchInLoop, this, string(queryStr.c_str()), cb)); -} - -void MariaDBClient::connectInLoop() -{ - loop_->assertInLoopThread(); assert(!isConnected_); ::mysql_init(&mysql_); @@ -72,35 +49,32 @@ void MariaDBClient::connectInLoop() stateMachineHandler(kRealConnectStart); } -void MariaDBClient::disconnectInLoop() +void MariaDBClient::disconnect() { - loop_->assertInLoopThread(); assert(isConnected_); stateMachineHandler(kCloseStart); } -void MariaDBClient::queryInLoop(StringArg queryStr, const QueryCallback& cb) +void MariaDBClient::executeUpdate(StringArg sql, const UpdateCallback& cb) { - loop_->assertInLoopThread(); assert(isConnected_); - queries_.emplace_back(new QueryData(QueryData::kQuery, queryStr.c_str(), cb)); + sqlQueue_.emplace_back(new SQLData(SQLData::kUpdate, sql.c_str(), cb)); - if (queries_.size() == 1) + if (sqlQueue_.size() == 1) { stateMachineHandler(kRealQueryStart); } } -void MariaDBClient::queryFetchInLoop(StringArg queryStr, const QueryFetchCallback& cb) +void MariaDBClient::executeQuery(StringArg sql, const QueryCallback& cb) { - loop_->assertInLoopThread(); assert(isConnected_); - queries_.emplace_back(new QueryData(QueryData::kQueryFetch, queryStr.c_str(), cb)); + sqlQueue_.emplace_back(new SQLData(SQLData::kQuery, sql.c_str(), cb)); - if (queries_.size() == 1) + if (sqlQueue_.size() == 1) { stateMachineHandler(kRealQueryStart); } @@ -111,8 +85,10 @@ void MariaDBClient::stateMachineHandler(int state, int revents, Timestamp receiv loop_->assertInLoopThread(); int mysqlRevents = toMySQLEvents(revents); - MYSQL* ret = NULL; - int err = 0; + + static MYSQL* ret = NULL; + static int err = 0; + static MYSQL_RES* res = NULL; again: switch (state) @@ -160,7 +136,7 @@ void MariaDBClient::stateMachineHandler(int state, int revents, Timestamp receiv case kRealConnectEnd: { - if (!ret) + if (ret == NULL) { LOG_ERROR << "Failed to mysql_real_connect(): " << errorStr(); } @@ -181,11 +157,10 @@ void MariaDBClient::stateMachineHandler(int state, int revents, Timestamp receiv case kRealQueryStart: { - int mysqlEvents = - ::mysql_real_query_start(&err, - &mysql_, - queries_.front()->queryStr_.c_str(), - queries_.front()->queryStr_.size()); + int mysqlEvents = ::mysql_real_query_start(&err, + &mysql_, + sqlQueue_.front()->sql_.c_str(), + sqlQueue_.front()->sql_.size()); if (mysqlEvents != 0) { channel_->setEventsCallback(std::bind(&MariaDBClient::stateMachineHandler, this, kRealQueryCont, _1, _2)); @@ -216,66 +191,64 @@ void MariaDBClient::stateMachineHandler(int state, int revents, Timestamp receiv case kRealQueryEnd: { - if (queries_.front()->type_ == QueryData::kQuery) + if (sqlQueue_.front()->type_ == SQLData::kUpdate) { - if (queries_.front()->queryCb_) + if (sqlQueue_.front()->updateCb_) { - queries_.front()->queryCb_(this); + sqlQueue_.front()->updateCb_(this); } - queries_.pop_front(); + sqlQueue_.pop_front(); - if (!queries_.empty()) + if (!sqlQueue_.empty()) { NEXT_IMMEDIATE(kRealQueryStart); } } else { - assert(queries_.front()->type_ == QueryData::kQueryFetch); - assert(queries_.front()->queryFetchCb_); - if (err) + assert(sqlQueue_.front()->type_ == SQLData::kQuery); + if (err != 0) { LOG_ERROR << "mysql_real_query() returns error: " << errorStr(); - queries_.front()->queryFetchCb_(this, FetchResultPtr()); - } - else - { - res_ = ::mysql_use_result(&mysql_); - if (!res_) + if (sqlQueue_.front()->queryCb_) { - LOG_ERROR << "mysql_use_result() returns error: " << errorStr(); - queries_.front()->queryFetchCb_(this, FetchResultPtr()); + sqlQueue_.front()->queryCb_(this, NULL); } - else + sqlQueue_.pop_front(); + + if (!sqlQueue_.empty()) { - result_.reset(new FetchResult); - NEXT_IMMEDIATE(kFetchRowStart); + NEXT_IMMEDIATE(kRealQueryStart); } } + else + { + NEXT_IMMEDIATE(kStoreResultStart); + } } } break; - case kFetchRowStart: + case kStoreResultStart: { - int mysqlEvents = ::mysql_fetch_row_start(&row_, res_); + int mysqlEvents = ::mysql_store_result_start(&res, &mysql_); if (mysqlEvents != 0) { - channel_->setEventsCallback(std::bind(&MariaDBClient::stateMachineHandler, this, kFetchRowCont, _1, _2)); + channel_->setEventsCallback(std::bind(&MariaDBClient::stateMachineHandler, this, kStoreResultCont, _1, _2)); int events = toEvents(mysqlEvents); channel_->enableEvents(events); } else { - NEXT_IMMEDIATE(kFetchRowEnd); + NEXT_IMMEDIATE(kStoreResultEnd); } } break; - case kFetchRowCont: + case kStoreResultCont: { - int mysqlEvents = ::mysql_fetch_row_cont(&row_, res_, mysqlRevents); + int mysqlEvents = ::mysql_store_result_cont(&res, &mysql_, mysqlRevents); if (mysqlEvents != 0) { int events = toEvents(mysqlEvents); @@ -283,49 +256,36 @@ void MariaDBClient::stateMachineHandler(int state, int revents, Timestamp receiv } else { - NEXT_IMMEDIATE(kFetchRowEnd); + NEXT_IMMEDIATE(kStoreResultEnd); } } break; - case kFetchRowEnd: + case kStoreResultEnd: { - if (row_) + if (sqlQueue_.front()->queryCb_) { - std::vector fields; - for (size_t i = 0; i < ::mysql_num_fields(res_); ++i) - { - if (row_[i]) - { - fields.push_back(row_[i]); - } - else - { - fields.push_back(""); - } - } - result_->push_back(fields); - - NEXT_IMMEDIATE(kFetchRowStart); + sqlQueue_.front()->queryCb_(this, res); } else { - if (errorNo() != 0) + if (res != NULL) { - LOG_ERROR << "Got error while retrieving rows: " << errorStr(); + ::mysql_free_result(res); } - ::mysql_free_result(res_); - - assert(queries_.front()->queryFetchCb_); - queries_.front()->queryFetchCb_(this, result_); - queries_.pop_front(); - result_.reset(); - - if (!queries_.empty()) + else { - NEXT_IMMEDIATE(kRealQueryStart); + assert(::mysql_field_count(&mysql_) != 0); + assert(::mysql_errno(&mysql_) != 0); + LOG_ERROR << "Got error while storing result: " << errorStr(); } } + sqlQueue_.pop_front(); + + if (!sqlQueue_.empty()) + { + NEXT_IMMEDIATE(kRealQueryStart); + } } break; @@ -335,7 +295,7 @@ void MariaDBClient::stateMachineHandler(int state, int revents, Timestamp receiv int mysqlEvents = ::mysql_close_start(&mysql_); if (mysqlEvents != 0) { - channel_->setEventsCallback(std::bind(&MariaDBClient::stateMachineHandler, this, kCloseContinue, _1, _2)); + channel_->setEventsCallback(std::bind(&MariaDBClient::stateMachineHandler, this, kCloseCont, _1, _2)); int events = toEvents(mysqlEvents); channel_->enableEvents(events); } @@ -346,7 +306,7 @@ void MariaDBClient::stateMachineHandler(int state, int revents, Timestamp receiv } break; - case kCloseContinue: + case kCloseCont: { int mysqlEvents = ::mysql_close_cont(&mysql_, mysqlRevents); if (mysqlEvents != 0) @@ -363,18 +323,11 @@ void MariaDBClient::stateMachineHandler(int state, int revents, Timestamp receiv case kCloseEnd: { - if (errorNo() != 0) - { - LOG_ERROR << "Got error while close: " << errorStr(); - } - else - { - isConnected_ = false; - channel_->disableAll(); - channel_->remove(); - loop_->queueInLoop(std::bind(dummy, channel_)); - channel_.reset(); - } + isConnected_ = false; + channel_->disableAll(); + channel_->remove(); + loop_->queueInLoop(std::bind(dummy, channel_)); + channel_.reset(); if (disconnectCb_) { diff --git a/contrib/mariadbclient/MariaDBClient.h b/contrib/mariadbclient/MariaDBClient.h index ae589b972..74dabdbce 100644 --- a/contrib/mariadbclient/MariaDBClient.h +++ b/contrib/mariadbclient/MariaDBClient.h @@ -10,7 +10,6 @@ #include #include #include -#include namespace muduo { @@ -31,11 +30,8 @@ class MariaDBClient : muduo::noncopyable typedef std::function ConnectCallback; typedef std::function DisconnectCallback; - typedef std::function QueryCallback; - - typedef std::vector> FetchResult; - typedef std::shared_ptr FetchResultPtr; - typedef std::function QueryFetchCallback; + typedef std::function UpdateCallback; // similar to WriteCompleteCallback + typedef std::function QueryCallback; enum State { @@ -47,12 +43,12 @@ class MariaDBClient : muduo::noncopyable kRealQueryCont, kRealQueryEnd, - kFetchRowStart, - kFetchRowCont, - kFetchRowEnd, + kStoreResultStart, + kStoreResultCont, + kStoreResultEnd, kCloseStart, - kCloseContinue, + kCloseCont, kCloseEnd, }; @@ -69,19 +65,15 @@ class MariaDBClient : muduo::noncopyable void connect(); void disconnect(); - void query(muduo::StringArg queryStr, const QueryCallback& cb = QueryCallback()); - void queryFetch(muduo::StringArg queryStr, const QueryFetchCallback& cb); + // INSERT, UPDATE, or DELETE + void executeUpdate(muduo::StringArg sql, const UpdateCallback& cb = UpdateCallback()); + // SELECT, SHOW, DESCRIBE, EXPLAIN, CHECK TABLE, and so forth + void executeQuery(muduo::StringArg sql, const QueryCallback& cb = QueryCallback()); uint32_t errorNo() { return ::mysql_errno(&mysql_); } const char* errorStr() { return ::mysql_error(&mysql_); } private: - void connectInLoop(); - void disconnectInLoop(); - - void queryInLoop(muduo::StringArg queryStr, const QueryCallback& cb = QueryCallback()); - void queryFetchInLoop(muduo::StringArg queryStr, const QueryFetchCallback& cb); - void stateMachineHandler(int state, int revents = -1, muduo::Timestamp receiveTime = muduo::Timestamp::invalid()); void logConnection(bool up) const; @@ -102,34 +94,30 @@ class MariaDBClient : muduo::noncopyable MYSQL mysql_; - struct QueryData + struct SQLData { - enum QueryType + enum Type { - kQuery, - kQueryFetch + kUpdate, + kQuery }; - QueryData(QueryType type, const string& queryStr, const QueryCallback& cb) - : type_(type), queryStr_(queryStr), queryCb_(cb) {} - QueryData(QueryType type, const string& queryStr, const QueryFetchCallback& cb) - : type_(type), queryStr_(queryStr), queryFetchCb_(cb) {} - ~QueryData() {} + SQLData(Type type, const string& sql, const UpdateCallback& cb) + : type_(type), sql_(sql), updateCb_(cb) {} + SQLData(Type type, const string& sql, const QueryCallback& cb) + : type_(type), sql_(sql), queryCb_(cb) {} + ~SQLData() {} - QueryType type_; - string queryStr_; + Type type_; + string sql_; union { + UpdateCallback updateCb_; QueryCallback queryCb_; - QueryFetchCallback queryFetchCb_; }; }; - std::deque> queries_; - - MYSQL_RES* res_; - MYSQL_ROW row_; - FetchResultPtr result_; + std::deque> sqlQueue_; }; } // namespace mariadbclient diff --git a/contrib/mariadbclient/mmariadbclient.cc b/contrib/mariadbclient/mmariadbclient.cc index 2c6368457..e246775d7 100644 --- a/contrib/mariadbclient/mmariadbclient.cc +++ b/contrib/mariadbclient/mmariadbclient.cc @@ -1,26 +1,81 @@ #include "MariaDBClient.h" -#include #include #include -#include using namespace muduo; using namespace muduo::net; -static const char* g_myGroups[] = {"client", NULL}; -static uint64_t g_id = 0; +static uint64_t g_seqid = 0; -void connectCallback(mariadbclient::MariaDBClient* c, CountDownLatch* connected) +void updateCallback(mariadbclient::MariaDBClient* c, uint64_t id) +{ + LOG_INFO << "seq id: " << id << "\terrorNo: " << c->errorNo() << "\terrorStr: " << c->errorStr(); +} + +void queryCallback(mariadbclient::MariaDBClient* c, MYSQL_RES* result, uint64_t id) +{ + LOG_INFO << "seq id: " << id << "\terrorNo: " << c->errorNo() << "\terrorStr: " << c->errorStr(); + + uint32_t numFields = ::mysql_num_fields(result); + MYSQL_ROW row; + while ((row = ::mysql_fetch_row(result)) != NULL) + { + uint64_t* lengths = ::mysql_fetch_lengths(result); + for (uint32_t i = 0; i < numFields; ++i) + { + printf("[%.*s] ", static_cast(lengths[i]), row[i] ? row[i] : "NULL"); + } + printf("\n"); + } + + ::mysql_free_result(result); +} + +void connectCallback(mariadbclient::MariaDBClient* c) { if (c->errorNo() == 0) { - LOG_INFO << "Connected... " << c->errorNo() << '\n' << c->errorStr(); - connected->countDown(); + LOG_INFO << "Connected... " << "\terrorNo: " << c->errorNo() << "\terrorStr: " << c->errorStr(); + + string sql0("DROP TABLE muduo_user"); + string sql1("CREATE TABLE muduo_user (" + "id INT(11) NOT NULL AUTO_INCREMENT," + "nick VARCHAR(64) NOT NULL," + "PRIMARY KEY (id)" + ")"); + string sql2("SELECT id, nick " + "FROM muduo_user"); + string sql3("INSERT INTO muduo_user (id, nick)" + "VALUES (1, 'ChenShuo')"); + string sql4("INSERT INTO muduo_user (nick)" + "VALUES ('Jack')"); + string sql5("INSERT INTO muduo_user (nick)" + "VALUES ('Lucy')"); + string sql6("UPDATE muduo_user " + "SET nick = 'Tom' " + "WHERE id = 2"); + string sql7("SELECT id, nick " + "FROM muduo_user"); + string sql8("DELETE FROM muduo_user " + "WHERE id > 1"); + string sql9("SELECT id, nick " + "FROM muduo_user"); + + c->executeUpdate(sql0, std::bind(&updateCallback, _1, g_seqid++)); + c->executeUpdate(sql1, std::bind(&updateCallback, _1, g_seqid++)); + c->executeQuery(sql2, std::bind(&queryCallback, _1, _2, g_seqid++)); + c->executeUpdate(sql3, std::bind(&updateCallback, _1, g_seqid++)); + c->executeUpdate(sql4); g_seqid++; + c->executeUpdate(sql5); g_seqid++; + c->executeUpdate(sql6, std::bind(&updateCallback, _1, g_seqid++)); + c->executeQuery(sql7, std::bind(&queryCallback, _1, _2, g_seqid++)); + c->executeUpdate(sql8, std::bind(&updateCallback, _1, g_seqid++)); + c->executeQuery(sql9, std::bind(&queryCallback, _1, _2, g_seqid++)); } else { - LOG_ERROR << "connectCallback Error: " << c->errorNo() << '\n' << c->errorStr(); + LOG_ERROR << "connectCallback Error: " << "\terrorNo: " << c->errorNo() << "\terrorStr: " << c->errorStr(); } } @@ -28,102 +83,33 @@ void disconnectCallback(mariadbclient::MariaDBClient* c, EventLoop* loop) { if (c->errorNo() == 0) { - LOG_INFO << "Disconnected... " << c->errorNo() << '\n' << c->errorStr(); + LOG_INFO << "Disconnected... " << "\terrorNo: " << c->errorNo() << "\terrorStr: " << c->errorStr(); } else { - LOG_ERROR << "disconnectCallback Error: " << c->errorNo() << '\n' << c->errorStr(); + LOG_ERROR << "disconnectCallback Error: " << "\terrorNo: " << c->errorNo() << "\terrorStr: " << c->errorStr(); } - loop->quit(); -} -void queryCallback(mariadbclient::MariaDBClient* c, uint64_t id) -{ - LOG_INFO << "message id: " << id << "\terrorNo: " << c->errorNo() << "\terrorStr: " << c->errorStr(); -} - -void selectCallback(mariadbclient::MariaDBClient* c, - const mariadbclient::MariaDBClient::FetchResultPtr& result, - uint64_t id) -{ - LOG_INFO << "message id: " << id << "\terrorNo: " << c->errorNo() << "\terrorStr: " << c->errorStr(); - LOG_INFO << "result size: " << result->size(); - if (!result->empty()) - { - for (size_t i = 0; i < result->size(); ++i) - { - for (size_t j = 0; j < (*result)[i].size(); ++j) - { - printf("%s\t", (*result)[i][j].c_str()); - } - printf("\n"); - } - } + loop->quit(); } int main(int argc, char* argv[]) { - int err = mysql_library_init(argc, argv, const_cast(g_myGroups)); - if (err) + int err = mysql_library_init(0, NULL, NULL); + if (err != 0) { LOG_FATAL << "mysql_library_init() returns error: " << err; } - { - EventLoop loop; - EventLoopThread t; - mariadbclient::MariaDBClient mariadbClient(t.startLoop(), InetAddress("127.0.0.1", 3306), "root", "123456", "test"); - CountDownLatch connected(1); - mariadbClient.setConnectCallback(std::bind(&connectCallback, _1, &connected)); - mariadbClient.setDisconnectCallback(std::bind(&disconnectCallback, _1, &loop)); - mariadbClient.connect(); - connected.wait(); - - mariadbClient.query("DROP TABLE muduo_user", - std::bind(&queryCallback, _1, g_id)); - - mariadbClient.query("CREATE TABLE muduo_user (\ -id INT(11) NOT NULL AUTO_INCREMENT,\ -nick VARCHAR(64) NOT NULL,\ -PRIMARY KEY (id)\ -)", - std::bind(&queryCallback, _1, g_id++)); - - mariadbClient.queryFetch("SELECT * FROM muduo_user", - std::bind(&selectCallback, _1, _2, g_id++)); - - mariadbClient.query("INSERT INTO muduo_user \ -(id, nick) \ -VALUES (1, \"ChenShuo\")", - std::bind(&queryCallback, _1, g_id++)); - - mariadbClient.query("INSERT INTO muduo_user \ -(nick) \ -VALUES (\"Jack\")"); - - mariadbClient.query("INSERT INTO muduo_user \ -(nick) \ -VALUES (\"Lucy\")"); - - mariadbClient.query("UPDATE muduo_user \ -SET nick=\"Tom\" \ -WHERE id=2", - std::bind(&queryCallback, _1, g_id++)); - - mariadbClient.queryFetch("SELECT * FROM muduo_user", - std::bind(&selectCallback, _1, _2, g_id++)); - - mariadbClient.query("DELETE FROM muduo_user \ -WHERE id>1", - std::bind(&queryCallback, _1, g_id++)); - - mariadbClient.queryFetch("SELECT * FROM muduo_user", - std::bind(&selectCallback, _1, _2, g_id++)); - - loop.runAfter(5, std::bind(&mariadbclient::MariaDBClient::disconnect, &mariadbClient)); - - loop.loop(); - } + EventLoop loop; + mariadbclient::MariaDBClient mariadbClient(&loop, InetAddress("127.0.0.1", 3306), "root", "123456", "test"); + mariadbClient.setConnectCallback(std::bind(&connectCallback, _1)); + mariadbClient.setDisconnectCallback(std::bind(&disconnectCallback, _1, &loop)); + mariadbClient.connect(); + loop.runAfter(5, std::bind(&mariadbclient::MariaDBClient::disconnect, &mariadbClient)); + + loop.loop(); + mysql_library_end(); return 0; diff --git a/muduo/net/Channel.h b/muduo/net/Channel.h index 86f3853b4..91b9f5154 100644 --- a/muduo/net/Channel.h +++ b/muduo/net/Channel.h @@ -50,9 +50,8 @@ class Channel : noncopyable { closeCallback_ = std::move(cb); } void setErrorCallback(EventCallback cb) { errorCallback_ = std::move(cb); } - - void setEventsCallback(const EventsCallback& cb) - { eventsCallback_ = cb; } + void setEventsCallback(EventsCallback cb) + { eventsCallback_ = std::move(cb); } /// Tie this channel to the owner object managed by shared_ptr, /// prevent the owner object being destroyed in handleEvent. @@ -71,7 +70,6 @@ class Channel : noncopyable void disableAll() { events_ = kNoneEvent; update(); } bool isWriting() const { return events_ & kWriteEvent; } bool isReading() const { return events_ & kReadEvent; } - void enableEvents(int events__) { events_ = events__; update(); } // for Poller From 697e3bae4ab19b8095b70b27bd5f1fbede1f196b Mon Sep 17 00:00:00 2001 From: hfcrwx Date: Wed, 19 May 2021 00:57:14 +0800 Subject: [PATCH 6/6] Update CMakeLists.txt & .travis.yml --- .travis.yml | 1 - CMakeLists.txt | 2 +- contrib/mariadbclient/CMakeLists.txt | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 92c3dcfd4..a568370e1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,6 @@ install: - sudo apt-get install libboost-test-dev libboost-program-options-dev libboost-system-dev - sudo apt-get install libc-ares-dev libcurl4-openssl-dev - sudo apt-get install zlib1g-dev libgd-dev - - sudo apt-get install libmariadbclient-dev env: - BUILD_TYPE=debug - BUILD_TYPE=release diff --git a/CMakeLists.txt b/CMakeLists.txt index 225d31467..4326f2359 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -64,7 +64,7 @@ find_library(BOOSTSYSTEM_LIBRARY NAMES boost_system) find_path(TCMALLOC_INCLUDE_DIR gperftools/heap-profiler.h) find_library(TCMALLOC_LIBRARY NAMES tcmalloc_and_profiler) find_path(MARIADBCLIENT_INCLUDE_DIR mysql/mysql.h) -find_library(MARIADBCLIENT_LIBRARY NAMES mariadbclient) +find_library(MARIADBCLIENT_LIBRARY NAMES mysqlclient) find_path(HIREDIS_INCLUDE_DIR hiredis/hiredis.h) find_library(HIREDIS_LIBRARY NAMES hiredis) find_path(GD_INCLUDE_DIR gd.h) diff --git a/contrib/mariadbclient/CMakeLists.txt b/contrib/mariadbclient/CMakeLists.txt index add2d26a0..ca3a0ade2 100644 --- a/contrib/mariadbclient/CMakeLists.txt +++ b/contrib/mariadbclient/CMakeLists.txt @@ -1,5 +1,5 @@ add_library(muduo_mariadbclient MariaDBClient.cc) -target_link_libraries(muduo_mariadbclient muduo_net mariadbclient) +target_link_libraries(muduo_mariadbclient muduo_net mysqlclient) add_executable(mmariadbclient mmariadbclient.cc) target_link_libraries(mmariadbclient muduo_mariadbclient)