Skip to content

Commit 3e7ac07

Browse files
committed
More improvements
1 parent 6774a76 commit 3e7ac07

File tree

5 files changed

+45
-62
lines changed

5 files changed

+45
-62
lines changed

Release/include/cpprest/ws_msg.h

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ namespace client
5959
namespace details
6060
{
6161
class winrt_client;
62+
class ReceiveContext;
6263
class ws_desktop_client;
6364
}
6465

@@ -83,10 +84,6 @@ class _websocket_message
8384
{
8485
public:
8586

86-
concurrency::streams::streambuf<uint8_t> & body() { return m_buf; }
87-
88-
void set_body(const concurrency::streams::streambuf<uint8_t> &buf) { m_buf = buf; }
89-
9087
void set_msg_type(websocket_message_type msg_type) { m_msg_type = msg_type; }
9188

9289
void set_length(size_t len) { m_length = len; }
@@ -97,10 +94,7 @@ class _websocket_message
9794

9895
private:
9996

100-
concurrency::streams::streambuf<uint8_t> m_buf;
101-
10297
websocket_message_type m_msg_type;
103-
10498
size_t m_length;
10599
};
106100
}
@@ -176,13 +170,13 @@ class websocket_outgoing_message
176170
}
177171

178172
private:
179-
180173
friend class details::winrt_client;
181174
friend class details::ws_desktop_client;
182175

183176
std::shared_ptr<details::_websocket_message> _m_impl;
184177

185178
pplx::task_completion_event<void> m_body_sent;
179+
concurrency::streams::streambuf<uint8_t> m_body;
186180

187181
void signal_body_sent()
188182
{
@@ -200,21 +194,21 @@ class websocket_outgoing_message
200194
{
201195
_m_impl->set_msg_type(msg_type);
202196
_m_impl->set_length(data.length());
203-
_m_impl->set_body(concurrency::streams::container_buffer<std::string>(std::move(data)));
197+
m_body = concurrency::streams::container_buffer<std::string>(std::move(data));
204198
}
205199

206200
void _set_message(const std::string &data, websocket_message_type msg_type)
207201
{
208202
_m_impl->set_msg_type(msg_type);
209203
_m_impl->set_length(data.length());
210-
_m_impl->set_body(concurrency::streams::container_buffer<std::string>(data));
204+
m_body = concurrency::streams::container_buffer<std::string>(data);
211205
}
212206

213207
void _set_message(const concurrency::streams::istream &istream, size_t len, websocket_message_type msg_type)
214208
{
215209
_m_impl->set_msg_type(msg_type);
216-
_m_impl->set_body(istream.streambuf());
217210
_m_impl->set_length(len);
211+
m_body = istream.streambuf();
218212
}
219213
};
220214

