@@ -2255,6 +2255,25 @@ prose_test_13 (void *ctx)
2255
2255
mongoc_collection_destroy (coll );
2256
2256
}
2257
2257
2258
+ static void
2259
+ _save_operation_time_from_agg (const mongoc_apm_command_succeeded_t * event )
2260
+ {
2261
+ if (0 == strcmp ("aggregate" ,
2262
+ mongoc_apm_command_succeeded_get_command_name (event ))) {
2263
+ mongoc_timestamp_t * timestamp ;
2264
+ bson_iter_t iter ;
2265
+ const bson_t * cmd ;
2266
+
2267
+ cmd = mongoc_apm_command_succeeded_get_reply (event );
2268
+ timestamp = mongoc_apm_command_succeeded_get_context (event );
2269
+ /* Capture the operationTime from the first aggregate reply. */
2270
+ if (timestamp -> timestamp == 0 ) {
2271
+ BSON_ASSERT (bson_iter_init_find (& iter , cmd , "operationTime" ));
2272
+ _mongoc_timestamp_set_from_bson (timestamp , & iter );
2273
+ }
2274
+ }
2275
+ }
2276
+
2258
2277
void
2259
2278
prose_test_14 (void * test_ctx )
2260
2279
{
@@ -2266,10 +2285,21 @@ prose_test_14 (void *test_ctx)
2266
2285
const bson_t * resume_token ;
2267
2286
bson_t expected_token ;
2268
2287
const bson_t * doc = NULL ;
2288
+ mongoc_timestamp_t optime = {0 };
2289
+ mongoc_apm_callbacks_t * callbacks ;
2290
+
2291
+ callbacks = mongoc_apm_callbacks_new ();
2292
+ mongoc_apm_set_command_succeeded_cb (callbacks ,
2293
+ _save_operation_time_from_agg );
2294
+ mongoc_client_set_apm_callbacks (client , callbacks , & optime );
2295
+ mongoc_apm_callbacks_destroy (callbacks );
2269
2296
2270
2297
coll = drop_and_get_coll (client , "db" , "coll" );
2271
2298
bson_init (& opts );
2272
2299
stream = mongoc_collection_watch (coll , tmp_bson ("{}" ), & opts );
2300
+ /* The _save_operation_time_from_agg listener must have stored the operation
2301
+ * time. */
2302
+ BSON_ASSERT (optime .timestamp != 0 );
2273
2303
2274
2304
ASSERT_OR_PRINT (mongoc_collection_insert_one (
2275
2305
coll , tmp_bson ("{'_id': 0}" ), & opts , NULL , & error ),
@@ -2309,6 +2339,17 @@ prose_test_14 (void *test_ctx)
2309
2339
2310
2340
resume_token = mongoc_change_stream_get_resume_token (stream );
2311
2341
ASSERT (bson_equal (resume_token , & expected_token ));
2342
+ mongoc_change_stream_destroy (stream );
2343
+
2344
+ /* Finally, with neither. */
2345
+ bson_destroy (& opts );
2346
+ bson_init (& opts );
2347
+ BSON_APPEND_TIMESTAMP (
2348
+ & opts , "startAtOperationTime" , optime .timestamp , optime .increment );
2349
+ stream = mongoc_collection_watch (coll , tmp_bson ("{}" ), & opts );
2350
+
2351
+ resume_token = mongoc_change_stream_get_resume_token (stream );
2352
+ ASSERT (resume_token == NULL );
2312
2353
2313
2354
bson_destroy (& expected_token );
2314
2355
bson_destroy (& opts );
0 commit comments