Skip to content
34 changes: 29 additions & 5 deletions dbos/system_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"errors"
"fmt"
"log/slog"
"math"
"math/rand"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -140,8 +142,11 @@ const (
_DBOS_WORKFLOW_EVENTS_CHANNEL = "dbos_workflow_events_channel"

// Database retry timeouts
_DB_CONNECTION_RETRY_DELAY = 500 * time.Millisecond
_DB_RETRY_INTERVAL = 1 * time.Second
_DB_CONNECTION_RETRY_BASE_DELAY = 1 * time.Second
_DB_CONNECTION_RETRY_FACTOR = 2
_DB_CONNECTION_RETRY_MAX_RETRIES = 10
_DB_CONNECTION_MAX_DELAY = 120 * time.Second
_DB_RETRY_INTERVAL = 1 * time.Second
)

func runMigrations(databaseURL string) error {
Expand Down Expand Up @@ -326,7 +331,7 @@ func (s *sysDB) shutdown(ctx context.Context, timeout time.Duration) {
// Allow pgx health checks to complete
// https://github.com/jackc/pgx/blob/15bca4a4e14e0049777c1245dba4c16300fe4fd0/pgxpool/pool.go#L417
// These trigger go-leak alerts
time.Sleep(_DB_CONNECTION_RETRY_DELAY)
time.Sleep(500 * time.Millisecond)

s.launched = false
}
Expand Down Expand Up @@ -1542,6 +1547,7 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) {
}
}

retryAttempt := 0
for {
// Block until a notification is received. OnNotification will be called when a notification is received.
// WaitForNotification handles context cancellation: https://github.com/jackc/pgx/blob/15bca4a4e14e0049777c1245dba4c16300fe4fd0/pgconn/pgconn.go#L1050
Expand All @@ -1560,10 +1566,14 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) {
}

// Other errors - log and retry.
// TODO add exponential backoff + jitter
s.logger.Error("Error waiting for notification", "error", err)
time.Sleep(_DB_CONNECTION_RETRY_DELAY)
time.Sleep(backoffWithJitter(retryAttempt))
retryAttempt += 1
continue
} else {
if retryAttempt > 0 {
retryAttempt -= 1
}
}
}
}
Expand Down Expand Up @@ -2367,3 +2377,17 @@ func (qb *queryBuilder) addWhereLessEqual(column string, value any) {
qb.whereClauses = append(qb.whereClauses, fmt.Sprintf("%s <= $%d", column, qb.argCounter))
qb.args = append(qb.args, value)
}

func backoffWithJitter(retryAttempt int) time.Duration {
exp := float64(_DB_CONNECTION_RETRY_BASE_DELAY) * math.Pow(_DB_CONNECTION_RETRY_FACTOR, float64(retryAttempt))
// cap backoff to max number of retries, then do a fixed time delay
// expected retryAttempt to initially be 0, so >= used
// cap delay to maximum of _DB_CONNECTION_MAX_DELAY milliseconds
if retryAttempt >= _DB_CONNECTION_RETRY_MAX_RETRIES || exp > float64(_DB_CONNECTION_MAX_DELAY) {
exp = float64(_DB_CONNECTION_MAX_DELAY)
}

// want randomization between +-25% of exp
jitter := 0.75 + rand.Float64()*0.5 // #nosec G404 -- trivial use of math/rand
return time.Duration(exp * jitter)
}
51 changes: 51 additions & 0 deletions dbos/system_database_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package dbos

import (
"testing"
"time"
)

var backoffWithJitterTestcases = []struct {
name string
retryAttempt int
wantMin time.Duration
wantMax time.Duration
}{
{
name: "first retry attempt (0)",
retryAttempt: 0,
wantMin: 750 * time.Millisecond,
wantMax: 1250 * time.Millisecond,
},
{
name: "second retry attempt (1)",
retryAttempt: 1,
wantMin: 1500 * time.Millisecond,
wantMax: 2500 * time.Millisecond,
},
{
name: "ninth retry attempt (8)",
retryAttempt: 8,
wantMin: 90 * time.Second,
wantMax: 150 * time.Second,
},
{
name: "exceeds max retries",
retryAttempt: 10,
wantMin: 90 * time.Second,
wantMax: 150 * time.Second,
},
}

func TestBackoffWithJitter(t *testing.T) {
for _, testcase := range backoffWithJitterTestcases {
t.Run(testcase.name, func(t *testing.T) {
got := backoffWithJitter(testcase.retryAttempt)

if got < testcase.wantMin || got > testcase.wantMax {
t.Errorf("Should be between %v and %v, got=%v, attempt=%v",
testcase.wantMin, testcase.wantMax, got, testcase.retryAttempt)
}
})
}
}
Loading