@@ -2635,6 +2635,180 @@ test_example_change_stream (mongoc_database_t *db)
2635
2635
}
2636
2636
2637
2637
2638
+ static void
2639
+ test_sample_causal_consistency (mongoc_client_t * client )
2640
+ {
2641
+ mongoc_session_opt_t * session_opts = NULL ;
2642
+ mongoc_client_session_t * session1 = NULL ;
2643
+ mongoc_client_session_t * session2 = NULL ;
2644
+ mongoc_read_prefs_t * read_prefs = NULL ;
2645
+ const bson_t * cluster_time = NULL ;
2646
+ mongoc_write_concern_t * wc = NULL ;
2647
+ mongoc_read_concern_t * rc = NULL ;
2648
+ mongoc_collection_t * coll = NULL ;
2649
+ mongoc_cursor_t * cursor = NULL ;
2650
+ const bson_t * result = NULL ;
2651
+ bson_t * update_opts = NULL ;
2652
+ bson_t * insert_opts = NULL ;
2653
+ bson_t * find_query = NULL ;
2654
+ bson_t * find_opts = NULL ;
2655
+ bson_t * insert = NULL ;
2656
+ bson_t * update = NULL ;
2657
+ bson_t * query = NULL ;
2658
+ bson_t * doc = NULL ;
2659
+ char * json = NULL ;
2660
+ uint32_t timestamp ;
2661
+ uint32_t increment ;
2662
+ bson_error_t error ;
2663
+ bool res ;
2664
+
2665
+ if (!test_framework_skip_if_no_txns ()) {
2666
+ return ;
2667
+ }
2668
+
2669
+ /* Seed the 'db.items' collection with a document. */
2670
+ coll = mongoc_client_get_collection (client , "db" , "items" );
2671
+ mongoc_collection_drop (coll , & error );
2672
+
2673
+ doc = BCON_NEW ("sku" , "111" , "name" , "Peanuts" ,
2674
+ "start" , BCON_DATE_TIME (bson_get_monotonic_time ()));
2675
+
2676
+ res = mongoc_collection_insert_one (coll , doc , NULL , NULL , & error );
2677
+ if (!res ) {
2678
+ fprintf (stderr , "insert failed: %s\n" , error .message );
2679
+ goto cleanup ;
2680
+ }
2681
+
2682
+ /* Example 1:
2683
+ * ----------
2684
+ * Use a causally-consistent session to run some operations. */
2685
+
2686
+ wc = mongoc_write_concern_new ();
2687
+ mongoc_write_concern_set_wmajority (wc , 1000 );
2688
+ mongoc_collection_set_write_concern (coll , wc );
2689
+
2690
+ rc = mongoc_read_concern_new ();
2691
+ mongoc_read_concern_set_level (rc , MONGOC_READ_CONCERN_LEVEL_MAJORITY );
2692
+ mongoc_collection_set_read_concern (coll , rc );
2693
+
2694
+ session_opts = mongoc_session_opts_new ();
2695
+ mongoc_session_opts_set_causal_consistency (session_opts , true);
2696
+
2697
+ session1 = mongoc_client_start_session (client , session_opts , & error );
2698
+ if (!session1 ) {
2699
+ fprintf (stderr , "couldn't start session: %s\n" , error .message );
2700
+ goto cleanup ;
2701
+ }
2702
+
2703
+ /* Run an update_one with our causally-consistent session */
2704
+ update_opts = bson_new ();
2705
+ res = mongoc_client_session_append (session1 , update_opts , & error );
2706
+ if (!res ) {
2707
+ fprintf (stderr , "couldn't add session to opts: %s\n" , error .message );
2708
+ goto cleanup ;
2709
+ }
2710
+
2711
+ query = BCON_NEW ("sku" , "111" );
2712
+ update = BCON_NEW ("$set" , "{" , "end" ,
2713
+ BCON_DATE_TIME (bson_get_monotonic_time ()), "}" );
2714
+ res = mongoc_collection_update_one (coll ,
2715
+ query ,
2716
+ update ,
2717
+ update_opts ,
2718
+ NULL , /* reply */
2719
+ & error );
2720
+
2721
+ if (!res ) {
2722
+ fprintf (stderr , "update failed: %s\n" , error .message );
2723
+ goto cleanup ;
2724
+ }
2725
+
2726
+ /* Run an insert with our causally-consistent session */
2727
+ insert_opts = bson_new ();
2728
+ res = mongoc_client_session_append (session1 , insert_opts , & error );
2729
+ if (!res ) {
2730
+ fprintf (stderr , "couldn't add session to opts: %s\n" , error .message );
2731
+ goto cleanup ;
2732
+ }
2733
+
2734
+ insert = BCON_NEW ("sku" , "nuts-111" , "name" , "Pecans" ,
2735
+ "start" , BCON_DATE_TIME (bson_get_monotonic_time ()));
2736
+ res = mongoc_collection_insert_one (coll , insert , insert_opts , NULL , & error );
2737
+ if (!res ) {
2738
+ fprintf (stderr , "insert failed: %s\n" , error .message );
2739
+ goto cleanup ;
2740
+ }
2741
+
2742
+ /* Example 2:
2743
+ * ----------
2744
+ * Make a new session, session2, and make it causally-consistent
2745
+ * with session1, so that session2 will read session1's writes. */
2746
+
2747
+ session2 = mongoc_client_start_session (client , session_opts , & error );
2748
+ if (!session2 ) {
2749
+ fprintf (stderr , "couldn't start session: %s\n" , error .message );
2750
+ goto cleanup ;
2751
+ }
2752
+
2753
+ /* Set the cluster time for session2 to session1's cluster time */
2754
+ cluster_time = mongoc_client_session_get_cluster_time (session1 );
2755
+ mongoc_client_session_advance_cluster_time (session2 , cluster_time );
2756
+
2757
+ /* Set the operation time for session2 to session2's operation time */
2758
+ mongoc_client_session_get_operation_time (session1 , & timestamp , & increment );
2759
+ mongoc_client_session_advance_operation_time (session2 ,
2760
+ timestamp ,
2761
+ increment );
2762
+
2763
+ /* Run a find on session2, which should now find all writes done
2764
+ * inside of session1 */
2765
+ find_opts = bson_new ();
2766
+ res = mongoc_client_session_append (session2 , find_opts , & error );
2767
+ if (!res ) {
2768
+ fprintf (stderr , "couldn't add session to opts: %s\n" , error .message );
2769
+ goto cleanup ;
2770
+ }
2771
+
2772
+ find_query = BCON_NEW ("end" , BCON_NULL );
2773
+ read_prefs = mongoc_read_prefs_new (MONGOC_READ_SECONDARY );
2774
+ cursor = mongoc_collection_find_with_opts (coll ,
2775
+ query ,
2776
+ find_opts ,
2777
+ read_prefs );
2778
+
2779
+ while (mongoc_cursor_next (cursor , & result )) {
2780
+ json = bson_as_json (result , NULL );
2781
+ fprintf (stdout , "Document: %s\n" , json );
2782
+ bson_free (json );
2783
+ }
2784
+
2785
+ if (mongoc_cursor_error (cursor , & error )) {
2786
+ fprintf (stderr , "cursor failure: %s\n" , error .message );
2787
+ goto cleanup ;
2788
+ }
2789
+
2790
+ cleanup :
2791
+
2792
+ bson_destroy (doc );
2793
+ bson_destroy (query );
2794
+ bson_destroy (insert );
2795
+ bson_destroy (update );
2796
+ bson_destroy (find_query );
2797
+ bson_destroy (update_opts );
2798
+ bson_destroy (find_opts );
2799
+ bson_destroy (insert_opts );
2800
+
2801
+ mongoc_read_concern_destroy (rc );
2802
+ mongoc_read_prefs_destroy (read_prefs );
2803
+ mongoc_write_concern_destroy (wc );
2804
+ mongoc_collection_destroy (coll );
2805
+ mongoc_cursor_destroy (cursor );
2806
+ mongoc_session_opts_destroy (session_opts );
2807
+ mongoc_client_session_destroy (session1 );
2808
+ mongoc_client_session_destroy (session2 );
2809
+ }
2810
+
2811
+
2638
2812
static void
2639
2813
test_sample_aggregation (mongoc_database_t * db )
2640
2814
{
@@ -3319,6 +3493,7 @@ test_sample_commands (void)
3319
3493
test_sample_command (test_example_58 , 58 , db , collection , false);
3320
3494
test_sample_command (test_example_56 , 56 , db , collection , true);
3321
3495
test_sample_change_stream_command (test_example_change_stream , db );
3496
+ test_sample_causal_consistency (client );
3322
3497
test_sample_aggregation (db );
3323
3498
test_sample_indexes (db );
3324
3499
test_sample_run_command (db );
0 commit comments