Skip to content

Commit 08e4cc9

Browse files
ajdavisbjori
authored andcommitted
CDRIVER-1571 slow initiator cancels topology scan
Async ismaster commands started before a slow call to a blocking stream initiator had the initiator's duration subtracted from their timeouts. The initiator and the async commands use the same initial timeout value, so once an initiator times out, all async commands started beforehand now have 0 seconds remaining. They're canceled before they run. This change relies on the overall timeout applied to mongoc_async_run instead of per-command timeouts.
1 parent 618dee5 commit 08e4cc9

File tree

4 files changed

+167
-68
lines changed

4 files changed

+167
-68
lines changed

src/mongoc/mongoc-async-cmd-private.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ typedef struct _mongoc_async_cmd
5555
void *data;
5656
bson_error_t error;
5757
int64_t start_time;
58-
int64_t expire_at;
58+
int64_t timeout_msec;
5959
bson_t cmd;
6060
mongoc_buffer_t buffer;
6161
mongoc_array_t array;

src/mongoc/mongoc-async-cmd.c

Lines changed: 8 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -170,16 +170,14 @@ mongoc_async_cmd_new (mongoc_async_t *async,
170170
int32_t timeout_msec)
171171
{
172172
mongoc_async_cmd_t *acmd;
173-
mongoc_async_cmd_t *tmp;
174-
bool found = false;
175173

176174
BSON_ASSERT (cmd);
177175
BSON_ASSERT (dbname);
178176
BSON_ASSERT (stream);
179177

180178
acmd = (mongoc_async_cmd_t *)bson_malloc0 (sizeof (*acmd));
181179
acmd->async = async;
182-
acmd->expire_at = bson_get_monotonic_time () + (int64_t) timeout_msec * 1000;
180+
acmd->timeout_msec = timeout_msec;
183181
acmd->stream = stream;
184182
acmd->setup = setup;
185183
acmd->setup_ctx = setup_ctx;
@@ -194,22 +192,8 @@ mongoc_async_cmd_new (mongoc_async_t *async,
194192

195193
_mongoc_async_cmd_state_start (acmd);
196194

197-
/* slot the cmd into the right place in the expiration list */
198-
{
199-
async->ncmds++;
200-
DL_FOREACH (async->cmds, tmp)
201-
{
202-
if (tmp->expire_at >= acmd->expire_at) {
203-
DL_PREPEND_ELEM (async->cmds, tmp, acmd);
204-
found = true;
205-
break;
206-
}
207-
}
208-
209-
if (! found) {
210-
DL_APPEND (async->cmds, acmd);
211-
}
212-
}
195+
async->ncmds++;
196+
DL_APPEND (async->cmds, acmd);
213197

214198
return acmd;
215199
}
@@ -238,16 +222,12 @@ mongoc_async_cmd_destroy (mongoc_async_cmd_t *acmd)
238222
mongoc_async_cmd_result_t
239223
_mongoc_async_cmd_phase_setup (mongoc_async_cmd_t *acmd)
240224
{
241-
int64_t now;
242-
int64_t timeout_msec;
243-
244-
now = bson_get_monotonic_time ();
245-
timeout_msec = (acmd->expire_at - now) / 1000;
246-
247-
BSON_ASSERT (timeout_msec < INT32_MAX);
225+
int retval;
248226

249-
switch (acmd->setup (acmd->stream, &acmd->events, acmd->setup_ctx,
250-
(int32_t) timeout_msec, &acmd->error)) {
227+
BSON_ASSERT (acmd->timeout_msec < INT32_MAX);
228+
retval = acmd->setup (acmd->stream, &acmd->events, acmd->setup_ctx,
229+
(int32_t) acmd->timeout_msec, &acmd->error);
230+
switch (retval) {
251231
case -1:
252232
return MONGOC_ASYNC_CMD_ERROR;
253233
break;

src/mongoc/mongoc-async.c

Lines changed: 29 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -67,42 +67,21 @@ mongoc_async_run (mongoc_async_t *async,
6767
mongoc_async_cmd_t *acmd, *tmp;
6868
mongoc_stream_poll_t *poller = NULL;
6969
int i;
70-
ssize_t nactive = 0;
70+
ssize_t nactive;
7171
int64_t now;
72-
int64_t expire_at = 0;
72+
int64_t expire_at;
73+
int64_t poll_timeout_msec;
74+
size_t poll_size;
7375

74-
size_t poll_size = 0;
76+
BSON_ASSERT (timeout_msec > 0);
7577

76-
for (;;) {
77-
now = bson_get_monotonic_time ();
78-
79-
if (expire_at == 0) {
80-
if (timeout_msec >= 0) {
81-
expire_at = now + ((int64_t) timeout_msec * 1000);
82-
} else {
83-
expire_at = -1;
84-
}
85-
} else if (timeout_msec >= 0) {
86-
timeout_msec = (expire_at - now) / 1000;
87-
}
88-
89-
if (now > expire_at) {
90-
break;
91-
}
92-
93-
DL_FOREACH_SAFE (async->cmds, acmd, tmp)
94-
{
95-
/* async commands are sorted by expire_at */
96-
if (now > acmd->expire_at) {
97-
acmd->cb (MONGOC_ASYNC_CMD_TIMEOUT, NULL, (now - acmd->start_time), acmd->data,
98-
&acmd->error);
99-
mongoc_async_cmd_destroy (acmd);
100-
} else {
101-
break;
102-
}
103-
}
78+
now = bson_get_monotonic_time ();
79+
expire_at = now + ((int64_t) timeout_msec * 1000);
80+
poll_size = 0;
10481

82+
for (;;) {
10583
if (!async->ncmds) {
84+
/* work complete */
10685
break;
10786
}
10887

@@ -120,13 +99,10 @@ mongoc_async_run (mongoc_async_t *async,
12099
i++;
121100
}
122101

123-
if (timeout_msec >= 0) {
124-
timeout_msec = BSON_MIN (timeout_msec, (async->cmds->expire_at - now) / 1000);
125-
} else {
126-
timeout_msec = (async->cmds->expire_at - now) / 1000;
127-
}
128-
129-
nactive = mongoc_stream_poll (poller, async->ncmds, timeout_msec);
102+
poll_timeout_msec = (expire_at - now) / 1000;
103+
BSON_ASSERT (poll_timeout_msec < INT32_MAX);
104+
nactive = mongoc_stream_poll (poller, async->ncmds,
105+
(int32_t) poll_timeout_msec);
130106

131107
if (nactive) {
132108
i = 0;
@@ -151,11 +127,25 @@ mongoc_async_run (mongoc_async_t *async,
151127
i++;
152128
}
153129
}
130+
131+
now = bson_get_monotonic_time ();
132+
if (now > expire_at) {
133+
break;
134+
}
154135
}
155136

156137
if (poll_size) {
157138
bson_free (poller);
158139
}
159140

160-
return async->ncmds;
141+
/* commands that succeeded or failed already have been removed from the
142+
* list and freed. therefore, all remaining commands have timed out. */
143+
DL_FOREACH_SAFE (async->cmds, acmd, tmp) {
144+
acmd->cb (MONGOC_ASYNC_CMD_TIMEOUT, NULL, (now - acmd->start_time),
145+
acmd->data, &acmd->error);
146+
mongoc_async_cmd_destroy (acmd);
147+
}
148+
149+
/* cancel the loop in the caller, mongoc_topology_scanner_work() */
150+
return false;
161151
}

tests/test-mongoc-topology-scanner.c

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "mongoc-tests.h"
77
#include "TestSuite.h"
88
#include "mock_server/mock-server.h"
9+
#include "mock_server/mock-rs.h"
910
#include "mock_server/future.h"
1011
#include "mock_server/future-functions.h"
1112

@@ -296,6 +297,126 @@ test_topology_scanner_oscillate ()
296297
}
297298

298299

300+
/* skip on Windows: https://daniel.haxx.se/blog/2012/10/10/wsapoll-is-broken/ */
301+
#ifndef _WIN32
302+
void
303+
test_topology_scanner_connection_error (void)
304+
{
305+
mongoc_client_t *client;
306+
bson_error_t error;
307+
308+
/* assuming nothing is listening on this port */
309+
client = mongoc_client_new (
310+
"mongodb://localhost:9876/?connectTimeoutMS=10");
311+
312+
ASSERT (!mongoc_client_command_simple (client, "db", tmp_bson ("{'foo': 1}"),
313+
NULL, NULL, &error));
314+
315+
ASSERT_ERROR_CONTAINS (error, MONGOC_ERROR_SERVER_SELECTION,
316+
MONGOC_ERROR_SERVER_SELECTION_FAILURE,
317+
"connection error calling ismaster on "
318+
"'localhost:9876'");
319+
320+
mongoc_client_destroy (client);
321+
}
322+
#endif
323+
324+
325+
void
326+
test_topology_scanner_connection_timeout (void)
327+
{
328+
mock_server_t *server;
329+
mongoc_client_t *client;
330+
mongoc_uri_t *uri;
331+
bson_error_t error;
332+
char *expected_msg;
333+
334+
/* server does NOT automatically reply to ismaster */
335+
server = mock_server_new ();
336+
mock_server_run (server);
337+
338+
uri = mongoc_uri_copy (mock_server_get_uri (server));
339+
mongoc_uri_set_option_as_int32 (uri, "connectTimeoutMS", 10);
340+
client = mongoc_client_new_from_uri (uri);
341+
342+
ASSERT (!mongoc_client_command_simple (client, "db", tmp_bson ("{'foo': 1}"),
343+
NULL, NULL, &error));
344+
345+
expected_msg = bson_strdup_printf (
346+
"connection timeout calling ismaster on '%s'",
347+
mongoc_uri_get_hosts (uri)->host_and_port);
348+
349+
ASSERT_ERROR_CONTAINS (error, MONGOC_ERROR_SERVER_SELECTION,
350+
MONGOC_ERROR_SERVER_SELECTION_FAILURE,
351+
expected_msg);
352+
353+
bson_free (expected_msg);
354+
mongoc_client_destroy (client);
355+
mongoc_uri_destroy (uri);
356+
mock_server_destroy (server);
357+
}
358+
359+
360+
typedef struct
361+
{
362+
uint16_t slow_port;
363+
mongoc_client_t *client;
364+
} initiator_data_t;
365+
366+
367+
static mongoc_stream_t *
368+
slow_initiator (const mongoc_uri_t *uri,
369+
const mongoc_host_list_t *host,
370+
void *user_data,
371+
bson_error_t *err)
372+
{
373+
initiator_data_t *data;
374+
375+
data = (initiator_data_t *) user_data;
376+
377+
if (host->port == data->slow_port) {
378+
_mongoc_usleep (500 * 1000); /* 500 ms is longer than connectTimeoutMS */
379+
}
380+
381+
return mongoc_client_default_stream_initiator (uri, host, data->client, err);
382+
}
383+
384+
385+
static void
386+
test_topology_scanner_blocking_initiator (void)
387+
{
388+
mock_rs_t *rs;
389+
mongoc_uri_t *uri;
390+
mongoc_client_t *client;
391+
initiator_data_t data;
392+
bson_error_t error;
393+
394+
rs = mock_rs_with_autoismaster (0, /* wire version */
395+
true, /* has primary */
396+
1, /* n_secondaries */
397+
0 /* n_arbiters */);
398+
399+
mock_rs_run (rs);
400+
uri = mongoc_uri_copy (mock_rs_get_uri (rs));
401+
mongoc_uri_set_option_as_int32 (uri, "connectTimeoutMS", 100);
402+
client = mongoc_client_new_from_uri (uri);
403+
404+
/* pretend last host in linked list is slow */
405+
data.slow_port = mongoc_uri_get_hosts (uri)->next->port;
406+
data.client = client;
407+
mongoc_client_set_stream_initiator (client, slow_initiator, &data);
408+
409+
ASSERT_OR_PRINT (mongoc_client_command_simple (client, "admin",
410+
tmp_bson ("{'ismaster': 1}"),
411+
NULL,
412+
NULL, &error), error);
413+
414+
mongoc_client_destroy (client);
415+
mongoc_uri_destroy (uri);
416+
mock_rs_destroy (rs);
417+
}
418+
419+
299420
void
300421
test_topology_scanner_install (TestSuite *suite)
301422
{
@@ -307,4 +428,12 @@ test_topology_scanner_install (TestSuite *suite)
307428
test_topology_scanner_discovery);
308429
TestSuite_Add (suite, "/TOPOLOGY/scanner_oscillate",
309430
test_topology_scanner_oscillate);
431+
#ifndef _WIN32
432+
TestSuite_Add (suite, "/TOPOLOGY/scanner_connection_error",
433+
test_topology_scanner_connection_error);
434+
#endif
435+
TestSuite_Add (suite, "/TOPOLOGY/scanner_connection_timeout",
436+
test_topology_scanner_connection_timeout);
437+
TestSuite_Add (suite, "/TOPOLOGY/blocking_initiator",
438+
test_topology_scanner_blocking_initiator);
310439
}

0 commit comments

Comments
 (0)