@@ -72,7 +72,7 @@ class winrt_client : public _websocket_client_impl
72
72
{
73
73
public:
74
74
winrt_client (web::uri address, websocket_client_config client_config)
75
- : _websocket_client_impl(std::move(address), std::move(client_config)), m_scheduled(0 )
75
+ : _websocket_client_impl(std::move(address), std::move(client_config)), m_scheduled(0 ), m_client_closed( false )
76
76
{
77
77
verify_uri (m_uri);
78
78
m_msg_websocket = ref new MessageWebSocket ();
@@ -91,23 +91,54 @@ class winrt_client : public _websocket_client_impl
91
91
92
92
m_context = ref new ReceiveContext ([=](std::shared_ptr<websocket_incoming_message> msg)
93
93
{
94
- std::unique_lock<std::mutex> lock (m_receive_queue_lock);
94
+ pplx::task_completion_event<websocket_incoming_message> tce; // This will be set if there are any tasks waiting to receive a message
95
+ {
96
+ std::unique_lock<std::mutex> lock (m_receive_queue_lock);
97
+ if (m_receive_task_queue.empty ()) // Push message to the queue as no one is waiting to receive
98
+ {
99
+ m_receive_msg_queue.push (std::move (*msg));
100
+ return ;
101
+ }
102
+ else // There are tasks waiting to receive a message.
103
+ {
104
+ tce = m_receive_task_queue.front ();
105
+ m_receive_task_queue.pop ();
106
+ }
107
+ }
108
+ // Setting the tce outside the receive lock for better performance
95
109
if (msg != nullptr )
96
- m_receive_msg_queue.push (std::move (*msg));
97
- m_receive_msg_cv.notify_one ();
110
+ {
111
+ tce.set (*msg);
112
+ }
113
+ else
114
+ {
115
+ tce.set_exception (std::make_exception_ptr (websocket_exception (_XPLATSTR (" Error occured during receive." ))));
116
+ }
98
117
},
99
118
[=]()
100
119
{
101
- m_receive_msg_cv. notify_all ();
120
+ close_pending_tasks_with_error ();
102
121
m_close_tce.set ();
103
122
});
104
123
}
105
124
106
125
~winrt_client ()
107
126
{
108
- m_receive_msg_cv. notify_all ();
127
+ close_pending_tasks_with_error ();
109
128
}
110
129
130
+ void close_pending_tasks_with_error ()
131
+ {
132
+ std::unique_lock<std::mutex> lock (m_receive_queue_lock);
133
+ m_client_closed = true ;
134
+ while (!m_receive_task_queue.empty ()) // There are tasks waiting to receive a message, signal them
135
+ {
136
+ auto tce = m_receive_task_queue.front ();
137
+ m_receive_task_queue.pop ();
138
+ tce.set_exception (std::make_exception_ptr (websocket_exception (_XPLATSTR (" Websocket connection has been closed." ))));
139
+ }
140
+ }
141
+
111
142
pplx::task<void > connect ()
112
143
{
113
144
const auto uri = ref new Windows::Foundation::Uri (ref new Platform::String (m_uri.to_string ().c_str ()));
@@ -124,6 +155,7 @@ class winrt_client : public _websocket_client_impl
124
155
}
125
156
catch (Platform::Exception^ ex)
126
157
{
158
+ close_pending_tasks_with_error ();
127
159
return pplx::task_from_exception<void >(websocket_exception (ex->HResult ));
128
160
}
129
161
return pplx::task_from_result ();
@@ -217,18 +249,24 @@ class winrt_client : public _websocket_client_impl
217
249
218
250
pplx::task<websocket_incoming_message> receive ()
219
251
{
220
- return pplx::create_task ([this ]() ->pplx ::task<websocket_incoming_message>
252
+ std::unique_lock<std::mutex> lock (m_receive_queue_lock);
253
+ if (m_client_closed == true )
221
254
{
222
- std::unique_lock<std::mutex> lock (m_receive_queue_lock);
223
- m_receive_msg_cv.wait (lock);
224
- if (m_receive_msg_queue.empty ()) // If queue is empty -> wait was notified by websocket client close or destructor etc.
225
- {
226
- return pplx::task_from_exception<websocket_incoming_message>(websocket_exception (_XPLATSTR (" Error receiving message, websocket client is closing." )));
227
- }
228
- auto msg = std::move (m_receive_msg_queue.front ());
255
+ return pplx::task_from_exception<websocket_incoming_message>(std::make_exception_ptr (websocket_exception (_XPLATSTR (" Websocket connection has closed." ))));
256
+ }
257
+
258
+ if (m_receive_msg_queue.empty ()) // Push task completion event to the tce queue, so that it gets signaled when we have a message.
259
+ {
260
+ pplx::task_completion_event<websocket_incoming_message> tce;
261
+ m_receive_task_queue.push (tce);
262
+ return pplx::create_task (tce);
263
+ }
264
+ else // Receive message queue is not empty, return a message from the queue.
265
+ {
266
+ auto msg = m_receive_msg_queue.front ();
229
267
m_receive_msg_queue.pop ();
230
268
return pplx::task_from_result<websocket_incoming_message>(msg);
231
- });
269
+ }
232
270
}
233
271
234
272
pplx::task<void > close ()
@@ -260,12 +298,20 @@ class winrt_client : public _websocket_client_impl
260
298
ReceiveContext ^ m_context;
261
299
262
300
pplx::task_completion_event<void > m_close_tce;
263
-
264
- // Incoming messages are maintained in a producer consumer queue.
265
- // m_receive_queue_lock and m_receive_msg_cv : synchronization primitives to guard access to the queue
301
+ // m_client_closed maintains the state of the client. It is set to true when:
302
+ // 1. the client has not connected
303
+ // 2. if it has received a close frame from the server.
304
+ // We may want to keep an enum to maintain the client state in the future.
305
+ bool m_client_closed;
306
+
307
+ // When a message arrives, if there are tasks waiting for a message, signal the topmost one.
308
+ // Else enqueue the message in a queue.
309
+ // m_receive_queue_lock : to guard access to the queue & m_client_closed
266
310
std::mutex m_receive_queue_lock;
267
- std::condition_variable m_receive_msg_cv;
268
- std::queue<websocket_incoming_message> m_receive_msg_queue;
311
+ // Queue to store incoming messages when there are no tasks waiting for a message
312
+ std::queue<websocket_incoming_message> m_receive_msg_queue;
313
+ // Queue to maintain the receive tasks when there are no messages(yet).
314
+ std::queue<pplx::task_completion_event<websocket_incoming_message>> m_receive_task_queue;
269
315
270
316
// The implementation has to ensure ordering of send requests
271
317
std::mutex m_send_lock;
0 commit comments