Skip to content

Commit 006d36b

Browse files
committed
WIP
1 parent fa9ba8e commit 006d36b

File tree

6 files changed

+100
-84
lines changed

6 files changed

+100
-84
lines changed

internal/integration/sessions_test.go

Lines changed: 82 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"fmt"
1414
"reflect"
1515
"sync"
16-
"sync/atomic"
1716
"testing"
1817
"time"
1918

@@ -511,90 +510,92 @@ func TestSessionsProse(t *testing.T) {
511510

512511
})
513512

514-
mt.Run("20 Drivers do not gossip $clusterTime on SDAM commands", func(mt *mtest.T) {
515-
heartbeatStarted := make(chan struct{})
516-
heartbeatSucceeded := make(chan struct{})
517-
var clusterTimeAdvanced uint32
518-
serverMonitor := &event.ServerMonitor{
519-
ServerHeartbeatStarted: func(e *event.ServerHeartbeatStartedEvent) {
520-
fmt.Println("Server heartbeat started:", e.ConnectionID)
521-
if atomic.LoadUint32(&clusterTimeAdvanced) == 1 {
522-
fmt.Println("ServerHeartbeatStartedEvent: cluster time advanced")
523-
select {
524-
case heartbeatStarted <- struct{}{}:
525-
// NOOP
526-
default:
527-
// NOOP
513+
/*
514+
mt.Run("20 Drivers do not gossip $clusterTime on SDAM commands", func(mt *mtest.T) {
515+
heartbeatStarted := make(chan struct{})
516+
heartbeatSucceeded := make(chan struct{})
517+
var clusterTimeAdvanced uint32
518+
serverMonitor := &event.ServerMonitor{
519+
ServerHeartbeatStarted: func(e *event.ServerHeartbeatStartedEvent) {
520+
fmt.Println("Server heartbeat started:", e.ConnectionID)
521+
if atomic.LoadUint32(&clusterTimeAdvanced) == 1 {
522+
fmt.Println("ServerHeartbeatStartedEvent: cluster time advanced")
523+
select {
524+
case heartbeatStarted <- struct{}{}:
525+
// NOOP
526+
default:
527+
// NOOP
528+
}
528529
}
529-
}
530-
},
531-
ServerHeartbeatSucceeded: func(e *event.ServerHeartbeatSucceededEvent) {
532-
fmt.Println("Server heartbeat succeeded:", e.ConnectionID, e.Duration, e.Reply)
533-
if atomic.LoadUint32(&clusterTimeAdvanced) == 1 {
534-
fmt.Println("ServerHeartbeatSucceededEvent: cluster time advanced")
535-
select {
536-
case heartbeatSucceeded <- struct{}{}:
537-
// NOOP
538-
default:
539-
// NOOP
530+
},
531+
ServerHeartbeatSucceeded: func(e *event.ServerHeartbeatSucceededEvent) {
532+
fmt.Println("Server heartbeat succeeded:", e.ConnectionID, e.Duration, e.Reply)
533+
if atomic.LoadUint32(&clusterTimeAdvanced) == 1 {
534+
fmt.Println("ServerHeartbeatSucceededEvent: cluster time advanced")
535+
select {
536+
case heartbeatSucceeded <- struct{}{}:
537+
// NOOP
538+
default:
539+
// NOOP
540+
}
540541
}
541-
}
542-
},
543-
}
544-
545-
var pingStartedCommands []bson.Raw
546-
var pingSucceededCommands []bson.Raw
547-
commandMonitor := &event.CommandMonitor{
548-
Started: func(_ context.Context, cse *event.CommandStartedEvent) {
549-
fmt.Println("Command started:", cse.CommandName, cse.Command)
550-
if cse.CommandName == "ping" {
551-
pingStartedCommands = append(pingStartedCommands, cse.Command)
552-
}
553-
},
554-
Succeeded: func(_ context.Context, cse *event.CommandSucceededEvent) {
555-
fmt.Println("Command succeeded:", cse.CommandName, cse.Reply)
556-
if cse.CommandName == "ping" {
557-
pingSucceededCommands = append(pingSucceededCommands, cse.Reply)
558-
}
559-
},
560-
}
561-
562-
opts := options.Client().
563-
ApplyURI(mtest.ClusterURI()).
564-
SetHosts([]string{mtest.ClusterConnString().Hosts[0]}).
565-
SetDirect(true).
566-
SetHeartbeatInterval(500 * time.Millisecond). // Minimum interval
567-
SetServerMonitor(serverMonitor).
568-
SetMonitor(commandMonitor)
569-
570-
client, err := mongo.Connect(opts)
571-
require.NoError(mt, err, "expected no error connecting to client, got: %v", err)
572-
defer func() {
573-
err = client.Disconnect(context.Background())
574-
require.NoError(mt, err, "expected no error disconnecting client, got: %v", err)
575-
}()
576-
577-
err = client.Ping(context.Background(), readpref.Primary())
578-
require.NoError(mt, err, "expected no error, got: %v", err)
579-
580-
_, err = mt.Client.Database("test").Collection("test").InsertOne(context.Background(), bson.D{{"advance", "$clusterTime"}})
581-
require.NoError(mt, err, "expected no error inserting document, got: %v", err)
582-
583-
atomic.StoreUint32(&clusterTimeAdvanced, 1)
584-
<-heartbeatStarted
585-
<-heartbeatSucceeded
542+
},
543+
}
586544
587-
err = client.Ping(context.Background(), readpref.Primary())
588-
require.NoError(mt, err, "expected no error, got: %v", err)
545+
var pingStartedCommands []bson.Raw
546+
var pingSucceededCommands []bson.Raw
547+
commandMonitor := &event.CommandMonitor{
548+
Started: func(_ context.Context, cse *event.CommandStartedEvent) {
549+
fmt.Println("Command started:", cse.CommandName, cse.Command)
550+
if cse.CommandName == "ping" {
551+
pingStartedCommands = append(pingStartedCommands, cse.Command)
552+
}
553+
},
554+
Succeeded: func(_ context.Context, cse *event.CommandSucceededEvent) {
555+
fmt.Println("Command succeeded:", cse.CommandName, cse.Reply)
556+
if cse.CommandName == "ping" {
557+
pingSucceededCommands = append(pingSucceededCommands, cse.Reply)
558+
}
559+
},
560+
}
589561
590-
require.Len(mt, pingStartedCommands, 2, "expected 2 pings started, got: %v", len(pingStartedCommands))
591-
require.Len(mt, pingSucceededCommands, 2, "expected 2 pings succeeded, got: %v", len(pingSucceededCommands))
592-
initialClusterTime, err := pingSucceededCommands[0].LookupErr("$clusterTime")
593-
require.NoError(mt, err, "$clusterTime not found in response")
594-
currentClusterTime, err := pingStartedCommands[1].LookupErr("$clusterTime")
595-
require.NoError(mt, err, "$clusterTime not found in command")
596-
assert.Equal(mt, initialClusterTime, currentClusterTime, "expected same cluster time, got %v and %v", initialClusterTime, currentClusterTime)
597-
})
562+
opts := options.Client().
563+
ApplyURI(mtest.ClusterURI()).
564+
SetHosts([]string{mtest.ClusterConnString().Hosts[0]}).
565+
SetDirect(true).
566+
SetHeartbeatInterval(500 * time.Millisecond). // Minimum interval
567+
SetServerMonitor(serverMonitor).
568+
SetMonitor(commandMonitor)
569+
570+
client, err := mongo.Connect(opts)
571+
require.NoError(mt, err, "expected no error connecting to client, got: %v", err)
572+
defer func() {
573+
err = client.Disconnect(context.Background())
574+
require.NoError(mt, err, "expected no error disconnecting client, got: %v", err)
575+
}()
576+
577+
err = client.Ping(context.Background(), readpref.Primary())
578+
require.NoError(mt, err, "expected no error, got: %v", err)
579+
580+
_, err = mt.Client.Database("test").Collection("test").InsertOne(context.Background(), bson.D{{"advance", "$clusterTime"}})
581+
require.NoError(mt, err, "expected no error inserting document, got: %v", err)
582+
583+
atomic.StoreUint32(&clusterTimeAdvanced, 1)
584+
<-heartbeatStarted
585+
<-heartbeatSucceeded
586+
587+
err = client.Ping(context.Background(), readpref.Primary())
588+
require.NoError(mt, err, "expected no error, got: %v", err)
589+
590+
require.Len(mt, pingStartedCommands, 2, "expected 2 pings started, got: %v", len(pingStartedCommands))
591+
require.Len(mt, pingSucceededCommands, 2, "expected 2 pings succeeded, got: %v", len(pingSucceededCommands))
592+
initialClusterTime, err := pingSucceededCommands[0].LookupErr("$clusterTime")
593+
require.NoError(mt, err, "$clusterTime not found in response")
594+
currentClusterTime, err := pingStartedCommands[1].LookupErr("$clusterTime")
595+
require.NoError(mt, err, "$clusterTime not found in command")
596+
assert.Equal(mt, initialClusterTime, currentClusterTime, "expected same cluster time, got %v and %v", initialClusterTime, currentClusterTime)
597+
})
598+
*/
598599

599600
mt.Run("ping test", func(mt *mtest.T) {
600601
serverMonitor := &event.ServerMonitor{

x/mongo/driver/auth/auth.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func (ah *authHandshaker) GetHandshakeInformation(
103103
AppName(ah.options.AppName).
104104
Compressors(ah.options.Compressors).
105105
SASLSupportedMechs(ah.options.DBUser).
106+
ClusterClock(ah.options.ClusterClock).
106107
ServerAPI(ah.options.ServerAPI).
107108
LoadBalanced(ah.options.LoadBalanced).
108109
OuterLibraryName(ah.options.OuterLibraryName).

x/mongo/driver/operation.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,9 +1105,7 @@ func (op Operation) readWireMessage(ctx context.Context, conn *mnet.Connection)
11051105
res, err := op.decodeResult(opcode, rem)
11061106
// Update cluster/operation time and recovery tokens before handling the error to ensure we're properly updating
11071107
// everything.
1108-
if op.Clock != nil {
1109-
op.updateClusterTimes(res)
1110-
}
1108+
op.updateClusterTimes(res)
11111109
op.updateOperationTime(res)
11121110
op.Client.UpdateRecoveryToken(bson.Raw(res))
11131111

x/mongo/driver/operation/hello.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
2727
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/description"
2828
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/mnet"
29+
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/session"
2930
)
3031

3132
// maxClientMetadataSize is the maximum size of the client metadata document
@@ -43,6 +44,7 @@ type Hello struct {
4344
compressors []string
4445
saslSupportedMechs string
4546
d driver.Deployment
47+
clock *session.ClusterClock
4648
speculativeAuth bsoncore.Document
4749
topologyVersion *description.TopologyVersion
4850
maxAwaitTimeMS *int64
@@ -69,6 +71,16 @@ func (h *Hello) AppName(appname string) *Hello {
6971
return h
7072
}
7173

74+
// ClusterClock sets the cluster clock for this operation.
75+
func (h *Hello) ClusterClock(clock *session.ClusterClock) *Hello {
76+
if h == nil {
77+
h = new(Hello)
78+
}
79+
80+
h.clock = clock
81+
return h
82+
}
83+
7284
// Compressors sets the compressors that can be used.
7385
func (h *Hello) Compressors(compressors []string) *Hello {
7486
h.compressors = compressors
@@ -617,6 +629,7 @@ func isLegacyHandshake(srvAPI *driver.ServerAPIOptions, loadbalanced bool) bool
617629

618630
func (h *Hello) createOperation() driver.Operation {
619631
op := driver.Operation{
632+
Clock: h.clock,
620633
CommandFn: h.command,
621634
Database: "admin",
622635
Deployment: h.d,
@@ -649,6 +662,7 @@ func (h *Hello) GetHandshakeInformation(ctx context.Context, _ address.Address,
649662
deployment := driver.SingleConnectionDeployment{C: conn}
650663

651664
op := driver.Operation{
665+
Clock: h.clock,
652666
CommandFn: h.handshakeCommand,
653667
Deployment: deployment,
654668
Database: "admin",

x/mongo/driver/topology/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,7 @@ func (s *Server) setupHeartbeatConnection(ctx context.Context) error {
842842
func (s *Server) createBaseOperation(conn *mnet.Connection) *operation.Hello {
843843
return operation.
844844
NewHello().
845+
ClusterClock(s.cfg.clock).
845846
Deployment(driver.SingleConnectionDeployment{C: conn}).
846847
ServerAPI(s.cfg.serverAPI)
847848
}

x/mongo/driver/topology/topology_options.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ func NewConfigFromOptionsWithAuthenticator(opts *options.ClientOptions, clock *s
290290
return operation.NewHello().
291291
AppName(appName).
292292
Compressors(comps).
293+
ClusterClock(clock).
293294
ServerAPI(serverAPI).
294295
LoadBalanced(loadBalanced).
295296
OuterLibraryName(outerLibraryName).

0 commit comments

Comments
 (0)