2
2
#include <mongoc.h>
3
3
4
4
#include "mongoc-tests.h"
5
+ #include "mongoc-socket-private.h"
5
6
#include "mongoc-thread-private.h"
7
+ #include "mongoc-errno-private.h"
6
8
#include "TestSuite.h"
7
9
8
10
#include "test-libmongoc.h"
11
13
#define MONGOC_LOG_DOMAIN "socket-test"
12
14
13
15
#define TIMEOUT 10000
16
+ #define WAIT 1000
14
17
15
18
19
+ static size_t gFourMB = 1024 * 1024 * 4 ;
20
+
16
21
typedef struct
17
22
{
18
23
unsigned short server_port ;
19
24
mongoc_cond_t cond ;
20
25
mongoc_mutex_t cond_mutex ;
21
26
bool closed_socket ;
27
+ int amount ;
22
28
} socket_test_data_t ;
23
29
24
30
@@ -153,6 +159,179 @@ socket_test_client (void *data_)
153
159
}
154
160
155
161
162
+ static void *
163
+ sendv_test_server (void * data_ )
164
+ {
165
+ socket_test_data_t * data = (socket_test_data_t * )data_ ;
166
+ struct sockaddr_in server_addr = { 0 };
167
+ mongoc_socket_t * listen_sock ;
168
+ mongoc_socket_t * conn_sock ;
169
+ mongoc_stream_t * stream ;
170
+ mongoc_iovec_t iov ;
171
+ socklen_t sock_len ;
172
+ int amount = 0 ;
173
+ ssize_t r ;
174
+ char * buf = (char * )bson_malloc (gFourMB );
175
+
176
+ iov .iov_base = buf ;
177
+ iov .iov_len = gFourMB ;
178
+
179
+ listen_sock = mongoc_socket_new (AF_INET , SOCK_STREAM , 0 );
180
+ assert (listen_sock );
181
+
182
+ server_addr .sin_family = AF_INET ;
183
+ server_addr .sin_addr .s_addr = htonl (INADDR_LOOPBACK );
184
+ server_addr .sin_port = htons (0 );
185
+
186
+ r = mongoc_socket_bind (listen_sock ,
187
+ (struct sockaddr * )& server_addr ,
188
+ sizeof server_addr );
189
+ assert (r == 0 );
190
+
191
+ sock_len = sizeof (server_addr );
192
+ r = mongoc_socket_getsockname (listen_sock , (struct sockaddr * )& server_addr , & sock_len );
193
+ assert (r == 0 );
194
+
195
+ r = mongoc_socket_listen (listen_sock , 10 );
196
+ assert (r == 0 );
197
+
198
+ mongoc_mutex_lock (& data -> cond_mutex );
199
+ data -> server_port = ntohs (server_addr .sin_port );
200
+ mongoc_cond_signal (& data -> cond );
201
+ mongoc_mutex_unlock (& data -> cond_mutex );
202
+
203
+ conn_sock = mongoc_socket_accept (listen_sock , -1 );
204
+ assert (conn_sock );
205
+
206
+ stream = mongoc_stream_socket_new (conn_sock );
207
+ assert (stream );
208
+
209
+ /* Wait until the client has pushed so much data he can't write more */
210
+ mongoc_mutex_lock (& data -> cond_mutex );
211
+ while (! data -> amount ) {
212
+ mongoc_cond_wait (& data -> cond , & data -> cond_mutex );
213
+ }
214
+ amount = data -> amount ;
215
+ data -> amount = 0 ;
216
+ mongoc_mutex_unlock (& data -> cond_mutex );
217
+
218
+ /* Start reading everything off the socket to unblock the client */
219
+ do {
220
+ r = mongoc_stream_readv (stream , & iov , 1 , amount , WAIT );
221
+ if (r > 0 ) {
222
+ amount -= r ;
223
+ } else if (MONGOC_ERRNO_IS_AGAIN (errno )) {
224
+ continue ;
225
+ } else {
226
+ fprintf (stderr , "r: %" PRIu64 " , errno: %d" , (uint64_t )r , conn_sock -> errno_ );
227
+ assert (r == amount );
228
+ }
229
+ } while (amount > 0 );
230
+
231
+
232
+ /* Allow the client to finish all its writes */
233
+ mongoc_mutex_lock (& data -> cond_mutex );
234
+ while (! data -> amount ) {
235
+ mongoc_cond_wait (& data -> cond , & data -> cond_mutex );
236
+ }
237
+ /* amount is likely negative value now, we've read more then caused the original blocker */
238
+ amount += data -> amount ;
239
+ data -> amount = 0 ;
240
+ mongoc_mutex_unlock (& data -> cond_mutex );
241
+
242
+ do {
243
+ r = mongoc_stream_readv (stream , & iov , 1 , amount , WAIT );
244
+ if (r > 0 ) {
245
+ amount -= r ;
246
+ } else if (MONGOC_ERRNO_IS_AGAIN (errno )) {
247
+ continue ;
248
+ } else {
249
+ fprintf (stderr , "r: %" PRIu64 " , errno: %d" , (uint64_t )r , errno );
250
+ assert (r == amount );
251
+ }
252
+ } while (amount > 0 );
253
+ ASSERT_CMPINT (0 , = = , amount );
254
+
255
+
256
+ mongoc_stream_destroy (stream );
257
+
258
+ mongoc_socket_destroy (listen_sock );
259
+
260
+ return NULL ;
261
+ }
262
+
263
+
264
+ static void *
265
+ sendv_test_client (void * data_ )
266
+ {
267
+ socket_test_data_t * data = (socket_test_data_t * )data_ ;
268
+ mongoc_socket_t * conn_sock ;
269
+ ssize_t r ;
270
+ int i ;
271
+ int amount = 0 ;
272
+ struct sockaddr_in server_addr = { 0 };
273
+ mongoc_stream_t * stream ;
274
+ mongoc_iovec_t iov ;
275
+ bool done = false;
276
+ char * buf = (char * )bson_malloc (gFourMB );
277
+
278
+ memset (buf , 'a' , (gFourMB )- 1 );
279
+ buf [gFourMB - 1 ] = '\0' ;
280
+
281
+ iov .iov_base = buf ;
282
+ iov .iov_len = gFourMB ;
283
+
284
+ conn_sock = mongoc_socket_new (AF_INET , SOCK_STREAM , 0 );
285
+ assert (conn_sock );
286
+
287
+ mongoc_mutex_lock (& data -> cond_mutex );
288
+ while (! data -> server_port ) {
289
+ mongoc_cond_wait (& data -> cond , & data -> cond_mutex );
290
+ }
291
+ mongoc_mutex_unlock (& data -> cond_mutex );
292
+
293
+ server_addr .sin_family = AF_INET ;
294
+ server_addr .sin_port = htons (data -> server_port );
295
+ server_addr .sin_addr .s_addr = htonl (INADDR_LOOPBACK );
296
+
297
+ r = mongoc_socket_connect (conn_sock , (struct sockaddr * )& server_addr , sizeof (server_addr ), -1 );
298
+ assert (r == 0 );
299
+
300
+ stream = mongoc_stream_socket_new (conn_sock );
301
+
302
+ for (i = 0 ; i < 5 ; i ++ )
303
+ {
304
+ r = mongoc_stream_writev (stream , & iov , 1 , WAIT );
305
+ if (r > 0 ) {
306
+ amount += r ;
307
+ }
308
+ if (r != gFourMB ) {
309
+ if (MONGOC_ERRNO_IS_AGAIN (conn_sock -> errno_ )) {
310
+ if (!done ) {
311
+ mongoc_mutex_lock (& data -> cond_mutex );
312
+ data -> amount = amount ;
313
+ amount = 0 ;
314
+ mongoc_cond_signal (& data -> cond );
315
+ mongoc_mutex_unlock (& data -> cond_mutex );
316
+ done = true;
317
+ }
318
+ } else {
319
+ assert (r == gFourMB );
320
+ }
321
+ }
322
+ }
323
+ assert (true == done );
324
+ mongoc_mutex_lock (& data -> cond_mutex );
325
+ data -> amount = amount ;
326
+ mongoc_cond_signal (& data -> cond );
327
+ mongoc_mutex_unlock (& data -> cond_mutex );
328
+
329
+ mongoc_stream_destroy (stream );
330
+
331
+ return NULL ;
332
+ }
333
+
334
+
156
335
static void
157
336
test_mongoc_socket_check_closed (void )
158
337
{
@@ -178,8 +357,34 @@ test_mongoc_socket_check_closed (void)
178
357
mongoc_cond_destroy (& data .cond );
179
358
}
180
359
360
+ static void
361
+ test_mongoc_socket_sendv (void )
362
+ {
363
+ socket_test_data_t data = { 0 };
364
+ mongoc_thread_t threads [2 ];
365
+ int i , r ;
366
+
367
+ mongoc_mutex_init (& data .cond_mutex );
368
+ mongoc_cond_init (& data .cond );
369
+
370
+ r = mongoc_thread_create (threads , & sendv_test_server , & data );
371
+ assert (r == 0 );
372
+
373
+ r = mongoc_thread_create (threads + 1 , & sendv_test_client , & data );
374
+ assert (r == 0 );
375
+
376
+ for (i = 0 ; i < 2 ; i ++ ) {
377
+ r = mongoc_thread_join (threads [i ]);
378
+ assert (r == 0 );
379
+ }
380
+
381
+ mongoc_mutex_destroy (& data .cond_mutex );
382
+ mongoc_cond_destroy (& data .cond );
383
+ }
384
+
181
385
void
182
386
test_socket_install (TestSuite * suite )
183
387
{
184
388
TestSuite_Add (suite , "/Socket/check_closed" , test_mongoc_socket_check_closed );
389
+ TestSuite_Add (suite , "/Socket/sendv" , test_mongoc_socket_sendv );
185
390
}
0 commit comments