Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/integration/mtest/mongotest.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,9 @@ func (t *T) ResetClient(opts *options.ClientOptions) {
t.clientOpts = opts
}

_ = t.Client.Disconnect(context.Background())
if t.Client != nil {
_ = t.Client.Disconnect(context.Background())
}
t.createTestClient()
t.DB = t.Client.Database(t.dbName)
t.Coll = t.DB.Collection(t.collName, t.collOpts)
Expand Down
64 changes: 64 additions & 0 deletions internal/integration/sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"

"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/event"
"go.mongodb.org/mongo-driver/v2/internal/assert"
"go.mongodb.org/mongo-driver/v2/internal/integration/mtest"
"go.mongodb.org/mongo-driver/v2/internal/mongoutil"
Expand Down Expand Up @@ -508,6 +510,68 @@ func TestSessionsProse(t *testing.T) {
assert.True(mt, limitedSessionUse, limitedSessMsg, len(ops))

})

mt.ResetClient(options.Client())
client := mt.Client
heartbeatStarted := make(chan struct{})
heartbeatSucceeded := make(chan struct{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest buffering these channels and adding a wait function:

wait := func(mt *mtest.T, ch <-chan struct{}, label string) {
	mt.Helper()

	select {
	case <-ch:
	case <-time.After(5 * time.Second):
		mt.Fatalf("timed out waiting for %s", label)
	}
}
wait(mt, heartbeatStarted, "heartbeat started")
wait(mt, heartbeatSucceeded, "heartbeat succeeded")

var clusterTimeAdvanced uint32
serverMonitor := &event.ServerMonitor{
ServerHeartbeatStarted: func(*event.ServerHeartbeatStartedEvent) {
if atomic.LoadUint32(&clusterTimeAdvanced) == 1 {
select {
case heartbeatStarted <- struct{}{}:
// NOOP
default:
// NOOP
}
}
},
ServerHeartbeatSucceeded: func(*event.ServerHeartbeatSucceededEvent) {
if atomic.LoadUint32(&clusterTimeAdvanced) == 1 {
select {
case heartbeatSucceeded <- struct{}{}:
// NOOP
default:
// NOOP
}
}
},
}
pingOpts := mtest.NewOptions().
CreateCollection(false).
ClientOptions(options.Client().
SetServerMonitor(serverMonitor).
SetHeartbeatInterval(500 * time.Millisecond). // Minimum interval
SetDirect(true)).
ClientType(mtest.Pinned)
mt.RunOpts("20 Drivers do not gossip $clusterTime on SDAM commands", pingOpts, func(mt *mtest.T) {
err := mt.Client.Ping(context.Background(), readpref.Primary())
assert.NoError(mt, err, "expected no error, got: %v", err)

_, err = client.Database("test").Collection("test").InsertOne(context.Background(), bson.D{{"advance", "$clusterTime"}})
require.NoError(mt, err, "expected no error inserting document, got: %v", err)

atomic.StoreUint32(&clusterTimeAdvanced, 1)
<-heartbeatStarted
<-heartbeatSucceeded

err = mt.Client.Ping(context.Background(), readpref.Primary())
require.NoError(mt, err, "expected no error, got: %v", err)

succeededEvents := mt.GetAllSucceededEvents()
require.Len(mt, succeededEvents, 2, "expected 2 succeeded events, got: %v", len(succeededEvents))
require.Equal(mt, "ping", succeededEvents[0].CommandName, "expected first command to be ping, got: %v", succeededEvents[0].CommandName)
initialClusterTime, err := succeededEvents[0].Reply.LookupErr("$clusterTime")
require.NoError(mt, err, "$clusterTime not found in response")

startedEvents := mt.GetAllStartedEvents()
require.Len(mt, startedEvents, 2, "expected 2 started events, got: %v", len(startedEvents))
require.Equal(mt, "ping", startedEvents[1].CommandName, "expected second command to be ping, got: %v", startedEvents[1].CommandName)
currentClusterTime, err := startedEvents[1].Command.LookupErr("$clusterTime")
require.NoError(mt, err, "$clusterTime not found in commane")
assert.Equal(mt, initialClusterTime, currentClusterTime, "expected same cluster time, got %v and %v", initialClusterTime, currentClusterTime)
})
}

type sessionFunction struct {
Expand Down
1 change: 0 additions & 1 deletion x/mongo/driver/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func (ah *authHandshaker) GetHandshakeInformation(
AppName(ah.options.AppName).
Compressors(ah.options.Compressors).
SASLSupportedMechs(ah.options.DBUser).
ClusterClock(ah.options.ClusterClock).
ServerAPI(ah.options.ServerAPI).
LoadBalanced(ah.options.LoadBalanced).
OuterLibraryName(ah.options.OuterLibraryName).
Expand Down
14 changes: 9 additions & 5 deletions x/mongo/driver/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,9 +1103,11 @@ func (op Operation) readWireMessage(ctx context.Context, conn *mnet.Connection)

// decode
res, err := op.decodeResult(opcode, rem)
// Update cluster/operation time and recovery tokens before handling the error to ensure we're properly updating
// everything.
op.updateClusterTimes(res)
// When a cluster clock is given, update cluster/operation time and recovery tokens before handling the error
// to ensure we're properly updating everything.
if op.Clock != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this block? Wouldn't op.updateClusterTimes(res) be a no-op if op.Click == nil for hello commands?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to stop updating the cluster time in session too, so L1709 in addClusterTime() will not append a “$clusterTime”.

op.updateClusterTimes(res)
}
op.updateOperationTime(res)
op.Client.UpdateRecoveryToken(bson.Raw(res))

Expand Down Expand Up @@ -1699,7 +1701,10 @@ func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer)
if (clock == nil && client == nil) || !sessionsSupported(desc.WireVersion) {
return dst
}
clusterTime := clock.GetClusterTime()
var clusterTime bson.Raw
if clock != nil {
clusterTime = clock.GetClusterTime()
}
if client != nil {
clusterTime = session.MaxClusterTime(clusterTime, client.ClusterTime)
}
Expand All @@ -1711,7 +1716,6 @@ func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer)
return dst
}
return append(bsoncore.AppendHeader(dst, bsoncore.Type(val.Type), "$clusterTime"), val.Value...)
// return bsoncore.AppendDocumentElement(dst, "$clusterTime", clusterTime)
}

