Skip to content

Commit 38bc087

Browse files
committed
[websockets] Use lws_cancel() to trigger send so that it works event if the lib is compiled againt libuv and libevents
1 parent cac5220 commit 38bc087

File tree

3 files changed

+37
-10
lines changed

3 files changed

+37
-10
lines changed

src/rpc/RpcBase.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ bool RpcBase::call(const std::string& action,
8888
std::stringstream expected_id;
8989
expected_id << m_transaction_id;
9090
std::shared_ptr<RpcMessage> rpc_message;
91-
auto wait_time = std::chrono::steady_clock().now() + timeout;
91+
auto wait_time = std::chrono::steady_clock().now() + timeout;
9292
do
9393
{
9494
// Compute timeout
@@ -179,8 +179,11 @@ void RpcBase::stop()
179179
// Check if already started
180180
if (m_rx_thread)
181181
{
182-
// Stop reception thread
182+
// Stop queues
183+
m_results_queue.setEnable(false);
183184
m_requests_queue.setEnable(false);
185+
186+
// Stop reception thread
184187
m_rx_thread->join();
185188
delete m_rx_thread;
186189
m_rx_thread = nullptr;
@@ -313,7 +316,7 @@ bool RpcBase::decodeCall(const std::string& unique_id,
313316
{
314317
// Add request to the queue
315318
auto msg = std::make_shared<RpcMessage>(unique_id, action.GetString(), rpc_frame, payload);
316-
m_requests_queue.push(msg);
319+
m_requests_queue.push(std::move(msg));
317320

318321
ret = true;
319322
}
@@ -331,7 +334,7 @@ bool RpcBase::decodeCallResult(const std::string& unique_id, rapidjson::Document
331334
{
332335
// Add result to the queue
333336
auto msg = std::make_shared<RpcMessage>(unique_id, rpc_frame, payload);
334-
m_results_queue.push(msg);
337+
m_results_queue.push(std::move(msg));
335338

336339
ret = true;
337340
}
@@ -353,7 +356,7 @@ bool RpcBase::decodeCallError(const std::string& unique_id,
353356
{
354357
// Add error to the queue
355358
auto msg = std::make_shared<RpcMessage>(unique_id, rpc_frame, payload, &error, &message);
356-
m_results_queue.push(msg);
359+
m_results_queue.push(std::move(msg));
357360

358361
ret = true;
359362
}

src/websockets/libwebsockets/LibWebsocketClient.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ bool LibWebsocketClient::send(const void* data, size_t size)
217217
ret = m_send_msgs.push(msg);
218218

219219
// Schedule a send
220-
lws_callback_on_writable(m_wsi);
220+
lws_cancel_service(m_context);
221221
}
222222

223223
return ret;
@@ -372,11 +372,21 @@ int LibWebsocketClient::eventCallback(struct lws* wsi, enum lws_callback_reasons
372372
client->m_listener->wsClientDataReceived(in, len);
373373
break;
374374

375+
case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
376+
{
377+
// Triggers a send
378+
if (!client->m_end && !client->m_send_msgs.empty())
379+
{
380+
lws_callback_on_writable(client->m_wsi);
381+
}
382+
}
383+
break;
384+
375385
case LWS_CALLBACK_CLIENT_WRITEABLE:
376386
{
377387
// Send data if any ready
378388
bool error = false;
379-
SendMsg* msg = nullptr;
389+
SendMsg* msg = nullptr;
380390
while (client->m_send_msgs.pop(msg, 0) && !error)
381391
{
382392
if (lws_write(wsi, msg->payload, msg->size, LWS_WRITE_TEXT) < static_cast<int>(msg->size))

src/websockets/libwebsockets/LibWebsocketServer.cpp

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,20 @@ int LibWebsocketServer::eventCallback(struct lws* wsi, enum lws_callback_reasons
414414
}
415415
break;
416416

417+
case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
418+
{
419+
// Trigger close or send
420+
for (const auto& iter_client : server->m_clients)
421+
{
422+
Client* client = dynamic_cast<Client*>(iter_client.second.get());
423+
if (!client->m_connected || !client->m_send_msgs.empty())
424+
{
425+
lws_callback_on_writable(client->m_wsi);
426+
}
427+
}
428+
}
429+
break;
430+
417431
case LWS_CALLBACK_SERVER_WRITEABLE:
418432
{
419433
// Get corresponding client
@@ -425,7 +439,7 @@ int LibWebsocketServer::eventCallback(struct lws* wsi, enum lws_callback_reasons
425439
{
426440
// Send data if any ready
427441
bool error = false;
428-
SendMsg* msg = nullptr;
442+
SendMsg* msg = nullptr;
429443
while (client->m_send_msgs.pop(msg, 0) && !error)
430444
{
431445
if (lws_write(client->m_wsi, msg->payload, msg->size, LWS_WRITE_TEXT) < static_cast<int>(msg->size))
@@ -511,7 +525,7 @@ bool LibWebsocketServer::Client::disconnect(bool notify_disconnected)
511525
}
512526

513527
// Schedule a close
514-
lws_callback_on_writable(m_wsi);
528+
lws_cancel_service_pt(m_wsi);
515529
}
516530

517531
// Empty message queue
@@ -543,7 +557,7 @@ bool LibWebsocketServer::Client::send(const void* data, size_t size)
543557
ret = m_send_msgs.push(msg);
544558

545559
// Schedule a send
546-
lws_callback_on_writable(m_wsi);
560+
lws_cancel_service_pt(m_wsi);
547561
}
548562

549563
return ret;

0 commit comments

Comments
 (0)