Skip to content

Commit 165b569

Browse files
Chore: add init exponential backoff + jitter for system_database.go (#103)
Refer to PR #101 (got closed while renaming branch) ## What this PR does Adds exponential backoff and jitter to replace fixed delays in system_database.go ## How it is implemented Replaced delay interval constant by constants ``` _DB_CONNECTION_RETRY_BASE_DELAY = 500 * time.Millisecond _DB_CONNECTION_RETRY_FACTOR = 2 _DB_CONNECTION_RETRY_MAX_RETRIES = 3 _DB_CONNECTION_MAX_DELAY = 10 * time.Second ``` Func backoffWithJitter takes an argument attempt - attempts of backoff delay done yet. Calculates the delay as ``` exp = baseDelay * math.Pow(retryFactor, attempts) jitter = 0.75 + rand.Float64()*0.5 // +-25% of exp to be jitter delay = exp * jitter ``` Capping provided against delay intervals getting too large. If number of retries exceeds a certain limit - fallback to fixed base delay (500ms) Jitter added separately even for fixed intervals. Constants may need to be changed. ## Tests I do have a basic table driven test for this, unsure of which file to add it in. --------- Signed-off-by: pranshu-raj-211 <[email protected]>
1 parent a84a07e commit 165b569

File tree

2 files changed

+80
-5
lines changed

2 files changed

+80
-5
lines changed

dbos/system_database.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"errors"
77
"fmt"
88
"log/slog"
9+
"math"
10+
"math/rand"
911
"strings"
1012
"sync"
1113
"time"
@@ -140,8 +142,11 @@ const (
140142
_DBOS_WORKFLOW_EVENTS_CHANNEL = "dbos_workflow_events_channel"
141143

142144
// Database retry timeouts
143-
_DB_CONNECTION_RETRY_DELAY = 500 * time.Millisecond
144-
_DB_RETRY_INTERVAL = 1 * time.Second
145+
_DB_CONNECTION_RETRY_BASE_DELAY = 1 * time.Second
146+
_DB_CONNECTION_RETRY_FACTOR = 2
147+
_DB_CONNECTION_RETRY_MAX_RETRIES = 10
148+
_DB_CONNECTION_MAX_DELAY = 120 * time.Second
149+
_DB_RETRY_INTERVAL = 1 * time.Second
145150
)
146151

147152
func runMigrations(databaseURL string) error {
@@ -326,7 +331,7 @@ func (s *sysDB) shutdown(ctx context.Context, timeout time.Duration) {
326331
// Allow pgx health checks to complete
327332
// https://github.com/jackc/pgx/blob/15bca4a4e14e0049777c1245dba4c16300fe4fd0/pgxpool/pool.go#L417
328333
// These trigger go-leak alerts
329-
time.Sleep(_DB_CONNECTION_RETRY_DELAY)
334+
time.Sleep(500 * time.Millisecond)
330335

331336
s.launched = false
332337
}
@@ -1542,6 +1547,7 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) {
15421547
}
15431548
}
15441549

1550+
retryAttempt := 0
15451551
for {
15461552
// Block until a notification is received. OnNotification will be called when a notification is received.
15471553
// WaitForNotification handles context cancellation: https://github.com/jackc/pgx/blob/15bca4a4e14e0049777c1245dba4c16300fe4fd0/pgconn/pgconn.go#L1050
@@ -1560,10 +1566,14 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) {
15601566
}
15611567

15621568
// Other errors - log and retry.
1563-
// TODO add exponential backoff + jitter
15641569
s.logger.Error("Error waiting for notification", "error", err)
1565-
time.Sleep(_DB_CONNECTION_RETRY_DELAY)
1570+
time.Sleep(backoffWithJitter(retryAttempt))
1571+
retryAttempt += 1
15661572
continue
1573+
} else {
1574+
if retryAttempt > 0 {
1575+
retryAttempt -= 1
1576+
}
15671577
}
15681578
}
15691579
}
@@ -2367,3 +2377,17 @@ func (qb *queryBuilder) addWhereLessEqual(column string, value any) {
23672377
qb.whereClauses = append(qb.whereClauses, fmt.Sprintf("%s <= $%d", column, qb.argCounter))
23682378
qb.args = append(qb.args, value)
23692379
}
2380+
2381+
func backoffWithJitter(retryAttempt int) time.Duration {
2382+
exp := float64(_DB_CONNECTION_RETRY_BASE_DELAY) * math.Pow(_DB_CONNECTION_RETRY_FACTOR, float64(retryAttempt))
2383+
// cap backoff to max number of retries, then do a fixed time delay
2384+
// expected retryAttempt to initially be 0, so >= used
2385+
// cap delay to maximum of _DB_CONNECTION_MAX_DELAY milliseconds
2386+
if retryAttempt >= _DB_CONNECTION_RETRY_MAX_RETRIES || exp > float64(_DB_CONNECTION_MAX_DELAY) {
2387+
exp = float64(_DB_CONNECTION_MAX_DELAY)
2388+
}
2389+
2390+
// want randomization between +-25% of exp
2391+
jitter := 0.75 + rand.Float64()*0.5 // #nosec G404 -- trivial use of math/rand
2392+
return time.Duration(exp * jitter)
2393+
}

dbos/system_database_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package dbos
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
var backoffWithJitterTestcases = []struct {
9+
name string
10+
retryAttempt int
11+
wantMin time.Duration
12+
wantMax time.Duration
13+
}{
14+
{
15+
name: "first retry attempt (0)",
16+
retryAttempt: 0,
17+
wantMin: 750 * time.Millisecond,
18+
wantMax: 1250 * time.Millisecond,
19+
},
20+
{
21+
name: "second retry attempt (1)",
22+
retryAttempt: 1,
23+
wantMin: 1500 * time.Millisecond,
24+
wantMax: 2500 * time.Millisecond,
25+
},
26+
{
27+
name: "ninth retry attempt (8)",
28+
retryAttempt: 8,
29+
wantMin: 90 * time.Second,
30+
wantMax: 150 * time.Second,
31+
},
32+
{
33+
name: "exceeds max retries",
34+
retryAttempt: 10,
35+
wantMin: 90 * time.Second,
36+
wantMax: 150 * time.Second,
37+
},
38+
}
39+
40+
func TestBackoffWithJitter(t *testing.T) {
41+
for _, testcase := range backoffWithJitterTestcases {
42+
t.Run(testcase.name, func(t *testing.T) {
43+
got := backoffWithJitter(testcase.retryAttempt)
44+
45+
if got < testcase.wantMin || got > testcase.wantMax {
46+
t.Errorf("Should be between %v and %v, got=%v, attempt=%v",
47+
testcase.wantMin, testcase.wantMax, got, testcase.retryAttempt)
48+
}
49+
})
50+
}
51+
}

0 commit comments

Comments
 (0)