Skip to content
33 changes: 28 additions & 5 deletions dbos/system_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"errors"
"fmt"
"log/slog"
"math"
"math/rand"
"net/url"
"strings"
"sync"
Expand Down Expand Up @@ -134,8 +136,11 @@ const (
_DBOS_WORKFLOW_EVENTS_CHANNEL = "dbos_workflow_events_channel"

// Database retry timeouts
_DB_CONNECTION_RETRY_DELAY = 500 * time.Millisecond
_DB_RETRY_INTERVAL = 1 * time.Second
_DB_CONNECTION_RETRY_BASE_DELAY = 500 * time.Millisecond
_DB_CONNECTION_RETRY_FACTOR = 2
_DB_CONNECTION_RETRY_MAX_RETRIES = 3
_DB_CONNECTION_MAX_DELAY = 10000 * time.Millisecond
_DB_RETRY_INTERVAL = 1 * time.Second
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can be a bit more lenient on these:
Max delay: 2 minutes
Max retries: 10
Also the base retry delay should be = to the base interval

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the base interval do you mean the _DB_RETRY_INTERVAL?
The issue #95 showed the variable that was being used for this, which in the code was _DB_CONNECTION_RETRY_DELAY = 500 * time.Millisecond, which I took as default.

Changed the other values, updating tests to match the same.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make _DB_CONNECTION_RETRY_BASE_DELAY equal 1 second. Thanks!

)

func runMigrations(databaseURL string) error {
Expand Down Expand Up @@ -281,7 +286,7 @@ func (s *sysDB) shutdown(ctx context.Context, timeout time.Duration) {
// Allow pgx health checks to complete
// https://github.com/jackc/pgx/blob/15bca4a4e14e0049777c1245dba4c16300fe4fd0/pgxpool/pool.go#L417
// These trigger go-leak alerts
time.Sleep(_DB_CONNECTION_RETRY_DELAY)
time.Sleep(backoffWithJitter(0))

s.launched = false
}
Expand Down Expand Up @@ -1496,6 +1501,7 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) {
}
}

retryAttempt := 0
for {
// Block until a notification is received. OnNotification will be called when a notification is received.
// WaitForNotification handles context cancellation: https://github.com/jackc/pgx/blob/15bca4a4e14e0049777c1245dba4c16300fe4fd0/pgconn/pgconn.go#L1050
Expand All @@ -1514,9 +1520,9 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) {
}

// Other errors - log and retry.
// TODO add exponential backoff + jitter
s.logger.Error("Error waiting for notification", "error", err)
time.Sleep(_DB_CONNECTION_RETRY_DELAY)
time.Sleep(backoffWithJitter(retryAttempt))
retryAttempt += 1
continue
}
}
Expand Down Expand Up @@ -2317,3 +2323,20 @@ func (qb *queryBuilder) addWhereLessEqual(column string, value any) {
qb.whereClauses = append(qb.whereClauses, fmt.Sprintf("%s <= $%d", column, qb.argCounter))
qb.args = append(qb.args, value)
}

func backoffWithJitter(retryAttempt int) time.Duration {
// cap backoff to max number of retries, then do a fixed time delay
// expected retryAttempt to initially be 0, so >= used
if retryAttempt >= _DB_CONNECTION_RETRY_MAX_RETRIES {
return _DB_CONNECTION_RETRY_BASE_DELAY
}
exp := float64(_DB_CONNECTION_RETRY_BASE_DELAY) * math.Pow(_DB_CONNECTION_RETRY_FACTOR, float64(retryAttempt))
// cap delay to maximum of _DB_CONNECTION_MAX_DELAY milliseconds
if exp > float64(_DB_CONNECTION_MAX_DELAY) {
exp = float64(_DB_CONNECTION_MAX_DELAY)
}

// want randomization between +-25% of exp
jitter := 0.75 + rand.Float64()*0.5
return time.Duration(exp * jitter)
}