Skip to content

Commit 367c488

Browse files
committed
Misc improvements to websockets.
1. Rename namespace from 'web_sockets' to 'websockets' create alias to keep from breaking customers. Will be removed at next major release. 2. Deleted unnecessary move constructor and move assignment operator from websocket_client class. 3. Removed unnecessary storage of task completion event for sending messages from websocket_outgoing_message, moved instead to websocket_incoming_message. 4. Added overload to websocket_outgoing_message::set_utf8_message for r-value references and const reference instead of just taking by value and moving. 5. websocket_outgoing_message::extract_string() - in our implementation the string always arrives immediately and there was no need to use a task completion event, so I deleted it. Results in less code and storage space. 6. Removed unnecessary websocket_incoming_message::_prepare_to_receive_data() function. 7. Changed our websocket desktop implementation for incoming messages to construct a container_buffer backed by a std::string and move the raw std::string from websocketpp. This saves a data copy on all incoming messages and is less code. 8. Changed our winrt implementation for incoming messages to use a container_buffer backed by std::string. 9. We deliver incoming websocket messages to customers in two forms: UTF-8 string and binary stream. In all cases we control allocation of the underlying stream buffer being used. Previously we were using a producer_consumer_buffer. Due to changes #8 and #9 before the data is stored in a std::string. This allows an optimization in websocket_incoming_message::extract_string. We now move the std::string out of the buffer instead of doing an asychronous read. Also saves locking overhead from producer_consumer_buffer, which is thread safe. And less code! 10. Removed some unnecessary string conversions for constructing websocket_exceptions. 11. Replaced some uses of std::unique_lock with std::lock_guard. 12. Misc adding more comments, const to some APIs, passing by reference, make_unique, etc...
1 parent 3751cbf commit 367c488

File tree

13 files changed

+161
-222
lines changed

13 files changed

+161
-222
lines changed

Release/include/cpprest/web_utilities.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ namespace http { namespace client {
3737
class http_client_config;
3838
}}
3939

