Skip to content

Commit f69b856

Browse files
craig[bot]stevendanna
andcommitted
Merge #156722
156722: batcheval: return unmodified staging txn in IndeterminateCommitError r=arulajmani a=stevendanna When a EndTxn(abort) enouncouters a transaction record in STAGING, it returns an IndeterminateCommitError so that the transaction recovery process can be started. When constructing this error, we previously returned a copy of the staging transaction that had its write timestamp forwarded the txn found on the request header. According to this comment, this is rather important. // We're using existingTxn on the reply, although it can be stale // compared to the Transaction in the request (e.g. the Sequence, // and various timestamps). We must be careful to update it with the // supplied ba.Txn if we return it with an error which might be // retried, as for example to avoid client-side serializable restart. However, a side-effect is that the txnrecovery performed in response to an IndeterminateCommitError could erroneously find that the the transaction was commited because it was checking for the implicit commit criteria using a higher timestamp. The actual recovery of the transaction would then hit the following assertion: programming error: timestamp change by implicitly committed transaction Here, we solve this by returning the unmodified txn in an IndeterminateCommitError. While the comment warns us against this, I think in the case of here, this transaction has nothing (other than the EndTxn) to retry since we are rolling back. Further, the error is handled server-side immediately and then the user receives either the error from the recovery process or the error from the retried request. Fixes #156698 Release note (bug fix): Fix a bug that could result in a transaction encountering the following assertion failure: failed indeterminate commit recovery: programming error: timestamp change by implicitly committed transaction Co-authored-by: Steven Danna <[email protected]>
2 parents ad4a910 + 3d1337f commit f69b856

File tree

3 files changed

+185
-7
lines changed

3 files changed

+185
-7
lines changed

