Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions crdb/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package crdb

import "context"
import (
"context"
"time"
)

// Tx abstracts the operations needed by ExecuteInTx so that different
// frameworks (e.g. go's sql package, pgx, gorm) can be used with ExecuteInTx.
Expand Down Expand Up @@ -60,8 +63,10 @@ func ExecuteInTx(ctx context.Context, tx Tx, fn func() error) (err error) {
return err
}

maxRetries := numRetriesFromContext(ctx)
retryCount := 0
// establish the retry policy
retryPolicy := getRetryPolicy(ctx)
// set up the retry policy state
retryFunc := retryPolicy.NewRetry()
for {
releaseFailed := false
err = fn()
Expand All @@ -82,13 +87,35 @@ func ExecuteInTx(ctx context.Context, tx Tx, fn func() error) (err error) {
return err
}

if rollbackErr := tx.Exec(ctx, "ROLLBACK TO SAVEPOINT cockroach_restart"); rollbackErr != nil {
return newTxnRestartError(rollbackErr, err)
// We have a retryable error. Check the retry policy.
delay, retryErr := retryFunc(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before we enter the retry logic, we should check for context cancellation:

if err := ctx.Err(); err != nil {
  return err
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if delay > 0 && retryErr == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow this. Why are we doing a retry if retryErr is nil?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added documentation on the RetryFunc function definition to clarify the semantics. Implementation of RetryFunc may return errors (typically MaxRetriesExceededError) to stop the retry process.

// We don't want to hold locks while waiting for a backoff, so restart the entire transaction
if restartErr := tx.Exec(ctx, "ROLLBACK"); restartErr != nil {
return newTxnRestartError(restartErr, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the wrong error to return here -- as per this line, when this error is displayed, it will say that there was an issue with ROLLBACK TO SAVEPOINT, which is not what this line is doing:

const msgPattern = "restarting txn failed. ROLLBACK TO SAVEPOINT " +

(This is also the wrong error to return on lines 98 and 101.)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Passing the operation to newTxnRestartError.

}
if restartErr := tx.Exec(ctx, "BEGIN"); restartErr != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it could have issues. Some libraries may not like it if we use the transaction after rolling it back. For example, I see this in the pgx code where it returns an error if you try to use a transaction that was already rolled back: https://github.com/jackc/pgx/blob/ecc9203ef42fbba50507e773901b5aead75288ef/tx.go#L205-L224

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It returns an error if you use a transaction object that has the closed flag set. Since we're not doing that, it ought to be ok, I would expect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still doesn't seem valid to me -- if we reuse the same tx object here, it will not succeed when sending BEGIN right after a ROLLBACK. (see https://github.com/jackc/pgx/blob/ecc9203ef42fbba50507e773901b5aead75288ef/tx.go#L205-L224)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that we not calling tx.Rollback, but rather tx.Exec(ctx, "ROLLBACK"). Please check the comments in the code, and let me know if that makes sense.

return newTxnRestartError(restartErr, err)
}
if restartErr := tx.Exec(ctx, "SAVEPOINT cockroach_restart"); restartErr != nil {
return newTxnRestartError(restartErr, err)
}
} else {
if rollbackErr := tx.Exec(ctx, "ROLLBACK TO SAVEPOINT cockroach_restart"); rollbackErr != nil {
return newTxnRestartError(rollbackErr, err)
}
}

retryCount++
if maxRetries > 0 && retryCount > maxRetries {
return newMaxRetriesExceededError(err, maxRetries)
if retryErr != nil {
return retryErr
}

if delay > 0 {
select {
case <-time.After(delay):
case <-ctx.Done():
return ctx.Err()
}
}
}
}
110 changes: 110 additions & 0 deletions crdb/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2025 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package crdb

import (
"time"
)

// RetryFunc owns the state for a transaction retry operation. Usually, this is
// just the retry count. RetryFunc is not assumed to be safe for concurrent use.
type RetryFunc func(err error) (time.Duration, error)

// RetryPolicy constructs a new instance of a RetryFunc for each transaction
// it is used with. Instances of RetryPolicy can likely be immutable and
// should be safe for concurrent calls to NewRetry.
type RetryPolicy interface {
NewRetry() RetryFunc
}

type LimitBackoffRetryPolicy struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is public-facing so should have a comment explaining what it is and how to use it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

RetryLimit int
Delay time.Duration
}

func (l *LimitBackoffRetryPolicy) NewRetry() RetryFunc {
tryCount := 0
return func(err error) (time.Duration, error) {
tryCount++
if tryCount > l.RetryLimit {
return 0, newMaxRetriesExceededError(err, l.RetryLimit)
}
return l.Delay, nil
}
}

// ExpBackoffRetryPolicy implements RetryPolicy using an exponential backoff with optional
// saturation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow what "saturation" means here. Can we write this comment to explain more how to use this?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, reworded comment.

type ExpBackoffRetryPolicy struct {
RetryLimit int
BaseDelay time.Duration
MaxDelay time.Duration
}

// NewRetry implements RetryPolicy
func (l *ExpBackoffRetryPolicy) NewRetry() RetryFunc {
tryCount := 0
return func(err error) (time.Duration, error) {
tryCount++
if tryCount > l.RetryLimit {
return 0, newMaxRetriesExceededError(err, l.RetryLimit)
}
delay := l.BaseDelay << (tryCount - 1)
if l.MaxDelay > 0 && delay > l.MaxDelay {
return l.MaxDelay, nil
}
if delay < l.BaseDelay {
// We've overflowed.
if l.MaxDelay > 0 {
return l.MaxDelay, nil
}
// There's no max delay. Giving up is probably better in
// practice than using a 290-year MAX_INT delay.
return 0, newMaxRetriesExceededError(err, tryCount)
}
return delay, nil
}
}

// Vargo converts a go-retry style Delay provider into a RetryPolicy
func Vargo(fn func() VargoBackoff) RetryPolicy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we could think of a different name to go with here. Users of cockroach-go who don't already know about the sethvargo/go-retry library may be confused to see this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to suggestions. I was trying to build an explicit integration without adding a transitive dependency.

return &vargoAdapter{
DelegateFactory: fn,
}
}

// VargoBackoff allow us to adapt sethvargo/go-retry Backoff policies
// without also creating a transitive dependency on that library.
type VargoBackoff interface {
Next() (next time.Duration, stop bool)
}

// vargoAdapter adapts backoff policies in the style of sethvargo/go-retry
type vargoAdapter struct {
DelegateFactory func() VargoBackoff
}

func (b *vargoAdapter) NewRetry() RetryFunc {
delegate := b.DelegateFactory()
count := 0
return func(err error) (time.Duration, error) {
count++
d, stop := delegate.Next()
if stop {
return 0, newMaxRetriesExceededError(err, count)
}
return d, nil
}
}
72 changes: 72 additions & 0 deletions crdb/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2025 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package crdb

import (
"testing"
"time"
)

func assertDelays(t *testing.T, policy RetryPolicy, expectedDelays []time.Duration) {
actualDelays := make([]time.Duration, 0, len(expectedDelays))
rf := policy.NewRetry()
for {
delay, err := rf(nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're always passing a nil error here -- can we add tests that check the behavior with a non-nil error?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if err != nil {
break
}

actualDelays = append(actualDelays, delay)
if len(actualDelays) > len(expectedDelays) {
t.Fatalf("too many retries: expected %d", len(expectedDelays))
}
}
if len(actualDelays) != len(expectedDelays) {
t.Errorf("wrong number of retries: expected %d, got %d", len(expectedDelays), len(actualDelays))
}
for i, delay := range actualDelays {
expected := expectedDelays[i]
if delay != expected {
t.Errorf("wrong delay at index %d: expected %d, got %d", i, expected, delay)
}
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also have tests for edge cases like zero BaseDelay, negative RetryLimit, and MaxDelay < BaseDelay.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

func TestLimitBackoffRetryPolicy(t *testing.T) {
policy := &LimitBackoffRetryPolicy{
RetryLimit: 3,
Delay: 1 * time.Second,
}
assertDelays(t, policy, []time.Duration{
1 * time.Second,
1 * time.Second,
1 * time.Second,
})
}

func TestExpBackoffRetryPolicy(t *testing.T) {
policy := &ExpBackoffRetryPolicy{
RetryLimit: 5,
BaseDelay: 1 * time.Second,
MaxDelay: 5 * time.Second,
}
assertDelays(t, policy, []time.Duration{
1 * time.Second,
2 * time.Second,
4 * time.Second,
5 * time.Second,
5 * time.Second,
})
}
Loading