Skip to content

Commit 47a09d8

Browse files
author
Divjot Arora
authored
GODRIVER-1785 Add a status return to the ProcessError function (#547)
1 parent adbaaed commit 47a09d8

File tree

8 files changed

+219
-64
lines changed

8 files changed

+219
-64
lines changed

mongo/change_stream_deployment.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ func (c *changeStreamDeployment) Connection(context.Context) (driver.Connection,
3535
return c.conn, nil
3636
}
3737

38-
func (c *changeStreamDeployment) ProcessError(err error, conn driver.Connection) {
38+
func (c *changeStreamDeployment) ProcessError(err error, conn driver.Connection) driver.ProcessErrorResult {
3939
ep, ok := c.server.(driver.ErrorProcessor)
4040
if !ok {
41-
return
41+
return driver.NoChange
4242
}
4343

44-
ep.ProcessError(err, conn)
44+
return ep.ProcessError(err, conn)
4545
}

x/mongo/driver/driver.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,32 @@ type Compressor interface {
8888
CompressWireMessage(src, dst []byte) ([]byte, error)
8989
}
9090

91+
// ProcessErrorResult represents the result of a ErrorProcessor.ProcessError() call. Exact values for this type can be
92+
// checked directly (e.g. res == ServerMarkedUnknown), but it is recommended that applications use the ServerChanged()
93+
// function instead.
94+
type ProcessErrorResult int
95+
96+
const (
97+
// NoChange indicates that the error did not affect the state of the server.
98+
NoChange ProcessErrorResult = iota
99+
// ServerMarkedUnknown indicates that the error only resulted in the server being marked as Unknown.
100+
ServerMarkedUnknown
101+
// ConnectionPoolCleared indicates that the error resulted in the server being marked as Unknown and its connection
102+
// pool being cleared.
103+
ConnectionPoolCleared
104+
)
105+
106+
// ServerChanged returns true if the ProcessErrorResult indicates that the server changed from an SDAM perspective
107+
// during a ProcessError() call.
108+
func (p ProcessErrorResult) ServerChanged() bool {
109+
return p != NoChange
110+
}
111+
91112
// ErrorProcessor implementations can handle processing errors, which may modify their internal state.
92113
// If this type is implemented by a Server, then Operation.Execute will call it's ProcessError
93114
// method after it decodes a wire message.
94115
type ErrorProcessor interface {
95-
ProcessError(err error, conn Connection)
116+
ProcessError(err error, conn Connection) ProcessErrorResult
96117
}
97118

98119
// HandshakeInformation contains information extracted from a MongoDB connection handshake. This is a helper type that

x/mongo/driver/operation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error {
367367
}
368368
res, err = roundTrip(ctx, conn, wm)
369369
if ep, ok := srvr.(ErrorProcessor); ok {
370-
ep.ProcessError(err, conn)
370+
_ = ep.ProcessError(err, conn)
371371
}
372372

373373
finishedInfo.response = res

x/mongo/driver/operation_legacy.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ func (op Operation) legacyKillCursors(ctx context.Context, dst []byte, srvr Serv
297297
if err != nil {
298298
err = Error{Message: err.Error(), Labels: []string{TransientTransactionError, NetworkError}}
299299
if ep, ok := srvr.(ErrorProcessor); ok {
300-
ep.ProcessError(err, conn)
300+
_ = ep.ProcessError(err, conn)
301301
}
302302

303303
finishedInfo.cmdErr = err
@@ -635,7 +635,7 @@ func (op Operation) appendLegacyQueryDocument(dst []byte, filter bsoncore.Docume
635635
func (op Operation) roundTripLegacyCursor(ctx context.Context, wm []byte, srvr Server, conn Connection, collName, identifier string) (bsoncore.Document, error) {
636636
wm, err := op.roundTripLegacy(ctx, conn, wm)
637637
if ep, ok := srvr.(ErrorProcessor); ok {
638-
ep.ProcessError(err, conn)
638+
_ = ep.ProcessError(err, conn)
639639
}
640640
if err != nil {
641641
return nil, err

x/mongo/driver/topology/sdam_spec_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ func applyErrors(t *testing.T, topo *Topology, errors []applicationError) {
481481
case "beforeHandshakeCompletes":
482482
server.ProcessHandshakeError(currError, generation)
483483
case "afterHandshakeCompletes":
484-
server.ProcessError(currError, &conn)
484+
_ = server.ProcessError(currError, &conn)
485485
default:
486486
t.Fatalf("unrecognized applicationError.When value: %v", appErr.When)
487487
}

x/mongo/driver/topology/server.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -356,66 +356,70 @@ func getWriteConcernErrorForProcessing(err error) (*driver.WriteConcernError, bo
356356
}
357357

358358
// ProcessError handles SDAM error handling and implements driver.ErrorProcessor.
359-
func (s *Server) ProcessError(err error, conn driver.Connection) {
359+
func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessErrorResult {
360360
// ignore nil error
361361
if err == nil {
362-
return
362+
return driver.NoChange
363363
}
364364

365365
s.processErrorLock.Lock()
366366
defer s.processErrorLock.Unlock()
367367

368368
// ignore stale error
369369
if conn.Stale() {
370-
return
370+
return driver.NoChange
371371
}
372372
// Invalidate server description if not master or node recovering error occurs.
373373
// These errors can be reported as a command error or a write concern error.
374374
desc := conn.Description()
375375
if cerr, ok := err.(driver.Error); ok && (cerr.NodeIsRecovering() || cerr.NotMaster()) {
376376
// ignore stale error
377377
if desc.TopologyVersion.CompareToIncoming(cerr.TopologyVersion) >= 0 {
378-
return
378+
return driver.NoChange
379379
}
380380

381381
// updates description to unknown
382382
s.updateDescription(description.NewServerFromError(s.address, err, cerr.TopologyVersion))
383383
s.RequestImmediateCheck()
384384

385+
res := driver.ServerMarkedUnknown
385386
// If the node is shutting down or is older than 4.2, we synchronously clear the pool
386387
if cerr.NodeIsShuttingDown() || desc.WireVersion == nil || desc.WireVersion.Max < 8 {
388+
res = driver.ConnectionPoolCleared
387389
s.pool.clear()
388390
}
389-
return
391+
return res
390392
}
391393
if wcerr, ok := getWriteConcernErrorForProcessing(err); ok {
392394
// ignore stale error
393395
if desc.TopologyVersion.CompareToIncoming(wcerr.TopologyVersion) >= 0 {
394-
return
396+
return driver.NoChange
395397
}
396398

397399
// updates description to unknown
398400
s.updateDescription(description.NewServerFromError(s.address, err, wcerr.TopologyVersion))
399401
s.RequestImmediateCheck()
400402

403+
res := driver.ServerMarkedUnknown
401404
// If the node is shutting down or is older than 4.2, we synchronously clear the pool
402405
if wcerr.NodeIsShuttingDown() || desc.WireVersion == nil || desc.WireVersion.Max < 8 {
406+
res = driver.ConnectionPoolCleared
403407
s.pool.clear()
404408
}
405-
return
409+
return res
406410
}
407411

408412
wrappedConnErr := unwrapConnectionError(err)
409413
if wrappedConnErr == nil {
410-
return
414+
return driver.NoChange
411415
}
412416

413417
// Ignore transient timeout errors.
414418
if netErr, ok := wrappedConnErr.(net.Error); ok && netErr.Timeout() {
415-
return
419+
return driver.NoChange
416420
}
417421
if wrappedConnErr == context.Canceled || wrappedConnErr == context.DeadlineExceeded {
418-
return
422+
return driver.NoChange
419423
}
420424

421425
// For a non-timeout network error, we clear the pool, set the description to Unknown, and cancel the in-progress
@@ -424,6 +428,7 @@ func (s *Server) ProcessError(err error, conn driver.Connection) {
424428
s.updateDescription(description.NewServerFromError(s.address, err, nil))
425429
s.pool.clear()
426430
s.cancelCheck()
431+
return driver.ConnectionPoolCleared
427432
}
428433

429434
// update handles performing heartbeats and updating any subscribers of the

0 commit comments

Comments
 (0)