Skip to content

Commit 08cbe76

Browse files
committed
Check for closed sockets before attempting a rpc
Ensure that the socket we've selected has been read from within a second. If it hasn't, check if it's closed by polling for readability and if that works, MSG_PEEK'ing a single byte. If we get eof, or an error, we can assume it's broken. This lets us avoid a large class of transient network failures. Closes #136
1 parent 4d4316e commit 08cbe76

14 files changed

+320
-13
lines changed

src/mongoc/mongoc-cluster-private.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ typedef struct
7878
int32_t max_wire_version;
7979
int32_t max_write_batch_size;
8080
char *replSet;
81+
int64_t last_read_msec;
8182
} mongoc_cluster_node_t;
8283

8384

src/mongoc/mongoc-cluster.c

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@
6363
#define MIN_WIRE_VERSION 0
6464
#define MAX_WIRE_VERSION 3
6565

66+
#define CHECK_CLOSED_DURATION_MSEC 1000
67+
6668

6769
#ifndef DEFAULT_SOCKET_TIMEOUT_MSEC
6870
/*
@@ -2656,20 +2658,34 @@ _mongoc_cluster_sendv (mongoc_cluster_t *cluster,
26562658
}
26572659
}
26582660

2659-
/*
2660-
* Try to find a node to deliver to. Since we are allowed to block in this
2661-
* version of sendv, we try to reconnect if we cannot select a node.
2662-
*/
2663-
while (!(node = _mongoc_cluster_select (cluster, rpcs, rpcs_len, hint,
2664-
write_concern, read_prefs,
2665-
error))) {
2666-
if ((retry_count++ == MAX_RETRY_COUNT) ||
2667-
!_mongoc_cluster_reconnect (cluster, error)) {
2668-
RETURN (false);
2661+
for (;;) {
2662+
/*
2663+
* Try to find a node to deliver to. Since we are allowed to block in this
2664+
* version of sendv, we try to reconnect if we cannot select a node.
2665+
*/
2666+
while (!(node = _mongoc_cluster_select (cluster, rpcs, rpcs_len, hint,
2667+
write_concern, read_prefs,
2668+
error))) {
2669+
if ((retry_count++ == MAX_RETRY_COUNT) ||
2670+
!_mongoc_cluster_reconnect (cluster, error)) {
2671+
RETURN (false);
2672+
}
26692673
}
2670-
}
26712674

2672-
BSON_ASSERT(node->stream);
2675+
BSON_ASSERT(node->stream);
2676+
2677+
if (node->last_read_msec + CHECK_CLOSED_DURATION_MSEC < now) {
2678+
if (mongoc_stream_check_closed (node->stream)) {
2679+
_mongoc_cluster_disconnect_node (cluster, node);
2680+
_mongoc_cluster_reconnect (cluster, NULL);
2681+
} else {
2682+
node->last_read_msec = now;
2683+
break;
2684+
}
2685+
} else {
2686+
break;
2687+
}
2688+
}
26732689

26742690
_mongoc_array_clear (&cluster->iov);
26752691

@@ -3005,6 +3021,8 @@ _mongoc_cluster_try_recv (mongoc_cluster_t *cluster,
30053021
RETURN (false);
30063022
}
30073023

3024+
node->last_read_msec = bson_get_monotonic_time ();
3025+
30083026
DUMP_BYTES (buffer, buffer->data + buffer->off, buffer->len);
30093027

30103028
_mongoc_rpc_swab_from_le (rpc);

src/mongoc/mongoc-socket.c

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1069,3 +1069,28 @@ mongoc_socket_getnameinfo (mongoc_socket_t *sock) /* IN */
10691069

10701070
RETURN (NULL);
10711071
}
1072+
1073+
1074+
bool
1075+
mongoc_socket_check_closed (mongoc_socket_t *sock) /* IN */
1076+
{
1077+
bool closed = false;
1078+
char buf [1];
1079+
ssize_t r;
1080+
1081+
if (_mongoc_socket_wait (sock->sd, POLLIN, 0)) {
1082+
sock->errno_ = 0;
1083+
1084+
r = recv (sock->sd, buf, 1, MSG_PEEK);
1085+
1086+
if (r < 0) {
1087+
_mongoc_socket_capture_errno (sock);
1088+
}
1089+
1090+
if (r < 1) {
1091+
closed = true;
1092+
}
1093+
}
1094+
1095+
return closed;
1096+
}

src/mongoc/mongoc-socket.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ ssize_t mongoc_socket_sendv (mongoc_socket_t *sock,
8686
mongoc_iovec_t *iov,
8787
size_t iovcnt,
8888
int64_t expire_at);
89+
bool mongoc_socket_check_closed (mongoc_socket_t *sock);
8990

9091

9192
BSON_END_DECLS

src/mongoc/mongoc-stream-buffered.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,15 @@ _mongoc_stream_buffered_get_base_stream (mongoc_stream_t *stream) /* IN */
244244
}
245245

246246

247+
static bool
248+
_mongoc_stream_buffered_check_closed (mongoc_stream_t *stream) /* IN */
249+
{
250+
mongoc_stream_buffered_t *buffered = (mongoc_stream_buffered_t *)stream;
251+
bson_return_val_if_fail(stream, -1);
252+
return mongoc_stream_check_closed (buffered->base_stream);
253+
}
254+
255+
247256
/*
248257
*--------------------------------------------------------------------------
249258
*
@@ -283,6 +292,7 @@ mongoc_stream_buffered_new (mongoc_stream_t *base_stream, /* IN */
283292
stream->stream.writev = mongoc_stream_buffered_writev;
284293
stream->stream.readv = mongoc_stream_buffered_readv;
285294
stream->stream.get_base_stream = _mongoc_stream_buffered_get_base_stream;
295+
stream->stream.check_closed = _mongoc_stream_buffered_check_closed;
286296

