Skip to content

Commit 5d5a41a

Browse files
committed
Stop gossiping cluster time on SDAM commands.
1 parent 056ba3c commit 5d5a41a

File tree

7 files changed

+80
-23
lines changed

7 files changed

+80
-23
lines changed

internal/integration/mtest/mongotest.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,9 @@ func (t *T) ResetClient(opts *options.ClientOptions) {
364364
t.clientOpts = opts
365365
}
366366

367-
_ = t.Client.Disconnect(context.Background())
367+
if t.Client != nil {
368+
_ = t.Client.Disconnect(context.Background())
369+
}
368370
t.createTestClient()
369371
t.DB = t.Client.Database(t.dbName)
370372
t.Coll = t.DB.Collection(t.collName, t.collOpts)

internal/integration/sessions_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ import (
1313
"fmt"
1414
"reflect"
1515
"sync"
16+
"sync/atomic"
1617
"testing"
1718
"time"
1819

1920
"go.mongodb.org/mongo-driver/v2/bson"
21+
"go.mongodb.org/mongo-driver/v2/event"
2022
"go.mongodb.org/mongo-driver/v2/internal/assert"
2123
"go.mongodb.org/mongo-driver/v2/internal/integration/mtest"
2224
"go.mongodb.org/mongo-driver/v2/internal/mongoutil"
@@ -508,6 +510,72 @@ func TestSessionsProse(t *testing.T) {
508510
assert.True(mt, limitedSessionUse, limitedSessMsg, len(ops))
509511

510512
})
513+
514+
mt.ResetClient(options.Client())
515+
client := mt.Client
516+
heartbeatStarted := make(chan struct{})
517+
heartbeatSucceeded := make(chan struct{})
518+
var clusterTimeAdvanced uint32
519+
serverMonitor := &event.ServerMonitor{
520+
ServerHeartbeatStarted: func(e *event.ServerHeartbeatStartedEvent) {
521+
fmt.Println("Server heartbeat started:", e.ConnectionID)
522+
if atomic.LoadUint32(&clusterTimeAdvanced) == 1 {
523+
fmt.Println("ServerHeartbeatStartedEvent: cluster time advanced")
524+
select {
525+
case heartbeatStarted <- struct{}{}:
526+
// NOOP
527+
default:
528+
// NOOP
529+
}
530+
}
531+
},
532+
ServerHeartbeatSucceeded: func(e *event.ServerHeartbeatSucceededEvent) {
533+
fmt.Println("Server heartbeat succeeded:", e.ConnectionID, e.Duration, e.Reply)
534+
if atomic.LoadUint32(&clusterTimeAdvanced) == 1 {
535+
fmt.Println("ServerHeartbeatSucceededEvent: cluster time advanced")
536+
select {
537+
case heartbeatSucceeded <- struct{}{}:
538+
// NOOP
539+
default:
540+
// NOOP
541+
}
542+
}
543+
},
544+
}
545+
pingOpts := mtest.NewOptions().
546+
CreateCollection(false).
547+
ClientOptions(options.Client().
548+
SetServerMonitor(serverMonitor).
549+
SetHeartbeatInterval(500 * time.Millisecond). // Minimum interval
550+
SetDirect(true)).
551+
ClientType(mtest.Pinned)
552+
mt.RunOpts("20 Drivers do not gossip $clusterTime on SDAM commands", pingOpts, func(mt *mtest.T) {
553+
err := mt.Client.Ping(context.Background(), readpref.Primary())
554+
assert.NoError(mt, err, "expected no error, got: %v", err)
555+
556+
_, err = client.Database("test").Collection("test").InsertOne(context.Background(), bson.D{{"advance", "$clusterTime"}})
557+
require.NoError(mt, err, "expected no error inserting document, got: %v", err)
558+
559+
atomic.StoreUint32(&clusterTimeAdvanced, 1)
560+
<-heartbeatStarted
561+
<-heartbeatSucceeded
562+
563+
err = mt.Client.Ping(context.Background(), readpref.Primary())
564+
require.NoError(mt, err, "expected no error, got: %v", err)
565+
566+
succeededEvents := mt.GetAllSucceededEvents()
567+
require.Len(mt, succeededEvents, 2, "expected 2 succeeded events, got: %v", len(succeededEvents))
568+
require.Equal(mt, "ping", succeededEvents[0].CommandName, "expected first command to be ping, got: %v", succeededEvents[0].CommandName)
569+
initialClusterTime, err := succeededEvents[0].Reply.LookupErr("$clusterTime")
570+
require.NoError(mt, err, "$clusterTime not found in response")
571+
572+
startedEvents := mt.GetAllStartedEvents()
573+
require.Len(mt, startedEvents, 2, "expected 2 started events, got: %v", len(startedEvents))
574+
require.Equal(mt, "ping", startedEvents[1].CommandName, "expected second command to be ping, got: %v", startedEvents[1].CommandName)
575+
currentClusterTime, err := startedEvents[1].Command.LookupErr("$clusterTime")
576+
require.NoError(mt, err, "$clusterTime not found in commane")
577+
assert.Equal(mt, initialClusterTime, currentClusterTime, "expected same cluster time, got %v and %v", initialClusterTime, currentClusterTime)
578+
})
511579
}
512580

