Skip to content

Commit dc2f9c1

Browse files
committed
CDRIVER-3319 fix resume with startAfter
- Also implement prose tests of CDRIVER-3281
1 parent 2e6c3df commit dc2f9c1

File tree

3 files changed

+199
-25
lines changed

3 files changed

+199
-25
lines changed

src/libmongoc/src/mongoc/mongoc-change-stream-private.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ struct _mongoc_change_stream_t {
5555
int64_t max_await_time_ms;
5656
int32_t batch_size;
5757

58+
bool has_returned_results;
59+
5860
/* Track whether the change stream has resumed after an error, as this
5961
* determines how we construct an initial or resuming aggregate command. */
6062
bool resumed;

src/libmongoc/src/mongoc/mongoc-change-stream.c

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,17 +120,25 @@ _make_command (mongoc_change_stream_t *stream,
120120

121121
if (stream->resumed) {
122122
/* Change stream spec: Resume Process */
123+
/* If there is a cached resumeToken: */
123124
if (!bson_empty (&stream->resume_token)) {
124-
BSON_APPEND_DOCUMENT (
125-
&change_stream_doc, "resumeAfter", &stream->resume_token);
126-
} else if (!bson_empty (&stream->opts.startAfter)) {
127-
BSON_APPEND_DOCUMENT (
128-
&change_stream_doc, "resumeAfter", &stream->opts.startAfter);
129-
} else if (!bson_empty (&stream->opts.resumeAfter)) {
130-
BSON_APPEND_DOCUMENT (
131-
&change_stream_doc, "resumeAfter", &stream->opts.resumeAfter);
125+
/* If the ChangeStream was started with startAfter
126+
and has yet to return a result document: */
127+
if (!bson_empty (&stream->opts.startAfter) &&
128+
!stream->has_returned_results) {
129+
/* The driver MUST set startAfter to the cached resumeToken */
130+
BSON_APPEND_DOCUMENT (
131+
&change_stream_doc, "startAfter", &stream->resume_token);
132+
} else {
133+
/* The driver MUST set resumeAfter to the cached resumeToken */
134+
BSON_APPEND_DOCUMENT (
135+
&change_stream_doc, "resumeAfter", &stream->resume_token);
136+
}
132137
} else if (!_mongoc_timestamp_empty (&stream->operation_time) &&
133138
max_wire_version >= 7) {
139+
/* Else if there is no cached resumeToken and the ChangeStream
140+
has a saved operation time and the max wire version is >= 7,
141+
the driver MUST set startAtOperationTime */
134142
_mongoc_timestamp_append (&stream->operation_time,
135143
&change_stream_doc,
136144
"startAtOperationTime");
@@ -553,6 +561,8 @@ mongoc_change_stream_next (mongoc_change_stream_t *stream, const bson_t **bson)
553561

554562
/* we have received documents, either from the first call to next or after a
555563
* resume. */
564+
stream->has_returned_results = true;
565+
556566
if (!bson_iter_init_find (&iter, *bson, "_id") ||
557567
!BSON_ITER_HOLDS_DOCUMENT (&iter)) {
558568
bson_set_error (&stream->err,

src/libmongoc/tests/test-mongoc-change-stream.c

Lines changed: 179 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1887,11 +1887,11 @@ test_resume_cases (void)
18871887

18881888
/* only 'startAfter' specified. */
18891889
/* - if no doc recv'ed, use the startAfter option for the original aggregate
1890-
* but resumeAfter with the same value when resuming. */
1890+
* whether or not we are resuming. */
18911891
_test_resume ("{" OPT_SA "}",
18921892
OPT_SA "," NO_OPT_OP "," NO_OPT_RA ",",
18931893
"",
1894-
OPT_RA "," NO_OPT_OP "," NO_OPT_SA ",",
1894+
OPT_SA "," NO_OPT_OP "," NO_OPT_RA ",",
18951895
"");
18961896
/* - if doc recv'ed and iterated, use the doc's resume token. */
18971897
_test_resume ("{" OPT_SA "}",
@@ -1902,13 +1902,14 @@ test_resume_cases (void)
19021902

19031903
/* 'resumeAfter', 'startAfter', and 'startAtOperationTime' are all specified.
19041904
* All should be passed (although the server currently returns an error). */
1905-
/* - if no doc recv'ed, use the resumeAfter option. */
1905+
/* - if no doc recv'ed, use startAfter. */
19061906
_test_resume ("{" OPT_RA "," OPT_SA "," OPT_OP "}",
19071907
OPT_RA "," OPT_SA "," OPT_OP ",",
19081908
"",
1909-
OPT_RA "," NO_OPT_OP "," NO_OPT_SA ",",
1909+
OPT_SA "," NO_OPT_OP "," NO_OPT_RA ",",
19101910
"");
1911-
/* - if doc recv'ed and iterated, use the doc's resume token. */
1911+
/* - if one doc recv'ed and iterated, use resumeAfter with doc's resume
1912+
* token. */
19121913
_test_resume ("{" OPT_RA "," OPT_SA "," OPT_OP "}",
19131914
OPT_RA "," OPT_SA "," OPT_OP ",",
19141915
DOC,
@@ -1924,6 +1925,7 @@ test_resume_cases_with_post_batch_resume_token (void)
19241925
{
19251926
#define CURSOR_PBR "'postBatchResumeToken': {'resume': 'pbr'}"
19261927
#define PBR_RA "'resumeAfter': {'resume': 'pbr'}"
1928+
#define PBR_SA "'startAfter': {'resume': 'pbr'}"
19271929

19281930
/* test features:
19291931
* - whether the change stream returns a document before resuming.
@@ -1937,55 +1939,59 @@ test_resume_cases_with_post_batch_resume_token (void)
19371939
* last document in the batch (if _test_resume() iterates to that point). */
19381940

19391941
/* no options specified. */
1940-
/* - if no doc recv'ed, use postBatchResumeToken. */
1942+
/* - if no doc recv'ed, use resumeAfter with postBatchResumeToken. */
19411943
_test_resume ("{}",
19421944
NO_OPT_OP "," NO_OPT_RA "," NO_OPT_SA ",",
19431945
"",
19441946
PBR_RA "," NO_OPT_OP "," NO_OPT_SA ",",
19451947
"," CURSOR_PBR);
1946-
/* - if one doc recv'ed and iterated, use postBatchResumeToken. */
1948+
/* - if one doc recv'ed and iterated, use resumeAfter with
1949+
* postBatchResumeToken. */
19471950
_test_resume ("{}",
19481951
NO_OPT_OP "," NO_OPT_RA "," NO_OPT_SA ",",
19491952
DOC,
19501953
PBR_RA "," NO_OPT_OP "," NO_OPT_SA ",",
19511954
"," CURSOR_PBR);
19521955

19531956
/* only 'startAtOperationTime' specified. */
1954-
/* - if no doc recv'ed, use postBatchResumeToken. */
1957+
/* - if no doc recv'ed, use resumeAfter with postBatchResumeToken. */
19551958
_test_resume ("{" OPT_OP "}",
19561959
OPT_OP "," NO_OPT_RA "," NO_OPT_SA ",",
19571960
"",
19581961
PBR_RA "," NO_OPT_OP "," NO_OPT_SA ",",
19591962
"," CURSOR_PBR);
1960-
/* - if one doc recv'ed and iterated, use postBatchResumeToken. */
1963+
/* - if one doc recv'ed and iterated, use resumeAfter with
1964+
* postBatchResumeToken. */
19611965
_test_resume ("{" OPT_OP "}",
19621966
OPT_OP "," NO_OPT_RA "," NO_OPT_SA ",",
19631967
DOC,
19641968
PBR_RA "," NO_OPT_OP "," NO_OPT_SA ",",
19651969
"," CURSOR_PBR);
19661970

19671971
/* only 'resumeAfter' specified. */
1968-
/* - if no doc recv'ed, use postBatchResumeToken. */
1972+
/* - if no doc recv'ed, use resumeAfter with postBatchResumeToken. */
19691973
_test_resume ("{" OPT_RA "}",
19701974
OPT_RA "," NO_OPT_OP "," NO_OPT_SA ",",
19711975
"",
19721976
PBR_RA "," NO_OPT_OP "," NO_OPT_SA ",",
19731977
"," CURSOR_PBR);
1974-
/* - if one doc recv'ed and iterated, use postBatchResumeToken. */
1978+
/* - if one doc recv'ed and iterated, use resumeAfter with
1979+
* postBatchResumeToken. */
19751980
_test_resume ("{" OPT_RA "}",
19761981
OPT_RA "," NO_OPT_OP "," NO_OPT_SA ",",
19771982
DOC,
19781983
PBR_RA "," NO_OPT_OP "," NO_OPT_SA ",",
19791984
"," CURSOR_PBR);
19801985

19811986
/* only 'startAfter' specified. */
1982-
/* - if no doc recv'ed, use postBatchResumeToken. */
1987+
/* - if no doc recv'ed, use startAfter with postBatchResumeToken. */
19831988
_test_resume ("{" OPT_SA "}",
19841989
OPT_SA "," NO_OPT_OP "," NO_OPT_RA ",",
19851990
"",
1986-
PBR_RA "," NO_OPT_OP "," NO_OPT_SA ",",
1991+
PBR_SA "," NO_OPT_OP "," NO_OPT_RA ",",
19871992
"," CURSOR_PBR);
1988-
/* - if one doc recv'ed and iterated, use postBatchResumeToken. */
1993+
/* - if one doc recv'ed and iterated, use resumeAfter with
1994+
* postBatchResumeToken. */
19891995
_test_resume ("{" OPT_SA "}",
19901996
OPT_SA "," NO_OPT_OP "," NO_OPT_RA ",",
19911997
DOC,
@@ -1994,13 +2000,14 @@ test_resume_cases_with_post_batch_resume_token (void)
19942000

19952001
/* 'resumeAfter', 'startAfter', and 'startAtOperationTime' are all specified.
19962002
* All should be passed (although the server currently returns an error). */
1997-
/* - if no doc recv'ed, use postBatchResumeToken. */
2003+
/* - if no doc recv'ed, use startAfter with postBatchResumeToken. */
19982004
_test_resume ("{" OPT_RA "," OPT_SA "," OPT_OP "}",
19992005
OPT_RA "," OPT_SA "," OPT_OP ",",
20002006
"",
2001-
PBR_RA "," NO_OPT_OP "," NO_OPT_SA ",",
2007+
PBR_SA "," NO_OPT_OP "," NO_OPT_RA ",",
20022008
"," CURSOR_PBR);
2003-
/* - if one doc recv'ed and iterated, use postBatchResumeToken. */
2009+
/* - if one doc recv'ed and iterated, use resumeAfter with
2010+
* postBatchResumeToken. */
20042011
_test_resume ("{" OPT_RA "," OPT_SA "," OPT_OP "}",
20052012
OPT_RA "," OPT_SA "," OPT_OP ",",
20062013
DOC,
@@ -2311,6 +2318,156 @@ prose_test_14 (void *test_ctx)
23112318
}
23122319

23132320

2321+
void
2322+
prose_test_17 (void)
2323+
{
2324+
mock_server_t *server;
2325+
request_t *request;
2326+
future_t *future;
2327+
mongoc_client_t *client;
2328+
mongoc_collection_t *coll;
2329+
mongoc_change_stream_t *stream;
2330+
const bson_t *next_doc = NULL;
2331+
2332+
server = mock_server_with_autoismaster (WIRE_VERSION_MAX);
2333+
mock_server_run (server);
2334+
client = mongoc_client_new_from_uri (mock_server_get_uri (server));
2335+
2336+
coll = mongoc_client_get_collection (client, "db", "coll");
2337+
/* Pass an arbitrary document as the resume token, like {'x': 1} */
2338+
future = future_collection_watch (
2339+
coll, tmp_bson ("{}"), tmp_bson ("{'startAfter': {'x': 1}}"));
2340+
2341+
request = mock_server_receives_msg (
2342+
server,
2343+
MONGOC_QUERY_NONE,
2344+
tmp_bson ("{ 'aggregate': 'coll', 'pipeline' : [ { '$changeStream': { "
2345+
"'startAfter': {'x': 1} , 'resumeAfter': { '$exists': false }, "
2346+
"'startAtOperationTime': { '$exists': false } } } ]}"));
2347+
2348+
mock_server_replies_simple (
2349+
request,
2350+
"{'cursor': {'id': 123, 'ns': 'db.coll', 'firstBatch': []}, 'ok': 1 }");
2351+
2352+
request_destroy (request);
2353+
2354+
stream = future_get_mongoc_change_stream_ptr (future);
2355+
ASSERT (stream);
2356+
future_destroy (future);
2357+
2358+
future = future_change_stream_next (stream, &next_doc);
2359+
2360+
request = mock_server_receives_msg (
2361+
server,
2362+
MONGOC_QUERY_NONE,
2363+
tmp_bson ("{ 'getMore': {'$numberLong': '123'}, 'collection': 'coll' }"));
2364+
2365+
mock_server_replies_simple (
2366+
request, "{ 'code': 10107, 'errmsg': 'not master', 'ok': 0 }");
2367+
2368+
request_destroy (request);
2369+
2370+
/* Resume occurs. */
2371+
request = mock_server_receives_msg (
2372+
server,
2373+
MONGOC_QUERY_NONE,
2374+
tmp_bson ("{ 'aggregate': 'coll', 'pipeline': [ { "
2375+
"'$changeStream': { 'startAfter': {'x': 1}, 'resumeAfter': { "
2376+
"'$exists': false }, 'startAtOperationTime': { '$exists': "
2377+
"false } } "
2378+
"}]}"));
2379+
2380+
/* Reply with a 0 cursor ID to prevent a killCursors command. */
2381+
mock_server_replies_simple (
2382+
request,
2383+
"{'cursor': {'id': 0, 'ns': 'db.coll', 'firstBatch': []}, 'ok': 1 }");
2384+
request_destroy (request);
2385+
BSON_ASSERT (!future_get_bool (future));
2386+
future_destroy (future);
2387+
mongoc_change_stream_destroy (stream);
2388+
2389+
mongoc_collection_destroy (coll);
2390+
mongoc_client_destroy (client);
2391+
mock_server_destroy (server);
2392+
}
2393+
2394+
2395+
void
2396+
prose_test_18 (void)
2397+
{
2398+
mock_server_t *server;
2399+
request_t *request;
2400+
future_t *future;
2401+
mongoc_client_t *client;
2402+
mongoc_collection_t *coll;
2403+
mongoc_change_stream_t *stream;
2404+
const bson_t *next_doc = NULL;
2405+
2406+
server = mock_server_with_autoismaster (WIRE_VERSION_MAX);
2407+
mock_server_run (server);
2408+
client = mongoc_client_new_from_uri (mock_server_get_uri (server));
2409+
2410+
coll = mongoc_client_get_collection (client, "db", "coll");
2411+
/* Pass an arbitrary document as the resume token, like {'x': 1} */
2412+
future = future_collection_watch (
2413+
coll, tmp_bson ("{}"), tmp_bson ("{'startAfter': {'x': 1}}"));
2414+
2415+
request = mock_server_receives_msg (
2416+
server,
2417+
MONGOC_QUERY_NONE,
2418+
tmp_bson ("{ 'aggregate': 'coll', 'pipeline' : [ { '$changeStream': { "
2419+
"'startAfter': {'x': 1}, 'resumeAfter': { '$exists': false }, "
2420+
"'startAtOperationTime': { '$exists': false } } } ]}"));
2421+
2422+
mock_server_replies_simple (request,
2423+
"{'cursor': {'id': 123, 'ns': "
2424+
"'db.coll', 'firstBatch': [{'_id': "
2425+
"{'y': 1}}]}, 'ok': 1 }");
2426+
2427+
request_destroy (request);
2428+
stream = future_get_mongoc_change_stream_ptr (future);
2429+
ASSERT (stream);
2430+
future_destroy (future);
2431+
2432+
/* The first call to mongoc_change_stream_next returns the batched document.
2433+
*/
2434+
mongoc_change_stream_next (stream, &next_doc);
2435+
2436+
future = future_change_stream_next (stream, &next_doc);
2437+
2438+
request = mock_server_receives_msg (
2439+
server,
2440+
MONGOC_QUERY_NONE,
2441+
tmp_bson ("{ 'getMore': {'$numberLong': '123'}, 'collection': 'coll' }"));
2442+
2443+
mock_server_replies_simple (
2444+
request, "{ 'code': 10107, 'errmsg': 'not master', 'ok': 0 }");
2445+
2446+
request_destroy (request);
2447+
2448+
request = mock_server_receives_msg (
2449+
server,
2450+
MONGOC_QUERY_NONE,
2451+
tmp_bson ("{ 'aggregate': 'coll', 'pipeline': [ { "
2452+
"'$changeStream': { 'resumeAfter': {'y': 1}, 'startAfter': { "
2453+
"'$exists': false }, 'startAtOperationTime': { '$exists': "
2454+
"false } } "
2455+
"}]}"));
2456+
/* Reply with a 0 cursor ID to prevent a killCursors command. */
2457+
mock_server_replies_simple (
2458+
request,
2459+
"{'cursor': {'id': 0, 'ns': 'db.coll', 'firstBatch': []}, 'ok': 1 }");
2460+
request_destroy (request);
2461+
BSON_ASSERT (!future_get_bool (future));
2462+
future_destroy (future);
2463+
mongoc_change_stream_destroy (stream);
2464+
2465+
mongoc_collection_destroy (coll);
2466+
mongoc_client_destroy (client);
2467+
mock_server_destroy (server);
2468+
}
2469+
2470+
23142471
void
23152472
test_change_stream_install (TestSuite *suite)
23162473
{
@@ -2469,6 +2626,11 @@ test_change_stream_install (TestSuite *suite)
24692626
NULL,
24702627
test_framework_skip_if_mongos,
24712628
test_framework_skip_if_not_rs_version_7);
2629+
TestSuite_AddMockServerTest (
2630+
suite, "/change_streams/prose_test_17", prose_test_17);
2631+
TestSuite_AddMockServerTest (
2632+
suite, "/change_streams/prose_test_18", prose_test_18);
2633+
24722634

24732635
test_framework_resolve_path (JSON_DIR "/change_streams", resolved);
24742636
install_json_test_suite (suite, resolved, &test_change_stream_spec_cb);

0 commit comments

Comments
 (0)