Skip to content

Commit 319b7f9

Browse files
MattWhelanMatt Whelansravotto
authored
Add flexible retry backoff policies with configurable delays(#182)
Introduce a new retry policy system that allows configurable backoff strategies for transaction retries. Add RetryPolicy interface and RetryFunc type to support pluggable retry strategies with customizable delays between attempts. Implement LimitBackoffRetryPolicy for fixed-delay retries and ExpBackoffRetryPolicy for exponential backoff with optional maximum delay caps. Add ExternalBackoffPolicy to integrate third-party backoff libraries (e.g., github.com/sethvargo/go-retry) without creating direct dependencies. Define UnlimitedRetries (0) and NoRetries (-1) sentinel constants for clearer retry limit semantics. Both retry policies consistently handle RetryLimit: positive values limit retries, 0 enables unlimited retries, and negative values disable retries entirely. Add context-based retry policy configuration via WithRetryPolicy, WithMaxRetries, and WithNoRetries. Optimize transaction retry implementation to avoid unnecessary SAVEPOINTs when using backoff policies. --------- Co-authored-by: Matt Whelan <matt.whelan@cockroachlabs.com> Co-authored-by: Silvano Ravotto <silvano@cockroachlabs.com>
1 parent 379e970 commit 319b7f9

File tree

7 files changed

+868
-79
lines changed

7 files changed

+868
-79
lines changed

crdb/common.go

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414

1515
package crdb
1616

17-
import "context"
17+
import (
18+
"context"
19+
"time"
20+
)
1821

1922
// Tx abstracts the operations needed by ExecuteInTx so that different
2023
// frameworks (e.g. go's sql package, pgx, gorm) can be used with ExecuteInTx.
@@ -60,8 +63,10 @@ func ExecuteInTx(ctx context.Context, tx Tx, fn func() error) (err error) {
6063
return err
6164
}
6265

63-
maxRetries := numRetriesFromContext(ctx)
64-
retryCount := 0
66+
// establish the retry policy
67+
retryPolicy := getRetryPolicy(ctx)
68+
// set up the retry policy state
69+
retryFunc := retryPolicy.NewRetry()
6570
for {
6671
releaseFailed := false
6772
err = fn()
@@ -82,13 +87,48 @@ func ExecuteInTx(ctx context.Context, tx Tx, fn func() error) (err error) {
8287
return err
8388
}
8489

85-
if rollbackErr := tx.Exec(ctx, "ROLLBACK TO SAVEPOINT cockroach_restart"); rollbackErr != nil {
86-
return newTxnRestartError(rollbackErr, err)
90+
// We have a retryable error. Check the retry policy.
91+
delay, retryErr := retryFunc(err)
92+
// Check if the context has been cancelled
93+
if ctxErr := ctx.Err(); ctxErr != nil {
94+
return ctxErr
95+
}
96+
if delay > 0 && retryErr == nil {
97+
// When backoff is needed, we don't want to hold locks while waiting for a backoff,
98+
// so restart the entire transaction:
99+
// - tx.Exec(ctx, "ROLLBACK") sends SQL to the server:
100+
// it doesn't call tx.Rollback() (which would close the Go sql.Tx object)
101+
// - The underlying connection remains open: the *sql.Tx wrapper maintains the database connection.
102+
// Only the server-side transaction is rolled back.
103+
// - tx.Exec(ctx, "BEGIN") starts a new server-side transaction on the same connection wrapped by the
104+
// same *sql.Tx object
105+
// - The defer handles cleanup - It calls tx.Rollback() (the Go method) only on errors,
106+
// which closes the Go object and returns the connection to the pool
107+
if restartErr := tx.Exec(ctx, "ROLLBACK"); restartErr != nil {
108+
return newTxnRestartError(restartErr, err, "ROLLBACK")
109+
}
110+
if restartErr := tx.Exec(ctx, "BEGIN"); restartErr != nil {
111+
return newTxnRestartError(restartErr, err, "BEGIN")
112+
}
113+
if restartErr := tx.Exec(ctx, "SAVEPOINT cockroach_restart"); restartErr != nil {
114+
return newTxnRestartError(restartErr, err, "SAVEPOINT cockroach_restart")
115+
}
116+
} else {
117+
if rollbackErr := tx.Exec(ctx, "ROLLBACK TO SAVEPOINT cockroach_restart"); rollbackErr != nil {
118+
return newTxnRestartError(rollbackErr, err, "ROLLBACK TO SAVEPOINT cockroach_restart")
119+
}
120+
}
121+
122+
if retryErr != nil {
123+
return retryErr
87124
}
88125

89-
retryCount++
90-
if maxRetries > 0 && retryCount > maxRetries {
91-
return newMaxRetriesExceededError(err, maxRetries)
126+
if delay > 0 {
127+
select {
128+
case <-time.After(delay):
129+
case <-ctx.Done():
130+
return ctx.Err()
131+
}
92132
}
93133
}
94134
}

crdb/error.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,13 @@ type TxnRestartError struct {
6666
msg string
6767
}
6868

69-
func newTxnRestartError(err error, retryErr error) *TxnRestartError {
70-
const msgPattern = "restarting txn failed. ROLLBACK TO SAVEPOINT " +
69+
func newTxnRestartError(err error, retryErr error, op string) *TxnRestartError {
70+
const msgPattern = "restarting txn failed. %s " +
7171
"encountered error: %s. Original error: %s."
7272
return &TxnRestartError{
7373
txError: txError{cause: err},
7474
retryCause: retryErr,
75-
msg: fmt.Sprintf(msgPattern, err, retryErr),
75+
msg: fmt.Sprintf(msgPattern, op, err, retryErr),
7676
}
7777
}
7878

crdb/retry.go

Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
// implied. See the License for the specific language governing
13+
// permissions and limitations under the License.
14+
15+
package crdb
16+
17+
import (
18+
"time"
19+
)
20+
21+
// RetryFunc owns the state for a transaction retry operation. Usually, this is
22+
// just the retry count. RetryFunc is not assumed to be safe for concurrent use.
23+
//
24+
// The function is called after each retryable error to determine whether to
25+
// retry and how long to wait. It receives the retryable error that triggered
26+
// the retry attempt.
27+
//
28+
// Return values:
29+
// - duration: The delay to wait before the next retry attempt. If 0, retry
30+
// immediately without delay.
31+
// - error: If non-nil, stops retrying and returns this error to the caller
32+
// (typically a MaxRetriesExceededError). If nil, the retry will proceed
33+
// after the specified duration.
34+
//
35+
// Example behavior:
36+
// - (100ms, nil): Wait 100ms, then retry
37+
// - (0, nil): Retry immediately
38+
// - (0, err): Stop retrying, return err to caller
39+
type RetryFunc func(err error) (time.Duration, error)
40+
41+
// RetryPolicy constructs a new instance of a RetryFunc for each transaction
42+
// it is used with. Instances of RetryPolicy can likely be immutable and
43+
// should be safe for concurrent calls to NewRetry.
44+
type RetryPolicy interface {
45+
NewRetry() RetryFunc
46+
}
47+
48+
const (
49+
// NoRetries is a sentinel value for LimitBackoffRetryPolicy.RetryLimit
50+
// indicating that no retries should be attempted. When a policy has
51+
// RetryLimit set to NoRetries, the transaction will be attempted only
52+
// once, and any retryable error will immediately return a
53+
// MaxRetriesExceededError.
54+
//
55+
// Use WithNoRetries(ctx) to create a context with this behavior.
56+
NoRetries = -1
57+
58+
// UnlimitedRetries indicates that retries should continue indefinitely
59+
// until the transaction succeeds or a non-retryable error occurs. This
60+
// is represented by setting RetryLimit to 0.
61+
//
62+
// Use WithMaxRetries(ctx, 0) to create a context with unlimited retries,
63+
// though this is generally not recommended in production as it can lead
64+
// to infinite retry loops.
65+
UnlimitedRetries = 0
66+
)
67+
68+
// LimitBackoffRetryPolicy implements RetryPolicy with a configurable retry limit
69+
// and optional constant delay between retries.
70+
//
71+
// The RetryLimit field controls retry behavior:
72+
// - Positive value (e.g., 10): Retry up to that many times before failing
73+
// - UnlimitedRetries (0): Retry indefinitely until success or non-retryable error
74+
// - NoRetries (-1) or any negative value: Do not retry; fail immediately on first retryable error
75+
//
76+
// If Delay is greater than zero, the policy will wait for the specified duration
77+
// between retry attempts.
78+
//
79+
// Example usage with limited retries and no delay:
80+
//
81+
// policy := &LimitBackoffRetryPolicy{
82+
// RetryLimit: 10,
83+
// Delay: 0,
84+
// }
85+
// ctx := crdb.WithRetryPolicy(context.Background(), policy)
86+
// err := crdb.ExecuteTx(ctx, db, nil, func(tx *sql.Tx) error {
87+
// // transaction logic
88+
// })
89+
//
90+
// Example usage with fixed delay between retries:
91+
//
92+
// policy := &LimitBackoffRetryPolicy{
93+
// RetryLimit: 5,
94+
// Delay: 100 * time.Millisecond,
95+
// }
96+
// ctx := crdb.WithRetryPolicy(context.Background(), policy)
97+
//
98+
// Example usage with unlimited retries:
99+
//
100+
// policy := &LimitBackoffRetryPolicy{
101+
// RetryLimit: UnlimitedRetries, // or 0
102+
// Delay: 50 * time.Millisecond,
103+
// }
104+
//
105+
// Note: Convenience functions are available:
106+
// - WithMaxRetries(ctx, n) creates a LimitBackoffRetryPolicy with RetryLimit=n and Delay=0
107+
// - WithNoRetries(ctx) creates a LimitBackoffRetryPolicy with RetryLimit=NoRetries
108+
type LimitBackoffRetryPolicy struct {
109+
// RetryLimit controls the retry behavior:
110+
// - Positive value: Maximum number of retries before returning MaxRetriesExceededError
111+
// - UnlimitedRetries (0): Retry indefinitely
112+
// - NoRetries (-1) or any negative value: Do not retry, fail immediately
113+
RetryLimit int
114+
115+
// Delay is the fixed duration to wait between retry attempts. If 0,
116+
// retries happen immediately without delay.
117+
Delay time.Duration
118+
}
119+
120+
// NewRetry implements RetryPolicy.
121+
func (l *LimitBackoffRetryPolicy) NewRetry() RetryFunc {
122+
tryCount := 0
123+
return func(err error) (time.Duration, error) {
124+
tryCount++
125+
// Any negative value (including NoRetries) means fail immediately
126+
if l.RetryLimit < UnlimitedRetries {
127+
return 0, newMaxRetriesExceededError(err, 0)
128+
}
129+
// UnlimitedRetries (0) means retry indefinitely, so skip the limit check
130+
// Any positive value enforces the retry limit
131+
if l.RetryLimit > UnlimitedRetries && tryCount > l.RetryLimit {
132+
return 0, newMaxRetriesExceededError(err, l.RetryLimit)
133+
}
134+
return l.Delay, nil
135+
}
136+
}
137+
138+
// ExpBackoffRetryPolicy implements RetryPolicy using an exponential backoff strategy
139+
// where delays double with each retry attempt, with an optional maximum delay cap.
140+
//
141+
// The delay between retries doubles with each attempt, starting from BaseDelay:
142+
// - Retry 1: BaseDelay
143+
// - Retry 2: BaseDelay * 2
144+
// - Retry 3: BaseDelay * 4
145+
// - Retry N: BaseDelay * 2^(N-1)
146+
//
147+
// If MaxDelay is set (> 0), the delay is capped at that value once reached.
148+
// This prevents excessive wait times during high retry counts and provides a
149+
// predictable upper bound for backoff duration.
150+
//
151+
// The RetryLimit field controls retry behavior:
152+
// - Positive value (e.g., 10): Retry up to that many times before failing
153+
// - UnlimitedRetries (0): Retry indefinitely until success or non-retryable error
154+
// - NoRetries (-1) or any negative value: Do not retry; fail immediately on first retryable error
155+
//
156+
// When the limit is exceeded or if the delay calculation overflows without a
157+
// MaxDelay set, it returns a MaxRetriesExceededError.
158+
//
159+
// Example usage with capped exponential backoff:
160+
//
161+
// policy := &ExpBackoffRetryPolicy{
162+
// RetryLimit: 10,
163+
// BaseDelay: 100 * time.Millisecond,
164+
// MaxDelay: 5 * time.Second,
165+
// }
166+
// ctx := crdb.WithRetryPolicy(context.Background(), policy)
167+
// err := crdb.ExecuteTx(ctx, db, nil, func(tx *sql.Tx) error {
168+
// // transaction logic that may encounter retryable errors
169+
// return tx.ExecContext(ctx, "UPDATE ...")
170+
// })
171+
//
172+
// This configuration produces delays: 100ms, 200ms, 400ms, 800ms, 1.6s, 3.2s,
173+
// then stays at 5s for all subsequent retries.
174+
//
175+
// Example usage with unbounded exponential backoff:
176+
//
177+
// policy := &ExpBackoffRetryPolicy{
178+
// RetryLimit: 5,
179+
// BaseDelay: 1 * time.Second,
180+
// MaxDelay: 0, // no cap
181+
// }
182+
//
183+
// This configuration produces delays: 1s, 2s, 4s, 8s, 16s.
184+
// Note: Setting MaxDelay to 0 means no cap, but be aware that delay overflow
185+
// will cause the policy to fail early.
186+
type ExpBackoffRetryPolicy struct {
187+
// RetryLimit controls the retry behavior:
188+
// - Positive value: Maximum number of retries before returning MaxRetriesExceededError
189+
// - UnlimitedRetries (0): Retry indefinitely
190+
// - NoRetries (-1) or any negative value: Do not retry, fail immediately
191+
RetryLimit int
192+
193+
// BaseDelay is the initial delay before the first retry. Each subsequent
194+
// retry doubles this value: delay = BaseDelay * 2^(attempt-1).
195+
BaseDelay time.Duration
196+
197+
// MaxDelay is the maximum delay cap. If > 0, delays are capped at this
198+
// value once reached. If 0, delays grow unbounded (until overflow, which
199+
// causes early termination).
200+
MaxDelay time.Duration
201+
}
202+
203+
// NewRetry implements RetryPolicy.
204+
func (l *ExpBackoffRetryPolicy) NewRetry() RetryFunc {
205+
tryCount := 0
206+
return func(err error) (time.Duration, error) {
207+
tryCount++
208+
// Any negative value (including NoRetries) means fail immediately
209+
if l.RetryLimit < UnlimitedRetries {
210+
return 0, newMaxRetriesExceededError(err, 0)
211+
}
212+
// UnlimitedRetries (0) means retry indefinitely, so skip the limit check
213+
// Any positive value enforces the retry limit
214+
if l.RetryLimit > UnlimitedRetries && tryCount > l.RetryLimit {
215+
return 0, newMaxRetriesExceededError(err, l.RetryLimit)
216+
}
217+
delay := l.BaseDelay << (tryCount - 1)
218+
if l.MaxDelay > 0 && delay > l.MaxDelay {
219+
return l.MaxDelay, nil
220+
}
221+
if delay < l.BaseDelay {
222+
// We've overflowed.
223+
if l.MaxDelay > 0 {
224+
return l.MaxDelay, nil
225+
}
226+
// There's no max delay. Giving up is probably better in
227+
// practice than using a 290-year MAX_INT delay.
228+
return 0, newMaxRetriesExceededError(err, tryCount)
229+
}
230+
return delay, nil
231+
}
232+
}
233+
234+
// ExternalBackoffPolicy adapts third-party backoff strategies
235+
// (like those from github.com/sethvargo/go-retry)
236+
// into a RetryPolicy without creating a direct dependency on those libraries.
237+
//
238+
// This function allows you to use any backoff implementation that conforms to the
239+
// ExternalBackoff interface, providing flexibility to integrate external retry strategies
240+
// with CockroachDB transaction retries.
241+
//
242+
// Example usage with a hypothetical external backoff library:
243+
//
244+
// import retry "github.com/sethvargo/go-retry"
245+
//
246+
// // Create a retry policy using an external backoff strategy
247+
// policy := crdb.ExternalBackoffPolicy(func() crdb.ExternalBackoff {
248+
// // Fibonacci backoff: 1s, 1s, 2s, 3s, 5s, 8s...
249+
// return retry.NewFibonacci(1 * time.Second)
250+
// })
251+
// ctx := crdb.WithRetryPolicy(context.Background(), policy)
252+
// err := crdb.ExecuteTx(ctx, db, nil, func(tx *sql.Tx) error {
253+
// // transaction logic
254+
// })
255+
//
256+
// The function parameter should return a fresh ExternalBackoff instance for each
257+
// transaction, as backoff state is not safe for concurrent use.
258+
func ExternalBackoffPolicy(fn func() ExternalBackoff) RetryPolicy {
259+
return &externalBackoffAdapter{
260+
DelegateFactory: fn,
261+
}
262+
}
263+
264+
// ExternalBackoff is an interface for external backoff strategies that provide
265+
// delays through a Next() method. This allows adaptation of backoff policies
266+
// from libraries like github.com/sethvargo/go-retry without creating a direct
267+
// dependency.
268+
//
269+
// Next returns the next backoff duration and a boolean indicating whether to
270+
// stop retrying. When stop is true, the retry loop terminates with a
271+
// MaxRetriesExceededError.
272+
type ExternalBackoff interface {
273+
// Next returns the next delay duration and whether to stop retrying.
274+
// When stop is true, no more retries will be attempted.
275+
Next() (next time.Duration, stop bool)
276+
}
277+
278+
// externalBackoffAdapter adapts backoff policies in the style of github.com/sethvargo/go-retry.
279+
type externalBackoffAdapter struct {
280+
DelegateFactory func() ExternalBackoff
281+
}
282+
283+
// NewRetry implements RetryPolicy by delegating to the external backoff strategy.
284+
// It creates a fresh backoff instance using DelegateFactory and wraps its Next()
285+
// method to conform to the RetryFunc signature.
286+
func (b *externalBackoffAdapter) NewRetry() RetryFunc {
287+
delegate := b.DelegateFactory()
288+
count := 0
289+
return func(err error) (time.Duration, error) {
290+
count++
291+
d, stop := delegate.Next()
292+
if stop {
293+
return 0, newMaxRetriesExceededError(err, count)
294+
}
295+
return d, nil
296+
}
297+
}

0 commit comments

Comments
 (0)