Skip to content

Commit 8e60a37

Browse files
committed
CDRIVER-2670 add more change streams prose tests
1 parent 486c148 commit 8e60a37

File tree

2 files changed

+130
-6
lines changed

2 files changed

+130
-6
lines changed

src/libmongoc/src/mongoc/mongoc-change-stream.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ _make_cursor (mongoc_change_stream_t *stream)
280280
* command when it returns." */
281281
if (bson_empty (&stream->resume_token) &&
282282
bson_empty (&stream->operation_time) && max_wire_version >= 7 &&
283-
bson_iter_init_find (&iter, &reply, "startAtOperationTime")) {
283+
bson_iter_init_find (&iter, &reply, "operationTime")) {
284284
bson_append_value (&stream->operation_time,
285285
"startAtOperationTime",
286286
20,

src/libmongoc/tests/test-mongoc-change-stream.c

Lines changed: 129 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,7 +1039,7 @@ test_change_stream_live_read_prefs (void *test_ctx)
10391039
NULL /* session */);
10401040

10411041
/* Change stream client will resume with another cursor. */
1042-
ASSERT (!mongoc_change_stream_next (stream, &next_doc));
1042+
ASSERT (mongoc_change_stream_next (stream, &next_doc));
10431043
ASSERT_OR_PRINT (
10441044
!mongoc_change_stream_error_document (stream, &err, &next_doc), err);
10451045

@@ -1311,13 +1311,137 @@ _skip_if_no_change_stream_updates (void)
13111311
return 0;
13121312
}
13131313
if (test_framework_get_server_version () >=
1314-
test_framework_str_to_version ("4.0.0")) {
1314+
test_framework_str_to_version ("3.8.0")) {
13151315
return 1;
13161316
}
13171317
return 0;
13181318
}
13191319

13201320

