Skip to content

Commit 8fd9f17

Browse files
Haris Sheikhharis-sheikh
authored andcommitted
CDRIVER-3088 add tests for postbatchresumetoken support
1 parent 9bc4b5c commit 8fd9f17

File tree

1 file changed

+315
-0
lines changed

1 file changed

+315
-0
lines changed

src/libmongoc/tests/test-mongoc-change-stream.c

Lines changed: 315 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@
4242
request_destroy (_request); \
4343
} while (0);
4444

45+
46+
typedef struct _data_change_stream_t {
47+
mongoc_cursor_response_t response;
48+
bson_t post_batch_resume_token;
49+
} _data_change_stream_t;
50+
51+
4552
static int
4653
test_framework_skip_if_not_single_version_5 (void)
4754
{
@@ -2009,6 +2016,286 @@ test_error_null_doc (void *ctx)
20092016
}
20102017

20112018

2019+
void
2020+
_check_doc_resume_token (const bson_t *doc, const bson_t *resume_token)
2021+
{
2022+
bson_t document_resume_token;
2023+
2024+
bson_lookup_doc (doc, "_id", &document_resume_token);
2025+
ASSERT (bson_equal (resume_token, &document_resume_token));
2026+
2027+
bson_destroy (&document_resume_token);
2028+
}
2029+
2030+
2031+
void
2032+
prose_test_11 (void *ctx)
2033+
{
2034+
mongoc_client_t *client;
2035+
mongoc_collection_t *coll;
2036+
mongoc_change_stream_t *stream;
2037+
bson_error_t error;
2038+
const bson_t *next_doc = NULL;
2039+
mongoc_write_concern_t *wc = mongoc_write_concern_new ();
2040+
bson_t opts = BSON_INITIALIZER;
2041+
const bson_t *resume_token;
2042+
_data_change_stream_t *post_batch_expected;
2043+
2044+
client = test_framework_client_new ();
2045+
ASSERT (client);
2046+
2047+
coll = drop_and_get_coll (client, "db", "coll_resume");
2048+
ASSERT (coll);
2049+
2050+
/* Set the batch size to 1 so we only get one document per call to next. */
2051+
stream = mongoc_collection_watch (
2052+
coll, tmp_bson ("{}"), tmp_bson ("{'batchSize': 1}"));
2053+
ASSERT (stream);
2054+
ASSERT_OR_PRINT (!mongoc_change_stream_error_document (stream, &error, NULL),
2055+
error);
2056+
2057+
/* The resume token should be updated to the post batch resume token */
2058+
ASSERT (!mongoc_change_stream_next (stream, &next_doc));
2059+
ASSERT (!next_doc);
2060+
resume_token = mongoc_change_stream_get_resume_token (stream);
2061+
ASSERT (!bson_empty0 (resume_token));
2062+
2063+
/* Look into the struct and get the actual post batch resume token, assert it
2064+
* is equal to our resume token */
2065+
post_batch_expected = (_data_change_stream_t *) stream->cursor->impl.data;
2066+
ASSERT (bson_compare (resume_token,
2067+
&post_batch_expected->post_batch_resume_token) == 0);
2068+
2069+
mongoc_write_concern_set_wmajority (wc, 30000);
2070+
mongoc_write_concern_append (wc, &opts);
2071+
ASSERT_OR_PRINT (mongoc_collection_insert_one (
2072+
coll, tmp_bson ("{'_id': 0}"), &opts, NULL, &error),
2073+
error);
2074+
2075+
/* Checking that a resume token is returned */
2076+
ASSERT (mongoc_change_stream_next (stream, &next_doc));
2077+
ASSERT (next_doc);
2078+
resume_token = mongoc_change_stream_get_resume_token (stream);
2079+
ASSERT (!bson_empty0 (resume_token));
2080+
ASSERT (bson_compare (resume_token,
2081+
&post_batch_expected->post_batch_resume_token) == 0);
2082+
2083+
bson_destroy (&opts);
2084+
mongoc_write_concern_destroy (wc);
2085+
mongoc_change_stream_destroy (stream);
2086+
mongoc_client_destroy (client);
2087+
mongoc_collection_destroy (coll);
2088+
}
2089+
2090+
2091+
void
2092+
prose_test_12 (void *ctx)
2093+
{
2094+
mongoc_client_t *client;
2095+
mongoc_collection_t *coll;
2096+
mongoc_change_stream_t *stream;
2097+
bson_error_t error;
2098+
const bson_t *next_doc = NULL;
2099+
mongoc_write_concern_t *wc = mongoc_write_concern_new ();
2100+
bson_t opts = BSON_INITIALIZER;
2101+
const bson_t *resume_token;
2102+
bson_iter_t iter, child;
2103+
bson_t expected_token;
2104+
bson_t expected_doc;
2105+
2106+
client = test_framework_client_new ();
2107+
ASSERT (client);
2108+
2109+
coll = drop_and_get_coll (client, "db", "coll_resume");
2110+
ASSERT (coll);
2111+
2112+
/* Set the batch size to 1 so we only get one document per call to next. */
2113+
stream = mongoc_collection_watch (
2114+
coll, tmp_bson ("{}"), tmp_bson ("{'batchSize': 1}"));
2115+
ASSERT (stream);
2116+
ASSERT_OR_PRINT (!mongoc_change_stream_error_document (stream, &error, NULL),
2117+
error);
2118+
2119+
mongoc_write_concern_set_wmajority (wc, 30000);
2120+
mongoc_write_concern_append (wc, &opts);
2121+
ASSERT_OR_PRINT (mongoc_collection_insert_one (
2122+
coll, tmp_bson ("{'_id': 0}"), &opts, NULL, &error),
2123+
error);
2124+
2125+
/* Checking that a resume token is returned */
2126+
ASSERT (mongoc_change_stream_next (stream, &next_doc));
2127+
ASSERT (next_doc);
2128+
resume_token = mongoc_change_stream_get_resume_token (stream);
2129+
ASSERT (!bson_empty0 (resume_token));
2130+
2131+
/* Need to now check that we are getting back the _id of the last inserted
2132+
* document when we iterate to the last document */
2133+
bson_copy_to (next_doc, &expected_doc);
2134+
_check_doc_resume_token (&expected_doc, resume_token);
2135+
2136+
ASSERT (bson_iter_init_find (&iter, next_doc, "documentKey"));
2137+
ASSERT (bson_iter_recurse (&iter, &child));
2138+
ASSERT (bson_iter_find (&child, "_id") && bson_iter_int32 (&child) == 0);
2139+
2140+
/* Must check that getResumeToken returns resumeAfter correctly when
2141+
* specified. */
2142+
bson_copy_to (resume_token, &expected_token);
2143+
mongoc_change_stream_destroy (stream);
2144+
bson_destroy (&opts);
2145+
bson_init (&opts);
2146+
BSON_APPEND_DOCUMENT (&opts, "resumeAfter", &expected_token);
2147+
2148+
stream = mongoc_collection_watch (coll, tmp_bson ("{}"), &opts);
2149+
ASSERT (stream);
2150+
2151+
resume_token = mongoc_change_stream_get_resume_token (stream);
2152+
ASSERT (bson_equal (resume_token, &expected_token));
2153+
2154+
bson_destroy (&expected_doc);
2155+
bson_destroy (&expected_token);
2156+
bson_destroy (&opts);
2157+
mongoc_write_concern_destroy (wc);
2158+
mongoc_change_stream_destroy (stream);
2159+
mongoc_client_destroy (client);
2160+
mongoc_collection_destroy (coll);
2161+
}
2162+
2163+
2164+
void
2165+
prose_test_13 (void *ctx)
2166+
{
2167+
mongoc_client_t *client;
2168+
mongoc_collection_t *coll;
2169+
mongoc_change_stream_t *stream;
2170+
bson_error_t error;
2171+
const bson_t *next_doc = NULL;
2172+
mongoc_apm_callbacks_t *callbacks;
2173+
mongoc_write_concern_t *wc = mongoc_write_concern_new ();
2174+
bson_t opts = BSON_INITIALIZER;
2175+
const bson_t *resume_token;
2176+
bson_iter_t iter, child;
2177+
2178+
client = test_framework_client_new ();
2179+
ASSERT (client);
2180+
2181+
callbacks = mongoc_apm_callbacks_new ();
2182+
mongoc_apm_set_command_started_cb (callbacks,
2183+
test_resume_token_command_start);
2184+
mongoc_client_set_apm_callbacks (client, callbacks, &ctx);
2185+
2186+
coll = drop_and_get_coll (client, "db", "coll_resume");
2187+
ASSERT (coll);
2188+
ASSERT_OR_PRINT (
2189+
mongoc_collection_insert_one (coll, tmp_bson (NULL), NULL, NULL, &error),
2190+
error);
2191+
2192+
/* Set the batch size to 1 so we only get one document per call to next. */
2193+
stream = mongoc_collection_watch (
2194+
coll, tmp_bson ("{}"), tmp_bson ("{'batchSize': 1}"));
2195+
ASSERT (stream);
2196+
ASSERT_OR_PRINT (!mongoc_change_stream_error_document (stream, &error, NULL),
2197+
error);
2198+
2199+
/* Insert a few docs to listen for. Use write concern majority, so subsequent
2200+
* call to watch will be guaranteed to retrieve them. */
2201+
mongoc_write_concern_set_wmajority (wc, 30000);
2202+
mongoc_write_concern_append (wc, &opts);
2203+
ASSERT_OR_PRINT (mongoc_collection_insert_one (
2204+
coll, tmp_bson ("{'_id': 0}"), &opts, NULL, &error),
2205+
error);
2206+
2207+
ASSERT_OR_PRINT (mongoc_collection_insert_one (
2208+
coll, tmp_bson ("{'_id': 1}"), &opts, NULL, &error),
2209+
error);
2210+
2211+
/* The resume token should be updated to the most recently iterated doc */
2212+
ASSERT (mongoc_change_stream_next (stream, &next_doc));
2213+
ASSERT (next_doc);
2214+
resume_token = mongoc_change_stream_get_resume_token (stream);
2215+
ASSERT (!bson_empty0 (resume_token));
2216+
_check_doc_resume_token (next_doc, resume_token);
2217+
2218+
ASSERT (mongoc_change_stream_next (stream, &next_doc));
2219+
ASSERT (next_doc);
2220+
resume_token = mongoc_change_stream_get_resume_token (stream);
2221+
ASSERT (!bson_empty0 (resume_token));
2222+
_check_doc_resume_token (next_doc, resume_token);
2223+
2224+
ASSERT (bson_iter_init_find (&iter, next_doc, "documentKey"));
2225+
ASSERT (bson_iter_recurse (&iter, &child));
2226+
ASSERT (bson_iter_find (&child, "_id") && bson_iter_int32 (&child) == 1);
2227+
2228+
bson_destroy (&opts);
2229+
mongoc_write_concern_destroy (wc);
2230+
mongoc_apm_callbacks_destroy (callbacks);
2231+
mongoc_change_stream_destroy (stream);
2232+
mongoc_client_destroy (client);
2233+
mongoc_collection_destroy (coll);
2234+
}
2235+
2236+
void
2237+
prose_test_14 (void *test_ctx)
2238+
{
2239+
mongoc_client_t *client = test_framework_client_new ();
2240+
mongoc_collection_t *coll;
2241+
mongoc_change_stream_t *stream;
2242+
bson_t opts;
2243+
bson_error_t error;
2244+
const bson_t *resume_token;
2245+
bson_t expected_token;
2246+
const bson_t *doc = NULL;
2247+
2248+
coll = drop_and_get_coll (client, "db", "coll");
2249+
bson_init (&opts);
2250+
stream = mongoc_collection_watch (coll, tmp_bson ("{}"), &opts);
2251+
2252+
ASSERT_OR_PRINT (mongoc_collection_insert_one (
2253+
coll, tmp_bson ("{'_id': 0}"), &opts, NULL, &error),
2254+
error);
2255+
ASSERT_OR_PRINT (mongoc_collection_insert_one (
2256+
coll, tmp_bson ("{'_id': 1}"), &opts, NULL, &error),
2257+
error);
2258+
ASSERT_OR_PRINT (mongoc_collection_insert_one (
2259+
coll, tmp_bson ("{'_id': 2}"), &opts, NULL, &error),
2260+
error);
2261+
ASSERT_OR_PRINT (mongoc_collection_insert_one (
2262+
coll, tmp_bson ("{'_id': 3}"), &opts, NULL, &error),
2263+
error);
2264+
2265+
ASSERT (mongoc_change_stream_next (stream, &doc));
2266+
resume_token = mongoc_change_stream_get_resume_token (stream);
2267+
2268+
bson_copy_to (resume_token, &expected_token);
2269+
BSON_APPEND_DOCUMENT (&opts, "startAfter", &expected_token);
2270+
2271+
mongoc_change_stream_destroy (stream);
2272+
2273+
/* Start a new change stream using "startAfter" set to a previously obtained
2274+
resume token to guarantee a non-empty initial batch */
2275+
stream = mongoc_collection_watch (coll, tmp_bson ("{}"), &opts);
2276+
2277+
resume_token = mongoc_change_stream_get_resume_token (stream);
2278+
ASSERT (bson_equal (resume_token, &expected_token));
2279+
2280+
/* Doing the same using "resumeAfter" instead */
2281+
mongoc_change_stream_destroy (stream);
2282+
bson_destroy (&opts);
2283+
bson_init (&opts);
2284+
BSON_APPEND_DOCUMENT (&opts, "resumeAfter", &expected_token);
2285+
2286+
stream = mongoc_collection_watch (coll, tmp_bson ("{}"), &opts);
2287+
2288+
resume_token = mongoc_change_stream_get_resume_token (stream);
2289+
ASSERT (bson_equal (resume_token, &expected_token));
2290+
2291+
bson_destroy (&expected_token);
2292+
bson_destroy (&opts);
2293+
mongoc_change_stream_destroy (stream);
2294+
mongoc_collection_destroy (coll);
2295+
mongoc_client_destroy (client);
2296+
}
2297+
2298+
20122299
void
20132300
test_change_stream_install (TestSuite *suite)
20142301
{
@@ -2139,6 +2426,34 @@ test_change_stream_install (TestSuite *suite)
21392426
NULL,
21402427
NULL,
21412428
_skip_if_no_client_watch);
2429+
TestSuite_AddFull (suite,
2430+
"/change_stream/live/prose_test_11",
2431+
prose_test_11,
2432+
NULL,
2433+
NULL,
2434+
test_framework_skip_if_not_rs_version_6,
2435+
test_framework_skip_if_max_wire_version_less_than_8);
2436+
TestSuite_AddFull (suite,
2437+
"/change_stream/live/prose_test_12",
2438+
prose_test_12,
2439+
NULL,
2440+
NULL,
2441+
test_framework_skip_if_not_rs_version_6,
2442+
test_framework_skip_if_max_wire_version_more_than_7);
2443+
TestSuite_AddFull (suite,
2444+
"/change_stream/live/prose_test_13",
2445+
prose_test_13,
2446+
NULL,
2447+
NULL,
2448+
test_framework_skip_if_not_rs_version_6,
2449+
_skip_if_no_start_at_optime);
2450+
TestSuite_AddFull (suite,
2451+
"/change_stream/live/prose_test_14",
2452+
prose_test_14,
2453+
NULL,
2454+
NULL,
2455+
test_framework_skip_if_mongos,
2456+
test_framework_skip_if_not_rs_version_7);
21422457

21432458
test_framework_resolve_path (JSON_DIR "/change_streams", resolved);
21442459
install_json_test_suite (suite, resolved, &test_change_stream_spec_cb);

0 commit comments

Comments
 (0)