Skip to content

GODRIVER-3288 Stop gossiping $clusterTime on SDAM commands. #2150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions internal/integration/sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"

"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/event"
"go.mongodb.org/mongo-driver/v2/internal/assert"
"go.mongodb.org/mongo-driver/v2/internal/integration/mtest"
"go.mongodb.org/mongo-driver/v2/internal/mongoutil"
Expand Down Expand Up @@ -508,6 +510,82 @@ func TestSessionsProse(t *testing.T) {
assert.True(mt, limitedSessionUse, limitedSessMsg, len(ops))

})

mt.Run("20 Drivers do not gossip $clusterTime on SDAM commands", func(mt *mtest.T) {
heartbeatStarted := make(chan struct{})
heartbeatSucceeded := make(chan struct{})
var clusterTimeAdvanced uint32
serverMonitor := &event.ServerMonitor{
ServerHeartbeatStarted: func(e *event.ServerHeartbeatStartedEvent) {
fmt.Println("Server heartbeat started:", e.ConnectionID)
if atomic.LoadUint32(&clusterTimeAdvanced) == 1 {
fmt.Println("ServerHeartbeatStartedEvent: cluster time advanced")
select {
case heartbeatStarted <- struct{}{}:
// NOOP
default:
// NOOP
}
}
},
ServerHeartbeatSucceeded: func(e *event.ServerHeartbeatSucceededEvent) {
fmt.Println("Server heartbeat succeeded:", e.ConnectionID, e.Duration, e.Reply)
if atomic.LoadUint32(&clusterTimeAdvanced) == 1 {
fmt.Println("ServerHeartbeatSucceededEvent: cluster time advanced")
select {
case heartbeatSucceeded <- struct{}{}:
// NOOP
default:
// NOOP
}
}
},
}

opts := options.Client().
ApplyURI(mtest.ClusterURI()).
SetHosts([]string{mtest.ClusterConnString().Hosts[0]}).
SetDirect(true).
SetHeartbeatInterval(500 * time.Millisecond). // Minimum interval
SetMonitor(&event.CommandMonitor{
Started: func(_ context.Context, cse *event.CommandStartedEvent) {
fmt.Println("Command started:", cse.CommandName, cse.Command)
},
Succeeded: func(_ context.Context, cse *event.CommandSucceededEvent) {
fmt.Println("Command succeeded:", cse.CommandName, cse.Reply)
},
}).
SetServerMonitor(serverMonitor)

client, err := mongo.Connect(opts)
require.NoError(mt, err, "expected no error connecting to client, got: %v", err)
defer func() {
err = client.Disconnect(context.Background())
require.NoError(mt, err, "expected no error disconnecting client, got: %v", err)
}()

res, err := client.Database("admin").RunCommand(context.Background(), bson.D{
{"ping", 1},
}).Raw()
require.NoError(mt, err, "expected no error, got: %v", err)
replyClusterTime, err := res.LookupErr("$clusterTime")
require.NoError(mt, err, "$clusterTime not found in response")

_, err = mt.Client.Database("test").Collection("test").InsertOne(context.Background(), bson.D{{"advance", "$clusterTime"}})
require.NoError(mt, err, "expected no error inserting document, got: %v", err)

atomic.StoreUint32(&clusterTimeAdvanced, 1)
<-heartbeatStarted
<-heartbeatSucceeded

res, err = client.Database("admin").RunCommand(context.Background(), bson.D{
{"ping", 1},
}).Raw()
require.NoError(mt, err, "expected no error, got: %v", err)
replyClusterTimeNew, err := res.LookupErr("$clusterTime")
require.NoError(mt, err, "$clusterTime not found in response")
mt.Fatalf("$clusterTime: %v %v", replyClusterTime, replyClusterTimeNew)
})
}