// calculateMaxTimeMS calculates the value of the 'maxTimeMS' field to potentially append
Expand Down
14 changes: 0 additions & 14 deletions x/mongo/driver/operation/hello.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/mnet"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/session"
)

// maxClientMetadataSize is the maximum size of the client metadata document
Expand All @@ -42,7 +41,6 @@ type Hello struct {
compressors []string
saslSupportedMechs string
d driver.Deployment
clock *session.ClusterClock
speculativeAuth bsoncore.Document
topologyVersion *description.TopologyVersion
maxAwaitTimeMS *int64
Expand All @@ -69,16 +67,6 @@ func (h *Hello) AppName(appname string) *Hello {
return h
}

// ClusterClock sets the cluster clock for this operation.
func (h *Hello) ClusterClock(clock *session.ClusterClock) *Hello {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC we still want to gossip cluster times when establishing connections.

if h == nil {
h = new(Hello)
}

h.clock = clock
return h
}

// Compressors sets the compressors that can be used.
func (h *Hello) Compressors(compressors []string) *Hello {
h.compressors = compressors
Expand Down Expand Up @@ -627,7 +615,6 @@ func isLegacyHandshake(srvAPI *driver.ServerAPIOptions, loadbalanced bool) bool

func (h *Hello) createOperation() driver.Operation {
op := driver.Operation{
Clock: h.clock,
CommandFn: h.command,
Database: "admin",
Deployment: h.d,
Expand All @@ -652,7 +639,6 @@ func (h *Hello) GetHandshakeInformation(ctx context.Context, _ address.Address,
deployment := driver.SingleConnectionDeployment{C: conn}

op := driver.Operation{
Clock: h.clock,
CommandFn: h.handshakeCommand,
Deployment: deployment,
Database: "admin",
Expand Down
1 change: 0 additions & 1 deletion x/mongo/driver/topology/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,6 @@ func (s *Server) setupHeartbeatConnection(ctx context.Context) error {
func (s *Server) createBaseOperation(conn *mnet.Connection) *operation.Hello {
return operation.
NewHello().
ClusterClock(s.cfg.clock).
Deployment(driver.SingleConnectionDeployment{C: conn}).
ServerAPI(s.cfg.serverAPI)
}
Expand Down
1 change: 0 additions & 1 deletion x/mongo/driver/topology/topology_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ func NewConfigFromOptionsWithAuthenticator(opts *options.ClientOptions, clock *s
return operation.NewHello().
AppName(appName).
Compressors(comps).
ClusterClock(clock).
ServerAPI(serverAPI).
LoadBalanced(loadBalanced).
OuterLibraryName(outerLibraryName).
Expand Down
Loading