Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"image": "mcr.microsoft.com/devcontainers/universal:2",
"features": {}
}
11 changes: 11 additions & 0 deletions include/co/co/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,15 @@ class __coapi sync_event {
DISALLOW_COPY_AND_ASSIGN(sync_event);
};

inline int set_thread_name(const char *name) {
#ifdef _WIN32
return 0;
#elif defined(__APPLE__)
::pthread_setname_np(name);
return 0;
#else
return ::pthread_setname_np(::pthread_self(), name);
#endif
}

} // co
2 changes: 1 addition & 1 deletion include/co/god.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ constexpr bool is_scalar() {
return std::is_scalar<T>::value;
}

#if defined(__GNUC__) && __GNUC__ < 5
#if defined(__GNUC__) && __GNUC__ < 5 && !defined(__APPLE__)
template<typename T>
constexpr bool is_trivially_copyable() {
return __has_trivial_copy(T);
Expand Down
19 changes: 14 additions & 5 deletions include/co/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,7 @@ class __coapi Server final {
uint32 conn_num() const;

/**
* start the server
* - The server will loop in a coroutine, and it will not block the calling thread.
* - The user MUST call on_connection() to set a connection callback before start()
* was called.
* config the server
* - By default, key and ca are NULL, and ssl is disabled.
*
* @param ip server ip, either an ipv4 or ipv6 address.
Expand All @@ -127,7 +124,19 @@ class __coapi Server final {
* @param key path of ssl private key file.
* @param ca path of ssl certificate file.
*/
void start(const char* ip, int port, const char* key=0, const char* ca=0);
Server &set_config(const char *ip, int port, const char *key = nullptr, const char *ca = nullptr);

uint16 get_port() const;

int listen(int end_port = 0);

/**
* start the server
* - The server will loop in a coroutine, and it will not block the calling thread.
* - The user MUST call on_connection() to set a connection callback before start()
* was called.
*/
void start();

/**
* exit the server gracefully
Expand Down
7 changes: 6 additions & 1 deletion src/co/co.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,12 @@ bool event_impl::wait(uint32 ms) {
_wc.pop_front();
!x ? (void)(x = w) : co::free(w, sizeof(*w));
}
x ? (void)(x->state = st_wait) : (void)(x = make_waitx(co));
if (x) {
x->co = co;
x->state = st_wait;
} else {
x = make_waitx(co);
}
co->waitx = x;
_wc.push_back(x);
}
Expand Down
2 changes: 2 additions & 0 deletions src/co/sched.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ void Sched::loop() {
co::vector<Coroutine*> ready_tasks(512);
co::Timer timer;

co::set_thread_name("Co_Sched");

while (!_x.stopped) {
int n = _x.epoll->wait(_wait_ms);
if (_x.stopped) break;
Expand Down
1 change: 1 addition & 0 deletions src/log/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ void Logger::write_topic_logs(LogFile& f, const char* topic, const char* p, size
void Logger::thread_fun() {
bool signaled;
int64 sec;
co::set_thread_name("Co_Logger");
while (atomic_load(&g_init_done, mo_acquire) != true) _log_event.wait(8);
while (!_stop) {
signaled = _log_event.wait(FLG_log_flush_ms);
Expand Down
4 changes: 3 additions & 1 deletion src/so/http.cc
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,9 @@ void ServerImpl::start(const char* ip, int port, const char* key, const char* ca
atomic_store(&_started, true, mo_relaxed);
_serv.on_connection(&ServerImpl::on_connection, this);
_serv.on_exit([this]() { co::del(this); });
_serv.start(ip, port, key, ca);
int r = _serv.set_config(ip, port, key, ca).listen();
CHECK(r == 0);
_serv.start();
}

inline int hex2int(char c) {
Expand Down
4 changes: 3 additions & 1 deletion src/so/rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ class ServerImpl {
atomic_store(&_started, true, mo_relaxed);
_tcp_serv.on_connection(&ServerImpl::on_connection, this);
_tcp_serv.on_exit([this]() { co::del(this); });
_tcp_serv.start(ip, port, key, ca);
int r = _tcp_serv.set_config(ip, port, key, ca).listen();
CHECK(r == 0);
_tcp_serv.start();
}

bool started() const { return _started; }
Expand Down
138 changes: 102 additions & 36 deletions src/so/tcp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ class ServerImpl {
_exit_cb = std::move(cb);
}

void start(const char* ip, int port, const char* key, const char* ca);
void set_config(const char *ip, int port, const char *key, const char *ca);
uint16 get_port() const { return _port; }
int listen(int end_port);
void start();
void exit();
bool started() const { return _started; }

Expand Down Expand Up @@ -224,7 +227,7 @@ class ServerImpl {
} _addr;
};

void ServerImpl::start(const char* ip, int port, const char* key, const char* ca) {
void ServerImpl::set_config(const char *ip, int port, const char *key, const char *ca) {
CHECK(_conn_cb != NULL) << "connection callback not set..";
_ip = (ip && *ip) ? ip : "0.0.0.0";
_port = (uint16)port;
Expand All @@ -244,27 +247,28 @@ void ServerImpl::start(const char* ip, int port, const char* key, const char* ca
CHECK_EQ(r, 1) << "ssl check private key error: " << ssl::strerror();

_on_sock = std::bind(&ServerImpl::on_ssl_connection, this, std::placeholders::_1);
this->ref();
atomic_store(&_started, true, mo_relaxed);
go(&ServerImpl::loop, this);
} else {
_on_sock = std::bind(&ServerImpl::on_tcp_connection, this, std::placeholders::_1);
this->ref();
atomic_store(&_started, true, mo_relaxed);
go(&ServerImpl::loop, this);
}
}

void ServerImpl::start() {
CHECK(_on_sock && _fd >= 0) << "must set_config and listen";
this->ref();
atomic_store(&_started, true, mo_relaxed);
go(&ServerImpl::loop, this);
}

void ServerImpl::exit() {
int status = atomic_cas(&_status, 0, 1);
if (status == 2) return; // already stopped

if (status == 0) {
sleep::ms(1);
if (status != 2) go(&ServerImpl::stop, this);
co::sleep(1);
go(&ServerImpl::stop, this);
}

while (_status != 2) sleep::ms(1);
while (_status != 2) co::sleep(1);
}

void ServerImpl::stop() {
Expand All @@ -273,39 +277,94 @@ void ServerImpl::stop() {
c.connect(-1);
}

/**
* the server loop
* - It listens on a port and waits for connections.
* - When a connection is accepted, it will start a new coroutine and call
* the connection callback to handle the connection.
*/
void ServerImpl::loop() {
do {
fastring port = str::from(_port);
struct addrinfo* info = 0;
int r = getaddrinfo(_ip.c_str(), port.c_str(), NULL, &info);
CHECK_EQ(r, 0) << "invalid ip address: " << _ip << ':' << _port;
CHECK(info != NULL);
int ServerImpl::listen(int end_port) {
int r = EINVAL;
if (_fd >= 0) return r;
if (end_port < _port) end_port = _port;

_fd = co::tcp_socket(info->ai_family);
CHECK_NE(_fd, (sock_t)-1) << "create socket error: " << co::strerror();
co::set_reuseaddr(_fd);
auto start_time = now::ms();

for (uint16 port_num = _port; port_num <= (uint16) end_port;) {
fastring port = str::from(port_num);
struct addrinfo *info = nullptr;
r = ::getaddrinfo(_ip.c_str(), port.c_str(), nullptr, &info);
if (r) {
ELOG << "getaddrinfo " << _ip << ':' << port << " error: (" << r << ") " << ::gai_strerror(r);
break;
}
CHECK(info);

bool retry = false;

sock_t fd = co::tcp_socket(info->ai_family);
if (fd < 0) {
r = errno;
goto end;
}
co::set_reuseaddr(fd);

// turn off IPV6_V6ONLY
if (info->ai_family == AF_INET6) {
int on = 0;
co::setsockopt(_fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on));
co::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on));
}

r = co::bind(_fd, info->ai_addr, (int)info->ai_addrlen);
CHECK_EQ(r, 0) << "bind " << _ip << ':' << _port << " failed: " << co::strerror();
r = co::bind(fd, info->ai_addr, (int) info->ai_addrlen);
if (r < 0) {
r = errno;
if (r == EADDRNOTAVAIL) {
if (now::ms() - start_time <= 20000) {
retry = true;
DLOG << "bind " << _ip << ':' << port << " error: (" << r << ") " << co::strerror(r);
co::sleep(1000);
} else {
ELOG << "bind " << _ip << ':' << port << " error: (" << r << ") " << co::strerror(r);
}
} else if (r == EADDRINUSE && now::ms() - start_time <= 40000) {
retry = true;
DLOG << "bind " << _ip << ':' << port << " error: (" << r << ") " << co::strerror(r);
co::sleep(20);
if (++port_num > (uint16) end_port) port_num = _port;
} else {
ELOG << "bind " << _ip << ':' << port << " error: (" << r << ") " << co::strerror(r);
}
goto end;
}

r = co::listen(_fd, 64 * 1024);
CHECK_EQ(r, 0) << "listen error: " << co::strerror();
r = co::listen(fd, 64 * 1024);
if (r < 0) {
r = errno;
if (r == EADDRINUSE && now::ms() - start_time <= 40000) {
retry = true;
DLOG << "listen " << _ip << ':' << port << " error: (" << r << ") " << co::strerror(r);
co::sleep(20);
if (++port_num > (uint16) end_port) port_num = _port;
} else {
ELOG << "listen " << _ip << ':' << port << " error: (" << r << ") " << co::strerror(r);
}
goto end;
}

freeaddrinfo(info);
} while (0);
_port = port_num;
_fd = fd;
fd = -1;

end:
if (fd >= 0) co::close(fd);
if (info) ::freeaddrinfo(info);
if (!retry) break;
}

return r;
}

