diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 000000000..39bbd2681 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,4 @@ +{ + "image": "mcr.microsoft.com/devcontainers/universal:2", + "features": {} +} diff --git a/include/co/co/thread.h b/include/co/co/thread.h index 94845e590..b63ad4883 100644 --- a/include/co/co/thread.h +++ b/include/co/co/thread.h @@ -95,4 +95,15 @@ class __coapi sync_event { DISALLOW_COPY_AND_ASSIGN(sync_event); }; +inline int set_thread_name(const char *name) { +#ifdef _WIN32 + return 0; +#elif defined(__APPLE__) + ::pthread_setname_np(name); + return 0; +#else + return ::pthread_setname_np(::pthread_self(), name); +#endif +} + } // co diff --git a/include/co/god.h b/include/co/god.h index 3873f4c39..bc0978d3d 100644 --- a/include/co/god.h +++ b/include/co/god.h @@ -217,7 +217,7 @@ constexpr bool is_scalar() { return std::is_scalar::value; } -#if defined(__GNUC__) && __GNUC__ < 5 +#if defined(__GNUC__) && __GNUC__ < 5 && !defined(__APPLE__) template constexpr bool is_trivially_copyable() { return __has_trivial_copy(T); diff --git a/include/co/tcp.h b/include/co/tcp.h index f331cdef5..aa6f6e524 100644 --- a/include/co/tcp.h +++ b/include/co/tcp.h @@ -115,10 +115,7 @@ class __coapi Server final { uint32 conn_num() const; /** - * start the server - * - The server will loop in a coroutine, and it will not block the calling thread. - * - The user MUST call on_connection() to set a connection callback before start() - * was called. + * config the server * - By default, key and ca are NULL, and ssl is disabled. * * @param ip server ip, either an ipv4 or ipv6 address. @@ -127,7 +124,19 @@ class __coapi Server final { * @param key path of ssl private key file. * @param ca path of ssl certificate file. */ - void start(const char* ip, int port, const char* key=0, const char* ca=0); + Server &set_config(const char *ip, int port, const char *key = nullptr, const char *ca = nullptr); + + uint16 get_port() const; + + int listen(int end_port = 0); + + /** + * start the server + * - The server will loop in a coroutine, and it will not block the calling thread. + * - The user MUST call on_connection() to set a connection callback before start() + * was called. + */ + void start(); /** * exit the server gracefully diff --git a/src/co/co.cc b/src/co/co.cc index 08aad350a..56b6cd31a 100644 --- a/src/co/co.cc +++ b/src/co/co.cc @@ -321,7 +321,12 @@ bool event_impl::wait(uint32 ms) { _wc.pop_front(); !x ? (void)(x = w) : co::free(w, sizeof(*w)); } - x ? (void)(x->state = st_wait) : (void)(x = make_waitx(co)); + if (x) { + x->co = co; + x->state = st_wait; + } else { + x = make_waitx(co); + } co->waitx = x; _wc.push_back(x); } diff --git a/src/co/sched.cc b/src/co/sched.cc index a590fc279..ed9eccbb9 100644 --- a/src/co/sched.cc +++ b/src/co/sched.cc @@ -149,6 +149,8 @@ void Sched::loop() { co::vector ready_tasks(512); co::Timer timer; + co::set_thread_name("Co_Sched"); + while (!_x.stopped) { int n = _x.epoll->wait(_wait_ms); if (_x.stopped) break; diff --git a/src/log/log.cc b/src/log/log.cc index 7e58b888d..cc99e5c1f 100644 --- a/src/log/log.cc +++ b/src/log/log.cc @@ -636,6 +636,7 @@ void Logger::write_topic_logs(LogFile& f, const char* topic, const char* p, size void Logger::thread_fun() { bool signaled; int64 sec; + co::set_thread_name("Co_Logger"); while (atomic_load(&g_init_done, mo_acquire) != true) _log_event.wait(8); while (!_stop) { signaled = _log_event.wait(FLG_log_flush_ms); diff --git a/src/so/http.cc b/src/so/http.cc index d9af98a84..886af5872 100644 --- a/src/so/http.cc +++ b/src/so/http.cc @@ -680,7 +680,9 @@ void ServerImpl::start(const char* ip, int port, const char* key, const char* ca atomic_store(&_started, true, mo_relaxed); _serv.on_connection(&ServerImpl::on_connection, this); _serv.on_exit([this]() { co::del(this); }); - _serv.start(ip, port, key, ca); + int r = _serv.set_config(ip, port, key, ca).listen(); + CHECK(r == 0); + _serv.start(); } inline int hex2int(char c) { diff --git a/src/so/rpc.cc b/src/so/rpc.cc index 8d23ff50b..bdcacd322 100644 --- a/src/so/rpc.cc +++ b/src/so/rpc.cc @@ -70,7 +70,9 @@ class ServerImpl { atomic_store(&_started, true, mo_relaxed); _tcp_serv.on_connection(&ServerImpl::on_connection, this); _tcp_serv.on_exit([this]() { co::del(this); }); - _tcp_serv.start(ip, port, key, ca); + int r = _tcp_serv.set_config(ip, port, key, ca).listen(); + CHECK(r == 0); + _tcp_serv.start(); } bool started() const { return _started; } diff --git a/src/so/tcp.cc b/src/so/tcp.cc index e67220ade..a2645a4c6 100644 --- a/src/so/tcp.cc +++ b/src/so/tcp.cc @@ -185,7 +185,10 @@ class ServerImpl { _exit_cb = std::move(cb); } - void start(const char* ip, int port, const char* key, const char* ca); + void set_config(const char *ip, int port, const char *key, const char *ca); + uint16 get_port() const { return _port; } + int listen(int end_port); + void start(); void exit(); bool started() const { return _started; } @@ -224,7 +227,7 @@ class ServerImpl { } _addr; }; -void ServerImpl::start(const char* ip, int port, const char* key, const char* ca) { +void ServerImpl::set_config(const char *ip, int port, const char *key, const char *ca) { CHECK(_conn_cb != NULL) << "connection callback not set.."; _ip = (ip && *ip) ? ip : "0.0.0.0"; _port = (uint16)port; @@ -244,27 +247,28 @@ void ServerImpl::start(const char* ip, int port, const char* key, const char* ca CHECK_EQ(r, 1) << "ssl check private key error: " << ssl::strerror(); _on_sock = std::bind(&ServerImpl::on_ssl_connection, this, std::placeholders::_1); - this->ref(); - atomic_store(&_started, true, mo_relaxed); - go(&ServerImpl::loop, this); } else { _on_sock = std::bind(&ServerImpl::on_tcp_connection, this, std::placeholders::_1); - this->ref(); - atomic_store(&_started, true, mo_relaxed); - go(&ServerImpl::loop, this); } } +void ServerImpl::start() { + CHECK(_on_sock && _fd >= 0) << "must set_config and listen"; + this->ref(); + atomic_store(&_started, true, mo_relaxed); + go(&ServerImpl::loop, this); +} + void ServerImpl::exit() { int status = atomic_cas(&_status, 0, 1); if (status == 2) return; // already stopped if (status == 0) { - sleep::ms(1); - if (status != 2) go(&ServerImpl::stop, this); + co::sleep(1); + go(&ServerImpl::stop, this); } - while (_status != 2) sleep::ms(1); + while (_status != 2) co::sleep(1); } void ServerImpl::stop() { @@ -273,39 +277,94 @@ void ServerImpl::stop() { c.connect(-1); } -/** - * the server loop - * - It listens on a port and waits for connections. - * - When a connection is accepted, it will start a new coroutine and call - * the connection callback to handle the connection. - */ -void ServerImpl::loop() { - do { - fastring port = str::from(_port); - struct addrinfo* info = 0; - int r = getaddrinfo(_ip.c_str(), port.c_str(), NULL, &info); - CHECK_EQ(r, 0) << "invalid ip address: " << _ip << ':' << _port; - CHECK(info != NULL); +int ServerImpl::listen(int end_port) { + int r = EINVAL; + if (_fd >= 0) return r; + if (end_port < _port) end_port = _port; - _fd = co::tcp_socket(info->ai_family); - CHECK_NE(_fd, (sock_t)-1) << "create socket error: " << co::strerror(); - co::set_reuseaddr(_fd); + auto start_time = now::ms(); + + for (uint16 port_num = _port; port_num <= (uint16) end_port;) { + fastring port = str::from(port_num); + struct addrinfo *info = nullptr; + r = ::getaddrinfo(_ip.c_str(), port.c_str(), nullptr, &info); + if (r) { + ELOG << "getaddrinfo " << _ip << ':' << port << " error: (" << r << ") " << ::gai_strerror(r); + break; + } + CHECK(info); + + bool retry = false; + + sock_t fd = co::tcp_socket(info->ai_family); + if (fd < 0) { + r = errno; + goto end; + } + co::set_reuseaddr(fd); // turn off IPV6_V6ONLY if (info->ai_family == AF_INET6) { int on = 0; - co::setsockopt(_fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on)); + co::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on)); } - r = co::bind(_fd, info->ai_addr, (int)info->ai_addrlen); - CHECK_EQ(r, 0) << "bind " << _ip << ':' << _port << " failed: " << co::strerror(); + r = co::bind(fd, info->ai_addr, (int) info->ai_addrlen); + if (r < 0) { + r = errno; + if (r == EADDRNOTAVAIL) { + if (now::ms() - start_time <= 20000) { + retry = true; + DLOG << "bind " << _ip << ':' << port << " error: (" << r << ") " << co::strerror(r); + co::sleep(1000); + } else { + ELOG << "bind " << _ip << ':' << port << " error: (" << r << ") " << co::strerror(r); + } + } else if (r == EADDRINUSE && now::ms() - start_time <= 40000) { + retry = true; + DLOG << "bind " << _ip << ':' << port << " error: (" << r << ") " << co::strerror(r); + co::sleep(20); + if (++port_num > (uint16) end_port) port_num = _port; + } else { + ELOG << "bind " << _ip << ':' << port << " error: (" << r << ") " << co::strerror(r); + } + goto end; + } - r = co::listen(_fd, 64 * 1024); - CHECK_EQ(r, 0) << "listen error: " << co::strerror(); + r = co::listen(fd, 64 * 1024); + if (r < 0) { + r = errno; + if (r == EADDRINUSE && now::ms() - start_time <= 40000) { + retry = true; + DLOG << "listen " << _ip << ':' << port << " error: (" << r << ") " << co::strerror(r); + co::sleep(20); + if (++port_num > (uint16) end_port) port_num = _port; + } else { + ELOG << "listen " << _ip << ':' << port << " error: (" << r << ") " << co::strerror(r); + } + goto end; + } - freeaddrinfo(info); - } while (0); + _port = port_num; + _fd = fd; + fd = -1; + end: + if (fd >= 0) co::close(fd); + if (info) ::freeaddrinfo(info); + if (!retry) break; + } + + return r; +} + +/** + * the server loop + * - It listens on a port and waits for connections. + * - When a connection is accepted, it will start a new coroutine and call + * the connection callback to handle the connection. + */ +void ServerImpl::loop() { LOG << "server start: " << _ip << ':' << _port; while (true) { _addrlen = sizeof(_addr); @@ -395,10 +454,17 @@ uint32 Server::conn_num() const { return ((ServerImpl*)_p)->conn_num(); } -void Server::start(const char* ip, int port, const char* key, const char* ca) { - ((ServerImpl*)_p)->start(ip, port, key, ca); +Server &Server::set_config(const char *ip, int port, const char *key, const char *ca) { + ((ServerImpl *) _p)->set_config(ip, port, key, ca); + return *this; } +uint16 Server::get_port() const { return ((ServerImpl *) _p)->get_port(); } + +int Server::listen(int end_port) { return ((ServerImpl *) _p)->listen(end_port); } + +void Server::start() { ((ServerImpl *) _p)->start(); } + void Server::exit() { ((ServerImpl*)_p)->exit(); } diff --git a/src/tasked.cc b/src/tasked.cc index c7f6bed72..7728ff6b0 100644 --- a/src/tasked.cc +++ b/src/tasked.cc @@ -80,6 +80,8 @@ void TaskedImpl::loop() { co::Timer timer; co::vector tmp(32); + co::set_thread_name("Co_Tasked"); + while (!_stop) { timer.restart(); { diff --git a/test/so/echo.cc b/test/so/echo.cc index 2f71d7ddb..9ff1f8952 100644 --- a/test/so/echo.cc +++ b/test/so/echo.cc @@ -77,7 +77,10 @@ int main(int argc, char** argv) { flag::parse(argc, argv); if (FLG_s) { - tcp::Server().on_connection(conn_cb).start("0.0.0.0", FLG_p); + tcp::Server serv; + serv.on_connection(conn_cb).set_config("0.0.0.0", FLG_p); + if (serv.listen() != 0) return 1; + serv.start(); while (true) sleep::sec(1024); } else { g_count = (Count*) co::zalloc(sizeof(Count) * FLG_c); diff --git a/test/so/ssl.cc b/test/so/ssl.cc index 4d874101e..43e1b55f5 100644 --- a/test/so/ssl.cc +++ b/test/so/ssl.cc @@ -118,11 +118,15 @@ int main(int argc, char** argv) { CHECK(!FLG_ca.empty()) << "ssl certificate file not set.."; if (FLG_t == 0) { - serv.start(FLG_ip.c_str(), FLG_port, FLG_key.c_str(), FLG_ca.c_str()); + serv.set_config(FLG_ip.c_str(), FLG_port, FLG_key.c_str(), FLG_ca.c_str()); + if (serv.listen() != 0) return 1; + serv.start(); sleep::ms(32); go(client_fun); } else if (FLG_t == 1) { - serv.start(FLG_ip.c_str(), FLG_port, FLG_key.c_str(), FLG_ca.c_str()); + serv.set_config(FLG_ip.c_str(), FLG_port, FLG_key.c_str(), FLG_ca.c_str()); + if (serv.listen() != 0) return 1; + serv.start(); } else { go(client_fun); } diff --git a/test/so/tcp.cc b/test/so/tcp.cc index 93e2e457e..a80e14535 100644 --- a/test/so/tcp.cc +++ b/test/so/tcp.cc @@ -103,9 +103,15 @@ int main(int argc, char** argv) { [](void* p) { delete (tcp::Client*) p; } ); - tcp::Server().on_connection(conn_cb).start( + tcp::Server serv; + serv.on_connection(conn_cb).set_config( "0.0.0.0", FLG_port, FLG_key.c_str(), FLG_ca.c_str() ); + if (serv.listen() != 0) { + delete gPool; + return 1; + } + serv.start(); sleep::ms(32); diff --git a/test/so/tcp2.cc b/test/so/tcp2.cc index ffe99bfaa..399b9b679 100644 --- a/test/so/tcp2.cc +++ b/test/so/tcp2.cc @@ -117,9 +117,14 @@ int main(int argc, char** argv) { ); tcp::Server serv; - serv.on_connection(conn_cb).start( + serv.on_connection(conn_cb).set_config( FLG_ip.c_str(), FLG_port, FLG_key.c_str(), FLG_ca.c_str() ); + if (serv.listen() != 0) { + delete gPool; + return 1; + } + serv.start(); sleep::ms(32); if (FLG_client_num > 1) {