@@ -224,11 +218,8 @@ class websocket_outgoing_message
224218
class websocket_incoming_message
225219
{
226220
public:
227-
websocket_incoming_message() : _m_impl(std::make_shared<details::_websocket_message>())
228-
{
229-
// Body defaults to producer_consumer_buffer.
230-
// Perhaps in the future options could be exposed to allow the user to set.
231-
_m_impl->set_body(concurrency::streams::producer_consumer_buffer<uint8_t>());
221+
websocket_incoming_message() : _m_impl(std::make_shared<details::_websocket_message>())
222+
{
232223
}
233224

234225
/// <summary>
@@ -237,7 +228,7 @@ class websocket_incoming_message
237228
/// </summary>
238229
/// <returns>String containing body of the message.</returns>
239230
_ASYNCRTIMP pplx::task<std::string> extract_string() const;
240-
231+
241232
/// <summary>
242233
/// Produces a stream which the caller may use to retrieve body from an incoming message.
243234
/// Can be used for both UTF-8 (text) and binary message types.
@@ -248,7 +239,11 @@ class websocket_incoming_message
248239
/// </remarks>
249240
concurrency::streams::istream body() const
250241
{
251-
return _m_impl->body().create_istream();
242+
auto to_uint8_t_stream = [](const concurrency::streams::streambuf<uint8_t> &buf) -> concurrency::streams::istream
243+
{
244+
return buf.create_istream();
245+
};
246+
return to_uint8_t_stream(m_body);
252247
}
253248

254249
/// <summary>
@@ -270,6 +265,12 @@ class websocket_incoming_message
270265
private:
271266
friend class details::winrt_client;
272267
friend class details::ws_desktop_client;
268+
friend class details::ReceiveContext;
269+
270+
// Store message body in a container buffer backed by a string.
271+
// Allows for optimization in the string message cases.
272+
concurrency::streams::container_buffer<std::string> m_body;
273+
273274
std::shared_ptr<details::_websocket_message> _m_impl;
274275
};
275276

Release/src/websockets/client/ws_client.cpp

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ class ws_desktop_client : public _websocket_client_impl, public std::enable_shar
220220
// 'move' the payload into a container buffer to avoid any copies.
221221
auto &payload = msg->get_raw_payload();
222222
const auto len = payload.size();
223-
incmsg->set_body(concurrency::streams::container_buffer<std::string>(std::move(payload)));
223+
ws_incoming_message.m_body = concurrency::streams::container_buffer<std::string>(std::move(payload));
224224
incmsg->set_length(len);
225225

226226
std::unique_lock<std::mutex> lock(m_receive_queue_lock);
@@ -374,7 +374,7 @@ class ws_desktop_client : public _websocket_client_impl, public std::enable_shar
374374
void send_msg(websocket_outgoing_message msg)
375375
{
376376
auto this_client = this->shared_from_this();
377-
auto& is_buf = msg._m_impl->body();
377+
auto& is_buf = msg.m_body;
378378
auto length = msg._m_impl->length();
379379

380380
if (length == SIZE_MAX)
@@ -395,10 +395,9 @@ class ws_desktop_client : public _websocket_client_impl, public std::enable_shar
395395
else
396396
{
397397
// The stream needs to be buffered.
398-
concurrency::streams::container_buffer<std::vector<uint8_t>> stbuf;
399398
auto is_buf_istream = is_buf.create_istream();
400-
msg._m_impl->set_body(stbuf);
401-
is_buf_istream.read_to_end(stbuf).then([this_client, msg](pplx::task<size_t> t) mutable
399+
msg.m_body = concurrency::streams::container_buffer<std::vector<uint8_t>>();
400+
is_buf_istream.read_to_end(msg.m_body).then([this_client, msg](pplx::task<size_t> t) mutable
402401
{
403402
try
404403
{
@@ -476,7 +475,7 @@ class ws_desktop_client : public _websocket_client_impl, public std::enable_shar
476475
}
477476

478477
return ec;
479-
}).then([this_client, msg, acquired, sp_allocated, length](pplx::task<websocketpp::lib::error_code> previousTask) mutable
478+
}).then([this_client, msg, is_buf, acquired, sp_allocated, length](pplx::task<websocketpp::lib::error_code> previousTask) mutable
480479
{
481480
std::exception_ptr eptr;
482481
try
@@ -495,7 +494,7 @@ class ws_desktop_client : public _websocket_client_impl, public std::enable_shar
495494

496495
if (acquired)
497496
{
498-
msg._m_impl->body().release(sp_allocated.get(), length);
497+
is_buf.release(sp_allocated.get(), length);
499498
}
500499

501500
// Set the send_task_completion_event after calling release.

Release/src/websockets/client/ws_msg.cpp

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,7 @@ pplx::task<std::string> websocket_incoming_message::extract_string() const
7878
{
7979
return pplx::task_from_exception<std::string>(websocket_exception("Invalid message type"));
8080
}
81-
82-
auto& buf_r = _m_impl->body();
83-
if (buf_r.in_avail() == 0)
84-
{
85-
return pplx::task_from_result<std::string>();
86-
}
87-
88-
// TODO all store as strings than can make optimization...
89-
std::string body;
90-
body.resize(static_cast<std::string::size_type>(buf_r.in_avail()));
91-
buf_r.getn(reinterpret_cast<uint8_t*>(&body[0]), body.size()).get();
92-
return pplx::task_from_result(std::move(body));
81+
return pplx::task_from_result(std::move(m_body.collection()));
9382
}
9483

9584
}}}}

Release/src/websockets/client/ws_winrt.cpp

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,10 @@ ref class ReceiveContext sealed
6161

6262
private:
6363
// Public members cannot have native types
64-
ReceiveContext(std::function<void(std::shared_ptr<websocket_incoming_message>)> receive_handler, std::function<void()> close_handler): m_receive_handler(receive_handler), m_close_handler(close_handler) {}
64+
ReceiveContext(std::function<void(websocket_incoming_message &>)> receive_handler, std::function<void()> close_handler): m_receive_handler(receive_handler), m_close_handler(close_handler) {}
6565

6666
// Handler to be executed when a message has been received by the client
67-
std::function<void(std::shared_ptr<websocket_incoming_message>)> m_receive_handler;
67+
std::function<void(websocket_incoming_message &)> m_receive_handler;
6868

6969
// Handler to be executed when a close message has been received by the client
7070
std::function<void()> m_close_handler;
@@ -108,15 +108,14 @@ class winrt_client : public _websocket_client_impl, public std::enable_shared_fr
108108
Platform::StringReference(client_config.credentials().password().c_str()));
109109
}
110110

