Skip to content

Commit b88fd0d

Browse files
author
iwysiu
committed
GODRIVER-628 connections survive primary stepdown
Change-Id: Iceaae40ffe70b87cf70f3957a44ecd31c227a5f4
1 parent 74cffef commit b88fd0d

File tree

9 files changed

+276
-41
lines changed

9 files changed

+276
-41
lines changed

event/monitoring.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,6 @@ type PoolEvent struct {
8686
}
8787

8888
// PoolMonitor is a function that allows the user to gain access to events occurring in the pool
89-
type PoolMonitor func(PoolEvent)
89+
type PoolMonitor struct {
90+
Event func(*PoolEvent)
91+
}

mongo/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ func (c *Client) configure(opts *options.ClientOptions) error {
375375
if opts.PoolMonitor != nil {
376376
serverOpts = append(
377377
serverOpts,
378-
topology.WithConnectionPoolMonitor(func(event.PoolMonitor) event.PoolMonitor { return *opts.PoolMonitor }),
378+
topology.WithConnectionPoolMonitor(func(*event.PoolMonitor) *event.PoolMonitor { return opts.PoolMonitor }),
379379
)
380380
}
381381
// Monitor

mongo/options/clientoptions_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ func TestClientOptions(t *testing.T) {
4545
}
4646
})
4747
t.Run("Set", func(t *testing.T) {
48-
var poolMonitor event.PoolMonitor = func(_ event.PoolEvent) {}
4948
testCases := []struct {
5049
name string
5150
fn interface{} // method to be run
@@ -64,7 +63,7 @@ func TestClientOptions(t *testing.T) {
6463
{"MaxConnIdleTime", (*ClientOptions).SetMaxConnIdleTime, 5 * time.Second, "MaxConnIdleTime", true},
6564
{"MaxPoolSize", (*ClientOptions).SetMaxPoolSize, uint64(250), "MaxPoolSize", true},
6665
{"MinPoolSize", (*ClientOptions).SetMinPoolSize, uint64(10), "MinPoolSize", true},
67-
{"PoolMonitor", (*ClientOptions).SetPoolMonitor, &poolMonitor, "PoolMonitor", false},
66+
{"PoolMonitor", (*ClientOptions).SetPoolMonitor, &event.PoolMonitor{}, "PoolMonitor", false},
6867
{"Monitor", (*ClientOptions).SetMonitor, &event.CommandMonitor{}, "Monitor", false},
6968
{"ReadConcern", (*ClientOptions).SetReadConcern, readconcern.Majority(), "ReadConcern", false},
7069
{"ReadPreference", (*ClientOptions).SetReadPreference, readpref.SecondaryPreferred(), "ReadPreference", false},

mongo/primary_stepdown_test.go

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
// Copyright (C) MongoDB, Inc. 2017-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
package mongo
8+
9+
import (
10+
"context"
11+
"os"
12+
"testing"
13+
14+
"github.com/stretchr/testify/require"
15+
"go.mongodb.org/mongo-driver/bson"
16+
"go.mongodb.org/mongo-driver/event"
17+
"go.mongodb.org/mongo-driver/internal/testutil"
18+
"go.mongodb.org/mongo-driver/mongo/options"
19+
"go.mongodb.org/mongo-driver/mongo/readpref"
20+
"go.mongodb.org/mongo-driver/mongo/writeconcern"
21+
)
22+
23+
type serverStatus struct {
24+
Host string
25+
Connections struct {
26+
TotalCreated int32 `bson:"totalCreated"`
27+
}
28+
}
29+
30+
var poolChan = make(chan *event.PoolEvent, 100)
31+
32+
var poolMonitor = event.PoolMonitor{func(event *event.PoolEvent) { poolChan <- event }}
33+
34+
func isPoolCleared() bool {
35+
for len(poolChan) > 0 {
36+
curr := <-poolChan
37+
if curr.Type == event.PoolCleared {
38+
return true
39+
}
40+
}
41+
return false
42+
}
43+
44+
func TestConnectionsSurvivePrimaryStepDown(t *testing.T) {
45+
if os.Getenv("TOPOLOGY") != "replica_set" {
46+
t.Skip("Needs to run on a replica set")
47+
}
48+
49+
ctx := context.Background()
50+
mongodbURI := testutil.ConnString(t)
51+
client, err := Connect(ctx, options.Client().ApplyURI(mongodbURI.String()).SetRetryWrites(false).
52+
SetPoolMonitor(&poolMonitor))
53+
require.NoError(t, err)
54+
db := client.Database("step-down", options.Database().SetWriteConcern(writeconcern.New(writeconcern.WMajority())))
55+
collName := "step-down"
56+
err = db.Collection(collName).Drop(ctx)
57+
require.NoError(t, err)
58+
59+
err = db.RunCommand(
60+
context.Background(),
61+
bson.D{{"create", collName}},
62+
).Err()
63+
require.NoError(t, err)
64+
coll := db.Collection(collName)
65+
66+
serverVersion, err := getServerVersion(db)
67+
require.NoError(t, err)
68+
adminDB := client.Database("admin")
69+
70+
for len(poolChan) > 0 {
71+
<-poolChan
72+
}
73+
74+
t.Run("getMore_iteration", func(t *testing.T) {
75+
if compareVersions(t, serverVersion, "4.2") < 0 {
76+
t.Skip("Needs server version >= 4.2")
77+
}
78+
initCollection(t, coll)
79+
cur, err := coll.Find(ctx, bson.D{}, options.Find().SetBatchSize(2))
80+
ok := cur.Next(ctx)
81+
require.True(t, ok)
82+
83+
err = adminDB.RunCommand(
84+
context.Background(),
85+
bson.D{{"replSetStepDown", 5}, {"force", true}},
86+
options.RunCmd().SetReadPreference(readpref.Primary()),
87+
).Err()
88+
require.NoError(t, err)
89+
90+
ok = cur.Next(ctx)
91+
require.True(t, ok)
92+
93+
require.False(t, isPoolCleared())
94+
})
95+
t.Run("notMaster_keep_pool", func(t *testing.T) {
96+
if compareVersions(t, serverVersion, "4.2") < 0 {
97+
t.Skip("Needs server version >= 4.2")
98+
}
99+
100+
err = adminDB.RunCommand(
101+
ctx,
102+
bson.D{{"configureFailPoint", "failCommand"},
103+
{"mode", bson.D{{"times", 1}}},
104+
{"data", bson.D{{"failCommands", bson.A{"insert"}}, {"errorCode", 10107}}}},
105+
).Err()
106+
require.NoError(t, err)
107+
defer func() {
108+
require.NoError(t, adminDB.RunCommand(ctx, bson.D{
109+
{"configureFailPoint", "failCommand"},
110+
{"mode", "off"},
111+
}).Err())
112+
}()
113+
114+
_, err = coll.InsertOne(ctx, bson.D{{"test", 1}})
115+
require.Error(t, err)
116+
117+
cerr, ok := err.(CommandError)
118+
require.True(t, ok)
119+
require.Equal(t, int32(10107), cerr.Code)
120+
121+
_, err = coll.InsertOne(ctx, bson.D{{"test", 1}})
122+
require.NoError(t, err)
123+
124+
require.False(t, isPoolCleared())
125+
})
126+
t.Run("notMaster_reset_pool", func(t *testing.T) {
127+
if compareVersions(t, serverVersion, "4.0") != 0 {
128+
t.Skip("Needs server version 4.0")
129+
}
130+
131+
err = adminDB.RunCommand(
132+
ctx,
133+
bson.D{{"configureFailPoint", "failCommand"},
134+
{"mode", bson.D{{"times", 1}}},
135+
{"data", bson.D{{"failCommands", bson.A{"insert"}}, {"errorCode", 10107}}}},
136+
).Err()
137+
require.NoError(t, err)
138+
defer func() {
139+
require.NoError(t, adminDB.RunCommand(ctx, bson.D{
140+
{"configureFailPoint", "failCommand"},
141+
{"mode", "off"},
142+
}).Err())
143+
}()
144+
_, err = coll.InsertOne(ctx, bson.D{{"test", 1}})
145+
require.Error(t, err)
146+
147+
cerr, ok := err.(CommandError)
148+
require.True(t, ok)
149+
require.Equal(t, int32(10107), cerr.Code)
150+
151+
require.True(t, isPoolCleared())
152+
})
153+
t.Run("shutdownInProgress_reset_pool", func(t *testing.T) {
154+
if compareVersions(t, serverVersion, "4.0") < 0 {
155+
t.Skip("Needs server version >= 4.0")
156+
}
157+
158+
err = adminDB.RunCommand(
159+
ctx,
160+
bson.D{{"configureFailPoint", "failCommand"},
161+
{"mode", bson.D{{"times", 1}}},
162+
{"data", bson.D{{"failCommands", bson.A{"insert"}}, {"errorCode", 91}}}},
163+
).Err()
164+
require.NoError(t, err)
165+
defer func() {
166+
require.NoError(t, adminDB.RunCommand(ctx, bson.D{
167+
{"configureFailPoint", "failCommand"},
168+
{"mode", "off"},
169+
}).Err())
170+
}()
171+
172+
_, err = coll.InsertOne(ctx, bson.D{{"test", 1}})
173+
require.Error(t, err)
174+
175+
cerr, ok := err.(CommandError)
176+
require.True(t, ok)
177+
require.Equal(t, int32(91), cerr.Code)
178+
179+
require.True(t, isPoolCleared())
180+
})
181+
t.Run("interruptedAtShutdown_reset_pool", func(t *testing.T) {
182+
if compareVersions(t, serverVersion, "4.0") < 0 {
183+
t.Skip("Needs server version >= 4.0")
184+
}
185+
186+
err = adminDB.RunCommand(
187+
ctx,
188+
bson.D{{"configureFailPoint", "failCommand"},
189+
{"mode", bson.D{{"times", 1}}},
190+
{"data", bson.D{{"failCommands", bson.A{"insert"}}, {"errorCode", 11600}}}},
191+
).Err()
192+
require.NoError(t, err)
193+
defer func() {
194+
require.NoError(t, adminDB.RunCommand(ctx, bson.D{
195+
{"configureFailPoint", "failCommand"},
196+
{"mode", "off"},
197+
}).Err())
198+
}()
199+
200+
_, err = coll.InsertOne(ctx, bson.D{{"test", 1}})
201+
require.Error(t, err)
202+
203+
cerr, ok := err.(CommandError)
204+
require.True(t, ok)
205+
require.Equal(t, int32(11600), cerr.Code)
206+
207+
require.True(t, isPoolCleared())
208+
})
209+
}

x/mongo/driver/errors.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ import (
1111
)
1212

1313
var (
14-
retryableCodes = []int32{11600, 11602, 10107, 13435, 13436, 189, 91, 7, 6, 89, 9001}
15-
nodeIsRecoveringCodes = []int32{11600, 11602, 13436, 189, 91}
16-
notMasterCodes = []int32{10107, 13435}
14+
retryableCodes = []int32{11600, 11602, 10107, 13435, 13436, 189, 91, 7, 6, 89, 9001}
15+
nodeIsRecoveringCodes = []int32{11600, 11602, 13436, 189, 91}
16+
notMasterCodes = []int32{10107, 13435}
17+
nodeIsShuttingDownCodes = []int32{11600, 91}
1718
)
1819

1920
var (
@@ -136,6 +137,16 @@ func (wce WriteConcernError) NodeIsRecovering() bool {
136137
return strings.Contains(wce.Message, "node is recovering")
137138
}
138139

140+
// NodeIsShuttingDown returns true if this error is a node is shutting down error.
141+
func (wce WriteConcernError) NodeIsShuttingDown() bool {
142+
for _, code := range nodeIsShuttingDownCodes {
143+
if wce.Code == int64(code) {
144+
return true
145+
}
146+
}
147+
return strings.Contains(wce.Message, "node is shutting down")
148+
}
149+
139150
// NotMaster returns true if this error is a not master error.
140151
func (wce WriteConcernError) NotMaster() bool {
141152
for _, code := range notMasterCodes {
@@ -245,6 +256,16 @@ func (e Error) NodeIsRecovering() bool {
245256
return strings.Contains(e.Message, "node is recovering")
246257
}
247258

259+
// NodeIsShuttingDown returns true if this error is a node is shutting down error.
260+
func (e Error) NodeIsShuttingDown() bool {
261+
for _, code := range nodeIsShuttingDownCodes {
262+
if e.Code == code {
263+
return true
264+
}
265+
}
266+
return strings.Contains(e.Message, "node is shutting down")
267+
}
268+
248269
// NotMaster returns true if this error is a not master error.
249270
func (e Error) NotMaster() bool {
250271
for _, code := range notMasterCodes {

x/mongo/driver/topology/CMAP_spec_test.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ type simThread struct {
5858

5959
type testInfo struct {
6060
objects map[string]interface{}
61-
originalEventChan chan event.PoolEvent
62-
finalEventChan chan event.PoolEvent
61+
originalEventChan chan *event.PoolEvent
62+
finalEventChan chan *event.PoolEvent
6363
threads map[string]*simThread
6464
backgroundThreadErrors chan error
6565
eventCounts map[string]uint64
@@ -90,8 +90,8 @@ func runCMAPTest(t *testing.T, testFileName string) {
9090

9191
testInfo := &testInfo{
9292
objects: make(map[string]interface{}),
93-
originalEventChan: make(chan event.PoolEvent, 200),
94-
finalEventChan: make(chan event.PoolEvent, 200),
93+
originalEventChan: make(chan *event.PoolEvent, 200),
94+
finalEventChan: make(chan *event.PoolEvent, 200),
9595
threads: make(map[string]*simThread),
9696
eventCounts: make(map[string]uint64),
9797
backgroundThreadErrors: make(chan error, 100),
@@ -110,10 +110,8 @@ func runCMAPTest(t *testing.T, testFileName string) {
110110
WithConnectionPoolMaxIdleTime(func(duration time.Duration) time.Duration {
111111
return time.Duration(test.PoolOptions.MaxIdleTimeMS) * time.Millisecond
112112
}),
113-
WithConnectionPoolMonitor(func(monitor event.PoolMonitor) event.PoolMonitor {
114-
return func(event event.PoolEvent) {
115-
testInfo.originalEventChan <- event
116-
}
113+
WithConnectionPoolMonitor(func(monitor *event.PoolMonitor) *event.PoolMonitor {
114+
return &event.PoolMonitor{func(event *event.PoolEvent) { testInfo.originalEventChan <- event }}
117115
}))
118116
testHelpers.RequireNil(t, err, "error creating server: %v", err)
119117
s.connectionstate = connected
@@ -184,7 +182,7 @@ func runCMAPTest(t *testing.T, testFileName string) {
184182

185183
}
186184

187-
func checkEvents(t *testing.T, expectedEvents []cmapEvent, actualEvents chan event.PoolEvent, ignoreEvents []string) {
185+
func checkEvents(t *testing.T, expectedEvents []cmapEvent, actualEvents chan *event.PoolEvent, ignoreEvents []string) {
188186
for _, expectedEvent := range expectedEvents {
189187
validEvent := nextValidEvent(t, actualEvents, ignoreEvents)
190188

@@ -265,7 +263,7 @@ EventsLeft:
265263
}
266264
}
267265

268-
func nextValidEvent(t *testing.T, events chan event.PoolEvent, ignoreEvents []string) event.PoolEvent {
266+
func nextValidEvent(t *testing.T, events chan *event.PoolEvent, ignoreEvents []string) *event.PoolEvent {
269267
t.Helper()
270268
NextEvent:
271269
for {

0 commit comments

Comments
 (0)