Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/integration/mtest/mongotest.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,9 @@ func (t *T) ResetClient(opts *options.ClientOptions) {
t.clientOpts = opts
}

_ = t.Client.Disconnect(context.Background())
if t.Client != nil {
_ = t.Client.Disconnect(context.Background())
}
t.createTestClient()
t.DB = t.Client.Database(t.dbName)
t.Coll = t.DB.Collection(t.collName, t.collOpts)
Expand Down
74 changes: 74 additions & 0 deletions internal/integration/sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"

"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/event"
"go.mongodb.org/mongo-driver/v2/internal/assert"
"go.mongodb.org/mongo-driver/v2/internal/integration/mtest"
"go.mongodb.org/mongo-driver/v2/internal/mongoutil"
Expand Down Expand Up @@ -508,6 +510,78 @@ func TestSessionsProse(t *testing.T) {
assert.True(mt, limitedSessionUse, limitedSessMsg, len(ops))

})

mt.ResetClient(options.Client())
client := mt.Client
heartbeatStarted := make(chan struct{}, 1)
heartbeatSucceeded := make(chan struct{}, 1)
var clusterTimeAdvanced uint32
serverMonitor := &event.ServerMonitor{
ServerHeartbeatStarted: func(*event.ServerHeartbeatStartedEvent) {
if atomic.LoadUint32(&clusterTimeAdvanced) == 1 {
select {
case heartbeatStarted <- struct{}{}:
// NOOP
default:
// NOOP
}
}
},
ServerHeartbeatSucceeded: func(*event.ServerHeartbeatSucceededEvent) {
if atomic.LoadUint32(&clusterTimeAdvanced) == 1 {
select {
case heartbeatSucceeded <- struct{}{}:
// NOOP
default:
// NOOP
}
}
},
}
pingOpts := mtest.NewOptions().
CreateCollection(false).
ClientOptions(options.Client().
SetServerMonitor(serverMonitor).
SetHeartbeatInterval(500 * time.Millisecond). // Minimum interval
SetDirect(true)).
ClientType(mtest.Pinned)
mt.RunOpts("20 Drivers do not gossip $clusterTime on SDAM commands", pingOpts, func(mt *mtest.T) {
wait := func(mt *mtest.T, ch <-chan struct{}, label string) {
mt.Helper()

select {
case <-ch:
case <-time.After(5 * time.Second):
mt.Fatalf("timed out waiting for %s", label)
}
}

err := mt.Client.Ping(context.Background(), readpref.Primary())
assert.NoError(mt, err, "expected no error, got: %v", err)

_, err = client.Database("test").Collection("test").InsertOne(context.Background(), bson.D{{"advance", "$clusterTime"}})
require.NoError(mt, err, "expected no error inserting document, got: %v", err)

atomic.StoreUint32(&clusterTimeAdvanced, 1)
wait(mt, heartbeatStarted, "ServerHeartbeatStartedEvent")
wait(mt, heartbeatSucceeded, "ServerHeartbeatSucceededEvent")

err = mt.Client.Ping(context.Background(), readpref.Primary())
require.NoError(mt, err, "expected no error, got: %v", err)

succeededEvents := mt.GetAllSucceededEvents()
require.Len(mt, succeededEvents, 2, "expected 2 succeeded events, got: %v", len(succeededEvents))
require.Equal(mt, "ping", succeededEvents[0].CommandName, "expected first command to be ping, got: %v", succeededEvents[0].CommandName)
initialClusterTime, err := succeededEvents[0].Reply.LookupErr("$clusterTime")
require.NoError(mt, err, "$clusterTime not found in response")

startedEvents := mt.GetAllStartedEvents()
require.Len(mt, startedEvents, 2, "expected 2 started events, got: %v", len(startedEvents))
require.Equal(mt, "ping", startedEvents[1].CommandName, "expected second command to be ping, got: %v", startedEvents[1].CommandName)
currentClusterTime, err := startedEvents[1].Command.LookupErr("$clusterTime")
require.NoError(mt, err, "$clusterTime not found in command")
assert.Equal(mt, initialClusterTime, currentClusterTime, "expected same cluster time, got %v and %v", initialClusterTime, currentClusterTime)
})
}

type sessionFunction struct {
Expand Down
14 changes: 9 additions & 5 deletions x/mongo/driver/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,9 +1103,11 @@ func (op Operation) readWireMessage(ctx context.Context, conn *mnet.Connection)

// decode
res, err := op.decodeResult(opcode, rem)
// Update cluster/operation time and recovery tokens before handling the error to ensure we're properly updating
// everything.
op.updateClusterTimes(res)
// When a cluster clock is given, update cluster/operation time and recovery tokens before handling the error
// to ensure we're properly updating everything.
if op.Clock != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this block? Wouldn't op.updateClusterTimes(res) be a no-op if op.Click == nil for hello commands?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to stop updating the cluster time in session too, so L1709 in addClusterTime() will not append a “$clusterTime”.

op.updateClusterTimes(res)
}
op.updateOperationTime(res)
op.Client.UpdateRecoveryToken(bson.Raw(res))

Expand Down Expand Up @@ -1699,7 +1701,10 @@ func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer)
if (clock == nil && client == nil) || !sessionsSupported(desc.WireVersion) {
return dst
}
clusterTime := clock.GetClusterTime()
var clusterTime bson.Raw
if clock != nil {
clusterTime = clock.GetClusterTime()
}
if client != nil {
clusterTime = session.MaxClusterTime(clusterTime, client.ClusterTime)
}
Expand All @@ -1711,7 +1716,6 @@ func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer)
return dst
}
return append(bsoncore.AppendHeader(dst, bsoncore.Type(val.Type), "$clusterTime"), val.Value...)
// return bsoncore.AppendDocumentElement(dst, "$clusterTime", clusterTime)
}

// calculateMaxTimeMS calculates the value of the 'maxTimeMS' field to potentially append
Expand Down
1 change: 0 additions & 1 deletion x/mongo/driver/topology/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,6 @@ func (s *Server) setupHeartbeatConnection(ctx context.Context) error {
func (s *Server) createBaseOperation(conn *mnet.Connection) *operation.Hello {
return operation.
NewHello().
ClusterClock(s.cfg.clock).
Deployment(driver.SingleConnectionDeployment{C: conn}).
ServerAPI(s.cfg.serverAPI)
}
Expand Down
Loading