111-
m_context = ref new ReceiveContext([=](std::shared_ptr<websocket_incoming_message> msg)
111+
m_context = ref new ReceiveContext([=](websocket_incoming_message &msg)
112112
{
113-
_ASSERTE(msg != nullptr);
114113
pplx::task_completion_event<websocket_incoming_message> tce; // This will be set if there are any tasks waiting to receive a message
115114
{
116115
std::lock_guard<std::mutex> lock(m_receive_queue_lock);
117116
if (m_receive_task_queue.empty()) // Push message to the queue as no one is waiting to receive
118117
{
119-
m_receive_msg_queue.push(std::move(*msg));
118+
m_receive_msg_queue.push(msg);
120119
return;
121120
}
122121
else // There are tasks waiting to receive a message.
@@ -126,7 +125,7 @@ class winrt_client : public _websocket_client_impl, public std::enable_shared_fr
126125
}
127126
}
128127
// Setting the tce outside the receive lock for better performance
129-
tce.set(*msg);
128+
tce.set(msg);
130129
},
131130
[=]() // Close handler called upon receiving a close frame from the server.
132131
{
@@ -253,7 +252,7 @@ class winrt_client : public _websocket_client_impl, public std::enable_shared_fr
253252
void send_msg(websocket_outgoing_message &msg)
254253
{
255254
auto this_client = this->shared_from_this();
256-
auto& is_buf = msg._m_impl->body().create_istream();
255+
auto& is_buf = msg.m_body.create_istream();
257256
auto length = msg._m_impl->length();
258257

259258
if (length == SIZE_MAX)
@@ -274,10 +273,9 @@ class winrt_client : public _websocket_client_impl, public std::enable_shared_fr
274273
else
275274
{
276275
// The stream needs to be buffered.
277-
concurrency::streams::container_buffer<std::vector<uint8_t>> stbuf;
278276
auto is_buf_istream = is_buf.create_istream();
279-
msg._m_impl->set_body(stbuf);
280-
is_buf_istream.read_to_end(stbuf).then([this_client, msg](pplx::task<size_t> t)
277+
msg.m_body = concurrency::streams::container_buffer<std::vector<uint8_t>>();
278+
is_buf_istream.read_to_end(msg.m_body).then([this_client, msg](pplx::task<size_t> t)
281279
{
282280
try
283281
{
@@ -336,7 +334,7 @@ class winrt_client : public _websocket_client_impl, public std::enable_shared_fr
336334

337335
// Send the data as one complete message, in WinRT we do not have an option to send fragments.
338336
return pplx::task<unsigned int>(this_client->m_messageWriter->StoreAsync());
339-
}).then([this_client, msg, acquired, sp_allocated, length] (pplx::task<unsigned int> previousTask)
337+
}).then([this_client, msg, is_buf, acquired, sp_allocated, length](pplx::task<unsigned int> previousTask)
340338
{
341339
std::exception_ptr eptr;
342340
unsigned int bytes_written = 0;
@@ -360,7 +358,7 @@ class winrt_client : public _websocket_client_impl, public std::enable_shared_fr
360358

361359
if (acquired)
362360
{
363-
msg._m_impl->body().release(sp_allocated.get(), bytes_written);
361+
is_buf.release(sp_allocated.get(), bytes_written);
364362
}
365363

366364
// Set the send_task_completion_event after calling release.
@@ -420,11 +418,6 @@ class winrt_client : public _websocket_client_impl, public std::enable_shared_fr
420418
return pplx::create_task(m_close_tce);
421419
}
422420

423-
static std::shared_ptr<details::_websocket_message> &get_impl(websocket_incoming_message msg)
424-
{
425-
return msg._m_impl;
426-
}
427-
428421
private:
429422

430423
// WinRT MessageWebSocket object
@@ -469,7 +462,7 @@ class winrt_client : public _websocket_client_impl, public std::enable_shared_fr
469462
void ReceiveContext::OnReceive(MessageWebSocket^ sender, MessageWebSocketMessageReceivedEventArgs^ args)
470463
{
471464
websocket_incoming_message ws_incoming_message;
472-
auto &msg = winrt_client::get_impl(ws_incoming_message);
465+
auto &msg = ws_incoming_message->_m_impl;
473466

474467
switch(args->MessageType)
475468
{
@@ -481,18 +474,16 @@ void ReceiveContext::OnReceive(MessageWebSocket^ sender, MessageWebSocketMessage
481474
break;
482475
}
483476

484-
auto &writebuf = msg->body();
485-
486477
try
487478
{
488479
DataReader^ reader = args->GetDataReader();
489480
const auto len = reader->UnconsumedBufferLength;
490-
auto block = writebuf.alloc(len);
491-
reader->ReadBytes(Platform::ArrayReference<uint8_t>(block, len));
492-
writebuf.commit(len);
493-
writebuf.close(std::ios::out).wait(); // Since this is an in-memory stream, this call is not blocking.
481+
std::string payload;
482+
payload.resize(len);
483+
reader->ReadBytes(Platform::ArrayReference<uint8_t>(payload.c_str(), len));
484+
ws_incoming_message.m_body = concurrency::streams::container_buffer<std::string>(std::move(payload));
494485
msg->signal_msg_received(len);
495-
m_receive_handler(std::make_shared<websocket_incoming_message>(ws_incoming_message));
486+
m_receive_handler(ws_incoming_message);
496487
}
497488
catch(...)
498489
{

Release/tests/Functional/websockets/client/receive_msg_tests.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ TEST_FIXTURE(uri_address, receive_text_msg)
108108

109109
receive_text_msg_helper(client, server, m_uri, "hello").wait();
110110
client.close().wait();
111+
112+
// TODO stgates
113+
VERIFY_ARE_EQUAL(false, true);
111114
}
112115

113116
// Receive text message (no fragmentation)

0 commit comments

Comments
 (0)