Skip to content

Commit d810656

Browse files
author
Divjot Arora
committed
GODRIVER-1442 Fix connection error handling in topology.Server (#259)
1 parent 9ec3173 commit d810656

File tree

4 files changed

+52
-13
lines changed

4 files changed

+52
-13
lines changed

mongo/integration/primary_stepdown_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,24 @@ func TestConnectionsSurvivePrimaryStepDown(t *testing.T) {
124124
})
125125
}
126126
})
127+
mt.RunOpts("network errors", mtest.NewOptions().ClientOptions(clientOpts).MinServerVersion("4.0"), func(mt *mtest.T) {
128+
// expect that a server's connection pool will be cleared if a non-timeout network error occurs during an
129+
// operation
130+
131+
clearPoolChan()
132+
mt.SetFailPoint(mtest.FailPoint{
133+
ConfigureFailPoint: "failCommand",
134+
Mode: mtest.FailPointMode{
135+
Times: 1,
136+
},
137+
Data: mtest.FailPointData{
138+
FailCommands: []string{"insert"},
139+
CloseConnection: true,
140+
},
141+
})
142+
143+
_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"test", 1}})
144+
assert.NotNil(mt, err, "expected InsertOne error, got nil")
145+
assert.True(mt, isPoolCleared(), "expected pool to be cleared but was not")
146+
})
127147
}

x/mongo/driver/errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ type Error struct {
190190
Message string
191191
Labels []string
192192
Name string
193+
Wrapped error
193194
}
194195

195196
// UnsupportedStorageEngine returns whether e came as a result of an unsupported storage engine

x/mongo/driver/operation.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,7 @@ func (op Operation) roundTrip(ctx context.Context, conn Connection, wm []byte) (
546546
if op.Client != nil && op.Client.Committing {
547547
labels = append(labels, UnknownTransactionCommitResult)
548548
}
549-
return nil, Error{Message: err.Error(), Labels: labels}
549+
return nil, Error{Message: err.Error(), Labels: labels, Wrapped: err}
550550
}
551551

552552
wm, err = conn.ReadWireMessage(ctx, wm[:0])
@@ -561,7 +561,7 @@ func (op Operation) roundTrip(ctx context.Context, conn Connection, wm []byte) (
561561
if op.Client != nil && op.Client.Committing {
562562
labels = append(labels, UnknownTransactionCommitResult)
563563
}
564-
return nil, Error{Message: err.Error(), Labels: labels}
564+
return nil, Error{Message: err.Error(), Labels: labels, Wrapped: err}
565565
}
566566

567567
// decompress wiremessage
@@ -588,7 +588,7 @@ func (op *Operation) moreToComeRoundTrip(ctx context.Context, conn Connection, w
588588
if op.Client != nil {
589589
op.Client.MarkDirty()
590590
}
591-
err = Error{Message: err.Error(), Labels: []string{TransientTransactionError, NetworkError}}
591+
err = Error{Message: err.Error(), Labels: []string{TransientTransactionError, NetworkError}, Wrapped: err}
592592
}
593593
return bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "ok", 1)), err
594594
}

x/mongo/driver/topology/server.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -243,16 +243,16 @@ func (s *Server) Connection(ctx context.Context) (driver.Connection, error) {
243243
conn, err := s.pool.get(ctx)
244244
if err != nil {
245245
s.sem.Release(1)
246-
connErr, ok := err.(ConnectionError)
247-
if !ok {
246+
wrappedConnErr := unwrapConnectionError(err)
247+
if wrappedConnErr == nil {
248248
return nil, err
249249
}
250250

251251
// Since the only kind of ConnectionError we receive from pool.Get will be an initialization
252252
// error, we should set the description.Server appropriately.
253253
desc := description.Server{
254254
Kind: description.Unknown,
255-
LastError: connErr.Wrapped,
255+
LastError: wrappedConnErr,
256256
}
257257
s.updateDescription(desc, false)
258258

@@ -317,8 +317,9 @@ func (s *Server) RequestImmediateCheck() {
317317

318318
// ProcessError handles SDAM error handling and implements driver.ErrorProcessor.
319319
func (s *Server) ProcessError(err error) {
320-
// Invalidate server description if not master or node recovering error occurs
321-
if cerr, ok := err.(driver.Error); ok && (cerr.NetworkError() || cerr.NodeIsRecovering() || cerr.NotMaster()) {
320+
// Invalidate server description if not master or node recovering error occurs.
321+
// These errors can be reported as a command error or a write concern error.
322+
if cerr, ok := err.(driver.Error); ok && (cerr.NodeIsRecovering() || cerr.NotMaster()) {
322323
desc := s.Description()
323324
desc.Kind = description.Unknown
324325
desc.LastError = err
@@ -345,15 +346,16 @@ func (s *Server) ProcessError(err error) {
345346
return
346347
}
347348

348-
ne, ok := err.(ConnectionError)
349-
if !ok {
349+
wrappedConnErr := unwrapConnectionError(err)
350+
if wrappedConnErr == nil {
350351
return
351352
}
352353

353-
if netErr, ok := ne.Wrapped.(net.Error); ok && netErr.Timeout() {
354+
// Ignore transient timeout errors.
355+
if netErr, ok := wrappedConnErr.(net.Error); ok && netErr.Timeout() {
354356
return
355357
}
356-
if ne.Wrapped == context.Canceled || ne.Wrapped == context.DeadlineExceeded {
358+
if wrappedConnErr == context.Canceled || wrappedConnErr == context.DeadlineExceeded {
357359
return
358360
}
359361

@@ -362,6 +364,7 @@ func (s *Server) ProcessError(err error) {
362364
desc.LastError = err
363365
// updates description to unknown
364366
s.updateDescription(desc, false)
367+
s.pool.clear()
365368
}
366369

367370
// update handles performing heartbeats and updating any subscribers of the
@@ -551,7 +554,7 @@ func (s *Server) heartbeat(conn *connection) (description.Server, *connection) {
551554
if err != nil {
552555
saved = err
553556
conn = nil
554-
if _, ok := err.(ConnectionError); ok {
557+
if wrappedConnErr := unwrapConnectionError(err); wrappedConnErr != nil {
555558
s.pool.drain()
556559
// If the server is not connected, give up and exit loop
557560
if s.Description().Kind == description.Unknown {
@@ -637,3 +640,18 @@ func (ss *ServerSubscription) Unsubscribe() error {
637640

638641
return nil
639642
}
643+
644+
// unwrapConnectionError returns the connection error wrapped by err, or nil if err does not wrap a connection error.
645+
func unwrapConnectionError(err error) error {
646+
connErr, ok := err.(ConnectionError)
647+
if ok {
648+
return connErr.Wrapped
649+
}
650+
651+
driverErr, ok := err.(driver.Error)
652+
if ok && driverErr.NetworkError() {
653+
return driverErr.Wrapped
654+
}
655+
656+
return nil
657+
}

0 commit comments

Comments
 (0)