Skip to content

Commit 2451e7d

Browse files
committed
adding OP_QUERY rpc exhaust support
Provide suport for exhaust queries Found a couple of bugs in: * buffers didn't handle already buffered data well (negative unsigned ints * gridfs files were still on possibly negative min_bytes for readv
1 parent b0df529 commit 2451e7d

14 files changed

+279
-57
lines changed

doc/mongoc_gridfs_file_readv.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ ssize_t
1515
mongoc_gridfs_file_readv (mongoc_gridfs_file_t *file,
1616
struct iovec *iov,
1717
size_t iovcnt,
18-
ssize_t min_bytes,
18+
size_t min_bytes,
1919
bson_uint32_t timeout_msec);
2020

2121
ssize_t

mongoc/mongoc-buffer.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ _mongoc_buffer_append_from_stream (mongoc_buffer_t *buffer,
156156
}
157157

158158
buf = &buffer->data[buffer->off + buffer->len];
159-
ret = mongoc_stream_read(stream, buf, size, -1, timeout_msec);
159+
ret = mongoc_stream_read(stream, buf, size, size, timeout_msec);
160160
if (ret != size) {
161161
bson_set_error(error,
162162
MONGOC_ERROR_STREAM,
@@ -200,11 +200,12 @@ _mongoc_buffer_fill (mongoc_buffer_t *buffer,
200200
BSON_ASSERT(buffer->data);
201201
BSON_ASSERT(buffer->datalen);
202202

203-
min_bytes -= buffer->len;
204-
if (min_bytes <= 0) {
203+
if (min_bytes <= buffer->len) {
205204
return buffer->len;
206205
}
207206

207+
min_bytes -= buffer->len;
208+
208209
if (buffer->len) {
209210
memmove(&buffer->data[0], &buffer->data[buffer->off], buffer->len);
210211
}

mongoc/mongoc-client-private.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ struct _mongoc_client_t
4545
mongoc_list_t *conns;
4646
mongoc_uri_t *uri;
4747
mongoc_cluster_t cluster;
48+
bson_bool_t in_exhaust;
4849

4950
mongoc_stream_initiator_t initiator;
5051
void *initiator_data;

mongoc/mongoc-client.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,14 @@ _mongoc_client_sendv (mongoc_client_t *client,
404404
bson_return_val_if_fail(rpcs, FALSE);
405405
bson_return_val_if_fail(rpcs_len, FALSE);
406406

407+
if (client->in_exhaust) {
408+
bson_set_error(error,
409+
MONGOC_ERROR_CLIENT,
410+
MONGOC_ERROR_CLIENT_IN_EXHAUST,
411+
"A cursor derived from this client is in exhaust.");
412+
RETURN(FALSE);
413+
}
414+
407415
for (i = 0; i < rpcs_len; i++) {
408416
rpcs[i].header.msg_len = 0;
409417
rpcs[i].header.request_id = ++client->request_id;
@@ -1073,6 +1081,8 @@ _mongoc_client_warm_up (mongoc_client_t *client,
10731081
ret = _mongoc_cluster_command_early (&client->cluster, "admin", &cmd,
10741082
NULL, error);
10751083
bson_destroy (&cmd);
1084+
} else if (client->cluster.state == MONGOC_CLUSTER_STATE_DEAD) {
1085+
ret = _mongoc_cluster_reconnect(&client->cluster, error);
10761086
}
10771087

10781088
return ret;

mongoc/mongoc-cluster-private.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ typedef struct
9090
bson_bool_t requires_auth : 1;
9191

9292
bson_int32_t wire_version;
93+
bson_bool_t isdbgrid;
9394

9495
mongoc_cluster_node_t nodes[MONGOC_CLUSTER_MAX_NODES];
9596
mongoc_client_t *client;
@@ -164,6 +165,15 @@ _mongoc_cluster_command_early (mongoc_cluster_t *cluster,
164165
bson_error_t *error)
165166
BSON_GNUC_INTERNAL;
166167

168+
void
169+
_mongoc_cluster_disconnect_node (mongoc_cluster_t *cluster,
170+
mongoc_cluster_node_t *node)
171+
BSON_GNUC_INTERNAL;
172+
173+
bson_bool_t
174+
_mongoc_cluster_reconnect (mongoc_cluster_t *cluster,
175+
bson_error_t *error)
176+
BSON_GNUC_INTERNAL;
167177

168178
BSON_END_DECLS
169179

mongoc/mongoc-cluster.c

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ _mongoc_cluster_build_basic_auth_digest (mongoc_cluster_t *cluster,
458458
*--------------------------------------------------------------------------
459459
*/
460460

461-
static void
461+
void
462462
_mongoc_cluster_disconnect_node (mongoc_cluster_t *cluster,
463463
mongoc_cluster_node_t *node)
464464
{
@@ -1044,13 +1044,18 @@ _mongoc_cluster_ismaster (mongoc_cluster_t *cluster,
10441044
if (bson_iter_init_find (&iter, &reply, "msg") &&
10451045
BSON_ITER_HOLDS_UTF8 (&iter) &&
10461046
(strcmp ("isdbgrid", bson_iter_utf8 (&iter, NULL)) == 0)) {
1047+
/* TODO: is this sufficient to detect sharded clusters? */
1048+
1049+
cluster->isdbgrid = TRUE;
10471050
/*
10481051
* TODO: This is actually a sharded cluster!
10491052
*/
10501053
if (cluster->mode != MONGOC_CLUSTER_SHARDED_CLUSTER) {
10511054
MONGOC_INFO ("Unexpectedly connected to sharded cluster: %s",
10521055
node->host.host_and_port);
10531056
}
1057+
} else {
1058+
cluster->isdbgrid = FALSE;
10541059
}
10551060

10561061
/*
@@ -1645,7 +1650,7 @@ _mongoc_cluster_reconnect_sharded_cluster (mongoc_cluster_t *cluster,
16451650
*--------------------------------------------------------------------------
16461651
*/
16471652

1648-
static bson_bool_t
1653+
bson_bool_t
16491654
_mongoc_cluster_reconnect (mongoc_cluster_t *cluster,
16501655
bson_error_t *error)
16511656
{

mongoc/mongoc-cursor-private.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ struct _mongoc_cursor_t
6060
bson_bool_t done : 1;
6161
bson_bool_t failed : 1;
6262
bson_bool_t end_of_event : 1;
63+
bson_bool_t in_exhaust : 1;
6364

6465
bson_t query;
6566
bson_t fields;

mongoc/mongoc-cursor.c

Lines changed: 79 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,15 @@ _mongoc_cursor_new (mongoc_client_t *client,
7070

7171
ENTRY;
7272

73-
bson_return_val_if_fail(client, NULL);
74-
bson_return_val_if_fail(db_and_collection, NULL);
75-
bson_return_val_if_fail(query, NULL);
73+
BSON_ASSERT(client);
74+
BSON_ASSERT(db_and_collection);
75+
BSON_ASSERT(query);
76+
77+
/* we can't have exhaust queries with limits */
78+
BSON_ASSERT (!((flags & MONGOC_QUERY_EXHAUST) && limit));
79+
80+
/* we can't have exhaust queries with sharded clusters */
81+
BSON_ASSERT (!((flags & MONGOC_QUERY_EXHAUST) && client->cluster.isdbgrid));
7682

7783
/*
7884
* Cursors execute their query lazily. This sadly means that we must copy
@@ -180,7 +186,15 @@ _mongoc_cursor_destroy (mongoc_cursor_t *cursor)
180186

181187
bson_return_if_fail(cursor);
182188

183-
if (cursor->rpc.reply.cursor_id) {
189+
if (cursor->in_exhaust) {
190+
cursor->client->in_exhaust = FALSE;
191+
192+
if (!cursor->done) {
193+
_mongoc_cluster_disconnect_node (
194+
&cursor->client->cluster,
195+
&cursor->client->cluster.nodes[cursor->hint - 1]);
196+
}
197+
} else if (cursor->rpc.reply.cursor_id) {
184198
_mongoc_cursor_kill_cursor(cursor, cursor->rpc.reply.cursor_id);
185199
}
186200

@@ -361,6 +375,11 @@ _mongoc_cursor_query (mongoc_cursor_t *cursor)
361375
cursor->reader = bson_reader_new_from_data(cursor->rpc.reply.documents,
362376
cursor->rpc.reply.documents_len);
363377

378+
if (cursor->flags & MONGOC_QUERY_EXHAUST) {
379+
cursor->in_exhaust = TRUE;
380+
cursor->client->in_exhaust = TRUE;
381+
}
382+
364383
cursor->done = FALSE;
365384
cursor->end_of_event = FALSE;
366385
cursor->sent = TRUE;
@@ -382,53 +401,57 @@ _mongoc_cursor_get_more (mongoc_cursor_t *cursor)
382401

383402
ENTRY;
384403

385-
bson_return_val_if_fail(cursor, FALSE);
404+
BSON_ASSERT(cursor);
386405

387-
if (!_mongoc_client_warm_up (cursor->client, &cursor->error)) {
388-
cursor->failed = TRUE;
389-
RETURN (FALSE);
390-
}
406+
if (! cursor->in_exhaust) {
407+
if (!_mongoc_client_warm_up (cursor->client, &cursor->error)) {
408+
cursor->failed = TRUE;
409+
RETURN (FALSE);
410+
}
391411

392-
if (!(cursor_id = cursor->rpc.reply.cursor_id)) {
393-
bson_set_error(&cursor->error,
394-
MONGOC_ERROR_CURSOR,
395-
MONGOC_ERROR_CURSOR_INVALID_CURSOR,
396-
"No valid cursor was provided.");
397-
goto failure;
398-
}
412+
if (!(cursor_id = cursor->rpc.reply.cursor_id)) {
413+
bson_set_error(&cursor->error,
414+
MONGOC_ERROR_CURSOR,
415+
MONGOC_ERROR_CURSOR_INVALID_CURSOR,
416+
"No valid cursor was provided.");
417+
goto failure;
418+
}
419+
420+
rpc.get_more.msg_len = 0;
421+
rpc.get_more.request_id = 0;
422+
rpc.get_more.response_to = 0;
423+
rpc.get_more.opcode = MONGOC_OPCODE_GET_MORE;
424+
rpc.get_more.zero = 0;
425+
rpc.get_more.collection = cursor->ns;
426+
if ((cursor->flags & MONGOC_QUERY_TAILABLE_CURSOR)) {
427+
rpc.get_more.n_return = 0;
428+
} else {
429+
/*
430+
* TODO: We need to apply the limit to this so we don't
431+
* overshoot our target.
432+
*/
433+
rpc.get_more.n_return = cursor->batch_size;
434+
}
435+
rpc.get_more.cursor_id = cursor_id;
399436

400-
rpc.get_more.msg_len = 0;
401-
rpc.get_more.request_id = 0;
402-
rpc.get_more.response_to = 0;
403-
rpc.get_more.opcode = MONGOC_OPCODE_GET_MORE;
404-
rpc.get_more.zero = 0;
405-
rpc.get_more.collection = cursor->ns;
406-
if ((cursor->flags & MONGOC_QUERY_TAILABLE_CURSOR)) {
407-
rpc.get_more.n_return = 0;
408-
} else {
409437
/*
410-
* TODO: We need to apply the limit to this so we don't
411-
* overshoot our target.
438+
* TODO: Stamp protections for disconnections.
412439
*/
413-
rpc.get_more.n_return = cursor->batch_size;
414-
}
415-
rpc.get_more.cursor_id = cursor_id;
416440

417-
/*
418-
* TODO: Stamp protections for disconnections.
419-
*/
441+
if (!_mongoc_client_sendv(cursor->client, &rpc, 1, cursor->hint,
442+
NULL, cursor->read_prefs, &cursor->error)) {
443+
cursor->done = TRUE;
444+
cursor->failed = TRUE;
445+
RETURN(FALSE);
446+
}
420447

421-
if (!_mongoc_client_sendv(cursor->client, &rpc, 1, cursor->hint,
422-
NULL, cursor->read_prefs, &cursor->error)) {
423-
cursor->done = TRUE;
424-
cursor->failed = TRUE;
425-
RETURN(FALSE);
448+
request_id = BSON_UINT32_FROM_LE(rpc.header.request_id);
449+
} else {
450+
request_id = BSON_UINT32_FROM_LE(cursor->rpc.header.request_id);
426451
}
427452

428453
_mongoc_buffer_clear(&cursor->buffer, FALSE);
429454

430-
request_id = BSON_UINT32_FROM_LE(rpc.header.request_id);
431-
432455
if (!_mongoc_client_recv(cursor->client,
433456
&cursor->rpc,
434457
&cursor->buffer,
@@ -536,7 +559,17 @@ _mongoc_cursor_next (mongoc_cursor_t *cursor,
536559

537560
ENTRY;
538561

539-
bson_return_val_if_fail(cursor, FALSE);
562+
BSON_ASSERT(cursor);
563+
564+
565+
if (cursor->client->in_exhaust && ! cursor->in_exhaust) {
566+
bson_set_error(&cursor->error,
567+
MONGOC_ERROR_CLIENT,
568+
MONGOC_ERROR_CLIENT_IN_EXHAUST,
569+
"Another cursor derived from this client is in exhaust.");
570+
cursor->failed = TRUE;
571+
RETURN(FALSE);
572+
}
540573

541574
if (bson) {
542575
*bson = NULL;
@@ -568,9 +601,11 @@ _mongoc_cursor_next (mongoc_cursor_t *cursor,
568601
eof = FALSE;
569602
b = bson_reader_read(cursor->reader, &eof);
570603
cursor->end_of_event = eof;
571-
cursor->done = (cursor->end_of_event &&
572-
!b &&
573-
!(cursor->flags & MONGOC_QUERY_TAILABLE_CURSOR));
604+
605+
cursor->done = cursor->end_of_event && (
606+
(cursor->in_exhaust && !cursor->rpc.reply.cursor_id) ||
607+
(!b && !(cursor->flags & MONGOC_QUERY_TAILABLE_CURSOR))
608+
);
574609

575610
/*
576611
* Do a supplimental check to see if we had a corrupted reply in the

mongoc/mongoc-error.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ typedef enum
5151
MONGOC_ERROR_CLIENT_GETNONCE,
5252
MONGOC_ERROR_CLIENT_AUTHENTICATE,
5353
MONGOC_ERROR_CLIENT_NO_ACCEPTABLE_PEER,
54+
MONGOC_ERROR_CLIENT_IN_EXHAUST,
5455

5556
MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
5657
MONGOC_ERROR_PROTOCOL_BAD_WIRE_VERSION,

mongoc/mongoc-gridfs-file.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ ssize_t
349349
mongoc_gridfs_file_readv (mongoc_gridfs_file_t *file,
350350
struct iovec *iov,
351351
size_t iovcnt,
352-
ssize_t min_bytes,
352+
size_t min_bytes,
353353
bson_uint32_t timeout_msec)
354354
{
355355
bson_uint32_t bytes_read = 0;
@@ -389,7 +389,7 @@ mongoc_gridfs_file_readv (mongoc_gridfs_file_t *file,
389389
} else if (file->length == file->pos) {
390390
/* we're at the end of the file. So we're done */
391391
RETURN (bytes_read);
392-
} else if (min_bytes > -1 && bytes_read >= min_bytes) {
392+
} else if (bytes_read >= min_bytes) {
393393
/* we need a new page, but we've read enough bytes to stop */
394394
RETURN (bytes_read);
395395
} else {

0 commit comments

Comments
 (0)