Skip to content

Commit b9f3f15

Browse files
craig[bot]nvbrafissYevgeniy Miretskiyyuzefovich
committed
107882: kv: don't let pusher win when pushing STAGING txn with equal priority r=miraradeva a=nvanbenschoten Fixes cockroachdb#105330. Fixes cockroachdb#101721. This commit updates the transaction push logic to stop pushes from completing successfully when the pushee is STAGING and the pusher has equal priority. Prior to this change, the pusher would win in these cases when using a PUSH_TIMESTAMP if at least one of the two transactions involved used a weak isolation level. This had two undesirable effects: - if the pushee was still possibly committable and requiring recovery (`!(knownHigherTimestamp || knownHigherEpoch)` in the code) then the pusher would immediately begin parallel commit recovery, attempting to disrupt the commit and abort the pushee. This is undesirable because the pushee may still be in progress and in cases of equal priority, we'd like to wait for the parallel commit to complete before kicking off recovery, deferring recovery to only the cases where the pushee/committers's heartbeat has expired. - if the pushee was known to be uncommittable (`knownHigherTimestamp || knownHigherEpoch` in the code), then txn recovery was not kicked off but the pusher still could not perform the PUSH_TIMESTAMP (see e40c1b4), so it would return a `TransactionPushError`. This confused logic in `handleTransactionPushError`, allowing the error to escape to the client. This commit resolves both issues by considering the pushee's transaction status in `txnwait.ShouldPushImmediately`. Release note: None 107962: docs: do not generate docs for crdb_internal functions r=knz a=rafiss Epic: None Release note: None 108062: server: Add rangefeed metrics r=miretskiy a=miretskiy Add rangefeed related metrics to keep track of the number of actively running rangefeeds on the server. Epic: CRDB-26372 Release note: None 108071: stmtdiagnostics: use background context when building the bundle r=yuzefovich a=yuzefovich When the context is canceled, we still want to build the bundle as best as possible. Over in 532274b we introduced the usage of the background context in order to insert the bundle into the system tables, but we still built the bundle with the canceled context. This commit fixes that oversight - in particular, we should now get `env.sql` correctly. Informs: https://github.com/cockroachlabs/support/issues/2494. Epic: None Release note: None 108085: builtins: improve docs and refactor code for unordered_unique_id r=rafiss,rharding6373 a=andyyang890 **builtins: improve documentation for unordered_unique_rowid** This patch clarifies that `unordered_unique_rowid` generates unique IDs that are statistically likely to be unordered because it bit-reverses the insert timestamp for use as the ID's timestamp bit segment. Release note: None ---- **builtinconstants: define constants for unique int bit segments** This patch defines constants for the sizes and bitmasks for each bit segment in a unique int generated for row IDs. Release note: None ---- **builtins: refactor bit shifting logic in mapToUnorderedUniqueInt** This patch refactors the bit shifting logic in `mapToUnorderedUniqueInt` to use constants instead of magic numbers. Release note: None ---- Epic: None Co-authored-by: Nathan VanBenschoten <[email protected]> Co-authored-by: Rafi Shamim <[email protected]> Co-authored-by: Yevgeniy Miretskiy <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: Andy Yang <[email protected]>
6 parents dc28e19 + 90516db + 846c348 + 0287bb7 + 7d9a30f + a7fe6cf commit b9f3f15

File tree

18 files changed

+423
-446
lines changed

18 files changed

+423
-446
lines changed

docs/generated/sql/functions.md

Lines changed: 1 addition & 320 deletions
Large diffs are not rendered by default.

