Skip to content

Commit f58daeb

Browse files
authored
mongo error clean up (#3591)
`changestream.Err()` calls [`wrapErrors`](https://github.com/mongodb/mongo-go-driver/blob/1e7cea0fc96b08059915a3d1b0903be2c7c1ed0c/mongo/change_stream.go#L569-L579) which [_attempts_ to wraps errors in one of CommandError, MarshalError, MongocryptError, etc.](https://github.com/mongodb/mongo-go-driver/blob/1e7cea0fc96b08059915a3d1b0903be2c7c1ed0c/mongo/errors.go#L90-L106), but errors can fall thru if it doesn't match one of the known error targets, and the original error is returned. This PR updates driver.Error matching with the wrapped errors. There's also been a few iterations of this error handling such that this section no longer makes sense (comment says we retry, but code has since been changed to notify, but notify to all retryable errors seems too aggressive): ``` // some error codes are defined to be retryable by the driver, so we retry them var retryableError RetryableError if errors.As(mongoErr, &retryableError) && retryableError.Retryable() { return ErrorNotifyConnectivity, mongoErrorInfo } ``` So instead notify on the errors we've seen so far: ShutdownInProgress (which has code=91). Would be better if we are more explicit about which one to retry and which one to notify, as they come up in practice. Finally, fixes https://linear.app/clickhouse/issue/DBI-111/classify-connection-pool-error-with-mongodb-cluster by making it retryable when "connection reset by peers" is detected.
1 parent 3712b33 commit f58daeb

File tree

2 files changed

+50
-39
lines changed

2 files changed

+50
-39
lines changed

flow/alerting/classifier.go

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/go-mysql-org/go-mysql/mysql"
1717
"github.com/jackc/pgerrcode"
1818
"github.com/jackc/pgx/v5/pgconn"
19+
"go.mongodb.org/mongo-driver/v2/mongo"
1920
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
2021
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/topology"
2122
"go.temporal.io/sdk/temporal"
@@ -92,11 +93,6 @@ type ErrorClass struct {
9293
action ErrorAction
9394
}
9495

95-
type RetryableError interface {
96-
error
97-
Retryable() bool
98-
}
99-
10096
var (
10197
ErrorNotifyDestinationModified = ErrorClass{
10298
Class: "NOTIFY_DESTINATION_MODIFIED", action: NotifyUser,
@@ -610,41 +606,57 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) {
610606
}
611607
}
612608

613-
var mongoErr driver.Error
614-
if errors.As(err, &mongoErr) {
615-
// https://www.mongodb.com/docs/manual/reference/error-codes/
609+
var mongoCmdErr mongo.CommandError
610+
if errors.As(err, &mongoCmdErr) {
616611
mongoErrorInfo := ErrorInfo{
617612
Source: ErrorSourceMongoDB,
618-
Code: strconv.Itoa(int(mongoErr.Code)),
613+
Code: strconv.Itoa(int(mongoCmdErr.Code)),
619614
}
620615

621-
if mongoErr.RetryableRead() {
622-
return ErrorNotifyConnectivity, mongoErrorInfo
616+
if mongoCmdErr.HasErrorMessage("connection reset by peer") {
617+
return ErrorRetryRecoverable, mongoErrorInfo
623618
}
624619

625-
// some driver errors do not provide error code, such as `poolClearedError`, so we check label instead
626-
for _, label := range mongoErr.Labels {
627-
if label == driver.TransientTransactionError {
628-
return ErrorNotifyConnectivity, mongoErrorInfo
629-
}
620+
// this often happens on Mongo Atlas as part of maintenance, and should recover, but we notify if exceed default threshold
621+
// (ShutdownInProgress code should be 91, but we have observed 0 in the past, so string match to be safe)
622+
if mongoCmdErr.HasErrorMessage("(ShutdownInProgress) The server is in quiesce mode and will shut down") {
623+
return ErrorNotifyConnectivity, mongoErrorInfo
630624
}
631625

632-
// some error codes are defined to be retryable by the driver, so we retry them
633-
var retryableError RetryableError
634-
if errors.As(mongoErr, &retryableError) && retryableError.Retryable() {
626+
// This should recover, but we notify if exceed default threshold
627+
if mongoCmdErr.HasErrorLabel(driver.TransientTransactionError) {
635628
return ErrorNotifyConnectivity, mongoErrorInfo
636629
}
637630

638-
switch mongoErr.Code {
631+
// https://www.mongodb.com/docs/manual/reference/error-codes/
632+
switch mongoCmdErr.Code {
639633
case 13: // Unauthorized
640634
return ErrorNotifyConnectivity, mongoErrorInfo
635+
case 91: // ShutdownInProgress
636+
return ErrorNotifyConnectivity, mongoErrorInfo
641637
case 286: // ChangeStreamHistoryLost
642638
return ErrorNotifyChangeStreamHistoryLost, mongoErrorInfo
643639
default:
644640
return ErrorOther, mongoErrorInfo
645641
}
646642
}
647643

644+
var mongoMarshalErr mongo.MarshalError
645+
if errors.As(err, &mongoMarshalErr) {
646+
return ErrorOther, ErrorInfo{
647+
Source: ErrorSourceMongoDB,
648+
Code: "MARSHAL_ERROR",
649+
}
650+
}
651+
652+
var mongoEncryptError mongo.MongocryptError
653+
if errors.As(err, &mongoEncryptError) {
654+
return ErrorOther, ErrorInfo{
655+
Source: ErrorSourceMongoDB,
656+
Code: "MONGOCRYPT_ERROR",
657+
}
658+
}
659+
648660
var mongoServerError topology.ServerSelectionError
649661
if errors.As(err, &mongoServerError) {
650662
return ErrorNotifyConnectivity, ErrorInfo{
@@ -653,6 +665,14 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) {
653665
}
654666
}
655667

668+
var mongoConnError topology.ConnectionError
669+
if errors.As(err, &mongoConnError) {
670+
return ErrorNotifyConnectivity, ErrorInfo{
671+
Source: ErrorSourceMongoDB,
672+
Code: "CONNECTION_ERROR",
673+
}
674+
}
675+
656676
var chException *clickhouse.Exception
657677
if errors.As(err, &chException) {
658678
chErrorInfo := ErrorInfo{

flow/alerting/classifier_test.go

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/jackc/pgx/v5/pgproto3"
1717
"github.com/stretchr/testify/assert"
1818
"github.com/stretchr/testify/require"
19+
"go.mongodb.org/mongo-driver/v2/mongo"
1920
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
2021
"go.temporal.io/sdk/temporal"
2122

@@ -574,30 +575,20 @@ func TestTemporalUnknownErrorShouldBeOther(t *testing.T) {
574575

575576
func TestMongoShutdownInProgressErrorShouldBeRecoverable(t *testing.T) {
576577
// Simulate a MongoDB shutdown in progress error (quiesce mode)
577-
err := driver.Error{
578-
Message: "connection pool for <host>:<port> was cleared because another operation failed with",
579-
Labels: []string{driver.TransientTransactionError},
580-
Wrapped: errors.New("the server is in quiesce mode and will shut down"),
578+
de := driver.Error{
579+
Code: 0,
580+
//nolint:lll
581+
Message: "connection pool for <host>:<port> was cleared because another operation failed with: (ShutdownInProgress) The server is in quiesce mode and will shut down",
581582
}
582-
errorClass, errInfo := GetErrorClass(t.Context(), fmt.Errorf("change stream error: %w", err))
583-
assert.Equal(t, ErrorNotifyConnectivity, errorClass, "Unexpected error class")
584-
assert.Equal(t, ErrorInfo{
585-
Source: ErrorSourceMongoDB,
586-
Code: "0",
587-
}, errInfo, "Unexpected error info")
588-
}
589-
590-
func TestMongoUnauthorizedErrorShouldBeConnectivity(t *testing.T) {
591-
// Simulate a MongoDB unauthorized error
592-
err := driver.Error{
593-
Code: 13,
594-
Message: "Command getMore requires authentication",
595-
Name: "Unauthorized",
583+
err := mongo.CommandError{
584+
Message: de.Message,
585+
Code: de.Code,
586+
Wrapped: de,
596587
}
597588
errorClass, errInfo := GetErrorClass(t.Context(), fmt.Errorf("change stream error: %w", err))
598589
assert.Equal(t, ErrorNotifyConnectivity, errorClass, "Unexpected error class")
599590
assert.Equal(t, ErrorInfo{
600591
Source: ErrorSourceMongoDB,
601-
Code: "13",
592+
Code: "0",
602593
}, errInfo, "Unexpected error info")
603594
}

0 commit comments

Comments
 (0)