Skip to content

Commit a3f2cf9

Browse files
committed
kv: add error return from Txn.PrepareForRetry
Instead of using log.Fatal, return assertion errors. Release note: None
1 parent f194f92 commit a3f2cf9

File tree

5 files changed

+37
-22
lines changed

5 files changed

+37
-22
lines changed

pkg/kv/db_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -872,7 +872,7 @@ func TestPreservingSteppingOnSenderReplacement(t *testing.T) {
872872
require.NotEqual(t, pErr.TxnID, pErr.Transaction.ID)
873873

874874
// Reset the handle in order to get a new sender.
875-
txn.PrepareForRetry(ctx)
875+
require.NoError(t, txn.PrepareForRetry(ctx))
876876

877877
// Make sure we have a new txn ID.
878878
require.NotEqual(t, pErr.TxnID, txn.ID())

pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,9 @@ func TestSavepoints(t *testing.T) {
127127

128128
case "reset":
129129
prevID := txn.ID()
130-
txn.PrepareForRetry(ctx)
130+
if err := txn.PrepareForRetry(ctx); err != nil {
131+
t.Fatal(err)
132+
}
131133
changed := "changed"
132134
if prevID == txn.ID() {
133135
changed = "not changed"

pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,9 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
912912
stopper.Stop(ctx)
913913

914914
if test.callPrepareForRetry {
915-
txn.PrepareForRetry(ctx)
915+
if err := txn.PrepareForRetry(ctx); err != nil {
916+
t.Fatal(err)
917+
}
916918
}
917919
if test.name != "nil" && err == nil {
918920
t.Fatalf("expected an error")

pkg/kv/txn.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -994,15 +994,17 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
994994
break
995995
}
996996

997-
txn.PrepareForRetry(ctx)
997+
if err := txn.PrepareForRetry(ctx); err != nil {
998+
return err
999+
}
9981000
}
9991001

10001002
return err
10011003
}
10021004

10031005
// PrepareForRetry needs to be called before a retry to perform some
10041006
// book-keeping and clear errors when possible.
1005-
func (txn *Txn) PrepareForRetry(ctx context.Context) {
1007+
func (txn *Txn) PrepareForRetry(ctx context.Context) error {
10061008
// Reset commit triggers. These must be reconfigured by the client during the
10071009
// next retry.
10081010
txn.commitTriggers = nil
@@ -1012,11 +1014,11 @@ func (txn *Txn) PrepareForRetry(ctx context.Context) {
10121014

10131015
retryErr := txn.mu.sender.GetTxnRetryableErr(ctx)
10141016
if retryErr == nil {
1015-
return
1017+
return nil
10161018
}
10171019
if txn.typ != RootTxn {
1018-
panic(errors.WithContextTags(errors.NewAssertionErrorWithWrappedErrf(
1019-
retryErr, "PrepareForRetry() called on leaf txn"), ctx))
1020+
return errors.WithContextTags(errors.NewAssertionErrorWithWrappedErrf(
1021+
retryErr, "PrepareForRetry() called on leaf txn"), ctx)
10201022
}
10211023
log.VEventf(ctx, 2, "retrying transaction: %s because of a retryable error: %s",
10221024
txn.debugNameLocked(), retryErr)
@@ -1029,23 +1031,24 @@ func (txn *Txn) PrepareForRetry(ctx context.Context) {
10291031
// txn IDs. However, at no point can both the old and new incarnation of a
10301032
// transaction be active at the same time -- this would constitute a
10311033
// programming error.
1032-
log.Fatalf(
1033-
ctx,
1034-
"unexpected retryable error for old incarnation of the transaction %s; current incarnation %s",
1035-
retryErr.TxnID,
1036-
txn.mu.ID,
1037-
)
1034+
return errors.WithContextTags(
1035+
errors.NewAssertionErrorWithWrappedErrf(
1036+
retryErr,
1037+
"unexpected retryable error for old incarnation of the transaction %s; current incarnation %s",
1038+
retryErr.TxnID,
1039+
txn.mu.ID,
1040+
), ctx)
10381041
}
10391042

10401043
if !retryErr.PrevTxnAborted() {
10411044
// If the retryable error doesn't correspond to an aborted transaction,
10421045
// there's no need to switch out the transaction. We simply clear the
10431046
// retryable error and proceed.
10441047
txn.mu.sender.ClearTxnRetryableErr(ctx)
1045-
return
1048+
return nil
10461049
}
10471050

1048-
txn.handleTransactionAbortedErrorLocked(ctx, retryErr)
1051+
return txn.handleTransactionAbortedErrorLocked(ctx, retryErr)
10491052
}
10501053

10511054
// Send runs the specified calls synchronously in a single batch and
@@ -1354,10 +1357,11 @@ func (txn *Txn) UpdateStateOnRemoteRetryableErr(ctx context.Context, pErr *kvpb.
13541357
// the current one with it.
13551358
func (txn *Txn) handleTransactionAbortedErrorLocked(
13561359
ctx context.Context, retryErr *kvpb.TransactionRetryWithProtoRefreshError,
1357-
) {
1360+
) error {
13581361
if !retryErr.PrevTxnAborted() {
13591362
// Sanity check we're dealing with a TransactionAbortedError.
1360-
log.Fatalf(ctx, "cannot replace root sender if txn not aborted: %v", retryErr)
1363+
return errors.WithContextTags(errors.NewAssertionErrorWithWrappedErrf(
1364+
retryErr, "cannot replace root sender if txn not aborted"), ctx)
13611365
}
13621366

13631367
// The transaction we had been using thus far has been aborted. The proto
@@ -1368,6 +1372,7 @@ func (txn *Txn) handleTransactionAbortedErrorLocked(
13681372
prevSteppingMode := txn.mu.sender.GetSteppingMode(ctx)
13691373
txn.mu.sender = txn.db.factory.RootTransactionalSender(newTxn, txn.mu.userPriority)
13701374
txn.mu.sender.ConfigureStepping(ctx, prevSteppingMode)
1375+
return nil
13711376
}
13721377

13731378
// SetFixedTimestamp makes the transaction run in an unusual way, at a "fixed

pkg/sql/conn_fsm.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -552,11 +552,14 @@ func cleanupAndFinishOnError(args fsm.Args) error {
552552

553553
func prepareTxnForRetry(args fsm.Args) error {
554554
ts := args.Extended.(*txnState)
555-
func() {
555+
err := func() error {
556556
ts.mu.Lock()
557557
defer ts.mu.Unlock()
558-
ts.mu.txn.PrepareForRetry(ts.Ctx)
558+
return ts.mu.txn.PrepareForRetry(ts.Ctx)
559559
}()
560+
if err != nil {
561+
return err
562+
}
560563
ts.setAdvanceInfo(
561564
advanceOne,
562565
noRewind,
@@ -587,13 +590,16 @@ func moveToCommitWaitAfterInternalCommit(args fsm.Args) error {
587590
func prepareTxnForRetryWithRewind(args fsm.Args) error {
588591
pl := args.Payload.(eventRetriableErrPayload)
589592
ts := args.Extended.(*txnState)
590-
func() {
593+
err := func() error {
591594
ts.mu.Lock()
592595
defer ts.mu.Unlock()
593-
ts.mu.txn.PrepareForRetry(ts.Ctx)
594596
ts.mu.autoRetryReason = pl.err
595597
ts.mu.autoRetryCounter++
598+
return ts.mu.txn.PrepareForRetry(ts.Ctx)
596599
}()
600+
if err != nil {
601+
return err
602+
}
597603
// The caller will call rewCap.rewindAndUnlock().
598604
ts.setAdvanceInfo(
599605
rewind,

0 commit comments

Comments
 (0)