Skip to content

Commit 2dc318f

Browse files
committed
CDRIVER-2606 array arg on collection_watch
1 parent 5a18c3c commit 2dc318f

File tree

3 files changed

+111
-2
lines changed

3 files changed

+111
-2
lines changed

src/mongoc/mongoc-change-stream.c

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,10 +271,22 @@ _mongoc_change_stream_new (const mongoc_collection_t *coll,
271271
}
272272
}
273273

274+
/* Accept two forms of user pipeline:
275+
* 1. A document like: { "pipeline": [...] }
276+
* 2. An array-like document: { "0": {}, "1": {}, ... }
277+
* If the passed pipeline is invalid, we pass it along and let the server
278+
* error instead.
279+
*/
274280
if (!bson_empty (pipeline)) {
275281
bson_iter_t iter;
276-
if (bson_iter_init_find (&iter, pipeline, "pipeline")) {
282+
if (bson_iter_init_find (&iter, pipeline, "pipeline") &&
283+
BSON_ITER_HOLDS_ARRAY (&iter)) {
277284
SET_BSON_OR_ERR (&stream->pipeline_to_append, "pipeline");
285+
} else {
286+
if (!BSON_APPEND_ARRAY (
287+
&stream->pipeline_to_append, "pipeline", pipeline)) {
288+
CHANGE_STREAM_ERR ("pipeline");
289+
}
278290
}
279291
}
280292

tests/test-mongoc-change-stream.c

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,6 +1095,97 @@ typedef struct {
10951095
int agg_count;
10961096
} array_started_ctx_t;
10971097

1098+
static void
1099+
_accepts_array_started (const mongoc_apm_command_started_t *event)
1100+
{
1101+
const bson_t *cmd = mongoc_apm_command_started_get_command (event);
1102+
const char *cmd_name = mongoc_apm_command_started_get_command_name (event);
1103+
array_started_ctx_t *ctx =
1104+
(array_started_ctx_t *) mongoc_apm_command_started_get_context (event);
1105+
if (strcmp (cmd_name, "aggregate") != 0) {
1106+
return;
1107+
}
1108+
ctx->agg_count++;
1109+
ASSERT_MATCH (cmd, ctx->pattern);
1110+
}
1111+
1112+
/* Test that watch accepts an array document {0: {}, 1: {}} as the pipeline,
1113+
* similar to mongoc_collection_aggregate */
1114+
static void
1115+
test_change_stream_accepts_array (void *test_ctx)
1116+
{
1117+
mongoc_client_t *client = test_framework_client_new ();
1118+
mongoc_apm_callbacks_t *callbacks = mongoc_apm_callbacks_new ();
1119+
array_started_ctx_t ctx = {0};
1120+
mongoc_collection_t *coll;
1121+
mongoc_change_stream_t *stream;
1122+
const bson_t *bson;
1123+
bson_error_t err;
1124+
bson_t *opts =
1125+
tmp_bson ("{'maxAwaitTimeMS': 1}"); /* to speed up the test. */
1126+
1127+
mongoc_client_set_error_api (client, MONGOC_ERROR_API_VERSION_2);
1128+
/* set up apm callbacks to listen for the agg commands. */
1129+
ctx.pattern =
1130+
bson_strdup ("{'aggregate': 'coll', 'pipeline': [ {'$changeStream': {}}, "
1131+
"{'$match': {'x': 1}}, {'$project': {'x': 1}}]}");
1132+
mongoc_apm_set_command_started_cb (callbacks, _accepts_array_started);
1133+
mongoc_client_set_apm_callbacks (client, callbacks, &ctx);
1134+
coll = mongoc_client_get_collection (client, "db", "coll");
1135+
ASSERT_OR_PRINT (
1136+
mongoc_collection_insert_one (coll, tmp_bson (NULL), NULL, NULL, &err),
1137+
err);
1138+
/* try starting a change stream with a { "pipeline": [...] } argument */
1139+
stream = mongoc_collection_watch (
1140+
coll,
1141+
tmp_bson ("{'pipeline': [{'$match': {'x': 1}}, {'$project': {'x': 1}}]}"),
1142+
opts);
1143+
(void) mongoc_change_stream_next (stream, &bson);
1144+
ASSERT_OR_PRINT (!mongoc_change_stream_error_document (stream, &err, &bson),
1145+
err);
1146+
ASSERT_CMPINT32 (ctx.agg_count, ==, 1);
1147+
mongoc_change_stream_destroy (stream);
1148+
/* try with an array like document. */
1149+
stream = mongoc_collection_watch (
1150+
coll,
1151+
tmp_bson ("{'0': {'$match': {'x': 1}}, '1': {'$project': {'x': 1}}}"),
1152+
opts);
1153+
(void) mongoc_change_stream_next (stream, &bson);
1154+
ASSERT_OR_PRINT (!mongoc_change_stream_error_document (stream, &err, &bson),
1155+
err);
1156+
ASSERT_CMPINT32 (ctx.agg_count, ==, 2);
1157+
mongoc_change_stream_destroy (stream);
1158+
/* try with malformed { "pipeline": [...] } argument. */
1159+
bson_free (ctx.pattern);
1160+
ctx.pattern = bson_strdup (
1161+
"{'aggregate': 'coll', 'pipeline': [ {'$changeStream': {}}, 42 ]}");
1162+
stream =
1163+
mongoc_collection_watch (coll, tmp_bson ("{'pipeline': [42] }"), NULL);
1164+
(void) mongoc_change_stream_next (stream, &bson);
1165+
BSON_ASSERT (mongoc_change_stream_error_document (stream, &err, &bson));
1166+
ASSERT_ERROR_CONTAINS (
1167+
err,
1168+
MONGOC_ERROR_SERVER,
1169+
14,
1170+
"Each element of the 'pipeline' array must be an object");
1171+
ASSERT_CMPINT32 (ctx.agg_count, ==, 3);
1172+
mongoc_change_stream_destroy (stream);
1173+
/* try with malformed array doc argument. */
1174+
stream = mongoc_collection_watch (coll, tmp_bson ("{'0': 42 }"), NULL);
1175+
(void) mongoc_change_stream_next (stream, &bson);
1176+
BSON_ASSERT (mongoc_change_stream_error_document (stream, &err, &bson));
1177+
ASSERT_ERROR_CONTAINS (
1178+
err,
1179+
MONGOC_ERROR_SERVER,
1180+
14,
1181+
"Each element of the 'pipeline' array must be an object");
1182+
ASSERT_CMPINT32 (ctx.agg_count, ==, 4);
1183+
mongoc_change_stream_destroy (stream);
1184+
bson_free (ctx.pattern);
1185+
mongoc_apm_callbacks_destroy (callbacks);
1186+
mongoc_collection_destroy (coll);
1187+
mongoc_client_destroy (client);
1188+
}
10981189

10991190
void
11001191
test_change_stream_install (TestSuite *suite)
@@ -1165,4 +1256,11 @@ test_change_stream_install (TestSuite *suite)
11651256
NULL,
11661257
NULL,
11671258
test_framework_skip_if_not_rs_version_6);
1259+
1260+
TestSuite_AddFull (suite,
1261+
"/change_stream/accepts_array",
1262+
test_change_stream_accepts_array,
1263+
NULL,
1264+
NULL,
1265+
test_framework_skip_if_not_rs_version_6);
11681266
}

tests/test-mongoc-sample-commands.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2598,7 +2598,6 @@ test_example_change_stream (mongoc_database_t *db)
25982598
/* End Changestream Example 3 */
25992599
bson_destroy (&opts);
26002600
bson_destroy (&pipeline);
2601-
bson_destroy (&opts);
26022601
mongoc_collection_destroy (collection);
26032602
}
26042603

0 commit comments

Comments
 (0)