Skip to content

Commit 90516db

Browse files
committed
kv: don't let pusher win when pushing STAGING txn with equal priority
Fixes cockroachdb#105330. Fixes cockroachdb#101721. This commit updates the transaction push logic to stop pushes from completing successfully when the pushee is STAGING and the pusher has equal priority. Prior to this change, the pusher would win in these cases when using a PUSH_TIMESTAMP if at least one of the two transactions involved used a weak isolation level. This had two undesirable effects: - if the pushee was still possibly committable and requiring recovery (`!(knownHigherTimestamp || knownHigherEpoch)` in the code) then the pusher would immediately begin parallel commit recovery, attempting to disrupt the commit and abort the pushee. This is undesirable because the pushee may still be in progress and in cases of equal priority, we'd like to wait for the parallel commit to complete before kicking off recovery, deferring recovery to only the cases where the pushee/committers's heartbeat has expired. - if the pushee was known to be uncommittable (`knownHigherTimestamp || knownHigherEpoch` in the code), then txn recovery was not kicked off but the pusher still could not perform the PUSH_TIMESTAMP (see e40c1b4), so it would return a `TransactionPushError`. This confused logic in `handleTransactionPushError`, allowing the error to escape to the client. This commit resolves both issues by considering the pushee's transaction status in `txnwait.ShouldPushImmediately`. Release note: None
1 parent 6146edc commit 90516db

File tree

8 files changed

+146
-90
lines changed

8 files changed

+146
-90
lines changed