pkg/cmd/docgen/funcs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func generateFunctions(from []string, categorize bool) []byte {
177177
// NB: funcs can appear more than once i.e. upper/lowercase variants for
178178
// faster lookups, so normalize to lowercase and de-dupe using a set.
179179
name = strings.ToLower(name)
180-
if _, ok := seen[name]; ok {
180+
if _, ok := seen[name]; ok || strings.HasPrefix(name, "crdb_internal.") {
181181
continue
182182
}
183183
seen[name] = struct{}{}

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,9 @@ func makeDistSenderRangeFeedMetrics() DistSenderRangeFeedMetrics {
368368
}
369369
}
370370

371+
// MetricStruct implements metrics.Struct interface.
372+
func (DistSenderRangeFeedMetrics) MetricStruct() {}
373+
371374
// updateCrossLocalityMetricsOnReplicaAddressedBatchRequest updates
372375
// DistSenderMetrics for batch requests that have been divided and are currently
373376
// forwarding to a specific replica for the corresponding range. The metrics

pkg/kv/kvnemesis/validator.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,16 +1232,6 @@ func (v *validator) failIfError(
12321232
) (ambiguous, hasError bool) {
12331233
exceptions = append(exceptions[:len(exceptions):len(exceptions)], func(err error) bool {
12341234
return errors.Is(err, errInjected)
1235-
}, func(err error) bool {
1236-
// Work-around for [1].
1237-
//
1238-
// TODO(arul): find out why we (as of [2]) sometimes leaking
1239-
// *TransactionPushError (wrapped in `UnhandledRetryableError`) from
1240-
// `db.Get`, `db.Scan`, etc.
1241-
//
1242-
// [1]: https://github.com/cockroachdb/cockroach/issues/105330
1243-
// [2]: https://github.com/cockroachdb/cockroach/pull/97779
1244-
return errors.HasType(err, (*kvpb.UnhandledRetryableError)(nil))
12451235
})
12461236
switch r.Type {
12471237
case ResultType_Unknown:

pkg/kv/kvserver/batcheval/cmd_push_txn.go

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/cockroachdb/cockroach/pkg/storage"
2727
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2828
"github.com/cockroachdb/cockroach/pkg/util/log"
29+
"github.com/cockroachdb/cockroach/pkg/util/must"
2930
"github.com/cockroachdb/errors"
3031
"github.com/cockroachdb/redact"
3132
)
@@ -213,38 +214,9 @@ func PushTxn(
213214
}
214215
reply.PusheeTxn.UpgradePriority(args.PusheeTxn.Priority)
215216

216-
// If the pusher is aware that the pushee's currently recorded attempt at a
217-
// parallel commit failed, either because it found intents at a higher
218-
// timestamp than the parallel commit attempt or because it found intents at
219-
// a higher epoch than the parallel commit attempt, it should not consider
220-
// the pushee to be performing a parallel commit. Its commit status is not
221-
// indeterminate.
222-
if (knownHigherTimestamp || knownHigherEpoch) && reply.PusheeTxn.Status == roachpb.STAGING {
223-
reply.PusheeTxn.Status = roachpb.PENDING
224-
reply.PusheeTxn.InFlightWrites = nil
225-
// If the pusher is aware that the pushee's currently recorded attempt
226-
// at a parallel commit failed, upgrade PUSH_TIMESTAMPs to PUSH_ABORTs.
227-
// We don't want to move the transaction back to PENDING, as this is not
228-
// (currently) allowed by the recovery protocol. We also don't want to
229-
// move the transaction to a new timestamp while retaining the STAGING
230-
// status, as this could allow the transaction to enter an implicit
231-
// commit state without its knowledge, leading to atomicity violations.
232-
//
233-
// This has no effect on pushes that fail with a TransactionPushError.
234-
// Such pushes will still wait on the pushee to retry its commit and
235-
// eventually commit or abort. It also has no effect on expired pushees,
236-
// as they would have been aborted anyway. This only impacts pushes
237-
// which would have succeeded due to priority mismatches. In these
238-
// cases, the push acts the same as a short-circuited transaction
239-
// recovery process, because the transaction recovery procedure always
240-
// finalizes target transactions, even if initiated by a PUSH_TIMESTAMP.
241-
if pushType == kvpb.PUSH_TIMESTAMP {
242-
pushType = kvpb.PUSH_ABORT
243-
}
244-
}
245-
246217
pusherIso, pusheeIso := args.PusherTxn.IsoLevel, reply.PusheeTxn.IsoLevel
247218
pusherPri, pusheePri := args.PusherTxn.Priority, reply.PusheeTxn.Priority
219+
pusheeStatus := reply.PusheeTxn.Status
248220
var pusherWins bool
249221
var reason string
250222
switch {
@@ -258,7 +230,7 @@ func PushTxn(
258230
// If just attempting to cleanup old or already-committed txns,
259231
// pusher always fails.
260232
pusherWins = false
261-
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri):
233+
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus):
262234
reason = "pusher has priority"
263235
pusherWins = true
264236
case args.Force:
@@ -282,13 +254,42 @@ func PushTxn(
282254
// If the pushed transaction is in the staging state, we can't change its
283255
// record without first going through the transaction recovery process and
284256
// attempting to finalize it.
257+
pusheeStaging := pusheeStatus == roachpb.STAGING
258+
// However, if the pusher is aware that the pushee's currently recorded
259+
// attempt at a parallel commit failed, either because it found intents at a
260+
// higher timestamp than the parallel commit attempt or because it found
261+
// intents at a higher epoch than the parallel commit attempt, it should not
262+
// consider the pushee to be performing a parallel commit. Its commit status
263+
// is not indeterminate.
264+
pusheeStagingFailed := pusheeStaging && (knownHigherTimestamp || knownHigherEpoch)
285265
recoverOnFailedPush := cArgs.EvalCtx.EvalKnobs().RecoverIndeterminateCommitsOnFailedPushes
286-
if reply.PusheeTxn.Status == roachpb.STAGING && (pusherWins || recoverOnFailedPush) {
266+
if pusheeStaging && !pusheeStagingFailed && (pusherWins || recoverOnFailedPush) {
287267
err := kvpb.NewIndeterminateCommitError(reply.PusheeTxn)
288268
log.VEventf(ctx, 1, "%v", err)
289269
return result.Result{}, err
290270
}
291271

272+
// If the pusher is aware that the pushee's currently recorded attempt at a
273+
// parallel commit failed, upgrade PUSH_TIMESTAMPs to PUSH_ABORTs. We don't
274+
// want to move the transaction back to PENDING, as this is not (currently)
275+
// allowed by the recovery protocol. We also don't want to move the
276+
// transaction to a new timestamp while retaining the STAGING status, as this
277+
// could allow the transaction to enter an implicit commit state without its
278+
// knowledge, leading to atomicity violations.
279+
//
280+
// This has no effect on pushes that fail with a TransactionPushError. Such
281+
// pushes will still wait on the pushee to retry its commit and eventually
282+
// commit or abort. It also has no effect on expired pushees, as they would
283+
// have been aborted anyway. This only impacts pushes which would have
284+
// succeeded due to priority mismatches. In these cases, the push acts the
285+
// same as a short-circuited transaction recovery process, because the
286+
// transaction recovery procedure always finalizes target transactions, even
287+
// if initiated by a PUSH_TIMESTAMP.
288+
if pusheeStaging && pusherWins && pushType == kvpb.PUSH_TIMESTAMP {
289+
_ = must.True(ctx, pusheeStagingFailed, "parallel commit must be known to have failed for push to succeed")
290+
pushType = kvpb.PUSH_ABORT
291+
}
292+
292293
if !pusherWins {
293294
err := kvpb.NewTransactionPushError(reply.PusheeTxn)
294295
log.VEventf(ctx, 1, "%v", err)
@@ -306,6 +307,8 @@ func PushTxn(
306307
// Forward the timestamp to accommodate AbortSpan GC. See method comment for
307308
// details.
308309
reply.PusheeTxn.WriteTimestamp.Forward(reply.PusheeTxn.LastActive())
310+
// If the transaction was previously staging, clear its in-flight writes.
311+
reply.PusheeTxn.InFlightWrites = nil
309312
// If the transaction record was already present, persist the updates to it.
310313
// If not, then we don't want to create it. This could allow for finalized
311314
// transactions to be revived. Instead, we obey the invariant that only the

pkg/kv/kvserver/concurrency/concurrency_manager_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -747,18 +747,19 @@ func (c *cluster) PushTransaction(
747747
pusheeTxn, pusheeRecordSig := pusheeRecord.asTxn()
748748
pusheeIso := pusheeTxn.IsoLevel
749749
pusheePri := pusheeTxn.Priority
750+
pusheeStatus := pusheeTxn.Status
750751
// NOTE: this logic is adapted from cmd_push_txn.go.
751752
var pusherWins bool
752753
switch {
753-
case pusheeTxn.Status.IsFinalized():
754+
case pusheeStatus.IsFinalized():
754755
// Already finalized.
755756
return pusheeTxn, nil
756757
case pushType == kvpb.PUSH_TIMESTAMP && pushTo.LessEq(pusheeTxn.WriteTimestamp):
757758
// Already pushed.
758759
return pusheeTxn, nil
759760
case pushType == kvpb.PUSH_TOUCH:
760761
pusherWins = false
761-
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri):
762+
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus):
762763
pusherWins = true
763764
default:
764765
pusherWins = false

pkg/kv/kvserver/concurrency/lock_table_waiter.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1298,7 +1298,11 @@ func canPushWithPriority(req Request, s waitingState) bool {
12981298
}
12991299
pusheeIso = s.txn.IsoLevel
13001300
pusheePri = s.txn.Priority
1301-
return txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri)
1301+
// We assume that the pushee is in the PENDING state when deciding whether
1302+
// to push. A push may determine that the pushee is STAGING or has already
1303+
// been finalized.
1304+
pusheeStatus := roachpb.PENDING
1305+
return txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus)
13021306
}
13031307

13041308
func logResolveIntent(ctx context.Context, intent roachpb.LockUpdate) {

pkg/kv/kvserver/replica_send.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,7 @@ func (r *Replica) handleTransactionPushError(
773773
dontRetry := r.store.cfg.TestingKnobs.DontRetryPushTxnFailures
774774
if !dontRetry && ba.IsSinglePushTxnRequest() {
775775
pushReq := ba.Requests[0].GetInner().(*kvpb.PushTxnRequest)
776-
dontRetry = txnwait.ShouldPushImmediately(pushReq)
776+
dontRetry = txnwait.ShouldPushImmediately(pushReq, t.PusheeTxn.Status)
777777
}
778778
if dontRetry {
779779
return g, pErr

pkg/kv/kvserver/txn_recovery_integration_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ import (
1818

1919
"github.com/cockroachdb/cockroach/pkg/kv"
2020
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
21+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
2122
"github.com/cockroachdb/cockroach/pkg/roachpb"
2223
"github.com/cockroachdb/cockroach/pkg/testutils"
2324
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2425
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2526
"github.com/cockroachdb/cockroach/pkg/util/log"
2627
"github.com/cockroachdb/cockroach/pkg/util/stop"
2728
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
29+
"github.com/cockroachdb/errors"
2830
"github.com/stretchr/testify/require"
2931
)
3032

@@ -288,6 +290,127 @@ func TestTxnRecoveryFromStagingWithHighPriority(t *testing.T) {
288290
})
289291
}
290292

293+
// TestTxnRecoveryFromStagingWithoutHighPriority tests that the transaction
294+
// recovery process is NOT initiated by a normal-priority operation which
295+
// encounters a staging transaction. Instead, the normal-priority operation
296+
// waits for the committing transaction to complete. The test contains a subtest
297+
// for each of the combinations of the following options:
298+
//
299+
// - pusheeIsoLevel: configures the isolation level of the pushee (committing)
300+
// transaction. Isolation levels affect the behavior of pushes of pending
301+
// transactions, but not of staging transactions.
302+
//
303+
// - pusheeCommits: configures whether or not the staging transaction is
304+
// implicitly and, eventually, explicitly committed or not.
305+
//
306+
// - pusherWriting: configures whether or not the conflicting operation is a
307+
// read (false) or a write (true), which dictates the kind of push operation
308+
// dispatched against the staging transaction.
309+
func TestTxnRecoveryFromStagingWithoutHighPriority(t *testing.T) {
310+
defer leaktest.AfterTest(t)()
311+
defer log.Scope(t).Close(t)
312+
ctx := context.Background()
313+
314+
run := func(t *testing.T, pusheeIsoLevel isolation.Level, pusheeCommits, pusherWriting bool) {
315+
stopper := stop.NewStopper()
316+
defer stopper.Stop(ctx)
317+
manual := timeutil.NewManualTime(timeutil.Unix(0, 123))
318+
cfg := TestStoreConfig(hlc.NewClockForTesting(manual))
319+
store := createTestStoreWithConfig(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg)
320+
321+
// Create a transaction that will get stuck performing a parallel
322+
// commit.
323+
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
324+
txn := newTransaction("txn", keyA, 1, store.Clock())
325+
txn.IsoLevel = pusheeIsoLevel
326+
327+
// Issue two writes, which will be considered in-flight at the time of
328+
// the transaction's EndTxn request.
329+
keyAVal := []byte("value")
330+
pArgs := putArgs(keyA, keyAVal)
331+
pArgs.Sequence = 1
332+
h := kvpb.Header{Txn: txn}
333+
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), h, &pArgs)
334+
require.Nil(t, pErr, "error: %s", pErr)
335+
336+
pArgs = putArgs(keyB, []byte("value2"))
337+
pArgs.Sequence = 2
338+
h2 := kvpb.Header{Txn: txn.Clone()}
339+
if !pusheeCommits {
340+
// If we're not going to have the pushee commit, make sure it never enters
341+
// the implicit commit state by bumping the timestamp of one of its writes.
342+
manual.Advance(100)
343+
h2.Txn.WriteTimestamp = store.Clock().Now()
344+
}
345+
_, pErr = kv.SendWrappedWith(ctx, store.TestSender(), h2, &pArgs)
346+
require.Nil(t, pErr, "error: %s", pErr)
347+
348+
// Issue a parallel commit, which will put the transaction into a
349+
// STAGING state. Include both writes as the EndTxn's in-flight writes.
350+
et, etH := endTxnArgs(txn, true)
351+
et.InFlightWrites = []roachpb.SequencedWrite{
352+
{Key: keyA, Sequence: 1},
353+
{Key: keyB, Sequence: 2},
354+
}
355+
etReply, pErr := kv.SendWrappedWith(ctx, store.TestSender(), etH, &et)
356+
require.Nil(t, pErr, "error: %s", pErr)
357+
require.Equal(t, roachpb.STAGING, etReply.Header().Txn.Status)
358+
359+
// Issue a conflicting, normal-priority operation.
360+
var conflictArgs kvpb.Request
361+
if pusherWriting {
362+
pArgs = putArgs(keyB, []byte("value3"))
363+
conflictArgs = &pArgs
364+
} else {
365+
gArgs := getArgs(keyB)
366+
conflictArgs = &gArgs
367+
}
368+
manual.Advance(100)
369+
pErrC := make(chan *kvpb.Error, 1)
370+
require.NoError(t, stopper.RunAsyncTask(ctx, "conflict", func(ctx context.Context) {
371+
_, pErr := kv.SendWrapped(ctx, store.TestSender(), conflictArgs)
372+
pErrC <- pErr
373+
}))
374+
375+
// Wait for the conflict to push and be queued in the txn wait queue.
376+
testutils.SucceedsSoon(t, func() error {
377+
select {
378+
case pErr := <-pErrC:
379+
t.Fatalf("conflicting operation unexpectedly completed: pErr=%s", pErr)
380+
default:
381+
}
382+
if v := store.txnWaitMetrics.PusherWaiting.Value(); v != 1 {
383+
return errors.Errorf("expected 1 pusher waiting, found %d", v)
384+
}
385+
return nil
386+
})
387+
388+
// Finalize the STAGING txn, either by committing it or by aborting it.
389+
et2, et2H := endTxnArgs(txn, pusheeCommits)
390+
etReply, pErr = kv.SendWrappedWith(ctx, store.TestSender(), et2H, &et2)
391+
require.Nil(t, pErr, "error: %s", pErr)
392+
expStatus := roachpb.COMMITTED
393+
if !pusheeCommits {
394+
expStatus = roachpb.ABORTED
395+
}
396+
require.Equal(t, expStatus, etReply.Header().Txn.Status)
397+
398+
// This will unblock the conflicting operation, which should succeed.
399+
pErr = <-pErrC
400+
require.Nil(t, pErr, "error: %s", pErr)
401+
}
402+
403+
for _, pusheeIsoLevel := range isolation.Levels() {
404+
t.Run("pushee_iso_level="+pusheeIsoLevel.String(), func(t *testing.T) {
405+
testutils.RunTrueAndFalse(t, "pushee_commits", func(t *testing.T, pusheeCommits bool) {
406+
testutils.RunTrueAndFalse(t, "pusher_writing", func(t *testing.T, pusherWriting bool) {
407+
run(t, pusheeIsoLevel, pusheeCommits, pusherWriting)
408+
})
409+
})
410+
})
411+
}
412+
}
413+
291414
// TestTxnClearRangeIntents tests whether a ClearRange call blindly removes
292415
// write intents. This can cause it to remove an intent from an implicitly
293416
// committed STAGING txn. When txn recovery kicks in, it will fail to find the

pkg/kv/kvserver/txnwait/queue.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,23 +70,26 @@ func TestingOverrideTxnLivenessThreshold(t time.Duration) func() {
7070
// proceed without queueing. This is true for pushes which are neither
7171
// ABORT nor TIMESTAMP, but also for ABORT and TIMESTAMP pushes where
7272
// the pushee has min priority or pusher has max priority.
73-
func ShouldPushImmediately(req *kvpb.PushTxnRequest) bool {
73+
func ShouldPushImmediately(req *kvpb.PushTxnRequest, pusheeStatus roachpb.TransactionStatus) bool {
7474
if req.Force {
7575
return true
7676
}
7777
return CanPushWithPriority(
7878
req.PushType,
7979
req.PusherTxn.IsoLevel, req.PusheeTxn.IsoLevel,
8080
req.PusherTxn.Priority, req.PusheeTxn.Priority,
81+
pusheeStatus,
8182
)
8283
}
8384

8485
// CanPushWithPriority returns true if the pusher can perform the specified push
85-
// type on the pushee, based on the two txns' isolation levels and priorities.
86+
// type on the pushee, based on the two txns' isolation levels, their priorities,
87+
// and the pushee's status.
8688
func CanPushWithPriority(
8789
pushType kvpb.PushTxnType,
8890
pusherIso, pusheeIso isolation.Level,
8991
pusherPri, pusheePri enginepb.TxnPriority,
92+
pusheeStatus roachpb.TransactionStatus,
9093
) bool {
9194
// Normalize the transaction priorities so that normal user priorities are
9295
// considered equal for the purposes of pushing.
@@ -103,6 +106,15 @@ func CanPushWithPriority(
103106
case kvpb.PUSH_ABORT:
104107
return pusherPri > pusheePri
105108
case kvpb.PUSH_TIMESTAMP:
109+
// If the pushee transaction is STAGING, only let the PUSH_TIMESTAMP through
110+
// to disrupt the transaction commit if the pusher has a higher priority. If
111+
// the priorities are equal, the PUSH_TIMESTAMP should wait for the commit
112+
// to complete.
113+
if pusheeStatus == roachpb.STAGING {
114+
return pusherPri > pusheePri
115+
}
116+
// Otherwise, the pushee has not yet started committing...
117+
106118
// If the pushee transaction tolerates write skew, the PUSH_TIMESTAMP is
107119
// harmless, so let it through.
108120
return pusheeIso.ToleratesWriteSkew() ||
@@ -476,10 +488,6 @@ func (q *Queue) releaseWaitingQueriesLocked(ctx context.Context, txnID uuid.UUID
476488
func (q *Queue) MaybeWaitForPush(
477489
ctx context.Context, req *kvpb.PushTxnRequest,
478490
) (*kvpb.PushTxnResponse, *kvpb.Error) {
479-
if ShouldPushImmediately(req) {
480-
return nil, nil
481-
}
482-
483491
q.mu.Lock()
484492
// If the txn wait queue is not enabled or if the request is not
485493
// contained within the replica, do nothing. The request can fall
@@ -501,6 +509,9 @@ func (q *Queue) MaybeWaitForPush(
501509
if txn := pending.getTxn(); isPushed(req, txn) {
502510
q.mu.Unlock()
503511
return createPushTxnResponse(txn), nil
512+
} else if ShouldPushImmediately(req, txn.Status) {
513+
q.mu.Unlock()
514+
return nil, nil
504515
}
505516

506517
push := &waitingPush{

0 commit comments

Comments
 (0)