Skip to content

Commit 0cfa5f1

Browse files
committed
Start client worker thread on connect, stop client worker thread after ws closed
1 parent 1a3a57c commit 0cfa5f1

File tree

2 files changed

+56
-29
lines changed

2 files changed

+56
-29
lines changed

zorro_websocket_proxy_client/include/zorro_websocket_proxy_client.h

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,14 @@ namespace websocket {
4343
ZorroWebsocketProxyClient(WebsocketProxyCallback* callback, std::string&& name, broker_error broker_err, broker_progress broker_prog);
4444
virtual ~ZorroWebsocketProxyClient();
4545

46-
void setLogger(log log_func) noexcept { log_ = log_func; }
46+
void setLogger(log log_func = NoLog) noexcept {
47+
log_ = log_func;
48+
}
49+
4750
DWORD serverId() const noexcept { return server_pid_; }
48-
std::pair<uint32_t, bool> openWs(const std::string& url);
49-
bool closeWs(uint32_t id = 0);
51+
52+
std::pair<uint32_t, bool> openWebSocket(const std::string& url);
53+
bool closeWebSocket(uint32_t id = 0);
5054
void send(uint32_t id, const char* msg, size_t len);
5155

5256
private:
@@ -57,9 +61,10 @@ namespace websocket {
5761
bool connect();
5862
bool spawnWebsocketsProxyServer();
5963
bool _register();
60-
void unregister();
64+
void unregister(bool destroying = false);
6165
void sendMessage(Message* msg, uint64_t index, uint32_t size);
62-
bool waitForResponse(Message* msg, uint32_t timeout = 10000) const;
66+
bool waitForResponse(Message* msg, uint32_t timeout = 10000);
67+
bool sendHeartbeat(uint64_t now);
6368
void doWork();
6469
void handleWsOpen(Message* msg);
6570
void handleWsClose(Message* msg);
@@ -97,10 +102,10 @@ namespace websocket {
97102
uint32_t id_ = 0;
98103
const DWORD pid_;
99104
std::atomic<DWORD> server_pid_ = 0;
100-
std::atomic_bool run_{ true };
105+
std::atomic_bool run_{ false };
101106
std::string name_;
102107
std::unordered_set<uint32_t> websockets_;
103-
std::thread worker_thread_;
108+
std::unique_ptr<std::thread> worker_thread_;
104109
log log_ = NoLog;
105110
};
106111

zorro_websocket_proxy_client/src/zorro_websocket_proxy_client.cpp

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,20 @@ ZorroWebsocketProxyClient::ZorroWebsocketProxyClient(WebsocketProxyCallback* cal
2020
, broker_progress_(broker_prog)
2121
, pid_(GetCurrentProcessId())
2222
, name_(std::move(name))
23-
, worker_thread_([this]() { doWork(); })
24-
{}
23+
{
24+
}
2525

2626
ZorroWebsocketProxyClient::~ZorroWebsocketProxyClient() {
2727
if (server_pid_.load(std::memory_order_relaxed)) {
2828
unregister();
2929
}
30-
run_.store(false, std::memory_order_release);
31-
if (worker_thread_.joinable()) {
32-
// join causes dead lock
33-
//worker_thread_.join();
34-
worker_thread_.detach();
35-
}
3630
}
3731

3832
bool ZorroWebsocketProxyClient::connect() {
33+
if (!worker_thread_) {
34+
worker_thread_ = std::make_unique<std::thread>([this]() { doWork(); });
35+
}
36+
3937
if (!spawnWebsocketsProxyServer()) {
4038
log_(L_ERROR, "Failed to spawn websocket proxy");
4139
return false;
@@ -50,14 +48,17 @@ void ZorroWebsocketProxyClient::sendMessage(Message* msg, uint64_t index, uint32
5048
last_heartbeat_time_ = get_timestamp();
5149
}
5250

53-
bool ZorroWebsocketProxyClient::waitForResponse(Message* msg, uint32_t timeout) const {
51+
bool ZorroWebsocketProxyClient::waitForResponse(Message* msg, uint32_t timeout) {
5452
auto start = get_timestamp();
55-
while (msg->status.load(std::memory_order_relaxed()) == Message::Status::PENDING) {
56-
if ((get_timestamp() - start) > timeout) {
53+
while (msg->status.load(std::memory_order_relaxed) == Message::Status::PENDING) {
54+
auto now = get_timestamp();
55+
if ((now - start) > timeout) {
5756
return false;
5857
}
5958
broker_progress_(1);
60-
std::this_thread::yield();
59+
if (!sendHeartbeat(now)) {
60+
std::this_thread::yield();
61+
}
6162
}
6263
return true;
6364
}
@@ -138,15 +139,26 @@ bool ZorroWebsocketProxyClient::_register() {
138139
return true;
139140
}
140141

141-
void ZorroWebsocketProxyClient::unregister() {
142+
void ZorroWebsocketProxyClient::unregister(bool destroying) {
142143
auto [msg, index, size] = reserveMessage();
143144
msg->type = Message::Type::Unregister;
144145
sendMessage(msg, index, size);
145146
server_pid_.store(0, std::memory_order_release);
146147
log_(L_INFO, "Unregistered, pid=" + std::to_string(pid_));
148+
run_.store(false, std::memory_order_release);
149+
if (worker_thread_ && worker_thread_->joinable()) {
150+
if (destroying) {
151+
#ifdef _WIN32
152+
worker_thread_->detach();
153+
return;
154+
#endif // _WIN32
155+
}
156+
worker_thread_->join();
157+
worker_thread_.reset();
158+
}
147159
}
148160

149-
std::pair<uint32_t, bool> ZorroWebsocketProxyClient::openWs(const std::string& url) {
161+
std::pair<uint32_t, bool> ZorroWebsocketProxyClient::openWebSocket(const std::string& url) {
150162
if (url.size() > 255) {
151163
brokerError("URL is to long. limit is 255 character");
152164
return std::make_pair(0, false);
@@ -181,13 +193,18 @@ std::pair<uint32_t, bool> ZorroWebsocketProxyClient::openWs(const std::string& u
181193
return std::make_pair(req->id, req->new_connection);
182194
}
183195

184-
bool ZorroWebsocketProxyClient::closeWs(uint32_t id) {
196+
bool ZorroWebsocketProxyClient::closeWebSocket(uint32_t id) {
185197
auto [msg, index, size] = reserveMessage<WsClose>();
186198
msg->type = Message::Type::CloseWs;
187199
auto req = reinterpret_cast<WsClose*>(msg->data);
188200
req->id = !id ? id_ : id;
189201
log_(L_INFO, "Close ws " + std::to_string(req->id));
190202
sendMessage(msg, index, size);
203+
if (!waitForResponse(msg)) {
204+
log_(L_DEBUG, "Close ws timedout");
205+
return false;
206+
}
207+
unregister();
191208
return true;
192209
}
193210

@@ -202,6 +219,7 @@ void ZorroWebsocketProxyClient::send(uint32_t id, const char* data, size_t len)
202219
}
203220

204221
void ZorroWebsocketProxyClient::doWork() {
222+
run_.store(true, std::memory_order_release);
205223
while (run_.load(std::memory_order_relaxed)) {
206224
auto server_pid = server_pid_.load(std::memory_order_relaxed);
207225
if (!server_pid) {
@@ -234,13 +252,7 @@ void ZorroWebsocketProxyClient::doWork() {
234252
}
235253
}
236254

237-
bool heartbeat_sent = false;
238-
if (server_pid && (now - last_heartbeat_time_) > HEARTBEAT_INTERVAL) {
239-
auto [msg, index, size] = reserveMessage();
240-
msg->type = Message::Type::Heartbeat;
241-
sendMessage(msg, index, size);
242-
heartbeat_sent = true;
243-
}
255+
bool heartbeat_sent = sendHeartbeat(now);
244256

245257
if (!result.first) {
246258
if (last_server_heartbeat_time_ && (now - last_server_heartbeat_time_) > HEARTBEAT_TIMEOUT) {
@@ -261,6 +273,16 @@ void ZorroWebsocketProxyClient::doWork() {
261273
}
262274
}
263275

276+
bool ZorroWebsocketProxyClient::sendHeartbeat(uint64_t now) {
277+
if (server_pid_.load(std::memory_order_relaxed) && (now - last_heartbeat_time_) > HEARTBEAT_INTERVAL) {
278+
auto [msg, index, size] = reserveMessage();
279+
msg->type = Message::Type::Heartbeat;
280+
sendMessage(msg, index, size);
281+
return true;
282+
}
283+
return false;
284+
}
285+
264286
void ZorroWebsocketProxyClient::handleWsOpen(Message* msg) {
265287
auto open = reinterpret_cast<WsOpen*>(msg->data);
266288
log_(L_DEBUG, "handleWsOPen, initiator=" + std::to_string(open->initiator));

0 commit comments

Comments
 (0)