Skip to content

Commit 48a8599

Browse files
author
Christian Hergert
committed
write-commands: automatically split OP_INSERT when over batch or max size.
1 parent 67a8bae commit 48a8599

File tree

1 file changed

+72
-10
lines changed

1 file changed

+72
-10
lines changed

src/mongoc/mongoc-write-command.c

Lines changed: 72 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
*/
3333

3434

35+
#define MAX_INSERT_BATCH 1000
3536
#define SUPPORTS_WRITE_COMMANDS(n) \
3637
(((n)->min_wire_version <= 2) && ((n)->max_wire_version >= 2))
3738
#define WRITE_CONCERN_DOC(wc) \
@@ -200,8 +201,11 @@ _mongoc_write_command_insert_legacy (mongoc_write_command_t *command,
200201
bson_iter_t iter;
201202
uint32_t len;
202203
bson_t *gle = NULL;
204+
size_t size = 0;
205+
bool has_more = false;
203206
char ns [MONGOC_NAMESPACE_MAX + 1];
204-
int i = 0;
207+
bool r;
208+
int i;
205209

206210
ENTRY;
207211

@@ -212,7 +216,13 @@ _mongoc_write_command_insert_legacy (mongoc_write_command_t *command,
212216
BSON_ASSERT (collection);
213217
BSON_ASSERT (command->type == MONGOC_WRITE_COMMAND_INSERT);
214218

215-
if (!command->u.insert.n_documents) {
219+
r = bson_iter_init (&iter, command->u.insert.documents);
220+
if (!r) {
221+
BSON_ASSERT (false);
222+
EXIT;
223+
}
224+
225+
if (!command->u.insert.n_documents || !bson_iter_next (&iter)) {
216226
bson_set_error (error,
217227
MONGOC_ERROR_COLLECTION,
218228
MONGOC_ERROR_COLLECTION_INSERT_FAILED,
@@ -225,16 +235,50 @@ _mongoc_write_command_insert_legacy (mongoc_write_command_t *command,
225235

226236
iov = bson_malloc ((sizeof *iov) * command->u.insert.n_documents);
227237

228-
bson_iter_init (&iter, command->u.insert.documents);
229-
230-
while (bson_iter_next (&iter)) {
238+
again:
239+
has_more = false;
240+
i = 0;
241+
size = (sizeof (mongoc_rpc_header_t) +
242+
4 +
243+
strlen (database) +
244+
1 +
245+
strlen (collection) +
246+
1);
247+
248+
do {
231249
BSON_ASSERT (BSON_ITER_HOLDS_DOCUMENT (&iter));
232250
BSON_ASSERT (i < command->u.insert.n_documents);
251+
233252
bson_iter_document (&iter, &len, &data);
253+
254+
/*
255+
* Check that the server can receive this document.
256+
*/
257+
if ((len > client->cluster.max_bson_size) ||
258+
(len > client->cluster.max_msg_size)) {
259+
bson_set_error (error,
260+
MONGOC_ERROR_BSON,
261+
MONGOC_ERROR_BSON_INVALID,
262+
"Document %u is too large for the cluster. "
263+
"Document is %u bytes, max is %u.",
264+
i, (unsigned)len, client->cluster.max_bson_size);
265+
}
266+
267+
/*
268+
* Check that we will not overflow our max message size.
269+
*/
270+
if ((i == MAX_INSERT_BATCH) ||
271+
(size > (client->cluster.max_msg_size - len))) {
272+
has_more = true;
273+
break;
274+
}
275+
234276
iov [i].iov_base = (void *)data;
235277
iov [i].iov_len = len;
278+
279+
size += len;
236280
i++;
237-
}
281+
} while (bson_iter_next (&iter));
238282

239283
rpc.insert.msg_len = 0;
240284
rpc.insert.request_id = 0;
@@ -244,7 +288,7 @@ _mongoc_write_command_insert_legacy (mongoc_write_command_t *command,
244288
(command->u.insert.ordered ? 0 : MONGOC_INSERT_CONTINUE_ON_ERROR);
245289
rpc.insert.collection = ns;
246290
rpc.insert.documents = iov;
247-
rpc.insert.n_documents = command->u.insert.n_documents;
291+
rpc.insert.n_documents = i;
248292

249293
hint = _mongoc_client_sendv (client, &rpc, 1, hint, write_concern,
250294
NULL, error);
@@ -255,16 +299,33 @@ _mongoc_write_command_insert_legacy (mongoc_write_command_t *command,
255299
}
256300

257301
if (_mongoc_write_concern_has_gle (write_concern)) {
302+
bson_iter_t iter;
303+
258304
if (!_mongoc_client_recv_gle (client, hint, &gle, error)) {
259305
result->failed = true;
260306
GOTO (cleanup);
261307
}
308+
309+
/*
310+
* Overwrite the "n" field since it will be zero. Otherwise, our
311+
* merge_legacy code will not know how many we tried in this batch.
312+
*/
313+
if (bson_iter_init_find (&iter, gle, "n") &&
314+
BSON_ITER_HOLDS_INT32 (&iter) &&
315+
!bson_iter_int32 (&iter)) {
316+
bson_iter_overwrite_int32 (&iter, i);
317+
}
262318
}
263319

264320
cleanup:
265321
if (gle) {
266322
_mongoc_write_result_merge_legacy (result, command, gle);
267323
bson_destroy (gle);
324+
gle = NULL;
325+
}
326+
327+
if (has_more) {
328+
GOTO (again);
268329
}
269330

270331
bson_free (iov);
@@ -542,6 +603,10 @@ _mongoc_write_command_execute (mongoc_write_command_t *command, /* I
542603
BSON_ASSERT (collection);
543604
BSON_ASSERT (result);
544605

606+
if (!write_concern) {
607+
write_concern = client->write_concern;
608+
}
609+
545610
if (!hint) {
546611
hint = _mongoc_client_preselect (client, MONGOC_OPCODE_INSERT,
547612
write_concern, NULL, &result->error);
@@ -686,9 +751,6 @@ _mongoc_write_result_merge_legacy (mongoc_write_result_t *result, /* IN */
686751
case MONGOC_WRITE_COMMAND_INSERT:
687752
if (n) {
688753
result->nInserted += n;
689-
} else if (bson_iter_init_find (&iter, reply, "ok") &&
690-
bson_iter_as_bool (&iter)) {
691-
result->nInserted += command->u.insert.n_documents;
692754
}
693755
break;
694756
case MONGOC_WRITE_COMMAND_DELETE:

0 commit comments

Comments
 (0)