513581
type sessionFunction struct {

x/mongo/driver/auth/auth.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ func (ah *authHandshaker) GetHandshakeInformation(
103103
AppName(ah.options.AppName).
104104
Compressors(ah.options.Compressors).
105105
SASLSupportedMechs(ah.options.DBUser).
106-
ClusterClock(ah.options.ClusterClock).
107106
ServerAPI(ah.options.ServerAPI).
108107
LoadBalanced(ah.options.LoadBalanced).
109108
OuterLibraryName(ah.options.OuterLibraryName).

x/mongo/driver/operation.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,9 +1103,11 @@ func (op Operation) readWireMessage(ctx context.Context, conn *mnet.Connection)
11031103

11041104
// decode
11051105
res, err := op.decodeResult(opcode, rem)
1106-
// Update cluster/operation time and recovery tokens before handling the error to ensure we're properly updating
1107-
// everything.
1108-
op.updateClusterTimes(res)
1106+
// When a cluster clock is given, update cluster/operation time and recovery tokens before handling the error
1107+
// to ensure we're properly updating everything.
1108+
if op.Clock != nil {
1109+
op.updateClusterTimes(res)
1110+
}
11091111
op.updateOperationTime(res)
11101112
op.Client.UpdateRecoveryToken(bson.Raw(res))
11111113

@@ -1699,7 +1701,10 @@ func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer)
16991701
if (clock == nil && client == nil) || !sessionsSupported(desc.WireVersion) {
17001702
return dst
17011703
}
1702-
clusterTime := clock.GetClusterTime()
1704+
var clusterTime bson.Raw
1705+
if clock != nil {
1706+
clusterTime = clock.GetClusterTime()
1707+
}
17031708
if client != nil {
17041709
clusterTime = session.MaxClusterTime(clusterTime, client.ClusterTime)
17051710
}
@@ -1711,7 +1716,6 @@ func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer)
17111716
return dst
17121717
}
17131718
return append(bsoncore.AppendHeader(dst, bsoncore.Type(val.Type), "$clusterTime"), val.Value...)
1714-
// return bsoncore.AppendDocumentElement(dst, "$clusterTime", clusterTime)
17151719
}
17161720

17171721
// calculateMaxTimeMS calculates the value of the 'maxTimeMS' field to potentially append

x/mongo/driver/operation/hello.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
2525
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/description"
2626
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/mnet"
27-
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/session"
2827
)
2928

3029
// maxClientMetadataSize is the maximum size of the client metadata document
@@ -42,7 +41,6 @@ type Hello struct {
4241
compressors []string
4342
saslSupportedMechs string
4443
d driver.Deployment
45-
clock *session.ClusterClock
4644
speculativeAuth bsoncore.Document
4745
topologyVersion *description.TopologyVersion
4846
maxAwaitTimeMS *int64
@@ -69,16 +67,6 @@ func (h *Hello) AppName(appname string) *Hello {
6967
return h
7068
}
7169

72-
// ClusterClock sets the cluster clock for this operation.
73-
func (h *Hello) ClusterClock(clock *session.ClusterClock) *Hello {
74-
if h == nil {
75-
h = new(Hello)
76-
}
77-
78-
h.clock = clock
79-
return h
80-
}
81-
8270
// Compressors sets the compressors that can be used.
8371
func (h *Hello) Compressors(compressors []string) *Hello {
8472
h.compressors = compressors
@@ -627,7 +615,6 @@ func isLegacyHandshake(srvAPI *driver.ServerAPIOptions, loadbalanced bool) bool
627615

628616
func (h *Hello) createOperation() driver.Operation {
629617
op := driver.Operation{
630-
Clock: h.clock,
631618
CommandFn: h.command,
632619
Database: "admin",
633620
Deployment: h.d,
@@ -652,7 +639,6 @@ func (h *Hello) GetHandshakeInformation(ctx context.Context, _ address.Address,
652639
deployment := driver.SingleConnectionDeployment{C: conn}
653640

654641
op := driver.Operation{
655-
Clock: h.clock,
656642
CommandFn: h.handshakeCommand,
657643
Deployment: deployment,
658644
Database: "admin",

x/mongo/driver/topology/server.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -842,7 +842,6 @@ func (s *Server) setupHeartbeatConnection(ctx context.Context) error {
842842
func (s *Server) createBaseOperation(conn *mnet.Connection) *operation.Hello {
843843
return operation.
844844
NewHello().
845-
ClusterClock(s.cfg.clock).
846845
Deployment(driver.SingleConnectionDeployment{C: conn}).
847846
ServerAPI(s.cfg.serverAPI)
848847
}

x/mongo/driver/topology/topology_options.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,6 @@ func NewConfigFromOptionsWithAuthenticator(opts *options.ClientOptions, clock *s
290290
return operation.NewHello().
291291
AppName(appName).
292292
Compressors(comps).
293-
ClusterClock(clock).
294293
ServerAPI(serverAPI).
295294
LoadBalanced(loadBalanced).
296295
OuterLibraryName(outerLibraryName).

0 commit comments

Comments
 (0)