type sessionFunction struct {
Expand Down
1 change: 0 additions & 1 deletion x/mongo/driver/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func (ah *authHandshaker) GetHandshakeInformation(
AppName(ah.options.AppName).
Compressors(ah.options.Compressors).
SASLSupportedMechs(ah.options.DBUser).
ClusterClock(ah.options.ClusterClock).
ServerAPI(ah.options.ServerAPI).
LoadBalanced(ah.options.LoadBalanced).
OuterLibraryName(ah.options.OuterLibraryName).
Expand Down
10 changes: 7 additions & 3 deletions x/mongo/driver/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,9 @@ func (op Operation) readWireMessage(ctx context.Context, conn *mnet.Connection)
res, err := op.decodeResult(opcode, rem)
// Update cluster/operation time and recovery tokens before handling the error to ensure we're properly updating
// everything.
op.updateClusterTimes(res)
if op.Clock != nil {
op.updateClusterTimes(res)
}
op.updateOperationTime(res)
op.Client.UpdateRecoveryToken(bson.Raw(res))

Expand Down Expand Up @@ -1699,7 +1701,10 @@ func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer)
if (clock == nil && client == nil) || !sessionsSupported(desc.WireVersion) {
return dst
}
clusterTime := clock.GetClusterTime()
var clusterTime bson.Raw
if clock != nil {
clusterTime = clock.GetClusterTime()
}
if client != nil {
clusterTime = session.MaxClusterTime(clusterTime, client.ClusterTime)
}
Expand All @@ -1711,7 +1716,6 @@ func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer)
return dst
}
return append(bsoncore.AppendHeader(dst, bsoncore.Type(val.Type), "$clusterTime"), val.Value...)
// return bsoncore.AppendDocumentElement(dst, "$clusterTime", clusterTime)
}

// calculateMaxTimeMS calculates the value of the 'maxTimeMS' field to potentially append
Expand Down
24 changes: 10 additions & 14 deletions x/mongo/driver/operation/hello.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ package operation
import (
"context"
"errors"
"fmt"
"os"
"runtime"
"strconv"
"strings"

"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/event"
"go.mongodb.org/mongo-driver/v2/internal/bsonutil"
"go.mongodb.org/mongo-driver/v2/internal/driverutil"
"go.mongodb.org/mongo-driver/v2/internal/handshake"
Expand All @@ -24,7 +26,6 @@ import (
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/mnet"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/session"
)

// maxClientMetadataSize is the maximum size of the client metadata document
Expand All @@ -42,7 +43,6 @@ type Hello struct {
compressors []string
saslSupportedMechs string
d driver.Deployment
clock *session.ClusterClock
speculativeAuth bsoncore.Document
topologyVersion *description.TopologyVersion
maxAwaitTimeMS *int64
Expand All @@ -69,16 +69,6 @@ func (h *Hello) AppName(appname string) *Hello {
return h
}

// ClusterClock sets the cluster clock for this operation.
func (h *Hello) ClusterClock(clock *session.ClusterClock) *Hello {
if h == nil {
h = new(Hello)
}

h.clock = clock
return h
}

// Compressors sets the compressors that can be used.
func (h *Hello) Compressors(compressors []string) *Hello {
h.compressors = compressors
Expand Down Expand Up @@ -627,7 +617,6 @@ func isLegacyHandshake(srvAPI *driver.ServerAPIOptions, loadbalanced bool) bool

func (h *Hello) createOperation() driver.Operation {
op := driver.Operation{
Clock: h.clock,
CommandFn: h.command,
Database: "admin",
Deployment: h.d,
Expand All @@ -637,6 +626,14 @@ func (h *Hello) createOperation() driver.Operation {
},
ServerAPI: h.serverAPI,
OmitMaxTimeMS: h.omitMaxTimeMS,
CommandMonitor: &event.CommandMonitor{
Started: func(_ context.Context, e *event.CommandStartedEvent) {
fmt.Println("Hello started:", e.CommandName, e.Command)
},
Succeeded: func(_ context.Context, e *event.CommandSucceededEvent) {
fmt.Println("Hello succeeded:", e.CommandName, e.Reply)
},
},
}

if isLegacyHandshake(h.serverAPI, h.loadBalanced) {
Expand All @@ -652,7 +649,6 @@ func (h *Hello) GetHandshakeInformation(ctx context.Context, _ address.Address,
deployment := driver.SingleConnectionDeployment{C: conn}

op := driver.Operation{
Clock: h.clock,
CommandFn: h.handshakeCommand,
Deployment: deployment,
Database: "admin",
Expand Down
1 change: 0 additions & 1 deletion x/mongo/driver/topology/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,6 @@ func (s *Server) setupHeartbeatConnection(ctx context.Context) error {
func (s *Server) createBaseOperation(conn *mnet.Connection) *operation.Hello {
return operation.
NewHello().
ClusterClock(s.cfg.clock).
Deployment(driver.SingleConnectionDeployment{C: conn}).
ServerAPI(s.cfg.serverAPI)
}
Expand Down
1 change: 0 additions & 1 deletion x/mongo/driver/topology/topology_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ func NewConfigFromOptionsWithAuthenticator(opts *options.ClientOptions, clock *s
return operation.NewHello().
AppName(appName).
Compressors(comps).
ClusterClock(clock).
ServerAPI(serverAPI).
LoadBalanced(loadBalanced).
OuterLibraryName(outerLibraryName).
Expand Down
Loading