pkg/kv/kvnemesis/validator.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,16 +1232,6 @@ func (v *validator) failIfError(
12321232
) (ambiguous, hasError bool) {
12331233
exceptions = append(exceptions[:len(exceptions):len(exceptions)], func(err error) bool {
12341234
return errors.Is(err, errInjected)
1235-
}, func(err error) bool {
1236-
// Work-around for [1].
1237-
//
1238-
// TODO(arul): find out why we (as of [2]) sometimes leaking
1239-
// *TransactionPushError (wrapped in `UnhandledRetryableError`) from
1240-
// `db.Get`, `db.Scan`, etc.
1241-
//
1242-
// [1]: https://github.com/cockroachdb/cockroach/issues/105330
1243-
// [2]: https://github.com/cockroachdb/cockroach/pull/97779
1244-
return errors.HasType(err, (*kvpb.UnhandledRetryableError)(nil))
12451235
})
12461236
switch r.Type {
12471237
case ResultType_Unknown:

pkg/kv/kvserver/batcheval/cmd_push_txn.go

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/cockroachdb/cockroach/pkg/storage"
2727
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2828
"github.com/cockroachdb/cockroach/pkg/util/log"
29+
"github.com/cockroachdb/cockroach/pkg/util/must"
2930
"github.com/cockroachdb/errors"
3031
"github.com/cockroachdb/redact"
3132
)
@@ -213,38 +214,9 @@ func PushTxn(
213214
}
214215
reply.PusheeTxn.UpgradePriority(args.PusheeTxn.Priority)
215216

216-
// If the pusher is aware that the pushee's currently recorded attempt at a
217-
// parallel commit failed, either because it found intents at a higher
218-
// timestamp than the parallel commit attempt or because it found intents at
219-
// a higher epoch than the parallel commit attempt, it should not consider
220-
// the pushee to be performing a parallel commit. Its commit status is not
221-
// indeterminate.
222-
if (knownHigherTimestamp || knownHigherEpoch) && reply.PusheeTxn.Status == roachpb.STAGING {
223-
reply.PusheeTxn.Status = roachpb.PENDING
224-
reply.PusheeTxn.InFlightWrites = nil
225-
// If the pusher is aware that the pushee's currently recorded attempt
226-
// at a parallel commit failed, upgrade PUSH_TIMESTAMPs to PUSH_ABORTs.
227-
// We don't want to move the transaction back to PENDING, as this is not
228-
// (currently) allowed by the recovery protocol. We also don't want to
229-
// move the transaction to a new timestamp while retaining the STAGING
230-
// status, as this could allow the transaction to enter an implicit
231-
// commit state without its knowledge, leading to atomicity violations.
232-
//
233-
// This has no effect on pushes that fail with a TransactionPushError.
234-
// Such pushes will still wait on the pushee to retry its commit and
235-
// eventually commit or abort. It also has no effect on expired pushees,
236-
// as they would have been aborted anyway. This only impacts pushes
237-
// which would have succeeded due to priority mismatches. In these
238-
// cases, the push acts the same as a short-circuited transaction
239-
// recovery process, because the transaction recovery procedure always
240-
// finalizes target transactions, even if initiated by a PUSH_TIMESTAMP.
241-
if pushType == kvpb.PUSH_TIMESTAMP {
242-
pushType = kvpb.PUSH_ABORT
243-
}
244-
}
245-
246217
pusherIso, pusheeIso := args.PusherTxn.IsoLevel, reply.PusheeTxn.IsoLevel
247218
pusherPri, pusheePri := args.PusherTxn.Priority, reply.PusheeTxn.Priority
219+
pusheeStatus := reply.PusheeTxn.Status
248220
var pusherWins bool
249221
var reason string
250222
switch {
@@ -258,7 +230,7 @@ func PushTxn(
258230
// If just attempting to cleanup old or already-committed txns,
259231
// pusher always fails.
260232
pusherWins = false
261-
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri):
233+
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus):
262234
reason = "pusher has priority"
263235
pusherWins = true
264236
case args.Force:
@@ -282,13 +254,42 @@ func PushTxn(
282254
// If the pushed transaction is in the staging state, we can't change its
283255
// record without first going through the transaction recovery process and
284256
// attempting to finalize it.
257+
pusheeStaging := pusheeStatus == roachpb.STAGING
258+
// However, if the pusher is aware that the pushee's currently recorded
259+
// attempt at a parallel commit failed, either because it found intents at a
260+
// higher timestamp than the parallel commit attempt or because it found
261+
// intents at a higher epoch than the parallel commit attempt, it should not
262+
// consider the pushee to be performing a parallel commit. Its commit status
263+
// is not indeterminate.
264+
pusheeStagingFailed := pusheeStaging && (knownHigherTimestamp || knownHigherEpoch)
285265
recoverOnFailedPush := cArgs.EvalCtx.EvalKnobs().RecoverIndeterminateCommitsOnFailedPushes
286-
if reply.PusheeTxn.Status == roachpb.STAGING && (pusherWins || recoverOnFailedPush) {
266+
if pusheeStaging && !pusheeStagingFailed && (pusherWins || recoverOnFailedPush) {
287267
err := kvpb.NewIndeterminateCommitError(reply.PusheeTxn)
288268
log.VEventf(ctx, 1, "%v", err)
289269
return result.Result{}, err
290270
}
291271

272+
// If the pusher is aware that the pushee's currently recorded attempt at a
273+
// parallel commit failed, upgrade PUSH_TIMESTAMPs to PUSH_ABORTs. We don't
274+
// want to move the transaction back to PENDING, as this is not (currently)
275+
// allowed by the recovery protocol. We also don't want to move the
276+
// transaction to a new timestamp while retaining the STAGING status, as this
277+
// could allow the transaction to enter an implicit commit state without its
278+
// knowledge, leading to atomicity violations.
279+
//
280+
// This has no effect on pushes that fail with a TransactionPushError. Such
281+
// pushes will still wait on the pushee to retry its commit and eventually
282+
// commit or abort. It also has no effect on expired pushees, as they would
283+
// have been aborted anyway. This only impacts pushes which would have
284+
// succeeded due to priority mismatches. In these cases, the push acts the
285+
// same as a short-circuited transaction recovery process, because the
286+
// transaction recovery procedure always finalizes target transactions, even
287+
// if initiated by a PUSH_TIMESTAMP.
288+
if pusheeStaging && pusherWins && pushType == kvpb.PUSH_TIMESTAMP {
289+
_ = must.True(ctx, pusheeStagingFailed, "parallel commit must be known to have failed for push to succeed")
290+
pushType = kvpb.PUSH_ABORT
291+
}
292+
292293
if !pusherWins {
293294
err := kvpb.NewTransactionPushError(reply.PusheeTxn)
294295
log.VEventf(ctx, 1, "%v", err)
@@ -306,6 +307,8 @@ func PushTxn(
306307
// Forward the timestamp to accommodate AbortSpan GC. See method comment for
307308
// details.
308309
reply.PusheeTxn.WriteTimestamp.Forward(reply.PusheeTxn.LastActive())
310+
// If the transaction was previously staging, clear its in-flight writes.
311+
reply.PusheeTxn.InFlightWrites = nil
309312
// If the transaction record was already present, persist the updates to it.
310313
// If not, then we don't want to create it. This could allow for finalized
311314
// transactions to be revived. Instead, we obey the invariant that only the

pkg/kv/kvserver/concurrency/concurrency_manager_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -747,18 +747,19 @@ func (c *cluster) PushTransaction(
747747
pusheeTxn, pusheeRecordSig := pusheeRecord.asTxn()
748748
pusheeIso := pusheeTxn.IsoLevel
749749
pusheePri := pusheeTxn.Priority
750+
pusheeStatus := pusheeTxn.Status
750751
// NOTE: this logic is adapted from cmd_push_txn.go.
751752
var pusherWins bool
752753
switch {
753-
case pusheeTxn.Status.IsFinalized():
754+
case pusheeStatus.IsFinalized():
754755
// Already finalized.
755756
return pusheeTxn, nil
756757
case pushType == kvpb.PUSH_TIMESTAMP && pushTo.LessEq(pusheeTxn.WriteTimestamp):
757758
// Already pushed.
758759
return pusheeTxn, nil
759760
case pushType == kvpb.PUSH_TOUCH:
760761
pusherWins = false
761-
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri):
762+
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus):
762763
pusherWins = true
763764
default:
764765
pusherWins = false

pkg/kv/kvserver/concurrency/lock_table_waiter.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1298,7 +1298,11 @@ func canPushWithPriority(req Request, s waitingState) bool {
12981298
}
12991299
pusheeIso = s.txn.IsoLevel
13001300
pusheePri = s.txn.Priority
1301-
return txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri)
1301+
// We assume that the pushee is in the PENDING state when deciding whether
1302+
// to push. A push may determine that the pushee is STAGING or has already
1303+
// been finalized.
1304+
pusheeStatus := roachpb.PENDING
1305+
return txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus)
13021306
}
13031307

13041308
func logResolveIntent(ctx context.Context, intent roachpb.LockUpdate) {

pkg/kv/kvserver/replica_send.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,7 @@ func (r *Replica) handleTransactionPushError(
773773
dontRetry := r.store.cfg.TestingKnobs.DontRetryPushTxnFailures
774774
if !dontRetry && ba.IsSinglePushTxnRequest() {
775775
pushReq := ba.Requests[0].GetInner().(*kvpb.PushTxnRequest)
776-
dontRetry = txnwait.ShouldPushImmediately(pushReq)
776+
dontRetry = txnwait.ShouldPushImmediately(pushReq, t.PusheeTxn.Status)
777777
}
778778
if dontRetry {
779779
return g, pErr

pkg/kv/kvserver/txn_recovery_integration_test.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -372,20 +372,6 @@ func TestTxnRecoveryFromStagingWithoutHighPriority(t *testing.T) {
372372
pErrC <- pErr
373373
}))
374374

375-
// If the pushee is not serializable and the pusher is reading, we
376-
// currently expect the conflicting operation to immediately succeed,
377-
// sometimes with an error, sometimes not. See #105330.
378-
if pusheeIsoLevel.ToleratesWriteSkew() && !pusherWriting {
379-
pErr = <-pErrC
380-
if pusheeCommits {
381-
require.Nil(t, pErr, "error: %s", pErr)
382-
} else {
383-
require.NotNil(t, pErr)
384-
require.Regexp(t, "failed to push", pErr)
385-
}
386-
return
387-
}
388-
389375
// Wait for the conflict to push and be queued in the txn wait queue.
390376
testutils.SucceedsSoon(t, func() error {
391377
select {

pkg/kv/kvserver/txnwait/queue.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,23 +70,26 @@ func TestingOverrideTxnLivenessThreshold(t time.Duration) func() {
7070
// proceed without queueing. This is true for pushes which are neither
7171
// ABORT nor TIMESTAMP, but also for ABORT and TIMESTAMP pushes where
7272
// the pushee has min priority or pusher has max priority.
73-
func ShouldPushImmediately(req *kvpb.PushTxnRequest) bool {
73+
func ShouldPushImmediately(req *kvpb.PushTxnRequest, pusheeStatus roachpb.TransactionStatus) bool {
7474
if req.Force {
7575
return true
7676
}
7777
return CanPushWithPriority(
7878
req.PushType,
7979
req.PusherTxn.IsoLevel, req.PusheeTxn.IsoLevel,
8080
req.PusherTxn.Priority, req.PusheeTxn.Priority,
81+
pusheeStatus,
8182
)
8283
}
8384

8485
// CanPushWithPriority returns true if the pusher can perform the specified push
85-
// type on the pushee, based on the two txns' isolation levels and priorities.
86+
// type on the pushee, based on the two txns' isolation levels, their priorities,
87+
// and the pushee's status.
8688
func CanPushWithPriority(
8789
pushType kvpb.PushTxnType,
8890
pusherIso, pusheeIso isolation.Level,
8991
pusherPri, pusheePri enginepb.TxnPriority,
92+
pusheeStatus roachpb.TransactionStatus,
9093
) bool {
9194
// Normalize the transaction priorities so that normal user priorities are
9295
// considered equal for the purposes of pushing.
@@ -103,6 +106,15 @@ func CanPushWithPriority(
103106
case kvpb.PUSH_ABORT:
104107
return pusherPri > pusheePri
105108
case kvpb.PUSH_TIMESTAMP:
109+
// If the pushee transaction is STAGING, only let the PUSH_TIMESTAMP through
110+
// to disrupt the transaction commit if the pusher has a higher priority. If
111+
// the priorities are equal, the PUSH_TIMESTAMP should wait for the commit
112+
// to complete.
113+
if pusheeStatus == roachpb.STAGING {
114+
return pusherPri > pusheePri
115+
}
116+
// Otherwise, the pushee has not yet started committing...
117+
106118
// If the pushee transaction tolerates write skew, the PUSH_TIMESTAMP is
107119
// harmless, so let it through.
108120
return pusheeIso.ToleratesWriteSkew() ||
@@ -476,10 +488,6 @@ func (q *Queue) releaseWaitingQueriesLocked(ctx context.Context, txnID uuid.UUID
476488
func (q *Queue) MaybeWaitForPush(
477489
ctx context.Context, req *kvpb.PushTxnRequest,
478490
) (*kvpb.PushTxnResponse, *kvpb.Error) {
479-
if ShouldPushImmediately(req) {
480-
return nil, nil
481-
}
482-
483491
q.mu.Lock()
484492
// If the txn wait queue is not enabled or if the request is not
485493
// contained within the replica, do nothing. The request can fall
@@ -501,6 +509,9 @@ func (q *Queue) MaybeWaitForPush(
501509
if txn := pending.getTxn(); isPushed(req, txn) {
502510
q.mu.Unlock()
503511
return createPushTxnResponse(txn), nil
512+
} else if ShouldPushImmediately(req, txn.Status) {
513+
q.mu.Unlock()
514+
return nil, nil
504515
}
505516

506517
push := &waitingPush{

0 commit comments

Comments
 (0)