@@ -4,6 +4,7 @@ mod sessions_not_supported_skip_local; // requires mongocryptd
44use std:: {
55 future:: IntoFuture ,
66 sync:: { Arc , Mutex } ,
7+ time:: Duration ,
78} ;
89
910use futures:: TryStreamExt ;
@@ -12,12 +13,18 @@ use futures_util::{future::try_join_all, FutureExt};
1213use crate :: {
1314 bson:: { doc, Document } ,
1415 error:: { ErrorKind , Result } ,
15- event:: command:: { CommandEvent , CommandStartedEvent } ,
16+ event:: {
17+ command:: { CommandEvent , CommandStartedEvent } ,
18+ sdam:: SdamEvent ,
19+ } ,
1620 test:: {
1721 get_client_options,
22+ log_uncaptured,
1823 server_version_gte,
1924 spec:: unified_runner:: run_unified_tests,
25+ topology_is_load_balanced,
2026 topology_is_sharded,
27+ Event ,
2128 } ,
2229 Client ,
2330} ;
@@ -197,3 +204,84 @@ async fn implicit_session_after_connection() {
197204}
198205
199206// Prose tests 18 and 19 in sessions_not_supported_skip_local module
207+
208+ // Sessions prose test 20
209+ #[ tokio:: test]
210+ async fn no_cluster_time_in_sdam ( ) {
211+ if topology_is_load_balanced ( ) . await {
212+ log_uncaptured ( "Skipping no_cluster_time_in_sdam: load-balanced topology" ) ;
213+ return ;
214+ }
215+ let mut options = get_client_options ( ) . await . clone ( ) ;
216+ options. direct_connection = Some ( true ) ;
217+ options. hosts . drain ( 1 ..) ;
218+ let heartbeat_freq = Duration :: from_millis ( 10 ) ;
219+ options. heartbeat_freq = Some ( heartbeat_freq) ;
220+ let c1 = Client :: for_test ( )
221+ . options ( options)
222+ . min_heartbeat_freq ( heartbeat_freq)
223+ . monitor_events ( )
224+ . await ;
225+
226+ // Send a ping on c1
227+ let cluster_time = c1
228+ . database ( "admin" )
229+ . run_command ( doc ! { "ping" : 1 } )
230+ . await
231+ . unwrap ( )
232+ . get ( "$clusterTime" )
233+ . cloned ( ) ;
234+
235+ // Send a write on c2
236+ let c2 = Client :: for_test ( ) . await ;
237+ c2. database ( "test" )
238+ . collection :: < Document > ( "test" )
239+ . insert_one ( doc ! { "advance" : "$clusterTime" } )
240+ . await
241+ . unwrap ( ) ;
242+
243+ // Wait for the next (heartbeat started, heartbeat succeeded) event pair on c1
244+ let mut events = c1. events . stream ( ) ;
245+ const TIMEOUT : Duration = Duration :: from_secs ( 1 ) ;
246+ crate :: runtime:: timeout ( TIMEOUT , async {
247+ loop {
248+ // Find a started event...
249+ let _started = events
250+ . next_match ( TIMEOUT , |ev| {
251+ matches ! ( ev, Event :: Sdam ( SdamEvent :: ServerHeartbeatStarted ( _) ) )
252+ } )
253+ . await
254+ . unwrap ( ) ;
255+ // ... and the next heartbeat event after that ...
256+ let next_hb = events
257+ . next_map ( TIMEOUT , |ev| match ev {
258+ Event :: Sdam ( hb @ SdamEvent :: ServerHeartbeatStarted ( _) ) => Some ( hb) ,
259+ Event :: Sdam ( hb @ SdamEvent :: ServerHeartbeatFailed ( _) ) => Some ( hb) ,
260+ Event :: Sdam ( hb @ SdamEvent :: ServerHeartbeatSucceeded ( _) ) => Some ( hb) ,
261+ _ => None ,
262+ } )
263+ . await
264+ . unwrap ( ) ;
265+ // ... and see if it was a succeeded event.
266+ if matches ! ( next_hb, SdamEvent :: ServerHeartbeatSucceeded ( _) ) {
267+ break ;
268+ }
269+ }
270+ } )
271+ . await
272+ . unwrap ( ) ;
273+
274+ // Send another ping
275+ let mut events = c1. events . stream ( ) ;
276+ c1. database ( "admin" )
277+ . run_command ( doc ! { "ping" : 1 } )
278+ . await
279+ . unwrap ( ) ;
280+ let ( start, _succeded) = events
281+ . next_successful_command_execution ( TIMEOUT , "ping" )
282+ . await
283+ . unwrap ( ) ;
284+
285+ // Assert that the cluster time hasn't changed
286+ assert_eq ! ( cluster_time. as_ref( ) , start. command. get( "$clusterTime" ) ) ;
287+ }
0 commit comments