Skip to content

Commit dfcfbfd

Browse files
falucocodebot
authored andcommitted
WS: Fix race condition on server shutdown
1 parent 5f18227 commit dfcfbfd

File tree

1 file changed

+66
-49
lines changed

1 file changed

+66
-49
lines changed

apps/services/remote_control/remote_server.cpp

Lines changed: 66 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
#include "apps/services/remote_control/remote_control_appconfig.h"
1414
#include "nlohmann/json.hpp"
1515
#include "srsran/srslog/srslog.h"
16+
#include "srsran/support/synchronization/stop_event.h"
17+
#include "srsran/support/synchronization/sync_event.h"
1618
#ifndef __clang__
1719
#pragma GCC diagnostic push
1820
#pragma GCC diagnostic ignored "-Wstringop-overflow"
@@ -111,6 +113,7 @@ class remote_server_impl : public remote_server
111113
std::unordered_map<std::string, std::unique_ptr<remote_command>> commands;
112114
std::set<socket_type*> metrics_subscribers;
113115
socket_type* current_cmd_client = nullptr;
116+
stop_event_source stop_control;
114117

115118
/// Metrics subscription command.
116119
class metrics_subscribe_command : public remote_command
@@ -189,51 +192,56 @@ class remote_server_impl : public remote_server
189192
cmd = std::move(remote_cmd);
190193
}
191194

192-
thread = unique_thread("ws_server", [this, bind_addr, port, enable_metrics_subscription]() {
193-
uWS::App ws_server;
194-
ws_server
195-
.ws<dummy_type>("/*",
196-
{.compression = uWS::CompressOptions(uWS::DISABLED),
197-
.maxPayloadLength = 16 * 1024,
198-
.idleTimeout = 120,
199-
.maxBackpressure = 16 * 1024 * 1024,
200-
.closeOnBackpressureLimit = false,
201-
.resetIdleTimeoutOnSend = false,
202-
.sendPingsAutomatically = true,
203-
.message =
204-
[this](socket_type* ws, std::string_view message, uWS::OpCode opCode) {
205-
// Only parse text based messages.
206-
if (opCode != uWS::OpCode::TEXT) {
207-
ws->send(
208-
build_error_response("This WebSocket server only supports opcode TEXT messages"),
209-
uWS::OpCode::TEXT,
210-
false);
211-
return;
212-
}
213-
214-
current_cmd_client = ws;
215-
auto restore_client = make_scope_exit([this]() { current_cmd_client = nullptr; });
216-
217-
// Handle the incoming message and return back the response.
218-
std::string response = handle_command(message);
219-
ws->send(response, uWS::OpCode::TEXT, false);
220-
},
221-
.close = [this](socket_type* ws, int, std::string_view) { metrics_subscribers.erase(ws); }})
222-
.listen(bind_addr, port, [bind_addr, port](auto* listen_socket) {
223-
if (listen_socket) {
224-
fmt::println("Remote control server listening on {}:{}", bind_addr, port);
225-
} else {
226-
fmt::println("Remote control server cannot listen on {}:{}", bind_addr, port);
227-
}
228-
});
229-
230-
server.store(&ws_server, std::memory_order_relaxed);
231-
server_loop.store(uWS::Loop::get(), std::memory_order_relaxed);
232-
if (enable_metrics_subscription) {
233-
static_cast<remote_server_sink*>(srslog::find_sink(remote_server_sink::name()))->set_server(this);
234-
}
235-
ws_server.run();
236-
});
195+
sync_event start_event;
196+
thread = unique_thread(
197+
"ws_server", [this, bind_addr, port, enable_metrics_subscription, token = start_event.get_token()]() mutable {
198+
uWS::App ws_server;
199+
ws_server
200+
.ws<dummy_type>(
201+
"/*",
202+
{.compression = uWS::CompressOptions(uWS::DISABLED),
203+
.maxPayloadLength = 16 * 1024,
204+
.idleTimeout = 120,
205+
.maxBackpressure = 16 * 1024 * 1024,
206+
.closeOnBackpressureLimit = false,
207+
.resetIdleTimeoutOnSend = false,
208+
.sendPingsAutomatically = true,
209+
.message =
210+
[this](socket_type* ws, std::string_view message, uWS::OpCode opCode) {
211+
// Only parse text based messages.
212+
if (opCode != uWS::OpCode::TEXT) {
213+
ws->send(build_error_response("This WebSocket server only supports opcode TEXT messages"),
214+
uWS::OpCode::TEXT,
215+
false);
216+
return;
217+
}
218+
219+
current_cmd_client = ws;
220+
auto restore_client = make_scope_exit([this]() { current_cmd_client = nullptr; });
221+
222+
// Handle the incoming message and return back the response.
223+
std::string response = handle_command(message);
224+
ws->send(response, uWS::OpCode::TEXT, false);
225+
},
226+
.close = [this](socket_type* ws, int, std::string_view) { metrics_subscribers.erase(ws); }})
227+
.listen(bind_addr, port, [bind_addr, port](auto* listen_socket) {
228+
if (listen_socket) {
229+
fmt::println("Remote control server listening on {}:{}", bind_addr, port);
230+
} else {
231+
fmt::println("Remote control server cannot listen on {}:{}", bind_addr, port);
232+
}
233+
});
234+
235+
server = &ws_server;
236+
server_loop = uWS::Loop::get();
237+
if (enable_metrics_subscription) {
238+
static_cast<remote_server_sink*>(srslog::find_sink(remote_server_sink::name()))->set_server(this);
239+
}
240+
token.reset();
241+
ws_server.run();
242+
});
243+
244+
start_event.wait();
237245
}
238246

239247
// See interface for documentation.
@@ -244,21 +252,30 @@ class remote_server_impl : public remote_server
244252
{
245253
// Wait for completion.
246254
if (thread.running()) {
247-
server_loop.load(std::memory_order_relaxed)->defer([this]() {
255+
// Make sure all pending tasks have been completed before closing the server.
256+
stop_control.stop();
257+
258+
server_loop.load()->defer([this]() {
248259
// Disconnect remote sink from the server, this will prevent metrics being processed after the server is closed.
249260
static_cast<remote_server_sink*>(srslog::find_sink(remote_server_sink::name()))->set_server(nullptr);
250261
server.load()->close();
251262
});
263+
252264
thread.join();
253-
server_loop.store(nullptr, std::memory_order_relaxed);
254-
server.store(nullptr, std::memory_order_relaxed);
265+
server_loop = nullptr;
266+
server = nullptr;
255267
}
256268
}
257269

258270
/// Sends the given metrics to all registered metrics subscribers.
259271
void send_metrics(std::string metrics_)
260272
{
261-
server_loop.load(std::memory_order_relaxed)->defer([metrics = std::move(metrics_), this]() {
273+
auto token = stop_control.get_token();
274+
if (SRSRAN_UNLIKELY(token.is_stop_requested())) {
275+
return;
276+
}
277+
278+
server_loop.load()->defer([this, metrics = std::move(metrics_), tk = std::move(token)]() {
262279
for (auto* subscriber : metrics_subscribers) {
263280
subscriber->send(metrics, uWS::OpCode::TEXT, false);
264281
}

0 commit comments

Comments
 (0)