Skip to content

Commit 19a4ed1

Browse files
committed
GODRIVER-2037 Don't clear the connection pool on client-side connect timeout errors. (#688)
1 parent be28a1c commit 19a4ed1

File tree

8 files changed

+535
-65
lines changed

8 files changed

+535
-65
lines changed

mongo/integration/primary_stepdown_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package integration
88

99
import (
10+
"sync"
1011
"testing"
1112

1213
"go.mongodb.org/mongo-driver/bson"
@@ -23,7 +24,65 @@ const (
2324
errorInterruptedAtShutdown int32 = 11600
2425
)
2526

27+
// testPoolMonitor exposes an *event.PoolMonitor and collects all events logged to that
28+
// *event.PoolMonitor. It is safe to use from multiple concurrent goroutines.
29+
type testPoolMonitor struct {
30+
*event.PoolMonitor
31+
32+
events []*event.PoolEvent
33+
mu sync.RWMutex
34+
}
35+
36+
func newTestPoolMonitor() *testPoolMonitor {
37+
tpm := &testPoolMonitor{
38+
events: make([]*event.PoolEvent, 0),
39+
}
40+
tpm.PoolMonitor = &event.PoolMonitor{
41+
Event: func(evt *event.PoolEvent) {
42+
tpm.mu.Lock()
43+
defer tpm.mu.Unlock()
44+
tpm.events = append(tpm.events, evt)
45+
},
46+
}
47+
return tpm
48+
}
49+
50+
// Events returns a copy of the events collected by the testPoolMonitor. Filters can optionally be
51+
// applied to the returned events set and are applied using AND logic (i.e. all filters must return
52+
// true to include the event in the result).
53+
func (tpm *testPoolMonitor) Events(filters ...func(*event.PoolEvent) bool) []*event.PoolEvent {
54+
filtered := make([]*event.PoolEvent, 0, len(tpm.events))
55+
tpm.mu.RLock()
56+
defer tpm.mu.RUnlock()
57+
58+
for _, evt := range tpm.events {
59+
keep := true
60+
for _, filter := range filters {
61+
if !filter(evt) {
62+
keep = false
63+
break
64+
}
65+
}
66+
if keep {
67+
filtered = append(filtered, evt)
68+
}
69+
}
70+
71+
return filtered
72+
}
73+
74+
// IsPoolCleared returns true if there are any events of type "event.PoolCleared" in the events
75+
// recorded by the testPoolMonitor.
76+
func (tpm *testPoolMonitor) IsPoolCleared() bool {
77+
poolClearedEvents := tpm.Events(func(evt *event.PoolEvent) bool {
78+
return evt.Type == event.PoolCleared
79+
})
80+
return len(poolClearedEvents) > 0
81+
}
82+
2683
var poolChan = make(chan *event.PoolEvent, 100)
84+
85+
// TODO(GODRIVER-2068): Replace all uses of poolMonitor with individual instances of testPoolMonitor.
2786
var poolMonitor = &event.PoolMonitor{
2887
Event: func(event *event.PoolEvent) {
2988
poolChan <- event

mongo/integration/sdam_error_handling_test.go

Lines changed: 106 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,12 @@ func TestSDAMErrorHandling(t *testing.T) {
2828
return options.Client().
2929
ApplyURI(mtest.ClusterURI()).
3030
SetRetryWrites(false).
31-
SetPoolMonitor(poolMonitor).
3231
SetWriteConcern(mtest.MajorityWc)
3332
}
3433
baseMtOpts := func() *mtest.Options {
3534
mtOpts := mtest.NewOptions().
36-
Topologies(mtest.ReplicaSet). // Don't run on sharded clusters to avoid complexity of sharded failpoints.
37-
MinServerVersion("4.0"). // 4.0+ is required to use failpoints on replica sets.
35+
Topologies(mtest.ReplicaSet, mtest.Single). // Don't run on sharded clusters to avoid complexity of sharded failpoints.
36+
MinServerVersion("4.0"). // 4.0+ is required to use failpoints on replica sets.
3837
ClientOptions(baseClientOpts())
3938

4039
if mtest.ClusterTopologyKind() == mtest.Sharded {
@@ -48,13 +47,14 @@ func TestSDAMErrorHandling(t *testing.T) {
4847
// blockConnection and appName.
4948
mt.RunOpts("before handshake completes", baseMtOpts().Auth(true).MinServerVersion("4.4"), func(mt *mtest.T) {
5049
mt.RunOpts("network errors", noClientOpts, func(mt *mtest.T) {
51-
mt.Run("pool cleared on network timeout", func(mt *mtest.T) {
52-
// Assert that the pool is cleared when a connection created by an application operation thread
53-
// encounters a network timeout during handshaking. Unlike the non-timeout test below, we only test
54-
// connections created in the foreground for timeouts because connections created by the pool
55-
// maintenance routine can't be timed out using a context.
56-
57-
appName := "authNetworkTimeoutTest"
50+
mt.Run("pool not cleared on operation-scoped network timeout", func(mt *mtest.T) {
51+
// Assert that the pool is not cleared when a connection created by an application
52+
// operation thread encounters an operation timeout during handshaking. Unlike the
53+
// non-timeout test below, we only test connections created in the foreground for
54+
// timeouts because connections created by the pool maintenance routine can't be
55+
// timed out using a context.
56+
57+
appName := "authOperationTimeoutTest"
5858
// Set failpoint on saslContinue instead of saslStart because saslStart isn't done when using
5959
// speculative auth.
6060
mt.SetFailPoint(mtest.FailPoint{
@@ -70,24 +70,61 @@ func TestSDAMErrorHandling(t *testing.T) {
7070
},
7171
})
7272

73-
// Reset the client with the appName specified in the failpoint.
74-
clientOpts := options.Client().
75-
SetAppName(appName).
76-
SetRetryWrites(false).
77-
SetPoolMonitor(poolMonitor)
78-
mt.ResetClient(clientOpts)
79-
clearPoolChan()
73+
// Reset the client with the appName specified in the failpoint and the pool monitor.
74+
tpm := newTestPoolMonitor()
75+
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))
8076

81-
// The saslContinue blocks for 150ms so run the InsertOne with a 100ms context to cause a network
82-
// timeout during auth and assert that the pool was cleared.
77+
// Use a context with a 100ms timeout so that the saslContinue delay of 150ms causes
78+
// an operation-scoped context timeout (i.e. a timeout not caused by a client timeout
79+
// like connectTimeoutMS or socketTimeoutMS).
8380
timeoutCtx, cancel := context.WithTimeout(mtest.Background, 100*time.Millisecond)
8481
defer cancel()
8582
_, err := mt.Coll.InsertOne(timeoutCtx, bson.D{{"test", 1}})
8683
assert.NotNil(mt, err, "expected InsertOne error, got nil")
8784
assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)
8885
assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
89-
assert.True(mt, isPoolCleared(), "expected pool to be cleared but was not")
86+
assert.False(mt, tpm.IsPoolCleared(), "expected pool not to be cleared but was cleared")
9087
})
88+
89+
mt.Run("pool cleared on non-operation-scoped network timeout", func(mt *mtest.T) {
90+
// Assert that the pool is cleared when a connection created by an application
91+
// operation thread encounters a timeout caused by connectTimeoutMS during
92+
// handshaking.
93+
94+
appName := "authConnectTimeoutTest"
95+
// Set failpoint on saslContinue instead of saslStart because saslStart isn't done when using
96+
// speculative auth.
97+
mt.SetFailPoint(mtest.FailPoint{
98+
ConfigureFailPoint: "failCommand",
99+
Mode: mtest.FailPointMode{
100+
Times: 1,
101+
},
102+
Data: mtest.FailPointData{
103+
FailCommands: []string{"saslContinue"},
104+
BlockConnection: true,
105+
BlockTimeMS: 150,
106+
AppName: appName,
107+
},
108+
})
109+
110+
// Reset the client with the appName specified in the failpoint and the pool monitor.
111+
tpm := newTestPoolMonitor()
112+
mt.ResetClient(baseClientOpts().
113+
SetAppName(appName).
114+
SetPoolMonitor(tpm.PoolMonitor).
115+
// Set a 100ms socket timeout so that the saslContinue delay of 150ms causes a
116+
// timeout during socket read (i.e. a timeout not caused by the InsertOne context).
117+
SetSocketTimeout(100 * time.Millisecond))
118+
119+
// Use context.Background() so that the new connection will not time out due to an
120+
// operation-scoped timeout.
121+
_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"test", 1}})
122+
assert.NotNil(mt, err, "expected InsertOne error, got nil")
123+
assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)
124+
assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
125+
assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared but was not")
126+
})
127+
91128
mt.RunOpts("pool cleared on non-timeout network error", noClientOpts, func(mt *mtest.T) {
92129
mt.Run("background", func(mt *mtest.T) {
93130
// Assert that the pool is cleared when a connection created by the background pool maintenance
@@ -106,16 +143,19 @@ func TestSDAMErrorHandling(t *testing.T) {
106143
},
107144
})
108145

109-
clientOpts := options.Client().
146+
// Reset the client with the appName specified in the failpoint.
147+
tpm := newTestPoolMonitor()
148+
mt.ResetClient(baseClientOpts().
110149
SetAppName(appName).
111-
SetMinPoolSize(5).
112-
SetPoolMonitor(poolMonitor)
113-
mt.ResetClient(clientOpts)
114-
clearPoolChan()
150+
SetPoolMonitor(tpm.PoolMonitor).
151+
// Set minPoolSize to enable the background pool maintenance goroutine.
152+
SetMinPoolSize(5))
115153

116154
time.Sleep(200 * time.Millisecond)
117-
assert.True(mt, isPoolCleared(), "expected pool to be cleared but was not")
155+
156+
assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared but was not")
118157
})
158+
119159
mt.Run("foreground", func(mt *mtest.T) {
120160
// Assert that the pool is cleared when a connection created by an application thread connection
121161
// checkout encounters a non-timeout network error during handshaking.
@@ -133,24 +173,23 @@ func TestSDAMErrorHandling(t *testing.T) {
133173
},
134174
})
135175

136-
clientOpts := options.Client().
137-
SetAppName(appName).
138-
SetPoolMonitor(poolMonitor)
139-
mt.ResetClient(clientOpts)
140-
clearPoolChan()
176+
// Reset the client with the appName specified in the failpoint.
177+
tpm := newTestPoolMonitor()
178+
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))
141179

142180
_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
143181
assert.NotNil(mt, err, "expected InsertOne error, got nil")
144182
assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)
145-
assert.True(mt, isPoolCleared(), "expected pool to be cleared but was not")
183+
assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared but was not")
146184
})
147185
})
148186
})
149187
})
150188
mt.RunOpts("after handshake completes", baseMtOpts(), func(mt *mtest.T) {
151189
mt.RunOpts("network errors", noClientOpts, func(mt *mtest.T) {
152190
mt.Run("pool cleared on non-timeout network error", func(mt *mtest.T) {
153-
clearPoolChan()
191+
appName := "afterHandshakeNetworkError"
192+
154193
mt.SetFailPoint(mtest.FailPoint{
155194
ConfigureFailPoint: "failCommand",
156195
Mode: mtest.FailPointMode{
@@ -159,16 +198,22 @@ func TestSDAMErrorHandling(t *testing.T) {
159198
Data: mtest.FailPointData{
160199
FailCommands: []string{"insert"},
161200
CloseConnection: true,
201+
AppName: appName,
162202
},
163203
})
164204

205+
// Reset the client with the appName specified in the failpoint.
206+
tpm := newTestPoolMonitor()
207+
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))
208+
165209
_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"test", 1}})
166210
assert.NotNil(mt, err, "expected InsertOne error, got nil")
167211
assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)
168-
assert.True(mt, isPoolCleared(), "expected pool to be cleared but was not")
212+
assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared but was not")
169213
})
170214
mt.Run("pool not cleared on timeout network error", func(mt *mtest.T) {
171-
clearPoolChan()
215+
tpm := newTestPoolMonitor()
216+
mt.ResetClient(baseClientOpts().SetPoolMonitor(tpm.PoolMonitor))
172217

173218
_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
174219
assert.Nil(mt, err, "InsertOne error: %v", err)
@@ -181,11 +226,11 @@ func TestSDAMErrorHandling(t *testing.T) {
181226
_, err = mt.Coll.Find(timeoutCtx, filter)
182227
assert.NotNil(mt, err, "expected Find error, got %v", err)
183228
assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)
184-
185-
assert.False(mt, isPoolCleared(), "expected pool to not be cleared but was")
229+
assert.False(mt, tpm.IsPoolCleared(), "expected pool to not be cleared but was")
186230
})
187231
mt.Run("pool not cleared on context cancellation", func(mt *mtest.T) {
188-
clearPoolChan()
232+
tpm := newTestPoolMonitor()
233+
mt.ResetClient(baseClientOpts().SetPoolMonitor(tpm.PoolMonitor))
189234

190235
_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
191236
assert.Nil(mt, err, "InsertOne error: %v", err)
@@ -204,8 +249,7 @@ func TestSDAMErrorHandling(t *testing.T) {
204249
assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)
205250
assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
206251
assert.True(mt, errors.Is(err, context.Canceled), "expected error %v to be context.Canceled", err)
207-
208-
assert.False(mt, isPoolCleared(), "expected pool to not be cleared but was")
252+
assert.False(mt, tpm.IsPoolCleared(), "expected pool to not be cleared but was")
209253
})
210254
})
211255
mt.RunOpts("server errors", noClientOpts, func(mt *mtest.T) {
@@ -242,28 +286,32 @@ func TestSDAMErrorHandling(t *testing.T) {
242286
}
243287
for _, tc := range testCases {
244288
mt.RunOpts(fmt.Sprintf("command error - %s", tc.name), serverErrorsMtOpts, func(mt *mtest.T) {
245-
clearPoolChan()
289+
appName := fmt.Sprintf("command_error_%s", tc.name)
246290

247291
// Cause the next insert to fail with an ok:0 response.
248-
fp := mtest.FailPoint{
292+
mt.SetFailPoint(mtest.FailPoint{
249293
ConfigureFailPoint: "failCommand",
250294
Mode: mtest.FailPointMode{
251295
Times: 1,
252296
},
253297
Data: mtest.FailPointData{
254298
FailCommands: []string{"insert"},
255299
ErrorCode: tc.errorCode,
300+
AppName: appName,
256301
},
257-
}
258-
mt.SetFailPoint(fp)
302+
})
303+
304+
// Reset the client with the appName specified in the failpoint.
305+
tpm := newTestPoolMonitor()
306+
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))
259307

260-
runServerErrorsTest(mt, tc.isShutdownError)
308+
runServerErrorsTest(mt, tc.isShutdownError, tpm)
261309
})
262310
mt.RunOpts(fmt.Sprintf("write concern error - %s", tc.name), serverErrorsMtOpts, func(mt *mtest.T) {
263-
clearPoolChan()
311+
appName := fmt.Sprintf("write_concern_error_%s", tc.name)
264312

265313
// Cause the next insert to fail with a write concern error.
266-
fp := mtest.FailPoint{
314+
mt.SetFailPoint(mtest.FailPoint{
267315
ConfigureFailPoint: "failCommand",
268316
Mode: mtest.FailPointMode{
269317
Times: 1,
@@ -273,32 +321,36 @@ func TestSDAMErrorHandling(t *testing.T) {
273321
WriteConcernError: &mtest.WriteConcernErrorData{
274322
Code: tc.errorCode,
275323
},
324+
AppName: appName,
276325
},
277-
}
278-
mt.SetFailPoint(fp)
326+
})
327+
328+
// Reset the client with the appName specified in the failpoint.
329+
tpm := newTestPoolMonitor()
330+
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))
279331

280-
runServerErrorsTest(mt, tc.isShutdownError)
332+
runServerErrorsTest(mt, tc.isShutdownError, tpm)
281333
})
282334
}
283335
})
284336
})
285337
}
286338

287-
func runServerErrorsTest(mt *mtest.T, isShutdownError bool) {
339+
func runServerErrorsTest(mt *mtest.T, isShutdownError bool, tpm *testPoolMonitor) {
288340
mt.Helper()
289341

290342
_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
291343
assert.NotNil(mt, err, "expected InsertOne error, got nil")
292344

293345
// The pool should always be cleared for shutdown errors, regardless of server version.
294346
if isShutdownError {
295-
assert.True(mt, isPoolCleared(), "expected pool to be cleared, but was not")
347+
assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared, but was not")
296348
return
297349
}
298350

299351
// For non-shutdown errors, the pool is only cleared if the error is from a pre-4.2 server.
300352
wantCleared := mtest.CompareServerVersions(mtest.ServerVersion(), "4.2") < 0
301-
gotCleared := isPoolCleared()
302-
assert.Equal(mt, wantCleared, gotCleared, "expected pool to be cleared: %v; pool was cleared: %v",
353+
gotCleared := tpm.IsPoolCleared()
354+
assert.Equal(mt, wantCleared, gotCleared, "expected pool to be cleared: %t; pool was cleared: %t",
303355
wantCleared, gotCleared)
304356
}

0 commit comments

Comments
 (0)