Skip to content

Commit 7b9dbec

Browse files
author
Divjot Arora
authored
GODRIVER-1584 Ensure cluster time is updated from handshakes (#390)
1 parent cc4c7eb commit 7b9dbec

22 files changed

+210
-68
lines changed

.evergreen/config.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,6 +1235,20 @@ tasks:
12351235
AUTH: "auth"
12361236
SSL: "ssl"
12371237

1238+
- name: test-replicaset-auth-nossl
1239+
tags: ["test", "replicaset", "authssl"]
1240+
commands:
1241+
- func: bootstrap-mongo-orchestration
1242+
vars:
1243+
TOPOLOGY: "replica_set"
1244+
AUTH: "auth"
1245+
SSL: "nossl"
1246+
- func: run-tests
1247+
vars:
1248+
TOPOLOGY: "replica_set"
1249+
AUTH: "auth"
1250+
SSL: "nossl"
1251+
12381252
- name: test-sharded-noauth-nossl
12391253
tags: ["test", "sharded"]
12401254
commands:

mongo/client.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,9 @@ func (c *Client) configure(opts *options.ClientOptions) error {
324324

325325
// TODO(GODRIVER-814): Add tests for topology, server, and connection related options.
326326

327+
// ClusterClock
328+
c.clock = new(session.ClusterClock)
329+
327330
// Pass down URI so topology can determine whether or not SRV polling is required
328331
topologyOpts = append(topologyOpts, topology.WithURI(func(uri string) string {
329332
return opts.GetURI()
@@ -368,7 +371,7 @@ func (c *Client) configure(opts *options.ClientOptions) error {
368371
}
369372
// Handshaker
370373
var handshaker = func(driver.Handshaker) driver.Handshaker {
371-
return operation.NewIsMaster().AppName(appName).Compressors(comps)
374+
return operation.NewIsMaster().AppName(appName).Compressors(comps).ClusterClock(c.clock)
372375
}
373376
// Auth & Database & Password & Username
374377
if opts.Auth != nil {
@@ -399,6 +402,7 @@ func (c *Client) configure(opts *options.ClientOptions) error {
399402
AppName: appName,
400403
Authenticator: authenticator,
401404
Compressors: comps,
405+
ClusterClock: c.clock,
402406
}
403407
if mechanism == "" {
404408
// Required for SASL mechanism negotiation during handshake
@@ -553,9 +557,6 @@ func (c *Client) configure(opts *options.ClientOptions) error {
553557
}
554558
}
555559

556-
// ClusterClock
557-
c.clock = new(session.ClusterClock)
558-
559560
// OCSP cache
560561
ocspCache := ocsp.NewCache()
561562
connOpts = append(

mongo/integration/client_test.go

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func TestClient(t *testing.T) {
7171

7272
assert.Equal(mt, int64(-10), got.ID, "expected ID -10, got %v", got.ID)
7373
})
74-
mt.RunOpts("tls connection", mtest.NewOptions().MinServerVersion("3.0").Auth(true), func(mt *mtest.T) {
74+
mt.RunOpts("tls connection", mtest.NewOptions().MinServerVersion("3.0").SSL(true), func(mt *mtest.T) {
7575
var result bson.Raw
7676
err := mt.Coll.Database().RunCommand(mtest.Background, bson.D{
7777
{"serverStatus", 1},
@@ -86,7 +86,7 @@ func TestClient(t *testing.T) {
8686
_, found = security.Document().LookupErr("SSLServerHasCertificateAuthority")
8787
assert.Nil(mt, found, "SSLServerHasCertificateAuthority not found in result")
8888
})
89-
mt.RunOpts("x509", mtest.NewOptions().Auth(true), func(mt *mtest.T) {
89+
mt.RunOpts("x509", mtest.NewOptions().Auth(true).SSL(true), func(mt *mtest.T) {
9090
const user = "C=US,ST=New York,L=New York City,O=MongoDB,OU=other,CN=external"
9191
db := mt.Client.Database("$external")
9292

@@ -396,13 +396,13 @@ func TestClient(t *testing.T) {
396396
err := mt.Client.Ping(mtest.Background, mtest.PrimaryRp)
397397
assert.Nil(mt, err, "Ping error: %v", err)
398398

399-
sent := appNameProxyDialer.sent
400-
assert.True(mt, len(sent) >= 2, "expected at least 2 events sent, got %v", len(sent))
399+
msgPairs := appNameProxyDialer.messages
400+
assert.True(mt, len(msgPairs) >= 2, "expected at least 2 events sent, got %v", len(msgPairs))
401401

402402
// First two messages should be connection handshakes: one for the heartbeat connection and the other for the
403403
// application connection.
404-
for idx, wm := range sent[:2] {
405-
cmd, err := drivertest.GetCommandFromQueryWireMessage(wm)
404+
for idx, pair := range msgPairs[:2] {
405+
cmd, err := drivertest.GetCommandFromQueryWireMessage(pair.sent)
406406
assert.Nil(mt, err, "GetCommandFromQueryWireMessage error at index %d: %v", idx, err)
407407
heartbeatCmdName := cmd.Index(0).Key()
408408
assert.Equal(mt, "isMaster", heartbeatCmdName,
@@ -441,13 +441,19 @@ func TestClient(t *testing.T) {
441441
})
442442
}
443443

444+
type proxyMessage struct {
445+
serverAddress string
446+
sent wiremessage.WireMessage
447+
received wiremessage.WireMessage
448+
}
449+
444450
// proxyDialer is a ContextDialer implementation that wraps a net.Dialer and records the messages sent and received
445451
// using connections created through it.
446452
type proxyDialer struct {
447453
*net.Dialer
448454
sync.Mutex
449-
sent []wiremessage.WireMessage
450-
received []wiremessage.WireMessage
455+
messages []proxyMessage
456+
sentMap sync.Map
451457
}
452458

453459
var _ options.ContextDialer = (*proxyDialer)(nil)
@@ -480,7 +486,9 @@ func (p *proxyDialer) storeSentMessage(msg []byte) {
480486

481487
msgCopy := make(wiremessage.WireMessage, len(msg))
482488
copy(msgCopy, msg)
483-
p.sent = append(p.sent, msgCopy)
489+
490+
_, requestID, _, _, _, _ := wiremessage.ReadHeader(msgCopy)
491+
p.sentMap.Store(requestID, msgCopy)
484492
}
485493

486494
// storeReceivedMessage stores a copy of the wire message being received from the server.
@@ -490,7 +498,16 @@ func (p *proxyDialer) storeReceivedMessage(msg []byte) {
490498

491499
msgCopy := make(wiremessage.WireMessage, len(msg))
492500
copy(msgCopy, msg)
493-
p.received = append(p.received, msgCopy)
501+
502+
_, _, responseTo, _, _, _ := wiremessage.ReadHeader(msgCopy)
503+
sentMsg, _ := p.sentMap.Load(responseTo)
504+
p.sentMap.Delete(responseTo)
505+
506+
proxyMsg := proxyMessage{
507+
sent: sentMsg.(wiremessage.WireMessage),
508+
received: msgCopy,
509+
}
510+
p.messages = append(p.messages, proxyMsg)
494511
}
495512

496513
// proxyConn is a net.Conn that wraps a network connection. All messages sent/received through a proxyConn are stored

mongo/integration/mtest/mongotest.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ type T struct {
9292
maxServerVersion string
9393
validTopologies []TopologyKind
9494
auth *bool
95+
ssl *bool
9596
enterprise *bool
9697
collCreateOpts bson.D
9798
connsCheckedOut int // net number of connections checked out during test execution
@@ -493,6 +494,11 @@ func (t *T) AuthEnabled() bool {
493494
return testContext.authEnabled
494495
}
495496

497+
// SSLEnabled returns whether or not this test is running in an environment with SSL.
498+
func (t *T) SSLEnabled() bool {
499+
return testContext.sslEnabled
500+
}
501+
496502
// TopologyKind returns the topology kind of the environment
497503
func (t *T) TopologyKind() TopologyKind {
498504
return testContext.topoKind
@@ -662,6 +668,9 @@ func (t *T) shouldSkip() bool {
662668
if t.auth != nil && *t.auth != testContext.authEnabled {
663669
return true
664670
}
671+
if t.ssl != nil && *t.ssl != testContext.sslEnabled {
672+
return true
673+
}
665674
if t.enterprise != nil && *t.enterprise != testContext.enterpriseServer {
666675
return true
667676
}

mongo/integration/mtest/options.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,15 @@ func (op *Options) Auth(auth bool) *Options {
184184
return op
185185
}
186186

187+
// SSL specifies whether or not SSL should be enabled for this test to run. By default, a test will run regardless
188+
// of whether or not SSL is enabled.
189+
func (op *Options) SSL(ssl bool) *Options {
190+
op.optFuncs = append(op.optFuncs, func(t *T) {
191+
t.ssl = &ssl
192+
})
193+
return op
194+
}
195+
187196
// Enterprise specifies whether or not this test should only be run on enterprise server variants. Defaults to false.
188197
func (op *Options) Enterprise(ent bool) *Options {
189198
op.optFuncs = append(op.optFuncs, func(t *T) {

mongo/integration/mtest/setup.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ var testContext struct {
4242
client *mongo.Client // client used for setup and teardown
4343
serverVersion string
4444
authEnabled bool
45+
sslEnabled bool
4546
enterpriseServer bool
4647
}
4748

@@ -121,7 +122,8 @@ func Setup() error {
121122
}
122123
}
123124

124-
testContext.authEnabled = len(os.Getenv("MONGO_GO_DRIVER_CA_FILE")) != 0
125+
testContext.authEnabled = os.Getenv("AUTH") == "auth"
126+
testContext.sslEnabled = os.Getenv("SSL") == "ssl"
125127
biRes, err := testContext.client.Database("admin").RunCommand(Background, bson.D{{"buildInfo", 1}}).DecodeBytes()
126128
if err != nil {
127129
return fmt.Errorf("buildInfo error: %v", err)

mongo/integration/sessions_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package integration
88

99
import (
1010
"bytes"
11+
"os"
1112
"reflect"
1213
"testing"
1314
"time"
@@ -18,6 +19,7 @@ import (
1819
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
1920
"go.mongodb.org/mongo-driver/mongo/options"
2021
"go.mongodb.org/mongo-driver/mongo/readpref"
22+
"go.mongodb.org/mongo-driver/x/mongo/driver/drivertest"
2123
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
2224
)
2325

@@ -113,6 +115,53 @@ func TestSessions(t *testing.T) {
113115
})
114116
}
115117
})
118+
119+
clusterTimeDialer := newProxyDialer()
120+
hosts := options.Client().ApplyURI(mt.ConnString()).Hosts
121+
clusterTimeHandshakeOpts := options.Client().
122+
SetDialer(clusterTimeDialer).
123+
SetHosts(hosts[:1]).
124+
SetDirect(true)
125+
clusterTimeHandshakeMtOpts := mtest.NewOptions().
126+
ClientOptions(clusterTimeHandshakeOpts).
127+
CreateCollection(false).
128+
SSL(false) // The proxy dialer doesn't work for SSL connections.
129+
mt.RunOpts("cluster time is updated from handshakes", clusterTimeHandshakeMtOpts, func(mt *mtest.T) {
130+
// Compression uses a different opcode that the existing drivertest helpers can't handle and doesn't affect
131+
// the functionality tested here, so we can skip the test if compression is enabled.
132+
if len(os.Getenv("MONGO_GO_DRIVER_COMPRESSOR")) > 0 {
133+
mt.Skip("skipping for compression")
134+
}
135+
136+
err := mt.Client.Ping(mtest.Background, mtest.PrimaryRp)
137+
assert.Nil(mt, err, "Ping error: %v", err)
138+
139+
msgPairs := clusterTimeDialer.messages
140+
for idx, pair := range msgPairs {
141+
// Get the command sent to the server.
142+
cmd, err := drivertest.GetCommandFromQueryWireMessage(pair.sent)
143+
if err != nil {
144+
cmd, err = drivertest.GetCommandFromMsgWireMessage(pair.sent)
145+
}
146+
if err != nil {
147+
mt.Fatalf("error reading command document from wire message: %v", err)
148+
}
149+
150+
// Get the $clusterTime value sent to the server. The first two messages are the handshakes for the
151+
// heartbeat and application connections. These should not contain $clusterTime because they happen on
152+
// connections that don't know the server's wire version and therefore don't know if the server supports
153+
// $clusterTime.
154+
_, err = cmd.LookupErr("$clusterTime")
155+
if idx <= 1 {
156+
assert.NotNil(mt, err, "expected no $clusterTime field in command %s", cmd)
157+
continue
158+
}
159+
160+
// All messages after the first two should contain $clusterTime.
161+
assert.Nil(mt, err, "expected $clusterTime field in command %s", cmd)
162+
}
163+
})
164+
116165
mt.RunOpts("explicit implicit session arguments", noClientOpts, func(mt *mtest.T) {
117166
// lsid is included in commands with explicit and implicit sessions
118167

x/mongo/driver/auth/auth.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.mongodb.org/mongo-driver/x/mongo/driver/address"
1616
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
1717
"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
18+
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
1819
)
1920

2021
// AuthenticatorFactory constructs an authenticator.
@@ -56,6 +57,7 @@ type HandshakeOptions struct {
5657
Compressors []string
5758
DBUser string
5859
PerformAuthentication func(description.Server) bool
60+
ClusterClock *session.ClusterClock
5961
}
6062

6163
type authHandshaker struct {
@@ -74,7 +76,8 @@ func (ah *authHandshaker) GetDescription(ctx context.Context, addr address.Addre
7476
op := operation.NewIsMaster().
7577
AppName(ah.options.AppName).
7678
Compressors(ah.options.Compressors).
77-
SASLSupportedMechs(ah.options.DBUser)
79+
SASLSupportedMechs(ah.options.DBUser).
80+
ClusterClock(ah.options.ClusterClock)
7881

7982
if ah.options.Authenticator != nil {
8083
if speculativeAuth, ok := ah.options.Authenticator.(SpeculativeAuthenticator); ok {
@@ -112,7 +115,13 @@ func (ah *authHandshaker) FinishHandshake(ctx context.Context, conn driver.Conne
112115

113116
desc := conn.Description()
114117
if performAuth(desc) && ah.options.Authenticator != nil {
115-
if err := ah.authenticate(ctx, desc, conn); err != nil {
118+
cfg := &Config{
119+
Description: desc,
120+
Connection: conn,
121+
ClusterClock: ah.options.ClusterClock,
122+
}
123+
124+
if err := ah.authenticate(ctx, cfg); err != nil {
116125
return newAuthError("auth error", err)
117126
}
118127
}
@@ -123,20 +132,20 @@ func (ah *authHandshaker) FinishHandshake(ctx context.Context, conn driver.Conne
123132
return ah.wrapped.FinishHandshake(ctx, conn)
124133
}
125134

126-
func (ah *authHandshaker) authenticate(ctx context.Context, desc description.Server, conn driver.Connection) error {
135+
func (ah *authHandshaker) authenticate(ctx context.Context, cfg *Config) error {
127136
// If the initial isMaster reply included a response to the speculative authentication attempt, we only need to
128137
// conduct the remainder of the conversation.
129-
if desc.SpeculativeAuthenticate != nil {
138+
if speculativeResponse := cfg.Description.SpeculativeAuthenticate; speculativeResponse != nil {
130139
// Defensively ensure that the server did not include a response if speculative auth was not attempted.
131140
if ah.conversation == nil {
132141
return errors.New("speculative auth was not attempted but the server included a response")
133142
}
134-
return ah.conversation.Finish(ctx, desc.SpeculativeAuthenticate, conn)
143+
return ah.conversation.Finish(ctx, cfg, speculativeResponse)
135144
}
136145

137146
// If the server does not support speculative authentication or the first attempt was not successful, we need to
138147
// perform authentication from scratch.
139-
return ah.options.Authenticator.Auth(ctx, desc, conn)
148+
return ah.options.Authenticator.Auth(ctx, cfg)
140149
}
141150

142151
// Handshaker creates a connection handshaker for the given authenticator.
@@ -147,10 +156,17 @@ func Handshaker(h driver.Handshaker, options *HandshakeOptions) driver.Handshake
147156
}
148157
}
149158

159+
// Config holds the information necessary to perform an authentication attempt.
160+
type Config struct {
161+
Description description.Server
162+
Connection driver.Connection
163+
ClusterClock *session.ClusterClock
164+
}
165+
150166
// Authenticator handles authenticating a connection.
151167
type Authenticator interface {
152168
// Auth authenticates the connection.
153-
Auth(context.Context, description.Server, driver.Connection) error
169+
Auth(context.Context, *Config) error
154170
}
155171

156172
func newAuthError(msg string, inner error) error {

x/mongo/driver/auth/conversation.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"context"
1111

1212
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
13-
"go.mongodb.org/mongo-driver/x/mongo/driver"
1413
)
1514

1615
// SpeculativeConversation represents an authentication conversation that can be merged with the initial connection
@@ -23,7 +22,7 @@ import (
2322
// authenticate the provided connection.
2423
type SpeculativeConversation interface {
2524
FirstMessage() (bsoncore.Document, error)
26-
Finish(ctx context.Context, firstResponse bsoncore.Document, conn driver.Connection) error
25+
Finish(ctx context.Context, cfg *Config, firstResponse bsoncore.Document) error
2726
}
2827

2928
// SpeculativeAuthenticator represents an authenticator that supports speculative authentication.

0 commit comments

Comments
 (0)