11#include " ipc/unix_socket_server.h"
22
3+ #include " common/logging.h"
4+
35std::shared_ptr<UnixSocketServer> UnixSocketServer::Create (const std::string &socket_path) {
46 return std::make_shared<UnixSocketServer>(socket_path);
57}
@@ -43,11 +45,13 @@ void UnixSocketServer::Start() {
4345
4446 if (bind (server_fd_, (sockaddr *)&addr, sizeof (addr)) == -1 ) {
4547 perror (" bind" );
48+ close (server_fd_);
4649 return ;
4750 }
4851
4952 if (listen (server_fd_, 16 ) == -1 ) {
5053 perror (" listen" );
54+ close (server_fd_);
5155 return ;
5256 }
5357
@@ -57,48 +61,85 @@ void UnixSocketServer::Start() {
5761
5862void UnixSocketServer::Stop () {
5963 running_ = false ;
64+
65+ if (server_fd_ >= 0 ) {
66+ shutdown (server_fd_, SHUT_RDWR);
67+ close (server_fd_);
68+ server_fd_ = -1 ;
69+ }
70+
6071 if (accept_thread_.joinable ())
6172 accept_thread_.join ();
6273
63- for (auto &[fd, t] : client_threads_) {
74+ std::unordered_map<int , std::thread> local_clients;
75+ {
76+ std::lock_guard<std::mutex> lock (mutex_);
77+ for (const auto &[fd, _] : client_threads_) {
78+ shutdown (fd, SHUT_RDWR);
79+ }
80+
81+ local_clients.swap (client_threads_);
82+ }
83+
84+ for (auto &[fd, thread] : local_clients) {
85+ if (thread.joinable ())
86+ thread.join (); // already detached ones won't be in here
6487 close (fd);
65- if (t.joinable ())
66- t.join ();
6788 }
6889
69- if (server_fd_ >= 0 )
70- close (server_fd_);
7190 unlink (socket_path_.c_str ());
7291}
7392
7493void UnixSocketServer::AcceptLoop () {
7594 while (running_) {
7695 int client_fd = accept (server_fd_, nullptr , nullptr );
77- if (client_fd < 0 )
96+ if (client_fd < 0 ) {
97+ if (running_) {
98+ perror (" accept" );
99+ std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
100+ }
78101 continue ;
79- std::lock_guard<std::mutex> lock (mutex_);
80- client_threads_[client_fd] = std::thread (&UnixSocketServer::HandleClient, this , client_fd);
102+ }
103+
104+ std::thread t (&UnixSocketServer::HandleClient, this , client_fd);
105+ {
106+ std::lock_guard<std::mutex> lock (mutex_);
107+ client_threads_[client_fd] = std::move (t);
108+ }
81109 }
82110}
83111
84112void UnixSocketServer::HandleClient (int client_fd) {
85113 char buffer[1024 ];
86114 while (running_) {
87115 int n = read (client_fd, buffer, sizeof (buffer));
88- if (n <= 0 )
116+ if (n <= 0 ) {
89117 break ;
118+ }
90119
91120 std::string msg (buffer, n);
92- std::cout << " [IPC ] Received: " << msg << std::endl ;
121+ DEBUG_PRINT ( " [%d ] Received: %s " , client_fd, msg. c_str ()) ;
93122
94123 std::lock_guard<std::mutex> lock (mutex_);
95- for (auto &[_, callback] : peer_callbacks_) {
124+ for (const auto &[_, callback] : peer_callbacks_) {
96125 if (callback) {
97126 callback (msg);
98127 }
99128 }
100129 }
130+
101131 close (client_fd);
102- std::lock_guard<std::mutex> lock (mutex_);
103- client_threads_.erase (client_fd);
132+
133+ {
134+ std::lock_guard<std::mutex> lock (mutex_);
135+ auto it = client_threads_.find (client_fd);
136+ if (it != client_threads_.end ()) {
137+ if (std::this_thread::get_id () == it->second .get_id ()) {
138+ it->second .detach ();
139+ }
140+ client_threads_.erase (it);
141+ }
142+ }
143+
144+ DEBUG_PRINT (" [%d] leaved!" , client_fd);
104145}
0 commit comments