Skip to content

Commit 6146edc

Browse files
committed
kv: add unit test to reproduce cockroachdb#105330
Informs cockroachdb#105330. This commit adds a new unit test named `TestTxnRecoveryFromStagingWithoutHighPriority`, which reproduces the error described in cockroachdb#105330. Release note: None
1 parent f6c4d9d commit 6146edc

File tree

1 file changed

+137
-0
lines changed

1 file changed

+137
-0
lines changed

pkg/kv/kvserver/txn_recovery_integration_test.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ import (
1818

1919
"github.com/cockroachdb/cockroach/pkg/kv"
2020
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
21+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
2122
"github.com/cockroachdb/cockroach/pkg/roachpb"
2223
"github.com/cockroachdb/cockroach/pkg/testutils"
2324
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2425
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2526
"github.com/cockroachdb/cockroach/pkg/util/log"
2627
"github.com/cockroachdb/cockroach/pkg/util/stop"
2728
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
29+
"github.com/cockroachdb/errors"
2830
"github.com/stretchr/testify/require"
2931
)
3032

@@ -288,6 +290,141 @@ func TestTxnRecoveryFromStagingWithHighPriority(t *testing.T) {
288290
})
289291
}
290292

293+
// TestTxnRecoveryFromStagingWithoutHighPriority tests that the transaction
294+
// recovery process is NOT initiated by a normal-priority operation which
295+
// encounters a staging transaction. Instead, the normal-priority operation
296+
// waits for the committing transaction to complete. The test contains a subtest
297+
// for each of the combinations of the following options:
298+
//
299+
// - pusheeIsoLevel: configures the isolation level of the pushee (committing)
300+
// transaction. Isolation levels affect the behavior of pushes of pending
301+
// transactions, but not of staging transactions.
302+
//
303+
// - pusheeCommits: configures whether or not the staging transaction is
304+
// implicitly and, eventually, explicitly committed or not.
305+
//
306+
// - pusherWriting: configures whether or not the conflicting operation is a
307+
// read (false) or a write (true), which dictates the kind of push operation
308+
// dispatched against the staging transaction.
309+
func TestTxnRecoveryFromStagingWithoutHighPriority(t *testing.T) {
310+
defer leaktest.AfterTest(t)()
311+
defer log.Scope(t).Close(t)
312+
ctx := context.Background()
313+
314+
run := func(t *testing.T, pusheeIsoLevel isolation.Level, pusheeCommits, pusherWriting bool) {
315+
stopper := stop.NewStopper()
316+
defer stopper.Stop(ctx)
317+
manual := timeutil.NewManualTime(timeutil.Unix(0, 123))
318+
cfg := TestStoreConfig(hlc.NewClockForTesting(manual))
319+
store := createTestStoreWithConfig(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg)
320+
321+
// Create a transaction that will get stuck performing a parallel
322+
// commit.
323+
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
324+
txn := newTransaction("txn", keyA, 1, store.Clock())
325+
txn.IsoLevel = pusheeIsoLevel
326+
327+
// Issue two writes, which will be considered in-flight at the time of
328+
// the transaction's EndTxn request.
329+
keyAVal := []byte("value")
330+
pArgs := putArgs(keyA, keyAVal)
331+
pArgs.Sequence = 1
332+
h := kvpb.Header{Txn: txn}
333+
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), h, &pArgs)
334+
require.Nil(t, pErr, "error: %s", pErr)
335+
336+
pArgs = putArgs(keyB, []byte("value2"))
337+
pArgs.Sequence = 2
338+
h2 := kvpb.Header{Txn: txn.Clone()}
339+
if !pusheeCommits {
340+
// If we're not going to have the pushee commit, make sure it never enters
341+
// the implicit commit state by bumping the timestamp of one of its writes.
342+
manual.Advance(100)
343+
h2.Txn.WriteTimestamp = store.Clock().Now()
344+
}
345+
_, pErr = kv.SendWrappedWith(ctx, store.TestSender(), h2, &pArgs)
346+
require.Nil(t, pErr, "error: %s", pErr)
347+
348+
// Issue a parallel commit, which will put the transaction into a
349+
// STAGING state. Include both writes as the EndTxn's in-flight writes.
350+
et, etH := endTxnArgs(txn, true)
351+
et.InFlightWrites = []roachpb.SequencedWrite{
352+
{Key: keyA, Sequence: 1},
353+
{Key: keyB, Sequence: 2},
354+
}
355+
etReply, pErr := kv.SendWrappedWith(ctx, store.TestSender(), etH, &et)
356+
require.Nil(t, pErr, "error: %s", pErr)
357+
require.Equal(t, roachpb.STAGING, etReply.Header().Txn.Status)
358+
359+
// Issue a conflicting, normal-priority operation.
360+
var conflictArgs kvpb.Request
361+
if pusherWriting {
362+
pArgs = putArgs(keyB, []byte("value3"))
363+
conflictArgs = &pArgs
364+
} else {
365+
gArgs := getArgs(keyB)
366+
conflictArgs = &gArgs
367+
}
368+
manual.Advance(100)
369+
pErrC := make(chan *kvpb.Error, 1)
370+
require.NoError(t, stopper.RunAsyncTask(ctx, "conflict", func(ctx context.Context) {
371+
_, pErr := kv.SendWrapped(ctx, store.TestSender(), conflictArgs)
372+
pErrC <- pErr
373+
}))
374+
375+
// If the pushee is not serializable and the pusher is reading, we
376+
// currently expect the conflicting operation to immediately succeed,
377+
// sometimes with an error, sometimes not. See #105330.
378+
if pusheeIsoLevel.ToleratesWriteSkew() && !pusherWriting {
379+
pErr = <-pErrC
380+
if pusheeCommits {
381+
require.Nil(t, pErr, "error: %s", pErr)
382+
} else {
383+
require.NotNil(t, pErr)
384+
require.Regexp(t, "failed to push", pErr)
385+
}
386+
return
387+
}
388+
389+
// Wait for the conflict to push and be queued in the txn wait queue.
390+
testutils.SucceedsSoon(t, func() error {
391+
select {
392+
case pErr := <-pErrC:
393+
t.Fatalf("conflicting operation unexpectedly completed: pErr=%s", pErr)
394+
default:
395+
}
396+
if v := store.txnWaitMetrics.PusherWaiting.Value(); v != 1 {
397+
return errors.Errorf("expected 1 pusher waiting, found %d", v)
398+
}
399+
return nil
400+
})
401+
402+
// Finalize the STAGING txn, either by committing it or by aborting it.
403+
et2, et2H := endTxnArgs(txn, pusheeCommits)
404+
etReply, pErr = kv.SendWrappedWith(ctx, store.TestSender(), et2H, &et2)
405+
require.Nil(t, pErr, "error: %s", pErr)
406+
expStatus := roachpb.COMMITTED
407+
if !pusheeCommits {
408+
expStatus = roachpb.ABORTED
409+
}
410+
require.Equal(t, expStatus, etReply.Header().Txn.Status)
411+
412+
// This will unblock the conflicting operation, which should succeed.
413+
pErr = <-pErrC
414+
require.Nil(t, pErr, "error: %s", pErr)
415+
}
416+
417+
for _, pusheeIsoLevel := range isolation.Levels() {
418+
t.Run("pushee_iso_level="+pusheeIsoLevel.String(), func(t *testing.T) {
419+
testutils.RunTrueAndFalse(t, "pushee_commits", func(t *testing.T, pusheeCommits bool) {
420+
testutils.RunTrueAndFalse(t, "pusher_writing", func(t *testing.T, pusherWriting bool) {
421+
run(t, pusheeIsoLevel, pusheeCommits, pusherWriting)
422+
})
423+
})
424+
})
425+
}
426+
}
427+
291428
// TestTxnClearRangeIntents tests whether a ClearRange call blindly removes
292429
// write intents. This can cause it to remove an intent from an implicitly
293430
// committed STAGING txn. When txn recovery kicks in, it will fail to find the

0 commit comments

Comments
 (0)