Skip to content

Commit adfff3a

Browse files
committed
CDRIVER-1704 Aggregate should apply collection [read|write]Concern if not in opts
1 parent eb64540 commit adfff3a

File tree

7 files changed

+261
-100
lines changed

7 files changed

+261
-100
lines changed

src/mongoc/mongoc-client-private.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ BSON_BEGIN_DECLS
4444

4545
/* protocol versions this driver can speak */
4646
#define WIRE_VERSION_MIN 0
47-
#define WIRE_VERSION_MAX 4
47+
#define WIRE_VERSION_MAX 5
4848

4949
/* first version that supported aggregation cursors */
5050
#define WIRE_VERSION_AGG_CURSOR 1
@@ -163,6 +163,11 @@ _mongoc_client_command_with_opts (mongoc_client_t *client,
163163
mongoc_write_concern_t *default_wc,
164164
bson_t *reply,
165165
bson_error_t *error);
166+
bool
167+
_mongoc_client_command_append_iterator_opts_to_command (bson_iter_t *iter,
168+
int max_wire_version,
169+
bson_t *command,
170+
bson_error_t *error);
166171

167172
BSON_END_DECLS
168173

src/mongoc/mongoc-client.c

Lines changed: 55 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,6 +1279,54 @@ mongoc_client_command_simple (mongoc_client_t *client,
12791279
RETURN (ret);
12801280
}
12811281

1282+
bool
1283+
_mongoc_client_command_append_iterator_opts_to_command (bson_iter_t *iter,
1284+
int max_wire_version,
1285+
bson_t *command,
1286+
bson_error_t *error)
1287+
{
1288+
ENTRY;
1289+
1290+
while (bson_iter_next (iter)) {
1291+
if (BSON_ITER_IS_KEY (iter, "collation")) {
1292+
if (max_wire_version < WIRE_VERSION_COLLATION) {
1293+
bson_set_error (error,
1294+
MONGOC_ERROR_COMMAND,
1295+
MONGOC_ERROR_COMMAND_INVALID_ARG,
1296+
"The selected server does not support collation");
1297+
RETURN (false);
1298+
}
1299+
1300+
}
1301+
else if (BSON_ITER_IS_KEY (iter, "writeConcern")) {
1302+
if (!_mongoc_write_concern_iter_is_valid (iter)) {
1303+
bson_set_error (error,
1304+
MONGOC_ERROR_COMMAND,
1305+
MONGOC_ERROR_COMMAND_INVALID_ARG,
1306+
"Invalid writeConcern");
1307+
RETURN (false);
1308+
}
1309+
1310+
if (max_wire_version < WIRE_VERSION_CMD_WRITE_CONCERN) {
1311+
continue;
1312+
}
1313+
1314+
}
1315+
else if (BSON_ITER_IS_KEY (iter, "readConcern")) {
1316+
if (max_wire_version < WIRE_VERSION_READ_CONCERN) {
1317+
bson_set_error (error,
1318+
MONGOC_ERROR_COMMAND,
1319+
MONGOC_ERROR_PROTOCOL_BAD_WIRE_VERSION,
1320+
"The selected server does not support readConcern");
1321+
RETURN (false);
1322+
}
1323+
}
1324+
1325+
bson_append_iter (command, bson_iter_key (iter), -1, iter);
1326+
}
1327+
1328+
RETURN (true);
1329+
}
12821330