/**
* the server loop
* - It listens on a port and waits for connections.
* - When a connection is accepted, it will start a new coroutine and call
* the connection callback to handle the connection.
*/
void ServerImpl::loop() {
LOG << "server start: " << _ip << ':' << _port;
while (true) {
_addrlen = sizeof(_addr);
Expand Down Expand Up @@ -395,10 +454,17 @@ uint32 Server::conn_num() const {
return ((ServerImpl*)_p)->conn_num();
}

void Server::start(const char* ip, int port, const char* key, const char* ca) {
((ServerImpl*)_p)->start(ip, port, key, ca);
Server &Server::set_config(const char *ip, int port, const char *key, const char *ca) {
((ServerImpl *) _p)->set_config(ip, port, key, ca);
return *this;
}

uint16 Server::get_port() const { return ((ServerImpl *) _p)->get_port(); }

int Server::listen(int end_port) { return ((ServerImpl *) _p)->listen(end_port); }

void Server::start() { ((ServerImpl *) _p)->start(); }

void Server::exit() {
((ServerImpl*)_p)->exit();
}
Expand Down
2 changes: 2 additions & 0 deletions src/tasked.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ void TaskedImpl::loop() {
co::Timer timer;
co::vector<Task> tmp(32);

co::set_thread_name("Co_Tasked");

while (!_stop) {
timer.restart();
{
Expand Down
5 changes: 4 additions & 1 deletion test/so/echo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ int main(int argc, char** argv) {
flag::parse(argc, argv);

if (FLG_s) {
tcp::Server().on_connection(conn_cb).start("0.0.0.0", FLG_p);
tcp::Server serv;
serv.on_connection(conn_cb).set_config("0.0.0.0", FLG_p);
if (serv.listen() != 0) return 1;
serv.start();
while (true) sleep::sec(1024);
} else {
g_count = (Count*) co::zalloc(sizeof(Count) * FLG_c);
Expand Down
8 changes: 6 additions & 2 deletions test/so/ssl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,15 @@ int main(int argc, char** argv) {
CHECK(!FLG_ca.empty()) << "ssl certificate file not set..";

if (FLG_t == 0) {
serv.start(FLG_ip.c_str(), FLG_port, FLG_key.c_str(), FLG_ca.c_str());
serv.set_config(FLG_ip.c_str(), FLG_port, FLG_key.c_str(), FLG_ca.c_str());
if (serv.listen() != 0) return 1;
serv.start();
sleep::ms(32);
go(client_fun);
} else if (FLG_t == 1) {
serv.start(FLG_ip.c_str(), FLG_port, FLG_key.c_str(), FLG_ca.c_str());
serv.set_config(FLG_ip.c_str(), FLG_port, FLG_key.c_str(), FLG_ca.c_str());
if (serv.listen() != 0) return 1;
serv.start();
} else {
go(client_fun);
}
Expand Down
8 changes: 7 additions & 1 deletion test/so/tcp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,15 @@ int main(int argc, char** argv) {
[](void* p) { delete (tcp::Client*) p; }
);

tcp::Server().on_connection(conn_cb).start(
tcp::Server serv;
serv.on_connection(conn_cb).set_config(
"0.0.0.0", FLG_port, FLG_key.c_str(), FLG_ca.c_str()
);
if (serv.listen() != 0) {
delete gPool;
return 1;
}
serv.start();

sleep::ms(32);

Expand Down
7 changes: 6 additions & 1 deletion test/so/tcp2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,14 @@ int main(int argc, char** argv) {
);

tcp::Server serv;
serv.on_connection(conn_cb).start(
serv.on_connection(conn_cb).set_config(
FLG_ip.c_str(), FLG_port, FLG_key.c_str(), FLG_ca.c_str()
);
if (serv.listen() != 0) {
delete gPool;
return 1;
}
serv.start();
sleep::ms(32);

if (FLG_client_num > 1) {
Expand Down