Skip to content

Commit df781c0

Browse files
committed
CDRIVER-830 do blocking reconnect if single-threaded
The new mongoc_cluster_node_reconnect initiates a non-blocking reconnect but doesn't wait for connection to complete. If the driver is in single- threaded mode (so the scanner and the application share a connection) and immediately uses that server, connection hasn't completed: cursor error: Failure during socket delivery: Socket not connected (57) With this change, apply connectTimeoutMS to the reconnection in single- threaded mode.
1 parent 1410525 commit df781c0

File tree

3 files changed

+104
-1
lines changed

3 files changed

+104
-1
lines changed

src/mongoc/mongoc-cluster.c

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1241,6 +1241,8 @@ mongoc_cluster_node_reconnect (mongoc_cluster_t *cluster, uint32_t server_id, bs
12411241
mongoc_topology_t *topology = cluster->client->topology;
12421242
mongoc_cluster_node_t *cluster_node;
12431243
mongoc_server_description_t *sd = NULL;
1244+
int64_t expire_at;
1245+
12441246
ENTRY;
12451247

12461248
bson_return_val_if_fail(cluster, false);
@@ -1259,7 +1261,24 @@ mongoc_cluster_node_reconnect (mongoc_cluster_t *cluster, uint32_t server_id, bs
12591261

12601262
mongoc_stream_failed (scanner_node->stream);
12611263
scanner_node->stream = NULL;
1262-
mongoc_topology_scanner_node_setup (scanner_node, error);
1264+
1265+
/* begin a non-blocking connect, then await connection */
1266+
if (!mongoc_topology_scanner_node_setup (scanner_node, error)) {
1267+
RETURN(false);
1268+
}
1269+
1270+
expire_at = bson_get_monotonic_time ()
1271+
+ topology->connect_timeout_msec * 1000;
1272+
1273+
if (!mongoc_stream_wait (scanner_node->stream, expire_at)) {
1274+
bson_set_error (error,
1275+
MONGOC_ERROR_STREAM,
1276+
MONGOC_ERROR_STREAM_NOT_ESTABLISHED,
1277+
"Could not reconnect to %s",
1278+
scanner_node->host.host_and_port);
1279+
RETURN(false);
1280+
}
1281+
12631282
if (scanner_node->stream && cluster->requires_auth) {
12641283
sd = mongoc_topology_server_by_id (topology, server_id);
12651284
if (!sd) {

src/mongoc/mongoc-stream.c

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "mongoc-array-private.h"
2121
#include "mongoc-buffer-private.h"
2222
#include "mongoc-error.h"
23+
#include "mongoc-errno-private.h"
2324
#include "mongoc-flags.h"
2425
#include "mongoc-log.h"
2526
#include "mongoc-opcode.h"
@@ -353,6 +354,87 @@ mongoc_stream_poll (mongoc_stream_poll_t *streams,
353354
}
354355

355356

357+
/*
358+
*--------------------------------------------------------------------------
359+
*
360+
* mongoc_stream_wait --
361+
*
362+
* Internal helper, poll a single stream until it connects.
363+
*
364+
* For now, only the POLLOUT (connected) event is supported.
365+
*
366+
* @expire_at should be an absolute time at which to expire using
367+
* the monotonic clock (bson_get_monotonic_time(), which is in
368+
* microseconds). expire_at of 0 or -1 is prohibited.
369+
*
370+
* Returns:
371+
* true if an event matched. otherwise false.
372+
* a timeout will return false.
373+
*
374+
* Side effects:
375+
* None.
376+
*
377+
*--------------------------------------------------------------------------
378+
*/
379+
380+
bool
381+
mongoc_stream_wait (mongoc_stream_t *stream,
382+
int64_t expire_at)
383+
{
384+
mongoc_stream_poll_t poller;
385+
int64_t now;
386+
int32_t timeout_msec;
387+
ssize_t ret;
388+
389+
ENTRY;
390+
391+
BSON_ASSERT (stream);
392+
BSON_ASSERT (expire_at > 0);
393+
394+
poller.stream = stream;
395+
poller.events = POLLOUT;
396+
poller.revents = 0;
397+
398+
now = bson_get_monotonic_time();
399+
400+
for (;;) {
401+
/* TODO CDRIVER-804 use int64_t for timeouts consistently */
402+
timeout_msec = (int32_t) BSON_MIN ((expire_at - now) / 1000L, INT32_MAX);
403+
if (timeout_msec < 0) {
404+
timeout_msec = 0;
405+
}
406+
407+
ret = mongoc_stream_poll (&poller, 1, timeout_msec);
408+
409+
if (ret > 0) {
410+
/* an event happened, return true if POLLOUT else false */
411+
RETURN (0 != (poller.revents & POLLOUT));
412+
} else if (ret < 0) {
413+
/* poll itself failed */
414+
415+
TRACE("errno is: %d", errno);
416+
if (MONGOC_ERRNO_IS_AGAIN(errno)) {
417+
now = bson_get_monotonic_time();
418+
419+
if (expire_at < now) {
420+
RETURN (false);
421+
} else {
422+
continue;
423+
}
424+
} else {
425+
/* poll failed for some non-transient reason */
426+
RETURN (false);
427+
}
428+
} else {
429+
/* poll timed out */
430+
RETURN (false);
431+
}
432+
}
433+
434+
return true;
435+
}
436+
437+
356438
bool
357439
mongoc_stream_check_closed (mongoc_stream_t *stream)
358440
{

src/mongoc/mongoc-stream.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ int mongoc_stream_setsockopt (mongoc_stream_t *stream,
9494
int optname,
9595
void *optval,
9696
socklen_t optlen);
97+
bool mongoc_stream_wait (mongoc_stream_t *stream,
98+
int64_t expire_at);
9799
bool mongoc_stream_check_closed (mongoc_stream_t *stream);
98100
ssize_t mongoc_stream_poll (mongoc_stream_poll_t *streams,
99101
size_t nstreams,

0 commit comments

Comments
 (0)