pkg/kv/kvclient/kvcoord/dist_sender_server_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/cockroachdb/cockroach/pkg/util/log"
4747
"github.com/cockroachdb/cockroach/pkg/util/metric"
4848
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
49+
"github.com/cockroachdb/cockroach/pkg/util/tracing"
4950
"github.com/cockroachdb/errors"
5051
"github.com/stretchr/testify/require"
5152
)
@@ -4616,3 +4617,143 @@ func TestMaxSpanRequestKeysWithMixedReadWriteBatches(t *testing.T) {
46164617
})
46174618
require.NoError(t, err)
46184619
}
4620+
4621+
// TestRollbackAfterRefreshAndFailedCommit is a regression test for #156698.
4622+
//
4623+
// This test forces the self-recovery of a transaction after a failed parallel
4624+
// commit. It doe this via the following.
4625+
//
4626+
// Setup:
4627+
//
4628+
// Create a split at key=8 to force batch splitting.
4629+
// Write to key=8 to be able to observe if the subsequent delete was effective.
4630+
//
4631+
// Txn 1:
4632+
//
4633+
// Read key=1 to set timestamp and prevent server side refresh.
4634+
//
4635+
// Txn 2:
4636+
//
4637+
// Read key=8 to bump timestamp cache on key=8.
4638+
//
4639+
// Txn1:
4640+
//
4641+
// Batch{
4642+
// Del key=1
4643+
// Del key=8
4644+
// EndTxn
4645+
// }
4646+
//
4647+
// This batch fails its parallel commit because Del(key=8) has its timestamp
4648+
// pushed so its write doesn't satisfy the STAGING record.
4649+
//
4650+
// After a successful refresh, our testing filters have arranged for the
4651+
// second EndTxn to fail. This then results in a Rollback().
4652+
//
4653+
// In the presence of the bug, the Rollback() would also fail. Without the bug,
4654+
// it succeeds. In either case, Txn1 fails so we observe the rollback failure
4655+
// via the trace.
4656+
func TestRollbackAfterRefreshAndFailedCommit(t *testing.T) {
4657+
defer leaktest.AfterTest(t)()
4658+
defer log.Scope(t).Close(t)
4659+
4660+
var (
4661+
targetTxnIDString atomic.Value
4662+
firstEndTxnSeen atomic.Bool
4663+
cmdID atomic.Value
4664+
)
4665+
cmdID.Store(kvserverbase.CmdIDKey(""))
4666+
targetTxnIDString.Store("")
4667+
ctx := context.Background()
4668+
st := cluster.MakeClusterSettings()
4669+
kvcoord.RandomizedTxnAnchorKeyEnabled.Override(ctx, &st.SV, false)
4670+
4671+
s, _, db := serverutils.StartServer(t, base.TestServerArgs{
4672+
Settings: st,
4673+
Knobs: base.TestingKnobs{
4674+
Store: &kvserver.StoreTestingKnobs{
4675+
TestingProposalFilter: func(fArgs kvserverbase.ProposalFilterArgs) *kvpb.Error {
4676+
if fArgs.Req.Header.Txn == nil ||
4677+
fArgs.Req.Header.Txn.ID.String() != targetTxnIDString.Load().(string) {
4678+
return nil // not our txn
4679+
}
4680+
if cmdID.Load().(kvserverbase.CmdIDKey) != "" {
4681+
// We already have a command to fail.
4682+
return nil
4683+
}
4684+
4685+
t.Logf("proposal from our txn: %s", fArgs.Req)
4686+
endTxnReq := fArgs.Req.Requests[len(fArgs.Req.Requests)-1].GetEndTxn()
4687+
if endTxnReq != nil {
4688+
if !firstEndTxnSeen.Load() {
4689+
firstEndTxnSeen.Store(true)
4690+
} else {
4691+
epoch := fArgs.Req.Header.Txn.Epoch
4692+
t.Logf("will fail application for txn %s@epoch=%d; req: %+v; raft cmdID: %s",
4693+
fArgs.Req.Header.Txn.ID.String(), epoch, endTxnReq, fArgs.CmdID)
4694+
cmdID.Store(fArgs.CmdID)
4695+
}
4696+
}
4697+
return nil
4698+
},
4699+
TestingApplyCalledTwiceFilter: func(fArgs kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) {
4700+
if fArgs.CmdID == cmdID.Load().(kvserverbase.CmdIDKey) {
4701+
t.Logf("failing application for raft cmdID: %s", fArgs.CmdID)
4702+
return 0, kvpb.NewErrorf("test injected error")
4703+
}
4704+
return 0, nil
4705+
},
4706+
},
4707+
},
4708+
})
4709+
defer s.Stopper().Stop(ctx)
4710+
4711+
scratchStart, err := s.ScratchRange()
4712+
require.NoError(t, err)
4713+
4714+
scratchKey := func(idx int) roachpb.Key {
4715+
key := scratchStart.Clone()
4716+
key = append(key, []byte(fmt.Sprintf("key-%03d", idx))...)
4717+
return key
4718+
}
4719+
4720+
_, _, err = s.SplitRange(scratchKey(6))
4721+
require.NoError(t, err)
4722+
require.NoError(t, db.Put(ctx, scratchKey(8), "hello"))
4723+
4724+
tracer := s.TracerI().(*tracing.Tracer)
4725+
txn1Ctx, collectAndFinish := tracing.ContextWithRecordingSpan(context.Background(), tracer, "txn1")
4726+
4727+
txn1Err := db.Txn(txn1Ctx, func(txn1Ctx context.Context, txn1 *kv.Txn) error {
4728+
txn1.SetDebugName("txn1")
4729+
targetTxnIDString.Store(txn1.ID().String())
4730+
_ = txn1.ConfigureStepping(txn1Ctx, kv.SteppingEnabled)
4731+
if _, err = txn1.Get(txn1Ctx, scratchKey(1)); err != nil {
4732+
return err
4733+
}
4734+
4735+
// Txn2 now executes, bumping the timestamp on key8.
4736+
if txn1.Epoch() == 0 {
4737+
txn2Ctx := context.Background()
4738+
txn2 := db.NewTxn(txn2Ctx, "txn2")
4739+
require.NoError(t, err)
4740+
_, err = txn2.Get(txn2Ctx, scratchKey(8))
4741+
require.NoError(t, err)
4742+
}
4743+
4744+
b := txn1.NewBatch()
4745+
b.Del(scratchKey(1))
4746+
b.Del(scratchKey(8)) // Won't satisfy implicit commit.
4747+
return txn1.CommitInBatch(txn1Ctx, b)
4748+
})
4749+
4750+
require.Error(t, txn1Err)
4751+
val8, err := db.Get(ctx, scratchKey(8))
4752+
require.NoError(t, err)
4753+
require.True(t, val8.Exists())
4754+
// The actual error returned to the user is the injected error. However, we
4755+
// want to verify that the transaction's self-recovery did not encounter a
4756+
// programming error. So, we check the trace for it.
4757+
recording := collectAndFinish()
4758+
require.NotContains(t, recording.String(), "failed indeterminate commit recovery: programming error")
4759+
}

