From d078ed21e62e41bf48def3cedfc6a13e84ff2181 Mon Sep 17 00:00:00 2001 From: pranshu-raj-211 Date: Sat, 30 Aug 2025 15:36:33 +0530 Subject: [PATCH 1/6] init exp backoff + jitter for system_database.go Signed-off-by: pranshu-raj-211 --- dbos/system_database.go | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 661bb909..b73553ce 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -6,7 +6,9 @@ import ( "errors" "fmt" "log/slog" + "math" "net/url" + "rand" "strings" "sync" "time" @@ -134,7 +136,10 @@ const ( _DBOS_WORKFLOW_EVENTS_CHANNEL = "dbos_workflow_events_channel" // Database retry timeouts - _DB_CONNECTION_RETRY_DELAY = 500 * time.Millisecond + _DB_CONNECTION_RETRY_BASE_DELAY = 500 * time.Millisecond + _DB_CONNECTION_RETRY_FACTOR = 2 + _DB_CONNECTION_RETRY_MAX_RETRIES = 3 + _DB_CONNECTION_MAX_DELAY = 10 * time.Second _DB_RETRY_INTERVAL = 1 * time.Second ) @@ -281,7 +286,7 @@ func (s *sysDB) shutdown(ctx context.Context, timeout time.Duration) { // Allow pgx health checks to complete // https://github.com/jackc/pgx/blob/15bca4a4e14e0049777c1245dba4c16300fe4fd0/pgxpool/pool.go#L417 // These trigger go-leak alerts - time.Sleep(_DB_CONNECTION_RETRY_DELAY) + time.Sleep(backoffWithJitter(0)) s.launched = false } @@ -1496,6 +1501,7 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) { } } + retryAttempt := 0 for { // Block until a notification is received. OnNotification will be called when a notification is received. // WaitForNotification handles context cancellation: https://github.com/jackc/pgx/blob/15bca4a4e14e0049777c1245dba4c16300fe4fd0/pgconn/pgconn.go#L1050 @@ -1516,7 +1522,8 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) { // Other errors - log and retry. // TODO add exponential backoff + jitter s.logger.Error("Error waiting for notification", "error", err) - time.Sleep(_DB_CONNECTION_RETRY_DELAY) + time.Sleep(backoffWithJitter(retryAttempt)) + retryAttempt += 1 continue } } @@ -2317,3 +2324,17 @@ func (qb *queryBuilder) addWhereLessEqual(column string, value any) { qb.whereClauses = append(qb.whereClauses, fmt.Sprintf("%s <= $%d", column, qb.argCounter)) qb.args = append(qb.args, value) } + + +func backoffWithJitter(retryAttempt int) time.Duration, error { + exp := float64(base) * math.Pow(_DB_CONNECTION_RETRY_FACTOR, float64(retryAttempt)) + if retryAttempt > _DB_CONNECTION_RETRY_MAX_RETRIES { + return nil, errors.New("Too many retry attempts") + } + if exp > float64(_DB_CONNECTION_MAX_DELAY){ + exp = float64(_DB_CONNECTION_MAX_DELAY) + } + + jitter := 0.3 + rand.Float64() + return time.Duration(exp + jitter), nil +} \ No newline at end of file From 4c0e7000f1446a17778488d97077f408a4559445 Mon Sep 17 00:00:00 2001 From: pranshu-raj-211 Date: Sat, 30 Aug 2025 19:18:00 +0530 Subject: [PATCH 2/6] fixed errors, changed function return values not returning errors anymore, if retries exceed max retries then return base fixed delay Signed-off-by: pranshu-raj-211 --- dbos/system_database.go | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index b73553ce..4183de0d 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -7,8 +7,8 @@ import ( "fmt" "log/slog" "math" + "math/rand" "net/url" - "rand" "strings" "sync" "time" @@ -136,11 +136,11 @@ const ( _DBOS_WORKFLOW_EVENTS_CHANNEL = "dbos_workflow_events_channel" // Database retry timeouts - _DB_CONNECTION_RETRY_BASE_DELAY = 500 * time.Millisecond - _DB_CONNECTION_RETRY_FACTOR = 2 + _DB_CONNECTION_RETRY_BASE_DELAY = 500 * time.Millisecond + _DB_CONNECTION_RETRY_FACTOR = 2 _DB_CONNECTION_RETRY_MAX_RETRIES = 3 - _DB_CONNECTION_MAX_DELAY = 10 * time.Second - _DB_RETRY_INTERVAL = 1 * time.Second + _DB_CONNECTION_MAX_DELAY = 10000 * time.Millisecond + _DB_RETRY_INTERVAL = 1 * time.Second ) func runMigrations(databaseURL string) error { @@ -1520,7 +1520,6 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) { } // Other errors - log and retry. - // TODO add exponential backoff + jitter s.logger.Error("Error waiting for notification", "error", err) time.Sleep(backoffWithJitter(retryAttempt)) retryAttempt += 1 @@ -2325,16 +2324,18 @@ func (qb *queryBuilder) addWhereLessEqual(column string, value any) { qb.args = append(qb.args, value) } - -func backoffWithJitter(retryAttempt int) time.Duration, error { - exp := float64(base) * math.Pow(_DB_CONNECTION_RETRY_FACTOR, float64(retryAttempt)) - if retryAttempt > _DB_CONNECTION_RETRY_MAX_RETRIES { - return nil, errors.New("Too many retry attempts") +func backoffWithJitter(retryAttempt int) (time.Duration) { + // cap backoff to max number of retries, then do a fixed time delay + // expected retryAttempt to initially be 0, so >= used + if retryAttempt >= _DB_CONNECTION_RETRY_MAX_RETRIES { + return _DB_CONNECTION_RETRY_BASE_DELAY } - if exp > float64(_DB_CONNECTION_MAX_DELAY){ + exp := float64(_DB_CONNECTION_RETRY_BASE_DELAY) * math.Pow(_DB_CONNECTION_RETRY_FACTOR, float64(retryAttempt)) + // cap delay to maximum of _DB_CONNECTION_MAX_DELAY milliseconds + if exp > float64(_DB_CONNECTION_MAX_DELAY) { exp = float64(_DB_CONNECTION_MAX_DELAY) } - - jitter := 0.3 + rand.Float64() - return time.Duration(exp + jitter), nil -} \ No newline at end of file + + jitter := 0.5 + rand.Float64() + return time.Duration(exp + jitter) +} From 87157f57e51543b2260c6fc39d31ae022d98a503 Mon Sep 17 00:00:00 2001 From: pranshu-raj-211 Date: Mon, 1 Sep 2025 08:07:07 +0530 Subject: [PATCH 3/6] fixes to jitter calculation use percentage of exp rather than small delay Signed-off-by: pranshu-raj-211 --- dbos/system_database.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 4183de0d..c45341f5 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -2324,7 +2324,7 @@ func (qb *queryBuilder) addWhereLessEqual(column string, value any) { qb.args = append(qb.args, value) } -func backoffWithJitter(retryAttempt int) (time.Duration) { +func backoffWithJitter(retryAttempt int) time.Duration { // cap backoff to max number of retries, then do a fixed time delay // expected retryAttempt to initially be 0, so >= used if retryAttempt >= _DB_CONNECTION_RETRY_MAX_RETRIES { @@ -2336,6 +2336,7 @@ func backoffWithJitter(retryAttempt int) (time.Duration) { exp = float64(_DB_CONNECTION_MAX_DELAY) } - jitter := 0.5 + rand.Float64() - return time.Duration(exp + jitter) + // want randomization between +-25% of exp + jitter := 0.75 + rand.Float64()*0.5 + return time.Duration(exp * jitter) } From 8efc971daf3673c81bc39ce0cebdfb05f4ad0a6d Mon Sep 17 00:00:00 2001 From: pranshu-raj-211 Date: Thu, 4 Sep 2025 16:38:17 +0530 Subject: [PATCH 4/6] changes as per comments, add tests make base delay constant 1 second, decrement retryAttempt in notificationListenerLoop, hardcode sleep duration in shutdown, slight change in logic - cap delays to max of 120 seconds, but allow for jitter in that too, add tests according to calculations Signed-off-by: pranshu-raj-211 --- dbos/system_database.go | 19 +++++++------- dbos/system_database_test.go | 51 ++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 9 deletions(-) create mode 100644 dbos/system_database_test.go diff --git a/dbos/system_database.go b/dbos/system_database.go index 140de440..15b45bab 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -136,10 +136,10 @@ const ( _DBOS_WORKFLOW_EVENTS_CHANNEL = "dbos_workflow_events_channel" // Database retry timeouts - _DB_CONNECTION_RETRY_BASE_DELAY = 500 * time.Millisecond + _DB_CONNECTION_RETRY_BASE_DELAY = 1 * time.Second _DB_CONNECTION_RETRY_FACTOR = 2 - _DB_CONNECTION_RETRY_MAX_RETRIES = 3 - _DB_CONNECTION_MAX_DELAY = 10000 * time.Millisecond + _DB_CONNECTION_RETRY_MAX_RETRIES = 10 + _DB_CONNECTION_MAX_DELAY = 120 * time.Second _DB_RETRY_INTERVAL = 1 * time.Second ) @@ -293,7 +293,7 @@ func (s *sysDB) shutdown(ctx context.Context, timeout time.Duration) { // Allow pgx health checks to complete // https://github.com/jackc/pgx/blob/15bca4a4e14e0049777c1245dba4c16300fe4fd0/pgxpool/pool.go#L417 // These trigger go-leak alerts - time.Sleep(backoffWithJitter(0)) + time.Sleep(500 * time.Second) s.launched = false } @@ -1532,6 +1532,10 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) { time.Sleep(backoffWithJitter(retryAttempt)) retryAttempt += 1 continue + } else { + if retryAttempt > 0 { + retryAttempt -= 1 + } } } } @@ -2337,14 +2341,11 @@ func (qb *queryBuilder) addWhereLessEqual(column string, value any) { } func backoffWithJitter(retryAttempt int) time.Duration { + exp := float64(_DB_CONNECTION_RETRY_BASE_DELAY) * math.Pow(_DB_CONNECTION_RETRY_FACTOR, float64(retryAttempt)) // cap backoff to max number of retries, then do a fixed time delay // expected retryAttempt to initially be 0, so >= used - if retryAttempt >= _DB_CONNECTION_RETRY_MAX_RETRIES { - return _DB_CONNECTION_RETRY_BASE_DELAY - } - exp := float64(_DB_CONNECTION_RETRY_BASE_DELAY) * math.Pow(_DB_CONNECTION_RETRY_FACTOR, float64(retryAttempt)) // cap delay to maximum of _DB_CONNECTION_MAX_DELAY milliseconds - if exp > float64(_DB_CONNECTION_MAX_DELAY) { + if retryAttempt >= _DB_CONNECTION_RETRY_MAX_RETRIES || exp > float64(_DB_CONNECTION_MAX_DELAY) { exp = float64(_DB_CONNECTION_MAX_DELAY) } diff --git a/dbos/system_database_test.go b/dbos/system_database_test.go new file mode 100644 index 00000000..cc783414 --- /dev/null +++ b/dbos/system_database_test.go @@ -0,0 +1,51 @@ +package dbos + +import ( + "testing" + "time" +) + +var backoffWithJitterTestcases = []struct { + name string + retryAttempt int + wantMin time.Duration + wantMax time.Duration +}{ + { + name: "first retry attempt (0)", + retryAttempt: 0, + wantMin: 750 * time.Millisecond, + wantMax: 1250 * time.Millisecond, + }, + { + name: "second retry attempt (1)", + retryAttempt: 1, + wantMin: 1500 * time.Millisecond, + wantMax: 2500 * time.Millisecond, + }, + { + name: "ninth retry attempt (8)", + retryAttempt: 8, + wantMin: 90 * time.Second, + wantMax: 150 * time.Second, + }, + { + name: "exceeds max retries", + retryAttempt: 10, + wantMin: 90 * time.Second, + wantMax: 150 * time.Second, + }, +} + +func TestBackoffWithJitter(t *testing.T) { + for _, testcase := range backoffWithJitterTestcases { + t.Run(testcase.name, func(t *testing.T) { + got := backoffWithJitter(testcase.retryAttempt) + + if got < testcase.wantMin || got > testcase.wantMax { + t.Errorf("Should be between %v and %v, got=%v, attempt=%v", + testcase.wantMin, testcase.wantMax, got, testcase.retryAttempt) + } + }) + } +} From 73fa46059283ebf09577ff1497c35dfb8ca935e8 Mon Sep 17 00:00:00 2001 From: pranshu-raj-211 Date: Thu, 4 Sep 2025 22:19:08 +0530 Subject: [PATCH 5/6] fix test error - use millisecond, ignore math/rand security issue Signed-off-by: pranshu-raj-211 --- dbos/system_database.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 15b45bab..5bfa373f 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -293,7 +293,7 @@ func (s *sysDB) shutdown(ctx context.Context, timeout time.Duration) { // Allow pgx health checks to complete // https://github.com/jackc/pgx/blob/15bca4a4e14e0049777c1245dba4c16300fe4fd0/pgxpool/pool.go#L417 // These trigger go-leak alerts - time.Sleep(500 * time.Second) + time.Sleep(500 * time.Millisecond) s.launched = false } @@ -2350,6 +2350,6 @@ func backoffWithJitter(retryAttempt int) time.Duration { } // want randomization between +-25% of exp - jitter := 0.75 + rand.Float64()*0.5 + jitter := 0.75 + rand.Float64()*0.5 //nosec G404 -- trivial use of math/rand return time.Duration(exp * jitter) } From cd16d6d2ef25108e9c11d78c7a32e7a721e680f5 Mon Sep 17 00:00:00 2001 From: pranshu-raj-211 Date: Thu, 4 Sep 2025 23:03:34 +0530 Subject: [PATCH 6/6] missed # --- dbos/system_database.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 5bfa373f..4500e6ec 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -2350,6 +2350,6 @@ func backoffWithJitter(retryAttempt int) time.Duration { } // want randomization between +-25% of exp - jitter := 0.75 + rand.Float64()*0.5 //nosec G404 -- trivial use of math/rand + jitter := 0.75 + rand.Float64()*0.5 // #nosec G404 -- trivial use of math/rand return time.Duration(exp * jitter) }