Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions flow/alerting/classifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ var (
ErrorNotifyPostgresSlotMemalloc = ErrorClass{
Class: "NOTIFY_POSTGRES_SLOT_MEMALLOC", action: NotifyUser,
}
ErrNotifyPostgresCreatingSlotOnReader = ErrorClass{
Class: "NOTIFY_POSTGRES_CREATING_SLOT_ON_READER", action: NotifyUser,
}
// Mongo specific, equivalent to slot invalidation in Postgres
ErrorNotifyChangeStreamHistoryLost = ErrorClass{
Class: "NOTIFY_CHANGE_STREAM_HISTORY_LOST", action: NotifyUser,
Expand Down Expand Up @@ -497,6 +500,9 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) {
return ErrorNotifyPostgresSlotMemalloc, pgErrorInfo
}

if strings.Contains(pgErr.Message, "Create the replication slot from the writer node instead") {
return ErrNotifyPostgresCreatingSlotOnReader, pgErrorInfo
}
// Fall through for other internal errors
return ErrorOther, pgErrorInfo

Expand Down
16 changes: 16 additions & 0 deletions flow/alerting/classifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,22 @@ func TestPostgresInvalidValueForSynchronizedStandbySlots(t *testing.T) {
}, errInfo, "Unexpected error info")
}

func TestPostgresCreatingSlotOnReader(t *testing.T) {
err := &pgconn.PgError{
Severity: "ERROR",
Code: pgerrcode.InternalError,
Message: `ERROR: Creating logical replication slot peerflow_slot_mirror_1cd7f87b__d143__4cea__a247__a2acc5f5b746
is not supported on the Multi-AZ DB cluster reader node.
Create the replication slot from the writer node instead. (SQLSTATE XX000)`,
}
errorClass, errInfo := GetErrorClass(t.Context(), fmt.Errorf("slot error: [slot] error creating replication slot: %w", err))
assert.Equal(t, ErrNotifyPostgresCreatingSlotOnReader, errorClass, "Unexpected error class")
assert.Equal(t, ErrorInfo{
Source: ErrorSourcePostgres,
Code: pgerrcode.InternalError,
}, errInfo, "Unexpected error info")
}

func TestPostgresStaleFileHandleErrorShouldBeRecoverable(t *testing.T) {
// Simulate a stale file handle error
err := &exceptions.PostgresWalError{
Expand Down
Loading