40-
namespace experimental { namespace web_sockets { namespace client {
40+
namespace experimental { namespace websockets { namespace client {
4141
class websocket_client_config;
4242
}}}
4343

@@ -75,7 +75,7 @@ class credentials
7575
private:
7676
friend class web::web_proxy;
7777
friend class web::http::client::http_client_config;
78-
friend class web::experimental::web_sockets::client::websocket_client_config;
78+
friend class web::experimental::websockets::client::websocket_client_config;
7979

8080
credentials() : m_is_set(false) {}
8181

Release/include/cpprest/ws_client.h

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,12 @@ namespace web
5757
/// WebSocket client is currently in beta.
5858
namespace experimental
5959
{
60-
namespace web_sockets
60+
61+
// In the past namespace was accidentally called 'web_sockets'. To avoid breaking code
62+
// alias it. At our next major release this should be deleted.
63+
namespace web_sockets = websockets;
64+
65+
namespace websockets
6166
{
6267
/// WebSocket client side library.
6368
namespace client
@@ -359,29 +364,10 @@ class websocket_client
359364
_ASYNCRTIMP websocket_client(websocket_client_config client_config);
360365

361366
/// <summary>
367+
/// Destructor
362368
/// </summary>
363369
~websocket_client() { }
364370

365-
/// <summary>
366-
/// Move constructor.
367-
/// </summary>
368-
websocket_client(websocket_client &&other)
369-
: m_client(std::move(other.m_client))
370-
{
371-
}
372-
373-
/// <summary>
374-
/// Move assignment operator.
375-
/// </summary>
376-
websocket_client &operator=(websocket_client &&other)
377-
{
378-
if(this != &other)
379-
{
380-
m_client = std::move(other.m_client);
381-
}
382-
return *this;
383-
}
384-
385371
/// <summary>
386372
/// Connects to the remote network destination. The connect method initiates the websocket handshake with the
387373
/// remote network destination, takes care of the protocol upgrade request.

Release/include/cpprest/ws_msg.h

Lines changed: 71 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
#include <limits>
3434

3535
#include "cpprest/xxpublic.h"
36-
#include "cpprest/containerstream.h"
36+
#include "cpprest/producerconsumerstream.h"
3737

3838
#if _NOT_PHONE8_
3939
#if defined(_MSC_VER) && (_MSC_VER >= 1800)
@@ -51,7 +51,7 @@ namespace web
5151
{
5252
namespace experimental
5353
{
54-
namespace web_sockets
54+
namespace websockets
5555
{
5656
namespace client
5757
{
@@ -60,8 +60,11 @@ namespace details
6060
{
6161
class winrt_client;
6262
class ws_desktop_client;
63-
}
6463

64+
#if defined(__cplusplus_winrt)
65+
ref class ReceiveContext;
66+
#endif
67+
}
6568

6669
/// <summary>
6770
/// The different types of websocket message.
@@ -83,47 +86,19 @@ namespace details
8386
class _websocket_message
8487
{
8588
public:
86-
_ASYNCRTIMP void set_body(concurrency::streams::istream instream);
87-
88-
/// <summary>
89-
/// Get the streambuf for the message
90-
/// </summary>
91-
concurrency::streams::streambuf<uint8_t>& streambuf() { return m_buf; }
92-
93-
/// <summary>
94-
/// Set the streambuf for the message
95-
/// </summary>
96-
void set_streambuf(concurrency::streams::streambuf<uint8_t> buf) { m_buf = buf; }
9789

9890
void set_msg_type(websocket_message_type msg_type) { m_msg_type = msg_type; }
9991

10092
void set_length(size_t len) { m_length = len; }
10193

102-
size_t length() { return m_length; }
103-
104-
websocket_message_type message_type() { return m_msg_type; }
105-
106-
pplx::task_completion_event<void> _get_data_available() { return m_data_available; }
107-
108-
void _set_data_available() { m_data_available.set(); }
94+
size_t length() const { return m_length; }
10995

110-
_ASYNCRTIMP std::string _extract_string();
96+
websocket_message_type message_type() const { return m_msg_type; }
11197

112-
/// <summary>
113-
/// Prepare the message with an output stream to receive network data
114-
/// </summary>
115-
_ASYNCRTIMP void _prepare_to_receive_data();
116-
117-
protected:
118-
119-
concurrency::streams::streambuf<uint8_t> m_buf;
98+
private:
12099

121100
websocket_message_type m_msg_type;
122-
123101
size_t m_length;
124-
125-
/// <summary> The TCE is used to signal the availability of the message body. </summary>
126-
pplx::task_completion_event<void> m_data_available;
127102
};
128103
}
129104

@@ -133,24 +108,36 @@ class _websocket_message
133108
class websocket_outgoing_message
134109
{
135110
public:
136-
websocket_outgoing_message()
137-
: _m_impl(std::make_shared<details::_websocket_message>()) {}
111+
112+
/// <summary>
113+
/// Creates an initially empty message for sending.
114+
/// </summary>
115+
websocket_outgoing_message() : _m_impl(std::make_shared<details::_websocket_message>()) {}
138116

139117
/// <summary>
140118
/// Sets a UTF-8 message as the message body.
141119
/// </summary>
142120
/// <param name="data">UTF-8 String containing body of the message.</param>
143-
void set_utf8_message(std::string data)
121+
void set_utf8_message(std::string &&data)
144122
{
145123
this->_set_message(std::move(data), websocket_message_type::text_message);
146124
}
147125

126+
/// <summary>
127+
/// Sets a UTF-8 message as the message body.
128+
/// </summary>
129+
/// <param name="data">UTF-8 String containing body of the message.</param>
130+
void set_utf8_message(const std::string &data)
131+
{
132+
this->_set_message(data, websocket_message_type::text_message);
133+
}
134+
148135
/// <summary>
149136
/// Sets a UTF-8 message as the message body.
150137
/// </summary>
151138
/// <param name="istream">casablanca input stream representing the body of the message.</param>
152139
/// <remarks>Upon sending, the entire stream may be buffered to determine the length.</remarks>
153-
void set_utf8_message(concurrency::streams::istream istream)
140+
void set_utf8_message(const concurrency::streams::istream &istream)
154141
{
155142
this->_set_message(istream, SIZE_MAX, websocket_message_type::text_message);
156143
}
@@ -160,7 +147,7 @@ class websocket_outgoing_message
160147
/// </summary>
161148
/// <param name="istream">casablanca input stream representing the body of the message.</param>
162149
/// <param name="len">number of bytes to send.</param>
163-
void set_utf8_message(concurrency::streams::istream istream, size_t len)
150+
void set_utf8_message(const concurrency::streams::istream &istream, size_t len)
164151
{
165152
this->_set_message(istream, len, websocket_message_type::text_message);
166153
}
@@ -170,7 +157,7 @@ class websocket_outgoing_message
170157
/// </summary>
171158
/// <param name="istream">casablanca input stream representing the body of the message.</param>
172159
/// <param name="len">number of bytes to send.</param>
173-
void set_binary_message(concurrency::streams::istream istream, size_t len)
160+
void set_binary_message(const concurrency::streams::istream &istream, size_t len)
174161
{
175162
this->_set_message(istream, len, websocket_message_type::binary_message);
176163
}
@@ -180,34 +167,52 @@ class websocket_outgoing_message
180167
/// </summary>
181168
/// <param name="istream">casablanca input stream representing the body of the message.</param>
182169
/// <remarks>Upon sending, the entire stream may be buffered to determine the length.</remarks>
183-
void set_binary_message(concurrency::streams::istream istream)
170+
void set_binary_message(const concurrency::streams::istream &istream)
184171
{
185172
this->_set_message(istream, SIZE_MAX, websocket_message_type::binary_message);
186173
}
187174

188175
private:
189-
190176
friend class details::winrt_client;
191177
friend class details::ws_desktop_client;
192178

193179
std::shared_ptr<details::_websocket_message> _m_impl;
194180

195-
void _set_message(std::string data, websocket_message_type msg_type)
181+
pplx::task_completion_event<void> m_body_sent;
182+
concurrency::streams::streambuf<uint8_t> m_body;
183+
184+
void signal_body_sent()
185+
{
186+
m_body_sent.set();
187+
}
188+
189+
void signal_body_sent(const std::exception_ptr &e)
190+
{
191+
m_body_sent.set_exception(e);
192+
}
193+
194+
pplx::task_completion_event<void> & body_sent() { return m_body_sent; }
195+
196+
void _set_message(std::string &&data, websocket_message_type msg_type)
196197
{
197198
_m_impl->set_msg_type(msg_type);
198199
_m_impl->set_length(data.length());
199-
auto istream = concurrency::streams::bytestream::open_istream<std::string>(std::move(data));
200-
_m_impl->set_body(istream);
200+
m_body = concurrency::streams::container_buffer<std::string>(std::move(data));
201201
}
202202

203-
void _set_message(concurrency::streams::istream istream, size_t len, websocket_message_type msg_type)
203+
void _set_message(const std::string &data, websocket_message_type msg_type)
204204
{
205205
_m_impl->set_msg_type(msg_type);
206-
_m_impl->set_body(istream);
207-
_m_impl->set_length(len);
206+
_m_impl->set_length(data.length());
207+
m_body = concurrency::streams::container_buffer<std::string>(data);
208208
}
209209

210-
pplx::task_completion_event<void> m_send_tce;
210+
void _set_message(const concurrency::streams::istream &istream, size_t len, websocket_message_type msg_type)
211+
{
212+
_m_impl->set_msg_type(msg_type);
213+
_m_impl->set_length(len);
214+
m_body = istream.streambuf();
215+
}
211216
};
212217

213218
/// <summary>
@@ -216,15 +221,17 @@ class websocket_outgoing_message
216221
class websocket_incoming_message
217222
{
218223
public:
219-
websocket_incoming_message(): _m_impl(std::make_shared<details::_websocket_message>()) { }
224+
websocket_incoming_message() : _m_impl(std::make_shared<details::_websocket_message>())
225+
{
226+
}
220227

221228
/// <summary>
222229
/// Extracts the body of the incoming message as a string value, only if the message type is UTF-8.
223230
/// A body can only be extracted once because in some cases an optimization is made where the data is 'moved' out.
224231
/// </summary>
225232
/// <returns>String containing body of the message.</returns>
226233
_ASYNCRTIMP pplx::task<std::string> extract_string() const;
227-
234+
228235
/// <summary>
229236
/// Produces a stream which the caller may use to retrieve body from an incoming message.
230237
/// Can be used for both UTF-8 (text) and binary message types.
@@ -235,7 +242,11 @@ class websocket_incoming_message
235242
/// </remarks>
236243
concurrency::streams::istream body() const
237244
{
238-
return _m_impl->streambuf().create_istream();
245+
auto to_uint8_t_stream = [](const concurrency::streams::streambuf<uint8_t> &buf) -> concurrency::streams::istream
246+
{
247+
return buf.create_istream();
248+
};
249+
return to_uint8_t_stream(m_body);
239250
}
240251

241252
/// <summary>
@@ -257,6 +268,14 @@ class websocket_incoming_message
257268
private:
258269
friend class details::winrt_client;
259270
friend class details::ws_desktop_client;
271+
#if defined(__cplusplus_winrt)
272+
friend ref class details::ReceiveContext;
273+
#endif
274+
275+
// Store message body in a container buffer backed by a string.
276+
// Allows for optimization in the string message cases.
277+
concurrency::streams::container_buffer<std::string> m_body;
278+
260279
std::shared_ptr<details::_websocket_message> _m_impl;
261280
};
262281

0 commit comments

Comments
 (0)