Skip to content

Commit 15a1985

Browse files
committed
CDRIVER-3230 fix opquery batch splitting
OP_QUERY batch splitting was not taking command options into account when computing the size to split. So, for example, a write concern document could put the batch over the maxBsonObjectSize + 16k limit.
1 parent 4ca77f6 commit 15a1985

File tree

5 files changed

+167
-35
lines changed

5 files changed

+167
-35
lines changed

src/libmongoc/src/mongoc/mongoc-write-command-private.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ struct _mongoc_crud_opts_t;
3737
#define MONGOC_WRITE_COMMAND_INSERT 1
3838
#define MONGOC_WRITE_COMMAND_UPDATE 2
3939

40+
/* MongoDB has a extra allowance to allow updating 16mb document, as the update
41+
* operators would otherwise overflow the 16mb object limit. See SERVER-10643
42+
* for context. */
43+
#define BSON_OBJECT_ALLOWANCE (16 * 1024)
44+
4045
struct _mongoc_bulk_write_flags_t {
4146
bool ordered;
4247
bool bypass_document_validation;

src/libmongoc/src/mongoc/mongoc-write-command.c

Lines changed: 68 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ _mongoc_write_command_will_overflow (uint32_t len_so_far,
418418
/* max BSON object size + 16k bytes.
419419
* server guarantees there is enough room: SERVER-10643
420420
*/
421-
int32_t max_cmd_size = max_bson_size + 16384;
421+
int32_t max_cmd_size = max_bson_size + BSON_OBJECT_ALLOWANCE;
422422

423423
BSON_ASSERT (max_bson_size);
424424

@@ -469,10 +469,6 @@ _mongoc_write_opmsg (mongoc_write_command_t *command,
469469
BSON_ASSERT (server_stream);
470470
BSON_ASSERT (collection);
471471

472-
/* MongoDB has a extra allowance to allow updating 16mb document,
473-
* as the update operators would otherwise overflow the 16mb object limit
474-
*/
475-
#define BSON_OBJECT_ALLOWANCE (16 * 1024)
476472
max_bson_obj_size = mongoc_server_stream_max_bson_obj_size (server_stream);
477473
max_msg_size = mongoc_server_stream_max_msg_size (server_stream);
478474
max_document_count =
@@ -683,6 +679,41 @@ _append_array_from_command (mongoc_write_command_t *command, bson_t *bson)
683679
bson_reader_destroy (reader);
684680
}
685681

682+
/* Assemble the base @cmd with all of the command options.
683+
* @parts is always initialized, even on error.
684+
* This is called twice in _mongoc_write_opquery.
685+
* Once with no payload documents, to determine the total size. And once with
686+
* payload documents, to send the final command. */
687+
static bool
688+
_assemble_cmd (bson_t *cmd,
689+
mongoc_write_command_t *command,
690+
mongoc_client_t *client,
691+
mongoc_server_stream_t *server_stream,
692+
const char *database,
693+
const mongoc_write_concern_t *write_concern,
694+
mongoc_cmd_parts_t *parts,
695+
bson_error_t *error)
696+
{
697+
bool ret;
698+
bson_iter_t iter;
699+
700+
mongoc_cmd_parts_init (parts, client, database, MONGOC_QUERY_NONE, cmd);
701+
parts->is_write_command = true;
702+
parts->assembled.operation_id = command->operation_id;
703+
704+
ret = mongoc_cmd_parts_set_write_concern (
705+
parts, write_concern, server_stream->sd->max_wire_version, error);
706+
if (ret) {
707+
BSON_ASSERT (bson_iter_init (&iter, &command->cmd_opts));
708+
ret = mongoc_cmd_parts_append_opts (
709+
parts, &iter, server_stream->sd->max_wire_version, error);
710+
}
711+
if (ret) {
712+
ret = mongoc_cmd_parts_assemble (parts, server_stream, error);
713+
}
714+
return ret;
715+
}
716+
686717
static void
687718
_mongoc_write_opquery (mongoc_write_command_t *command,
688719
mongoc_client_t *client,
@@ -695,12 +726,10 @@ _mongoc_write_opquery (mongoc_write_command_t *command,
695726
bson_error_t *error)
696727
{
697728
mongoc_cmd_parts_t parts;
698-
bson_iter_t iter;
699729
const char *key;
700730
uint32_t len = 0;
701731
bson_t ar;
702732
bson_t cmd;
703-
bson_t reply;
704733
char str[16];
705734
bool has_more;
706735
bool ret = false;
@@ -732,10 +761,30 @@ _mongoc_write_opquery (mongoc_write_command_t *command,
732761
i = 0;
733762

734763
_mongoc_write_command_init (&cmd, command, collection);
764+
/* If any part of assembling failed, return with failure. */
765+
if (!_assemble_cmd (&cmd,
766+
command,
767+
client,
768+
server_stream,
769+
database,
770+
write_concern,
771+
&parts,
772+
error)) {
773+
result->failed = true;
774+
bson_destroy (&cmd);
775+
mongoc_cmd_parts_cleanup (&parts);
776+
EXIT;
777+
}
735778

736-
/* 1 byte to specify array type, 1 byte for field name's null terminator */
737-
overhead = cmd.len + 2 + gCommandFieldLens[command->type];
738-
779+
/* Use the assembled command to compute the overhead, since it may be a new
780+
* BSON document with options applied. If no options were applied, then
781+
* parts.assembled.command points to cmd. The constant 2 is due to 1 byte to
782+
* specify array type and 1 byte for field name's null terminator. */
783+
overhead =
784+
parts.assembled.command->len + 2 + gCommandFieldLens[command->type];
785+
/* Toss out the assembled command, we'll assemble again after adding all of
786+
* the payload documents. */
787+
mongoc_cmd_parts_cleanup (&parts);
739788

740789
reader = bson_reader_new_from_data (command->payload.data + data_offset,
741790
command->payload.len - data_offset);
@@ -773,36 +822,20 @@ _mongoc_write_opquery (mongoc_write_command_t *command,
773822
data_offset += len;
774823
}
775824
} else {
776-
mongoc_cmd_parts_init (&parts, client, database, MONGOC_QUERY_NONE, &cmd);
777-
parts.is_write_command = true;
778-
parts.assembled.operation_id = command->operation_id;
779-
if (!mongoc_cmd_parts_set_write_concern (
780-
&parts,
781-
write_concern,
782-
server_stream->sd->max_wire_version,
783-
error)) {
784-
bson_reader_destroy (reader);
785-
bson_destroy (&cmd);
786-
mongoc_cmd_parts_cleanup (&parts);
787-
EXIT;
788-
}
825+
bson_t reply;
789826

790-
BSON_ASSERT (bson_iter_init (&iter, &command->cmd_opts));
791-
if (!mongoc_cmd_parts_append_opts (
792-
&parts, &iter, server_stream->sd->max_wire_version, error)) {
793-
bson_reader_destroy (reader);
794-
bson_destroy (&cmd);
795-
mongoc_cmd_parts_cleanup (&parts);
796-
EXIT;
797-
}
798-
799-
ret = mongoc_cmd_parts_assemble (&parts, server_stream, error);
827+
ret = _assemble_cmd (&cmd,
828+
command,
829+
client,
830+
server_stream,
831+
database,
832+
write_concern,
833+
&parts,
834+
error);
800835
if (ret) {
801836
ret = mongoc_cluster_run_command_monitored (
802837
&client->cluster, &parts.assembled, &reply, error);
803838
} else {
804-
/* assembling failed */
805-
result->must_stop = true;
806839
bson_init (&reply);
807840
}
808841

src/libmongoc/tests/mock_server/request.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ request_new (const mongoc_buffer_t *buffer,
131131
const bson_t *
132132
request_get_doc (const request_t *request, int n)
133133
{
134+
BSON_ASSERT (request);
134135
return _mongoc_array_index (&request->docs, const bson_t *, n);
135136
}
136137

src/libmongoc/tests/test-mongoc-client.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3458,6 +3458,7 @@ test_client_reset_sessions (void)
34583458
/* Add an autoresponder for endSessions to unblock the test. */
34593459
mock_server_auto_endsessions (server);
34603460

3461+
bson_destroy (&opts);
34613462
bson_destroy (&lsid);
34623463
request_destroy (request);
34633464
future_destroy (future);

src/libmongoc/tests/test-mongoc-write-commands.c

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010

1111
#include "test-libmongoc.h"
1212
#include "test-conveniences.h"
13+
#include "mock_server/mock-server.h"
14+
#include "mock_server/future.h"
15+
#include "mock_server/future-functions.h"
1316

1417

1518
static void
@@ -369,6 +372,92 @@ test_bypass_not_sent ()
369372
mongoc_client_destroy (client);
370373
}
371374

375+
static void
376+
test_split_opquery_with_options (void)
377+
{
378+
mock_server_t *server;
379+
mongoc_client_t *client;
380+
mongoc_collection_t *coll;
381+
bson_t **docs;
382+
int i;
383+
bson_error_t error;
384+
future_t *future;
385+
request_t *request;
386+
const bson_t *insert;
387+
bson_t opts;
388+
mongoc_write_concern_t *wc;
389+
int n_docs;
390+
391+
/* Use a reduced maxBsonObjectSize, and wire version for OP_QUERY */
392+
const char *ismaster = "{'ok': 1.0,"
393+
" 'ismaster': true,"
394+
" 'minWireVersion': 0,"
395+
" 'maxWireVersion': 5,"
396+
" 'maxBsonObjectSize': 100}";
397+
398+
server = mock_server_new ();
399+
mock_server_auto_ismaster (server, ismaster);
400+
mock_server_run (server);
401+
402+
/* Create an insert with two batches. Because of the reduced
403+
* maxBsonObjectSize, each document must be less than 100 bytes.
404+
* Because of the hardcoded allowance (see SERVER-10643), and our current
405+
* incorrect batching logic (see CDRIVER-3310) the complete insert
406+
* command can be can be 16k + 100 bytes.
407+
* After CDRIVER-3310, update this test, since the allowance will not be
408+
* taken into consideration for document batching.
409+
* So create enough documents to fill up at least one batch.
410+
*/
411+
n_docs = ((BSON_OBJECT_ALLOWANCE) / tmp_bson ("{ '_id': 1 }")->len) +
412+
1; /* inexact, but errs towards more than enough documents. */
413+
docs = bson_malloc (sizeof (bson_t *) * n_docs);
414+
for (i = 0; i < n_docs; i++) {
415+
docs[i] = BCON_NEW ("_id", BCON_INT64 (i));
416+
}
417+
418+
client = mongoc_client_new_from_uri (mock_server_get_uri (server));
419+
coll = mongoc_client_get_collection (client, "db", "coll");
420+
421+
/* Add a write concern, to ensure that it is taken into account during
422+
* splitting. */
423+
bson_init (&opts);
424+
wc = mongoc_write_concern_new ();
425+
mongoc_write_concern_set_wmajority (wc, 100);
426+
mongoc_write_concern_append (wc, &opts);
427+
428+
future = future_collection_insert_many (
429+
coll, (const bson_t **) docs, n_docs, &opts, NULL, &error);
430+
/* Mock server recieves first insert. */
431+
request = mock_server_receives_request (server);
432+
BSON_ASSERT (request);
433+
insert = request_get_doc (request, 0);
434+
/* The total command size is just a hair under BSON_OBJECT_ALLOWANCE (16384)
435+
* + 100 */
436+
BSON_ASSERT (insert->len == 16482);
437+
mock_server_replies_ok_and_destroys (request);
438+
439+
/* Mock server recieves second insert. */
440+
request = mock_server_receives_request (server);
441+
BSON_ASSERT (request);
442+
insert = request_get_doc (request, 0);
443+
/* The size doesn't really matter for the purpose of the test, but check it
444+
* anyway. */
445+
BSON_ASSERT (insert->len == 10433);
446+
mock_server_replies_ok_and_destroys (request);
447+
BSON_ASSERT (future_get_bool (future));
448+
449+
future_destroy (future);
450+
for (i = 0; i < n_docs; i++) {
451+
bson_destroy (docs[i]);
452+
}
453+
bson_free (docs);
454+
bson_destroy (&opts);
455+
mongoc_collection_destroy (coll);
456+
mongoc_write_concern_destroy (wc);
457+
mongoc_client_destroy (client);
458+
mock_server_destroy (server);
459+
}
460+
372461
void
373462
test_write_command_install (TestSuite *suite)
374463
{
@@ -383,4 +472,7 @@ test_write_command_install (TestSuite *suite)
383472
NULL,
384473
NULL,
385474
test_framework_skip_if_max_wire_version_less_than_4);
475+
TestSuite_AddMockServerTest (suite,
476+
"/WriteCommand/split_opquery_with_options",
477+
test_split_opquery_with_options);
386478
}

0 commit comments

Comments
 (0)