@@ -13,6 +13,7 @@ import (
1313 "fmt"
1414 "reflect"
1515 "sync"
16+ "sync/atomic"
1617 "testing"
1718 "time"
1819
@@ -597,46 +598,70 @@ func TestSessionsProse(t *testing.T) {
597598 })
598599 */
599600
601+ mt .ResetClient (options .Client ())
602+ client := mt .Client
603+ heartbeatStarted := make (chan struct {})
604+ heartbeatSucceeded := make (chan struct {})
605+ var clusterTimeAdvanced uint32
606+ serverMonitor := & event.ServerMonitor {
607+ ServerHeartbeatStarted : func (e * event.ServerHeartbeatStartedEvent ) {
608+ fmt .Println ("Server heartbeat started:" , e .ConnectionID )
609+ if atomic .LoadUint32 (& clusterTimeAdvanced ) == 1 {
610+ fmt .Println ("ServerHeartbeatStartedEvent: cluster time advanced" )
611+ select {
612+ case heartbeatStarted <- struct {}{}:
613+ // NOOP
614+ default :
615+ // NOOP
616+ }
617+ }
618+ },
619+ ServerHeartbeatSucceeded : func (e * event.ServerHeartbeatSucceededEvent ) {
620+ fmt .Println ("Server heartbeat succeeded:" , e .ConnectionID , e .Duration , e .Reply )
621+ if atomic .LoadUint32 (& clusterTimeAdvanced ) == 1 {
622+ fmt .Println ("ServerHeartbeatSucceededEvent: cluster time advanced" )
623+ select {
624+ case heartbeatSucceeded <- struct {}{}:
625+ // NOOP
626+ default :
627+ // NOOP
628+ }
629+ }
630+ },
631+ }
600632 pingOpts := mtest .NewOptions ().
601633 CreateCollection (false ).
602- Topologies (mtest .ReplicaSet , mtest .LoadBalanced )
634+ ClientOptions (options .Client ().
635+ SetServerMonitor (serverMonitor ).
636+ SetHeartbeatInterval (500 * time .Millisecond ). // Minimum interval
637+ SetDirect (true )).
638+ ClientType (mtest .Pinned )
603639 mt .RunOpts ("ping test" , pingOpts , func (mt * mtest.T ) {
604- serverMonitor := & event.ServerMonitor {
605- ServerHeartbeatStarted : func (e * event.ServerHeartbeatStartedEvent ) {
606- fmt .Println ("Server heartbeat started:" , e .ConnectionID )
607- },
608- ServerHeartbeatSucceeded : func (e * event.ServerHeartbeatSucceededEvent ) {
609- fmt .Println ("Server heartbeat succeeded:" , e .ConnectionID , e .Duration , e .Reply )
610- },
611- }
640+ err := mt .Client .Ping (context .Background (), readpref .Primary ())
641+ assert .NoError (mt , err , "expected no error, got: %v" , err )
612642
613- commandMonitor := & event.CommandMonitor {
614- Started : func (_ context.Context , cse * event.CommandStartedEvent ) {
615- fmt .Println ("Command started:" , cse .CommandName , cse .Command )
616- },
617- Succeeded : func (_ context.Context , cse * event.CommandSucceededEvent ) {
618- fmt .Println ("Command succeeded:" , cse .CommandName , cse .Reply )
619- },
620- }
643+ _ , err = client .Database ("test" ).Collection ("test" ).InsertOne (context .Background (), bson.D {{"advance" , "$clusterTime" }})
644+ require .NoError (mt , err , "expected no error inserting document, got: %v" , err )
621645
622- opts := options .Client ().
623- ApplyURI (mtest .ClusterURI ()).
624- SetHosts ([]string {mtest .ClusterConnString ().Hosts [0 ]}).
625- SetDirect (true ).
626- SetHeartbeatInterval (500 * time .Millisecond ). // Minimum interval
627- SetServerMonitor (serverMonitor ).
628- SetMonitor (commandMonitor )
646+ atomic .StoreUint32 (& clusterTimeAdvanced , 1 )
647+ <- heartbeatStarted
648+ <- heartbeatSucceeded
629649
630- client , err := mongo .Connect (opts )
631- require .NoError (mt , err , "expected no error connecting to client, got: %v" , err )
632- defer func () {
633- err = client .Disconnect (context .Background ())
634- require .NoError (mt , err , "expected no error disconnecting client, got: %v" , err )
635- }()
650+ err = mt .Client .Ping (context .Background (), readpref .Primary ())
651+ require .NoError (mt , err , "expected no error, got: %v" , err )
636652
637- err = client .Ping (context .Background (), readpref .Primary ())
638- assert .NoError (mt , err , "expected no error, got: %v" , err )
639- // mt.Fatal(mtest.ClusterURI(), mtest.ClusterConnString().Hosts, mtest.GlobalTopology().Description().Servers)
653+ succeededEvents := mt .GetAllSucceededEvents ()
654+ require .Len (mt , succeededEvents , 2 , "expected 2 succeeded events, got: %v" , len (succeededEvents ))
655+ require .Equal (mt , "ping" , succeededEvents [0 ].CommandName , "expected first command to be ping, got: %v" , succeededEvents [0 ].CommandName )
656+ initialClusterTime , err := succeededEvents [0 ].Reply .LookupErr ("$clusterTime" )
657+ require .NoError (mt , err , "$clusterTime not found in response" )
658+
659+ startedEvents := mt .GetAllStartedEvents ()
660+ require .Len (mt , startedEvents , 2 , "expected 2 started events, got: %v" , len (startedEvents ))
661+ require .Equal (mt , "ping" , startedEvents [1 ].CommandName , "expected second command to be ping, got: %v" , startedEvents [1 ].CommandName )
662+ currentClusterTime , err := startedEvents [1 ].Command .LookupErr ("$clusterTime" )
663+ require .NoError (mt , err , "$clusterTime not found in commane" )
664+ assert .Equal (mt , initialClusterTime , currentClusterTime , "expected same cluster time, got %v and %v" , initialClusterTime , currentClusterTime )
640665 })
641666}
642667
0 commit comments