diff --git a/dbos/system_database.go b/dbos/system_database.go index c79f214d..2a7b8a9d 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -6,6 +6,8 @@ import ( "errors" "fmt" "log/slog" + "math" + "math/rand" "strings" "sync" "time" @@ -140,8 +142,11 @@ const ( _DBOS_WORKFLOW_EVENTS_CHANNEL = "dbos_workflow_events_channel" // Database retry timeouts - _DB_CONNECTION_RETRY_DELAY = 500 * time.Millisecond - _DB_RETRY_INTERVAL = 1 * time.Second + _DB_CONNECTION_RETRY_BASE_DELAY = 1 * time.Second + _DB_CONNECTION_RETRY_FACTOR = 2 + _DB_CONNECTION_RETRY_MAX_RETRIES = 10 + _DB_CONNECTION_MAX_DELAY = 120 * time.Second + _DB_RETRY_INTERVAL = 1 * time.Second ) func runMigrations(databaseURL string) error { @@ -326,7 +331,7 @@ func (s *sysDB) shutdown(ctx context.Context, timeout time.Duration) { // Allow pgx health checks to complete // https://github.com/jackc/pgx/blob/15bca4a4e14e0049777c1245dba4c16300fe4fd0/pgxpool/pool.go#L417 // These trigger go-leak alerts - time.Sleep(_DB_CONNECTION_RETRY_DELAY) + time.Sleep(500 * time.Millisecond) s.launched = false } @@ -1542,6 +1547,7 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) { } } + retryAttempt := 0 for { // Block until a notification is received. OnNotification will be called when a notification is received. // WaitForNotification handles context cancellation: https://github.com/jackc/pgx/blob/15bca4a4e14e0049777c1245dba4c16300fe4fd0/pgconn/pgconn.go#L1050 @@ -1560,10 +1566,14 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) { } // Other errors - log and retry. - // TODO add exponential backoff + jitter s.logger.Error("Error waiting for notification", "error", err) - time.Sleep(_DB_CONNECTION_RETRY_DELAY) + time.Sleep(backoffWithJitter(retryAttempt)) + retryAttempt += 1 continue + } else { + if retryAttempt > 0 { + retryAttempt -= 1 + } } } } @@ -2367,3 +2377,17 @@ func (qb *queryBuilder) addWhereLessEqual(column string, value any) { qb.whereClauses = append(qb.whereClauses, fmt.Sprintf("%s <= $%d", column, qb.argCounter)) qb.args = append(qb.args, value) } + +func backoffWithJitter(retryAttempt int) time.Duration { + exp := float64(_DB_CONNECTION_RETRY_BASE_DELAY) * math.Pow(_DB_CONNECTION_RETRY_FACTOR, float64(retryAttempt)) + // cap backoff to max number of retries, then do a fixed time delay + // expected retryAttempt to initially be 0, so >= used + // cap delay to maximum of _DB_CONNECTION_MAX_DELAY milliseconds + if retryAttempt >= _DB_CONNECTION_RETRY_MAX_RETRIES || exp > float64(_DB_CONNECTION_MAX_DELAY) { + exp = float64(_DB_CONNECTION_MAX_DELAY) + } + + // want randomization between +-25% of exp + jitter := 0.75 + rand.Float64()*0.5 // #nosec G404 -- trivial use of math/rand + return time.Duration(exp * jitter) +} diff --git a/dbos/system_database_test.go b/dbos/system_database_test.go new file mode 100644 index 00000000..cc783414 --- /dev/null +++ b/dbos/system_database_test.go @@ -0,0 +1,51 @@ +package dbos + +import ( + "testing" + "time" +) + +var backoffWithJitterTestcases = []struct { + name string + retryAttempt int + wantMin time.Duration + wantMax time.Duration +}{ + { + name: "first retry attempt (0)", + retryAttempt: 0, + wantMin: 750 * time.Millisecond, + wantMax: 1250 * time.Millisecond, + }, + { + name: "second retry attempt (1)", + retryAttempt: 1, + wantMin: 1500 * time.Millisecond, + wantMax: 2500 * time.Millisecond, + }, + { + name: "ninth retry attempt (8)", + retryAttempt: 8, + wantMin: 90 * time.Second, + wantMax: 150 * time.Second, + }, + { + name: "exceeds max retries", + retryAttempt: 10, + wantMin: 90 * time.Second, + wantMax: 150 * time.Second, + }, +} + +func TestBackoffWithJitter(t *testing.T) { + for _, testcase := range backoffWithJitterTestcases { + t.Run(testcase.name, func(t *testing.T) { + got := backoffWithJitter(testcase.retryAttempt) + + if got < testcase.wantMin || got > testcase.wantMax { + t.Errorf("Should be between %v and %v, got=%v, attempt=%v", + testcase.wantMin, testcase.wantMax, got, testcase.retryAttempt) + } + }) + } +}