Skip to content

Commit fecca8d

Browse files
committed
kv: remove TxnCoordSender.PrepareRetryableError, rationalize ManualRestart
This commit cleans up the implementation of `GenerateForcedRetryableError` by removing the `TxnCoordSender.PrepareRetryableError` method and having the `TxnCoordSender.ManualRestart` return a retryable error. The cleanup also ensures that the `TxnCoordSender` is left in a `txnRetryableError` state after a `ManualRestart`, so that `Txn.PrepareForRetry` must be called before continuing to use the transaction. The goal with both of these changes is to close the gap between the handling of error-driven txn restarts and manual restarts. It also reworks the code to construct the retry error in the same place that "handles" the error, which is important for future changes that plan to add more context to retry errors. Release note: None Epic: None
1 parent c917a6b commit fecca8d

File tree

5 files changed

+37
-53
lines changed

5 files changed

+37
-53
lines changed

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,27 +1158,32 @@ func (tc *TxnCoordSender) RequiredFrontier() hlc.Timestamp {
11581158

11591159
// ManualRestart is part of the kv.TxnSender interface.
11601160
func (tc *TxnCoordSender) ManualRestart(
1161-
ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp,
1162-
) {
1161+
ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp, msg redact.RedactableString,
1162+
) error {
11631163
tc.mu.Lock()
11641164
defer tc.mu.Unlock()
1165-
1166-
if tc.mu.txnState == txnFinalized {
1167-
log.Fatalf(ctx, "ManualRestart called on finalized txn: %s", tc.mu.txn)
1165+
if tc.mu.txnState != txnPending && tc.mu.txnState != txnRetryableError {
1166+
return errors.AssertionFailedf("cannot manually restart, current state: %s", tc.mu.txnState)
11681167
}
11691168

11701169
// Invalidate any writes performed by any workers after the retry updated
11711170
// the txn's proto but before we synchronized (some of these writes might
11721171
// have been performed at the wrong epoch).
11731172
tc.mu.txn.Restart(pri, 0 /* upgradePriority */, ts)
11741173

1174+
pErr := kvpb.NewTransactionRetryWithProtoRefreshError(
1175+
msg, tc.mu.txn.ID, tc.mu.txn)
1176+
1177+
// Move to a retryable error state, where all Send() calls fail until the
1178+
// state is cleared.
1179+
tc.mu.txnState = txnRetryableError
1180+
tc.mu.storedRetryableErr = pErr
1181+
1182+
// Reset state as this manual restart incremented the transaction's epoch.
11751183
for _, reqInt := range tc.interceptorStack {
11761184
reqInt.epochBumpedLocked()
11771185
}
1178-
1179-
// The txn might have entered the txnError state after the epoch was bumped.
1180-
// Reset the state for the retry.
1181-
tc.mu.txnState = txnPending
1186+
return pErr
11821187
}
11831188

11841189
// IsSerializablePushAndRefreshNotPossible is part of the kv.TxnSender interface.
@@ -1339,22 +1344,6 @@ func (tc *TxnCoordSender) TestingCloneTxn() *roachpb.Transaction {
13391344
return tc.mu.txn.Clone()
13401345
}
13411346

1342-
// PrepareRetryableError is part of the kv.TxnSender interface.
1343-
func (tc *TxnCoordSender) PrepareRetryableError(
1344-
ctx context.Context, msg redact.RedactableString,
1345-
) error {
1346-
tc.mu.Lock()
1347-
defer tc.mu.Unlock()
1348-
if tc.mu.txnState != txnPending {
1349-
return errors.AssertionFailedf("cannot set a retryable error. current state: %s", tc.mu.txnState)
1350-
}
1351-
pErr := kvpb.NewTransactionRetryWithProtoRefreshError(
1352-
msg, tc.mu.txn.ID, tc.mu.txn)
1353-
tc.mu.storedRetryableErr = pErr
1354-
tc.mu.txnState = txnRetryableError
1355-
return pErr
1356-
}
1357-
13581347
// Step is part of the TxnSender interface.
13591348
func (tc *TxnCoordSender) Step(ctx context.Context) error {
13601349
// TODO(nvanbenschoten): it should be possible to make this assertion, but

pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,8 @@ func TestTxnCoordSenderCleanupOnCommitAfterRestart(t *testing.T) {
661661
}
662662

663663
// Restart the transaction with a new epoch.
664-
txn.Sender().ManualRestart(ctx, txn.UserPriority(), s.Clock.Now())
664+
require.Error(t, txn.Sender().ManualRestart(ctx, txn.UserPriority(), s.Clock.Now(), "force retry"))
665+
txn.Sender().ClearTxnRetryableErr(ctx)
665666

666667
// Now immediately commit.
667668
require.NoError(t, txn.Commit(ctx))
@@ -2770,14 +2771,16 @@ func TestTxnCoordSenderSetFixedTimestamp(t *testing.T) {
27702771
before: func(t *testing.T, txn *kv.Txn) {
27712772
_, err := txn.Get(ctx, "k")
27722773
require.NoError(t, err)
2773-
txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next())
2774+
require.Error(t, txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next(), "force retry"))
2775+
txn.Sender().ClearTxnRetryableErr(ctx)
27742776
},
27752777
},
27762778
{
27772779
name: "write before, in prior epoch",
27782780
before: func(t *testing.T, txn *kv.Txn) {
27792781
require.NoError(t, txn.Put(ctx, "k", "v"))
2780-
txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next())
2782+
require.Error(t, txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next(), "force retry"))
2783+
txn.Sender().ClearTxnRetryableErr(ctx)
27812784
},
27822785
},
27832786
{
@@ -2786,7 +2789,8 @@ func TestTxnCoordSenderSetFixedTimestamp(t *testing.T) {
27862789
_, err := txn.Get(ctx, "k")
27872790
require.NoError(t, err)
27882791
require.NoError(t, txn.Put(ctx, "k", "v"))
2789-
txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next())
2792+
require.Error(t, txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next(), "force retry"))
2793+
txn.Sender().ClearTxnRetryableErr(ctx)
27902794
},
27912795
},
27922796
} {

pkg/kv/mock_transactional_sender.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,10 @@ func (m *MockTransactionalSender) RequiredFrontier() hlc.Timestamp {
145145

146146
// ManualRestart is part of the TxnSender interface.
147147
func (m *MockTransactionalSender) ManualRestart(
148-
ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp,
149-
) {
148+
ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp, msg redact.RedactableString,
149+
) error {
150150
m.txn.Restart(pri, 0 /* upgradePriority */, ts)
151+
return kvpb.NewTransactionRetryWithProtoRefreshError(msg, m.txn.ID, m.txn)
151152
}
152153

153154
// IsSerializablePushAndRefreshNotPossible is part of the TxnSender interface.
@@ -196,13 +197,6 @@ func (m *MockTransactionalSender) UpdateStateOnRemoteRetryableErr(
196197
// DisablePipelining is part of the kv.TxnSender interface.
197198
func (m *MockTransactionalSender) DisablePipelining() error { return nil }
198199

199-
// PrepareRetryableError is part of the kv.TxnSender interface.
200-
func (m *MockTransactionalSender) PrepareRetryableError(
201-
ctx context.Context, msg redact.RedactableString,
202-
) error {
203-
return kvpb.NewTransactionRetryWithProtoRefreshError(msg, m.txn.ID, *m.txn.Clone())
204-
}
205-
206200
// Step is part of the TxnSender interface.
207201
func (m *MockTransactionalSender) Step(_ context.Context) error {
208202
// At least one test (e.g sql/TestPortalsDestroyedOnTxnFinish) requires

pkg/kv/sender.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -181,14 +181,15 @@ type TxnSender interface {
181181
// timestamp and priority.
182182
// An uninitialized timestamp can be passed to leave the timestamp
183183
// alone.
184+
// Returns a TransactionRetryWithProtoRefreshError with a payload
185+
// initialized from this txn, which must be cleared by a call to
186+
// ClearTxnRetryableErr before continuing to use the TxnSender.
184187
//
185188
// Used by the SQL layer which sometimes knows that a transaction
186189
// will not be able to commit and prefers to restart early.
187-
// It is also used after synchronizing concurrent actors using a txn
188-
// when a retryable error is seen.
189-
// TODO(andrei): this second use should go away once we move to a
190-
// TxnAttempt model.
191-
ManualRestart(context.Context, roachpb.UserPriority, hlc.Timestamp)
190+
ManualRestart(
191+
ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp, msg redact.RedactableString,
192+
) error
192193

193194
// UpdateStateOnRemoteRetryableErr updates the txn in response to an
194195
// error encountered when running a request through the txn.
@@ -263,11 +264,6 @@ type TxnSender interface {
263264
// IsLocking returns whether the transaction has begun acquiring locks.
264265
IsLocking() bool
265266

266-
// PrepareRetryableError generates a
267-
// TransactionRetryWithProtoRefreshError with a payload initialized
268-
// from this txn.
269-
PrepareRetryableError(ctx context.Context, msg redact.RedactableString) error
270-
271267
// TestingCloneTxn returns a clone of the transaction's current
272268
// proto. This is for use by tests only. Use
273269
// GetLeafTxnInitialState() instead when creating leaf transactions.

pkg/kv/txn.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1393,23 +1393,24 @@ func (txn *Txn) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error {
13931393
return txn.mu.sender.SetFixedTimestamp(ctx, ts)
13941394
}
13951395

1396-
// GenerateForcedRetryableError returns a TransactionRetryWithProtoRefreshError that will
1397-
// cause the txn to be retried.
1396+
// GenerateForcedRetryableError returns a TransactionRetryWithProtoRefreshError
1397+
// that will cause the txn to be retried.
13981398
//
13991399
// The transaction's epoch is bumped, simulating to an extent what the
14001400
// TxnCoordSender does on retriable errors. The transaction's timestamp is only
14011401
// bumped to the extent that txn.ReadTimestamp is racheted up to txn.WriteTimestamp.
14021402
// TODO(andrei): This method should take in an up-to-date timestamp, but
14031403
// unfortunately its callers don't currently have that handy.
1404+
//
1405+
// As with other transaction retry errors, the caller must call PrepareForRetry
1406+
// before continuing to use the transaction.
14041407
func (txn *Txn) GenerateForcedRetryableError(
14051408
ctx context.Context, msg redact.RedactableString,
14061409
) error {
14071410
txn.mu.Lock()
14081411
defer txn.mu.Unlock()
14091412
now := txn.db.clock.NowAsClockTimestamp()
1410-
txn.mu.sender.ManualRestart(ctx, txn.mu.userPriority, now.ToTimestamp())
1411-
txn.resetDeadlineLocked()
1412-
return txn.mu.sender.PrepareRetryableError(ctx, msg)
1413+
return txn.mu.sender.ManualRestart(ctx, txn.mu.userPriority, now.ToTimestamp(), msg)
14131414
}
14141415

14151416
// IsSerializablePushAndRefreshNotPossible returns true if the transaction is

0 commit comments

Comments
 (0)