1321+
static void
1322+
_test_resume (const char *opts,
1323+
const char *expected_change_stream_opts,
1324+
const char *first_doc,
1325+
const char *expected_resume_change_stream_opts)
1326+
{
1327+
mock_server_t *server;
1328+
request_t *request;
1329+
future_t *future;
1330+
mongoc_client_t *client;
1331+
mongoc_collection_t *coll;
1332+
mongoc_change_stream_t *stream;
1333+
bson_error_t err;
1334+
char *msg;
1335+
const bson_t *doc = NULL;
1336+
1337+
server = mock_server_with_autoismaster (7);
1338+
mock_server_run (server);
1339+
client = mongoc_client_new_from_uri (mock_server_get_uri (server));
1340+
mongoc_client_set_error_api (client, MONGOC_ERROR_API_VERSION_2);
1341+
coll = mongoc_client_get_collection (client, "db", "coll");
1342+
future = future_collection_watch (coll, tmp_bson ("{}"), tmp_bson (opts));
1343+
request = mock_server_receives_msg (
1344+
server,
1345+
MONGOC_QUERY_NONE,
1346+
tmp_bson ("{ 'aggregate': 'coll', 'pipeline' : [ { '$changeStream': { %s "
1347+
"'fullDocument': 'default' } } ], 'cursor': { } }",
1348+
expected_change_stream_opts));
1349+
msg = bson_strdup_printf ("{'cursor': {'id': 123, 'ns': "
1350+
"'db.coll','firstBatch': [%s] }, 'operationTime': "
1351+
"{ '$timestamp': {'t': 1, 'i': 2} }, 'ok': 1 }",
1352+
first_doc);
1353+
mock_server_replies_simple (request, msg);
1354+
bson_free (msg);
1355+
stream = future_get_mongoc_change_stream_ptr (future);
1356+
BSON_ASSERT (stream);
1357+
future_destroy (future);
1358+
request_destroy (request);
1359+
/* if a first document was returned, the first call to next returns it. */
1360+
if (*first_doc) {
1361+
mongoc_change_stream_next (stream, &doc);
1362+
ASSERT_MATCH (doc, first_doc);
1363+
}
1364+
future = future_change_stream_next (stream, &doc);
1365+
request = mock_server_receives_msg (
1366+
server,
1367+
MONGOC_QUERY_NONE,
1368+
tmp_bson ("{ 'getMore': {'$numberLong': '123'}, 'collection': 'coll' }"));
1369+
mock_server_hangs_up (request);
1370+
request_destroy (request);
1371+
/* since the server closed the connection, a resume is attempted. */
1372+
request = mock_server_receives_msg (
1373+
server,
1374+
MONGOC_QUERY_NONE,
1375+
tmp_bson ("{ 'aggregate': 'coll', 'pipeline' : [ { '$changeStream': { %s "
1376+
"'fullDocument': 'default' }} ], 'cursor': { } }",
1377+
expected_resume_change_stream_opts));
1378+
mock_server_replies_simple (
1379+
request,
1380+
"{'cursor': {'id': 0,'ns': 'db.coll','firstBatch': []},'ok': 1 }");
1381+
request_destroy (request);
1382+
1383+
BSON_ASSERT (!future_get_bool (future));
1384+
ASSERT_OR_PRINT (!mongoc_change_stream_error_document (stream, &err, NULL),
1385+
err);
1386+
BSON_ASSERT (doc == NULL);
1387+
future_destroy (future);
1388+
1389+
mongoc_change_stream_destroy (stream);
1390+
mongoc_collection_destroy (coll);
1391+
mongoc_client_destroy (client);
1392+
mock_server_destroy (server);
1393+
}
1394+
1395+
1396+
/* test resume behavior before and after the first document is receieved. */
1397+
static void
1398+
test_resume_cases (void)
1399+
{
1400+
#define NO_OPT_RA "'resumeAfter': {'$exists': false}"
1401+
#define NO_OPT_OP "'startAtOperationTime': {'$exists': false}"
1402+
#define AGG_OP "'startAtOperationTime': {'$timestamp': {'t': 1, 'i': 2}}"
1403+
#define DOC "{'_id': {'resume': 'example'}}"
1404+
#define OPT_OP "'startAtOperationTime': {'$timestamp': {'t': 111, 'i': 222}}"
1405+
#define DOC_RA "'resumeAfter': {'resume': 'example'}"
1406+
#define OPT_RA "'resumeAfter': {'resume': 'after'}"
1407+
1408+
/* test features:
1409+
* - whether the change stream returns a document before resuming.
1410+
* - whether 'startAtOperationTime' is specified
1411+
* - whether 'resumeAfter' is specified
1412+
* total of 2 * 2 * 2 = 8 test cases. */
1413+
1414+
/* neither 'startAtOperationTime' nor 'resumeAfter' specified. */
1415+
/* - if no doc recv'ed use the operationTime returned by aggregate. */
1416+
_test_resume ("{}", NO_OPT_OP "," NO_OPT_RA ",", "", AGG_OP ",");
1417+
/* - if doc recv'ed use the doc's resume token. */
1418+
_test_resume ("{}", NO_OPT_OP "," NO_OPT_RA ",", DOC, DOC_RA ",");
1419+
1420+
/* 'startAtOperationTime' specified
1421+
* - if no doc recv'ed use the startAtOperationTime in the options. */
1422+
_test_resume ("{" OPT_OP "}", OPT_OP "," NO_OPT_RA ",", "", OPT_OP ",");
1423+
/* - if doc recv'ed use the docs resume token. */
1424+
_test_resume ("{" OPT_OP "}", OPT_OP "," NO_OPT_RA ",", DOC, DOC_RA ",");
1425+
1426+
/* 'resumeAfter' specified. */
1427+
/* - if no doc recv'ed use the resumeAfter in the options. */
1428+
_test_resume ("{" OPT_RA "}", NO_OPT_OP "," OPT_RA ",", "", OPT_RA ",");
1429+
/* - if doc recv'ed use the docs resume token. */
1430+
_test_resume ("{" OPT_RA "}", NO_OPT_OP "," OPT_RA ",", DOC, DOC_RA ",");
1431+
1432+
/* both 'resumeAfter' and 'startAtOperationTime' specified. They both
1433+
* should be passed (although the server currently returns an error). */
1434+
/* - if no doc recv'ed use both. */
1435+
_test_resume ("{" OPT_RA "," OPT_OP "}",
1436+
OPT_RA "," OPT_OP ",",
1437+
"",
1438+
OPT_RA "," OPT_OP ",");
1439+
/* - if doc recv'ed use the docs resume token. */
1440+
_test_resume (
1441+
"{" OPT_RA "," OPT_OP "}", OPT_RA "," OPT_OP ",", DOC, DOC_RA ",");
1442+
}
1443+
1444+
13211445
void
13221446
test_change_stream_install (TestSuite *suite)
13231447
{
@@ -1398,8 +1522,6 @@ test_change_stream_install (TestSuite *suite)
13981522
NULL,
13991523
NULL,
14001524
test_framework_skip_if_not_rs_version_7,
1401-
test_framework_skip_if_no_sessions,
1402-
test_framework_skip_if_no_crypto,
14031525
_skip_if_no_change_stream_updates);
14041526
TestSuite_AddFull (suite,
14051527
"/change_stream/database",
@@ -1410,9 +1532,11 @@ test_change_stream_install (TestSuite *suite)
14101532
_skip_if_no_change_stream_updates);
14111533
TestSuite_AddFull (suite,
14121534
"/change_stream/client",
1413-
test_change_stream_database_watch,
1535+
test_change_stream_client_watch,
14141536
NULL,
14151537
NULL,
14161538
test_framework_skip_if_not_rs_version_7,
14171539
_skip_if_no_change_stream_updates);
1540+
TestSuite_AddMockServerTest (
1541+
suite, "/change_stream/resume_with_first_doc", test_resume_cases);
14181542
}

0 commit comments

Comments
 (0)