pkg/kv/kvserver/batcheval/cmd_end_transaction.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ func EndTxn(
305305
// and various timestamps). We must be careful to update it with the
306306
// supplied ba.Txn if we return it with an error which might be
307307
// retried, as for example to avoid client-side serializable restart.
308-
reply.Txn = &existingTxn
308+
reply.Txn = existingTxn.Clone()
309309

310310
// Verify that we can either commit it or abort it (according
311311
// to args.Commit), and also that the Timestamp and Epoch have
@@ -498,7 +498,19 @@ func EndTxn(
498498
// committed. Doing so is only possible if we can guarantee that under no
499499
// circumstances can an implicitly committed transaction be rolled back.
500500
if reply.Txn.Status == roachpb.STAGING {
501-
err := kvpb.NewIndeterminateCommitError(*reply.Txn)
501+
// Note that reply.Txn has been updated with the Txn from the request
502+
// header. But, the transaction might have been pushed since it was
503+
// written. In fact, the transaction from the request header might
504+
// actually be in a state that _would have_ been implicitly committed IF
505+
// it had been able to write a transaction record with this new state. We
506+
// use the transaction record from disk to avoid erroneously attempting to
507+
// commit this transaction during recovery. Attempting to commit the
508+
// transaction based on the pushed timestamp would result in an assertion
509+
// failure.
510+
if !recordAlreadyExisted {
511+
return result.Result{}, errors.AssertionFailedf("programming error: transaction in STAGING without transaction record")
512+
}
513+
err := kvpb.NewIndeterminateCommitError(existingTxn)
502514
log.VEventf(ctx, 1, "%v", err)
503515
return result.Result{}, err
504516
}

pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/cockroachdb/cockroach/pkg/util/log"
3131
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3232
"github.com/cockroachdb/cockroach/pkg/util/uuid"
33+
"github.com/cockroachdb/errors"
3334
"github.com/stretchr/testify/require"
3435
)
3536

@@ -185,8 +186,9 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) {
185186
inFlightWrites []roachpb.SequencedWrite
186187
deadline hlc.Timestamp
187188
// Expected result.
188-
expError string
189-
expTxn *roachpb.TransactionRecord
189+
expError string
190+
expTxn *roachpb.TransactionRecord
191+
validateError func(t *testing.T, err error)
190192
}{
191193
{
192194
// Standard case where a transaction is rolled back when
@@ -1049,6 +1051,28 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) {
10491051
return &record
10501052
}(),
10511053
},
1054+
{
1055+
// Non-standard case where the transaction is being rolled back after a
1056+
// successful refresh. In this case we want to be sure that the staging
1057+
// record is returned so that we don't attempt transaction recovery using
1058+
// the refreshed transaction.
1059+
name: "record staging, rollback after refresh.",
1060+
// Replica state.
1061+
existingTxn: stagingRecord,
1062+
// Request state.
1063+
headerTxn: refreshedHeaderTxn,
1064+
commit: false,
1065+
// Expected result.
1066+
expError: "found txn in indeterminate STAGING state",
1067+
expTxn: stagingRecord,
1068+
validateError: func(t *testing.T, err error) {
1069+
var icErr *kvpb.IndeterminateCommitError
1070+
errors.As(err, &icErr)
1071+
require.NotNil(t, icErr)
1072+
require.Equal(t, stagingRecord.WriteTimestamp, icErr.StagingTxn.WriteTimestamp)
1073+
},
1074+
},
1075+
10521076
{
10531077
// Non-standard case where a transaction record is re-staged during
10541078
// a parallel commit. The record already exists because of a failed
@@ -1597,10 +1621,11 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) {
15971621
if !testutils.IsError(err, regexp.QuoteMeta(c.expError)) {
15981622
t.Fatalf("expected error %q; found %v", c.expError, err)
15991623
}
1600-
} else {
1601-
if err != nil {
1602-
t.Fatal(err)
1624+
if c.validateError != nil {
1625+
c.validateError(t, err)
16031626
}
1627+
} else {
1628+
require.NoError(t, err)
16041629

16051630
// Assert that the txn record is written as expected.
16061631
var resTxnRecord roachpb.TransactionRecord

0 commit comments

Comments
 (0)