287297
stream->base_stream = base_stream;
288298

src/mongoc/mongoc-stream-file.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,13 @@ _mongoc_stream_file_writev (mongoc_stream_t *stream, /* IN */
167167
}
168168

169169

170+
static bool
171+
_mongoc_stream_file_check_closed (mongoc_stream_t *stream) /* IN */
172+
{
173+
return false;
174+
}
175+
176+
170177
mongoc_stream_t *
171178
mongoc_stream_file_new (int fd) /* IN */
172179
{
@@ -181,6 +188,7 @@ mongoc_stream_file_new (int fd) /* IN */
181188
stream->vtable.flush = _mongoc_stream_file_flush;
182189
stream->vtable.readv = _mongoc_stream_file_readv;
183190
stream->vtable.writev = _mongoc_stream_file_writev;
191+
stream->vtable.check_closed = _mongoc_stream_file_check_closed;
184192
stream->fd = fd;
185193

186194
return (mongoc_stream_t *)stream;

src/mongoc/mongoc-stream-gridfs.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,13 @@ _mongoc_stream_gridfs_writev (mongoc_stream_t *stream,
139139
}
140140

141141

142+
static bool
143+
_mongoc_stream_gridfs_check_closed (mongoc_stream_t *stream) /* IN */
144+
{
145+
return false;
146+
}
147+
148+
142149
mongoc_stream_t *
143150
mongoc_stream_gridfs_new (mongoc_gridfs_file_t *file)
144151
{
@@ -156,6 +163,7 @@ mongoc_stream_gridfs_new (mongoc_gridfs_file_t *file)
156163
stream->stream.flush = _mongoc_stream_gridfs_flush;
157164
stream->stream.writev = _mongoc_stream_gridfs_writev;
158165
stream->stream.readv = _mongoc_stream_gridfs_readv;
166+
stream->stream.check_closed = _mongoc_stream_gridfs_check_closed;
159167

160168
mongoc_counter_streams_active_inc ();
161169

src/mongoc/mongoc-stream-socket.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,23 @@ mongoc_stream_socket_get_socket (mongoc_stream_socket_t *stream) /* IN */
210210
}
211211

212212

213+
static bool
214+
_mongoc_stream_socket_check_closed (mongoc_stream_t *stream) /* IN */
215+
{
216+
mongoc_stream_socket_t *ss = (mongoc_stream_socket_t *)stream;
217+
218+
ENTRY;
219+
220+
bson_return_val_if_fail (stream, true);
221+
222+
if (ss->sock) {
223+
RETURN (mongoc_socket_check_closed (ss->sock));
224+
}
225+
226+
RETURN (true);
227+
}
228+
229+
213230
/*
214231
*--------------------------------------------------------------------------
215232
*
@@ -242,6 +259,7 @@ mongoc_stream_socket_new (mongoc_socket_t *sock) /* IN */
242259
stream->vtable.readv = _mongoc_stream_socket_readv;
243260
stream->vtable.writev = _mongoc_stream_socket_writev;
244261
stream->vtable.setsockopt = _mongoc_stream_socket_setsockopt;
262+
stream->vtable.check_closed = _mongoc_stream_socket_check_closed;
245263
stream->sock = sock;
246264

247265
return (mongoc_stream_t *)stream;

src/mongoc/mongoc-stream-tls.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,15 @@ _mongoc_stream_tls_get_base_stream (mongoc_stream_t *stream)
721721
}
722722

723723

724+
static bool
725+
_mongoc_stream_tls_check_closed (mongoc_stream_t *stream) /* IN */
726+
{
727+
mongoc_stream_tls_t *tls = (mongoc_stream_tls_t *)stream;
728+
bson_return_val_if_fail(stream, -1);
729+
return mongoc_stream_check_closed (tls->base_stream);
730+
}
731+
732+
724733
/*
725734
*--------------------------------------------------------------------------
726735
*
@@ -779,6 +788,7 @@ mongoc_stream_tls_new (mongoc_stream_t *base_stream,
779788
tls->parent.readv = _mongoc_stream_tls_readv;
780789
tls->parent.setsockopt = _mongoc_stream_tls_setsockopt;
781790
tls->parent.get_base_stream = _mongoc_stream_tls_get_base_stream;
791+
tls->parent.check_closed = _mongoc_stream_tls_check_closed;
782792
tls->weak_cert_validation = opt->weak_cert_validation;
783793
tls->bio = bio_ssl;
784794
tls->ctx = ssl_ctx;

src/mongoc/mongoc-stream.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,3 +239,18 @@ mongoc_stream_get_base_stream (mongoc_stream_t *stream) /* IN */
239239

240240
return NULL;
241241
}
242+
243+
244+
bool
245+
mongoc_stream_check_closed (mongoc_stream_t *stream)
246+
{
247+
int ret;
248+
249+
ENTRY;
250+
251+
bson_return_val_if_fail(stream, -1);
252+
253+
ret = stream->check_closed(stream);
254+
255+
RETURN (ret);
256+
}

0 commit comments

Comments
 (0)