Skip to content

Commit f7babc7

Browse files
authored
GODRIVER-3288 Stop gossiping $clusterTime on SDAM commands. (#2150)
1 parent b7fee90 commit f7babc7

File tree

4 files changed

+86
-7
lines changed

4 files changed

+86
-7
lines changed

internal/integration/mtest/mongotest.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,9 @@ func (t *T) ResetClient(opts *options.ClientOptions) {
368368
t.clientOpts = opts
369369
}
370370

371-
_ = t.Client.Disconnect(context.Background())
371+
if t.Client != nil {
372+
_ = t.Client.Disconnect(context.Background())
373+
}
372374
t.createTestClient()
373375
t.DB = t.Client.Database(t.dbName)
374376
t.Coll = t.DB.Collection(t.collName, t.collOpts)

internal/integration/sessions_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ import (
1313
"fmt"
1414
"reflect"
1515
"sync"
16+
"sync/atomic"
1617
"testing"
1718
"time"
1819

1920
"go.mongodb.org/mongo-driver/v2/bson"
21+
"go.mongodb.org/mongo-driver/v2/event"
2022
"go.mongodb.org/mongo-driver/v2/internal/assert"
2123
"go.mongodb.org/mongo-driver/v2/internal/integration/mtest"
2224
"go.mongodb.org/mongo-driver/v2/internal/mongoutil"
@@ -508,6 +510,78 @@ func TestSessionsProse(t *testing.T) {
508510
assert.True(mt, limitedSessionUse, limitedSessMsg, len(ops))
509511

510512
})
513+
514+
mt.ResetClient(options.Client())
515+
client := mt.Client
516+
heartbeatStarted := make(chan struct{}, 1)
517+
heartbeatSucceeded := make(chan struct{}, 1)
518+
var clusterTimeAdvanced uint32
519+
serverMonitor := &event.ServerMonitor{
520+
ServerHeartbeatStarted: func(*event.ServerHeartbeatStartedEvent) {
521+
if atomic.LoadUint32(&clusterTimeAdvanced) == 1 {
522+
select {
523+
case heartbeatStarted <- struct{}{}:
524+
// NOOP
525+
default:
526+
// NOOP
527+
}
528+
}
529+
},
530+
ServerHeartbeatSucceeded: func(*event.ServerHeartbeatSucceededEvent) {
531+
if atomic.LoadUint32(&clusterTimeAdvanced) == 1 {
532+
select {
533+
case heartbeatSucceeded <- struct{}{}:
534+
// NOOP
535+
default:
536+
// NOOP
537+
}
538+
}
539+
},
540+
}
541+
pingOpts := mtest.NewOptions().
542+
CreateCollection(false).
543+
ClientOptions(options.Client().
544+
SetServerMonitor(serverMonitor).
545+
SetHeartbeatInterval(500 * time.Millisecond). // Minimum interval
546+
SetDirect(true)).
547+
ClientType(mtest.Pinned)
548+
mt.RunOpts("20 Drivers do not gossip $clusterTime on SDAM commands", pingOpts, func(mt *mtest.T) {
549+
wait := func(mt *mtest.T, ch <-chan struct{}, label string) {
550+
mt.Helper()
551+
552+
select {
553+
case <-ch:
554+
case <-time.After(5 * time.Second):
555+
mt.Fatalf("timed out waiting for %s", label)
556+
}
557+
}
558+
559+
err := mt.Client.Ping(context.Background(), readpref.Primary())
560+
assert.NoError(mt, err, "expected no error, got: %v", err)
561+
562+
_, err = client.Database("test").Collection("test").InsertOne(context.Background(), bson.D{{"advance", "$clusterTime"}})
563+
require.NoError(mt, err, "expected no error inserting document, got: %v", err)
564+
565+
atomic.StoreUint32(&clusterTimeAdvanced, 1)
566+
wait(mt, heartbeatStarted, "ServerHeartbeatStartedEvent")
567+
wait(mt, heartbeatSucceeded, "ServerHeartbeatSucceededEvent")
568+
569+
err = mt.Client.Ping(context.Background(), readpref.Primary())
570+
require.NoError(mt, err, "expected no error, got: %v", err)
571+
572+
succeededEvents := mt.GetAllSucceededEvents()
573+
require.Len(mt, succeededEvents, 2, "expected 2 succeeded events, got: %v", len(succeededEvents))
574+
require.Equal(mt, "ping", succeededEvents[0].CommandName, "expected first command to be ping, got: %v", succeededEvents[0].CommandName)
575+
initialClusterTime, err := succeededEvents[0].Reply.LookupErr("$clusterTime")
576+
require.NoError(mt, err, "$clusterTime not found in response")
577+
578+
startedEvents := mt.GetAllStartedEvents()
579+
require.Len(mt, startedEvents, 2, "expected 2 started events, got: %v", len(startedEvents))
580+
require.Equal(mt, "ping", startedEvents[1].CommandName, "expected second command to be ping, got: %v", startedEvents[1].CommandName)
581+
currentClusterTime, err := startedEvents[1].Command.LookupErr("$clusterTime")
582+
require.NoError(mt, err, "$clusterTime not found in command")
583+
assert.Equal(mt, initialClusterTime, currentClusterTime, "expected same cluster time, got %v and %v", initialClusterTime, currentClusterTime)
584+
})
511585
}
512586

513587
type sessionFunction struct {

x/mongo/driver/operation.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1133,9 +1133,11 @@ func (op Operation) readWireMessage(ctx context.Context, conn *mnet.Connection)
11331133

11341134
// decode
11351135
res, err := op.decodeResult(opcode, rem)
1136-
// Update cluster/operation time and recovery tokens before handling the error to ensure we're properly updating
1137-
// everything.
1138-
op.updateClusterTimes(res)
1136+
// When a cluster clock is given, update cluster/operation time and recovery tokens before handling the error
1137+
// to ensure we're properly updating everything.
1138+
if op.Clock != nil {
1139+
op.updateClusterTimes(res)
1140+
}
11391141
op.updateOperationTime(res)
11401142
op.Client.UpdateRecoveryToken(bson.Raw(res))
11411143

@@ -1729,7 +1731,10 @@ func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer)
17291731
if (clock == nil && client == nil) || !sessionsSupported(desc.WireVersion) {
17301732
return dst
17311733
}
1732-
clusterTime := clock.GetClusterTime()
1734+
var clusterTime bson.Raw
1735+
if clock != nil {
1736+
clusterTime = clock.GetClusterTime()
1737+
}
17331738
if client != nil {
17341739
clusterTime = session.MaxClusterTime(clusterTime, client.ClusterTime)
17351740
}
@@ -1741,7 +1746,6 @@ func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer)
17411746
return dst
17421747
}
17431748
return append(bsoncore.AppendHeader(dst, bsoncore.Type(val.Type), "$clusterTime"), val.Value...)
1744-
// return bsoncore.AppendDocumentElement(dst, "$clusterTime", clusterTime)
17451749
}
17461750

17471751
// calculateMaxTimeMS calculates the value of the 'maxTimeMS' field to potentially append

x/mongo/driver/topology/server.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -842,7 +842,6 @@ func (s *Server) setupHeartbeatConnection(ctx context.Context) error {
842842
func (s *Server) createBaseOperation(conn *mnet.Connection) *operation.Hello {
843843
return operation.
844844
NewHello().
845-
ClusterClock(s.cfg.clock).
846845
Deployment(driver.SingleConnectionDeployment{C: conn}).
847846
ServerAPI(s.cfg.serverAPI)
848847
}

0 commit comments

Comments
 (0)