33
33
#include < limits>
34
34
35
35
#include " cpprest/xxpublic.h"
36
- #include " cpprest/containerstream .h"
36
+ #include " cpprest/producerconsumerstream .h"
37
37
38
38
#if _NOT_PHONE8_
39
39
#if defined(_MSC_VER) && (_MSC_VER >= 1800)
@@ -51,7 +51,7 @@ namespace web
51
51
{
52
52
namespace experimental
53
53
{
54
- namespace web_sockets
54
+ namespace websockets
55
55
{
56
56
namespace client
57
57
{
@@ -62,7 +62,6 @@ namespace details
62
62
class ws_desktop_client ;
63
63
}
64
64
65
-
66
65
// / <summary>
67
66
// / The different types of websocket message.
68
67
// / Text type contains UTF-8 encoded data.
@@ -83,47 +82,26 @@ namespace details
83
82
class _websocket_message
84
83
{
85
84
public:
86
- _ASYNCRTIMP void set_body (concurrency::streams::istream instream);
87
85
88
- // / <summary>
89
- // / Get the streambuf for the message
90
- // / </summary>
91
- concurrency::streams::streambuf<uint8_t >& streambuf () { return m_buf; }
86
+ concurrency::streams::streambuf<uint8_t > & body () { return m_buf; }
92
87
93
- // / <summary>
94
- // / Set the streambuf for the message
95
- // / </summary>
96
- void set_streambuf (concurrency::streams::streambuf<uint8_t > buf) { m_buf = buf; }
88
+ void set_body (const concurrency::streams::streambuf<uint8_t > &buf) { m_buf = buf; }
97
89
98
90
void set_msg_type (websocket_message_type msg_type) { m_msg_type = msg_type; }
99
91
100
92
void set_length (size_t len) { m_length = len; }
101
93
102
- size_t length () { return m_length; }
94
+ size_t length () const { return m_length; }
103
95
104
- websocket_message_type message_type () { return m_msg_type; }
96
+ websocket_message_type message_type () const { return m_msg_type; }
105
97
106
- pplx::task_completion_event<void > _get_data_available () { return m_data_available; }
107
-
108
- void _set_data_available () { m_data_available.set (); }
109
-
110
- _ASYNCRTIMP std::string _extract_string ();
111
-
112
- // / <summary>
113
- // / Prepare the message with an output stream to receive network data
114
- // / </summary>
115
- _ASYNCRTIMP void _prepare_to_receive_data ();
116
-
117
- protected:
98
+ private:
118
99
119
100
concurrency::streams::streambuf<uint8_t > m_buf;
120
101
121
102
websocket_message_type m_msg_type;
122
103
123
104
size_t m_length;
124
-
125
- // / <summary> The TCE is used to signal the availability of the message body. </summary>
126
- pplx::task_completion_event<void > m_data_available;
127
105
};
128
106
}
129
107
@@ -133,24 +111,36 @@ class _websocket_message
133
111
class websocket_outgoing_message
134
112
{
135
113
public:
136
- websocket_outgoing_message ()
137
- : _m_impl(std::make_shared<details::_websocket_message>()) {}
114
+
115
+ // / <summary>
116
+ // / Creates an initially empty message for sending.
117
+ // / </summary>
118
+ websocket_outgoing_message () : _m_impl(std::make_shared<details::_websocket_message>()) {}
138
119
139
120
// / <summary>
140
121
// / Sets a UTF-8 message as the message body.
141
122
// / </summary>
142
123
// / <param name="data">UTF-8 String containing body of the message.</param>
143
- void set_utf8_message (std::string data)
124
+ void set_utf8_message (std::string && data)
144
125
{
145
126
this ->_set_message (std::move (data), websocket_message_type::text_message);
146
127
}
147
128
129
+ // / <summary>
130
+ // / Sets a UTF-8 message as the message body.
131
+ // / </summary>
132
+ // / <param name="data">UTF-8 String containing body of the message.</param>
133
+ void set_utf8_message (const std::string &data)
134
+ {
135
+ this ->_set_message (data, websocket_message_type::text_message);
136
+ }
137
+
148
138
// / <summary>
149
139
// / Sets a UTF-8 message as the message body.
150
140
// / </summary>
151
141
// / <param name="istream">casablanca input stream representing the body of the message.</param>
152
142
// / <remarks>Upon sending, the entire stream may be buffered to determine the length.</remarks>
153
- void set_utf8_message (concurrency::streams::istream istream)
143
+ void set_utf8_message (const concurrency::streams::istream & istream)
154
144
{
155
145
this ->_set_message (istream, SIZE_MAX, websocket_message_type::text_message);
156
146
}
@@ -160,7 +150,7 @@ class websocket_outgoing_message
160
150
// / </summary>
161
151
// / <param name="istream">casablanca input stream representing the body of the message.</param>
162
152
// / <param name="len">number of bytes to send.</param>
163
- void set_utf8_message (concurrency::streams::istream istream, size_t len)
153
+ void set_utf8_message (const concurrency::streams::istream & istream, size_t len)
164
154
{
165
155
this ->_set_message (istream, len, websocket_message_type::text_message);
166
156
}
@@ -170,7 +160,7 @@ class websocket_outgoing_message
170
160
// / </summary>
171
161
// / <param name="istream">casablanca input stream representing the body of the message.</param>
172
162
// / <param name="len">number of bytes to send.</param>
173
- void set_binary_message (concurrency::streams::istream istream, size_t len)
163
+ void set_binary_message (const concurrency::streams::istream & istream, size_t len)
174
164
{
175
165
this ->_set_message (istream, len, websocket_message_type::binary_message);
176
166
}
@@ -180,7 +170,7 @@ class websocket_outgoing_message
180
170
// / </summary>
181
171
// / <param name="istream">casablanca input stream representing the body of the message.</param>
182
172
// / <remarks>Upon sending, the entire stream may be buffered to determine the length.</remarks>
183
- void set_binary_message (concurrency::streams::istream istream)
173
+ void set_binary_message (const concurrency::streams::istream & istream)
184
174
{
185
175
this ->_set_message (istream, SIZE_MAX, websocket_message_type::binary_message);
186
176
}
@@ -192,22 +182,40 @@ class websocket_outgoing_message
192
182
193
183
std::shared_ptr<details::_websocket_message> _m_impl;
194
184
195
- void _set_message (std::string data, websocket_message_type msg_type)
185
+ pplx::task_completion_event<void > m_body_sent;
186
+
187
+ void signal_body_sent ()
188
+ {
189
+ m_body_sent.set ();
190
+ }
191
+
192
+ void signal_body_sent (const std::exception_ptr &e)
193
+ {
194
+ m_body_sent.set_exception (e);
195
+ }
196
+
197
+ pplx::task_completion_event<void > & body_sent () { return m_body_sent; }
198
+
199
+ void _set_message (std::string &&data, websocket_message_type msg_type)
196
200
{
197
201
_m_impl->set_msg_type (msg_type);
198
202
_m_impl->set_length (data.length ());
199
- auto istream = concurrency::streams::bytestream::open_istream<std::string>(std::move (data));
200
- _m_impl->set_body (istream);
203
+ _m_impl->set_body (concurrency::streams::container_buffer<std::string>(std::move (data)));
201
204
}
202
205
203
- void _set_message (concurrency::streams::istream istream, size_t len , websocket_message_type msg_type)
206
+ void _set_message (const std::string &data , websocket_message_type msg_type)
204
207
{
205
208
_m_impl->set_msg_type (msg_type);
206
- _m_impl->set_body (istream );
207
- _m_impl->set_length (len );
209
+ _m_impl->set_length (data. length () );
210
+ _m_impl->set_body (concurrency::streams::container_buffer<std::string>(data) );
208
211
}
209
212
210
- pplx::task_completion_event<void > m_send_tce;
213
+ void _set_message (const concurrency::streams::istream &istream, size_t len, websocket_message_type msg_type)
214
+ {
215
+ _m_impl->set_msg_type (msg_type);
216
+ _m_impl->set_body (istream.streambuf ());
217
+ _m_impl->set_length (len);
218
+ }
211
219
};
212
220
213
221
// / <summary>
@@ -216,7 +224,12 @@ class websocket_outgoing_message
216
224
class websocket_incoming_message
217
225
{
218
226
public:
219
- websocket_incoming_message (): _m_impl(std::make_shared<details::_websocket_message>()) { }
227
+ websocket_incoming_message () : _m_impl(std::make_shared<details::_websocket_message>())
228
+ {
229
+ // Body defaults to producer_consumer_buffer.
230
+ // Perhaps in the future options could be exposed to allow the user to set.
231
+ _m_impl->set_body (concurrency::streams::producer_consumer_buffer<uint8_t >());
232
+ }
220
233
221
234
// / <summary>
222
235
// / Extracts the body of the incoming message as a string value, only if the message type is UTF-8.
@@ -235,7 +248,7 @@ class websocket_incoming_message
235
248
// / </remarks>
236
249
concurrency::streams::istream body () const
237
250
{
238
- return _m_impl->streambuf ().create_istream ();
251
+ return _m_impl->body ().create_istream ();
239
252
}
240
253
241
254
// / <summary>
0 commit comments