12831331
static void
12841332
_ensure_copied (bson_t **dst,
@@ -1368,46 +1416,14 @@ _mongoc_client_command_with_opts (mongoc_client_t *client,
13681416
bson_iter_t iter;
13691417

13701418
if (opts && bson_iter_init (&iter, opts)) {
1419+
bool ok = false;
13711420
_ensure_copied (&command_with_opts, command);
1372-
while (bson_iter_next (&iter)) {
1373-
if (BSON_ITER_IS_KEY (&iter, "collation")) {
1374-
if (server_stream->sd->max_wire_version < WIRE_VERSION_COLLATION) {
1375-
bson_set_error (error,
1376-
MONGOC_ERROR_COMMAND,
1377-
MONGOC_ERROR_PROTOCOL_BAD_WIRE_VERSION,
1378-
"The selected server does not support collation");
1379-
GOTO (err);
1380-
}
1381-
1382-
}
1383-
else if (BSON_ITER_IS_KEY (&iter, "writeConcern")) {
1384-
if (!_mongoc_write_concern_iter_is_valid (&iter)) {
1385-
bson_set_error (error,
1386-
MONGOC_ERROR_COMMAND,
1387-
MONGOC_ERROR_COMMAND_INVALID_ARG,
1388-
"Invalid writeConcern");
1389-
GOTO (err);
1390-
}
1391-
1392-
if (server_stream->sd->max_wire_version < WIRE_VERSION_CMD_WRITE_CONCERN) {
1393-
continue;
1394-
}
1395-
1396-
opts_has_write_concern = true;
1397-
}
1398-
else if (BSON_ITER_IS_KEY (&iter, "readConcern")) {
1399-
if (server_stream->sd->max_wire_version < WIRE_VERSION_READ_CONCERN) {
1400-
bson_set_error (error,
1401-
MONGOC_ERROR_COMMAND,
1402-
MONGOC_ERROR_PROTOCOL_BAD_WIRE_VERSION,
1403-
"The selected server does not support readConcern");
1404-
GOTO (err);
1405-
}
1406-
1407-
opts_has_read_concern = true;
1408-
}
1409-
1410-
bson_append_iter (command_with_opts, bson_iter_key (&iter), -1, &iter);
1421+
ok = _mongoc_client_command_append_iterator_opts_to_command (&iter,
1422+
server_stream->sd->max_wire_version,
1423+
command_with_opts,
1424+
error);
1425+
if (!ok) {
1426+
GOTO (err);
14111427
}
14121428
}
14131429

src/mongoc/mongoc-collection.c

Lines changed: 41 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -304,11 +304,15 @@ mongoc_collection_aggregate (mongoc_collection_t *collection, /* IN */
304304
const mongoc_read_prefs_t *read_prefs) /* IN */
305305
{
306306
mongoc_server_description_t *selected_server = NULL;
307+
bool has_batch_size = false;
308+
bool has_out_key = false;
309+
bson_iter_t kiter;
310+
bson_iter_t ar;
307311
mongoc_cursor_t *cursor;
312+
int32_t batch_size = 0;
308313
bson_iter_t iter;
309314
bson_t command;
310315
bson_t child;
311-
int32_t batch_size = 0;
312316
bool use_cursor;
313317

314318
ENTRY;
@@ -360,88 +364,65 @@ mongoc_collection_aggregate (mongoc_collection_t *collection, /* IN */
360364
BSON_APPEND_ARRAY (&command, "pipeline", pipeline);
361365
}
362366

367+
if (bson_iter_init_find (&iter, pipeline, "pipeline") &&
368+
BSON_ITER_HOLDS_ARRAY (&iter) && bson_iter_recurse (&iter, &ar)) {
369+
while (bson_iter_next (&ar)) {
370+
if (BSON_ITER_HOLDS_DOCUMENT (&ar) && bson_iter_recurse (&ar, &kiter)) {
371+
has_out_key |= bson_iter_find (&kiter, "$out");
372+
}
373+
}
374+
}
375+
363376
/* for newer version, we include a cursor subdocument */
364377
if (use_cursor) {
365378
bson_append_document_begin (&command, "cursor", 6, &child);
366379

367-
if (opts && bson_iter_init (&iter, opts)) {
368-
while (bson_iter_next (&iter)) {
369-
if (BSON_ITER_IS_KEY (&iter, "batchSize") &&
370-
(BSON_ITER_HOLDS_INT32 (&iter) ||
380+
if (opts
381+
&& bson_iter_init_find (&iter, opts, "batchSize")
382+
&& (BSON_ITER_HOLDS_INT32 (&iter) ||
371383
BSON_ITER_HOLDS_INT64 (&iter) ||
372384
BSON_ITER_HOLDS_DOUBLE (&iter))) {
373385
batch_size = (int32_t)bson_iter_as_int64 (&iter);
374386
BSON_APPEND_INT32 (&child, "batchSize", batch_size);
375-
}
376-
}
387+
has_batch_size = true;
377388
}
378389

379390
bson_append_document_end (&command, &child);
380391
}
381392

382-
if (opts && bson_iter_init (&iter, opts)) {
383-
while (bson_iter_next (&iter)) {
384-
if (!(BSON_ITER_IS_KEY (&iter, "batchSize") ||
385-
BSON_ITER_IS_KEY (&iter, "cursor") ||
386-
BSON_ITER_IS_KEY (&iter, "writeConcern"))) {
387-
if (!bson_append_iter (&command, bson_iter_key (&iter), -1, &iter)) {
388-
bson_set_error (&cursor->error,
389-
MONGOC_ERROR_COMMAND,
390-
MONGOC_ERROR_COMMAND_INVALID_ARG,
391-
"Failed to append \"batchSize\" or \"cursor\" to create command.");
392-
GOTO (done);
393-
}
394-
}
395-
}
396-
}
397-
398-
if (collection->read_concern->level != NULL) {
399-
const bson_t *read_concern_bson;
393+
if (opts) {
394+
bool ok = false;
395+
bson_t opts_dupe = BSON_INITIALIZER;
400396

401-
if (selected_server->max_wire_version < WIRE_VERSION_READ_CONCERN) {
402-
bson_set_error (&cursor->error,
403-
MONGOC_ERROR_COMMAND,
404-
MONGOC_ERROR_PROTOCOL_BAD_WIRE_VERSION,
405-
"The selected server does not support readConcern");
406-
GOTO (done);
397+
if (has_batch_size) {
398+
bson_copy_to_excluding_noinit (opts, &opts_dupe, "batchSize", NULL);
399+
bson_iter_init (&iter, &opts_dupe);
400+
} else {
401+
bson_iter_init (&iter, opts);
407402
}
408403

409-
read_concern_bson = _mongoc_read_concern_get_bson (collection->read_concern);
410-
BSON_APPEND_DOCUMENT (&command, "readConcern", read_concern_bson);
411-
}
404+
ok = _mongoc_client_command_append_iterator_opts_to_command (&iter,
405+
selected_server->max_wire_version,
406+
&command,
407+
&cursor->error);
412408

413-
if (opts && bson_iter_init_find (&iter, opts, "writeConcern")
414-
&& BSON_ITER_HOLDS_DOCUMENT (&iter)) {
415-
if (!_mongoc_write_concern_iter_is_valid (&iter)) {
416-
bson_set_error (&cursor->error,
417-
MONGOC_ERROR_COMMAND,
418-
MONGOC_ERROR_COMMAND_INVALID_ARG,
419-
"Invalid writeConcern");
409+
bson_destroy (&opts_dupe);
410+
411+
if (!ok) {
420412
GOTO (done);
421413
}
414+
}
422415

423-
if (selected_server->max_wire_version >= WIRE_VERSION_CMD_WRITE_CONCERN) {
424-
cursor->write_concern = _mongoc_write_concern_new_from_iter (&iter);
425-
}
416+
/* Only inherit WriteConcern when for aggregate with $out */
417+
if (!bson_has_field (&command, "writeConcern") && has_out_key) {
418+
cursor->write_concern = mongoc_write_concern_copy (mongoc_collection_get_write_concern (collection));
426419
}
427-
if (opts && bson_iter_init_find (&iter, opts, "collation")
428-
&& BSON_ITER_HOLDS_DOCUMENT (&iter)) {
429-
if (selected_server->max_wire_version < WIRE_VERSION_COLLATION) {
430-
bson_set_error (&cursor->error,
431-
MONGOC_ERROR_COMMAND,
432-
MONGOC_ERROR_COMMAND_INVALID_ARG,
433-
"The selected server does not support collation");
434-
GOTO (done);
435-
}
436-
if (!bson_append_iter (&command, bson_iter_key (&iter), -1, &iter)) {
437-
bson_set_error (&cursor->error,
438-
MONGOC_ERROR_BSON,
439-
MONGOC_ERROR_BSON_INVALID,
440-
"Failed to append \"collation\" to create command.");
441-
GOTO (done);
442-
}
420+
421+
if (!bson_has_field (&command, "readConcern")) {
422+
cursor->read_concern = mongoc_read_concern_copy (mongoc_collection_get_read_concern (collection));
443423
}
444424

425+
445426
if (use_cursor) {
446427
_mongoc_cursor_cursorid_init (cursor, &command);
447428
} else {

src/mongoc/mongoc-cursor.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1240,6 +1240,12 @@ _mongoc_cursor_run_command (mongoc_cursor_t *cursor,
12401240
read_prefs_result.query_with_read_prefs);
12411241
}
12421242

1243+
if (cursor->read_concern &&
1244+
server_stream->sd->max_wire_version >= WIRE_VERSION_READ_CONCERN) {
1245+
mongoc_read_concern_append (cursor->read_concern,
1246+
read_prefs_result.query_with_read_prefs);
1247+
}
1248+
12431249
ret = mongoc_cluster_run_command_monitored (
12441250
cluster,
12451251
server_stream,

src/mongoc/mongoc-read-concern-private.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ struct _mongoc_read_concern_t
3838

3939
bool _mongoc_read_concern_is_default (const mongoc_read_concern_t *read_concern);
4040
const bson_t *_mongoc_read_concern_get_bson (mongoc_read_concern_t *read_concern);
41+
bool mongoc_read_concern_append (mongoc_read_concern_t *read_concern,
42+
bson_t *command);
4143

4244
BSON_END_DECLS
4345

src/mongoc/mongoc-read-concern.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,10 @@ mongoc_read_concern_append (mongoc_read_concern_t *read_concern,
135135
{
136136
BSON_ASSERT (read_concern);
137137

138+
if (!read_concern->level) {
139+
return true;
140+
}
141+
138142
if (!bson_append_document (command, "readConcern", 11,
139143
_mongoc_read_concern_get_bson (read_concern))) {
140144
MONGOC_ERROR ("Could not append readConcern to command.");

0 commit comments

Comments
 (0)