Skip to content

Commit a97e76d

Browse files
craig[bot]stevendanna
andcommitted
Merge #152010
152010: kvcoord: prevent unexpected parallel commit of weak isolation transactions r=miraradeva a=stevendanna The span refresher may retry a batch with an EndTxn request. When retrying such a request, it splits the EndTxn out into its own batch to avoid the transaction from becoming implicitly committed in the middle of the retry. For serializable transactions this is sufficient because serializable transactions are not allowed to write a transaction record if WriteTimestamp != ReadTimestamp. All of the errors we retry in the span refresher imply that the write timestamp is moving forward, and thus any old staging record is at a timestamp that is less than the timestamp that we will be writing at during the retry. For weak isolation transactions, however, this is not sufficient. Consider a weak isolation transaction issuing the following batch, with ReadTimestamp == WriteTimestamp == t1. Put(a) Put(b) EndTxn Assume the Puts and EndTxn go to different ranges in parallel. The following can happen: Put(a) -> Encounters WriteTooOld that requires a refresh to t2 Put(b) -> Writes intent@t1 EndTxn() -> WriteTimetamp pushed via timestamp cache to t2, writes staging record@t2 In a SSI transaction the EndTxn would fail because WriteTimestamp != ReadTimestamp, but in an weak isolation transaction we write a staging record at t2. If we successfully refresh to t2 and start our retry, then that existing STAGING transaction record meets the implicit commit criteria as soon as Put(a) succeeds on retry. We would like to avoid this because it can result in a number of different errors. Here, we avoid this hazard by refreshing weak isolation transactions that are in STAGING to a timestamp just past the largest timestamp at which the staging record could have been written. As a result, any subsequent writes do not satisfy the implicit commit criteria of the existing record. This required a small change to the transaction record returned from EndTxn(abort) to avoid situations where we mistake this future transaction record for the existing transaction record. Fixes #156698 Fixes #154510 Release note (bug fix): Fix a bug in which a Read Committed or Snapshot isolation transaction may be committed despite returning a non-ambiguous error. Co-authored-by: Steven Danna <[email protected]>
2 parents 6816c43 + f28a1db commit a97e76d

File tree

2 files changed

+188
-6
lines changed

2 files changed

+188
-6
lines changed

pkg/kv/kvclient/kvcoord/dist_sender_server_test.go

Lines changed: 169 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4410,6 +4410,7 @@ func TestUnexpectedCommitOnTxnRecovery(t *testing.T) {
44104410
targetTxnIDString.Store("")
44114411
ctx := context.Background()
44124412
st := cluster.MakeTestingClusterSettings()
4413+
44134414
// This test relies on unreplicated locks to be replicated on lease transfers.
44144415
concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(ctx, &st.SV, true)
44154416
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
@@ -4625,7 +4626,7 @@ func TestMaxSpanRequestKeysWithMixedReadWriteBatches(t *testing.T) {
46254626
//
46264627
// Setup:
46274628
//
4628-
// Create a split at key=8 to force batch splitting.
4629+
// Create a split at key=6 to force batch splitting.
46294630
// Write to key=8 to be able to observe if the subsequent delete was effective.
46304631
//
46314632
// Txn 1:
@@ -4757,3 +4758,170 @@ func TestRollbackAfterRefreshAndFailedCommit(t *testing.T) {
47574758
recording := collectAndFinish()
47584759
require.NotContains(t, recording.String(), "failed indeterminate commit recovery: programming error")
47594760
}
4761+
4762+
// TestUnexpectedCommitOnTxnAbortAfterRefresh is a regression test for #151864.
4763+
// It is similar to TestRollbackAfterRefreshAndFailedCommit but tests a much
4764+
// worse outcome that can be experienced by weak-isolation transactions.
4765+
//
4766+
// This test issues requests across two transactions. In the presence of the
4767+
// bug, Txn 1 ends being committed even though an error is returned to the
4768+
// user.
4769+
//
4770+
// Setup:
4771+
//
4772+
// Create a split at key=6 to force batch splitting.
4773+
//
4774+
// Txn 1:
4775+
//
4776+
// Read key=1 to set timestamp and prevent server side refresh
4777+
//
4778+
// Txn 2:
4779+
//
4780+
// Read key=1 to bump timestamp cache on key 1
4781+
// Write key=8
4782+
//
4783+
// Txn1:
4784+
//
4785+
// Batch{
4786+
// Del key=1
4787+
// Del key=2
4788+
// Del key=8
4789+
// Del key=9
4790+
// EndTxn
4791+
// }
4792+
//
4793+
// This batch should encounter a write too old error on key=8.
4794+
//
4795+
// After a successful refresh, our testing filters have arranged for the second
4796+
// EndTxn to fail. In the absence of the bug, we expect for this EndTxn failure
4797+
// to result in the transaction never committing and an error being returned to
4798+
// the client. When the bug existed, transaction recovery (either initiated by
4799+
// another transaction or initiated by Txn 1 itself during a rollback issued
4800+
// after the injected error) could result in the transaction being erroneously
4801+
// committed despite the injected error being returned to the client.
4802+
func TestUnexpectedCommitOnTxnAbortAfterRefresh(t *testing.T) {
4803+
defer leaktest.AfterTest(t)()
4804+
defer log.Scope(t).Close(t)
4805+
4806+
var (
4807+
targetTxnIDString atomic.Value
4808+
firstEndTxnSeen atomic.Bool
4809+
cmdID atomic.Value
4810+
)
4811+
cmdID.Store(kvserverbase.CmdIDKey(""))
4812+
targetTxnIDString.Store("")
4813+
ctx := context.Background()
4814+
st := cluster.MakeClusterSettings()
4815+
kvcoord.RandomizedTxnAnchorKeyEnabled.Override(ctx, &st.SV, false)
4816+
4817+
s, _, db := serverutils.StartServer(t, base.TestServerArgs{
4818+
Settings: st,
4819+
Knobs: base.TestingKnobs{
4820+
Store: &kvserver.StoreTestingKnobs{
4821+
TestingProposalFilter: func(fArgs kvserverbase.ProposalFilterArgs) *kvpb.Error {
4822+
if fArgs.Req.Header.Txn == nil ||
4823+
fArgs.Req.Header.Txn.ID.String() != targetTxnIDString.Load().(string) {
4824+
return nil // not our txn
4825+
}
4826+
4827+
t.Logf("proposal from our txn: %s", fArgs.Req)
4828+
endTxnReq := fArgs.Req.Requests[len(fArgs.Req.Requests)-1].GetEndTxn()
4829+
if endTxnReq != nil {
4830+
if !firstEndTxnSeen.Load() {
4831+
firstEndTxnSeen.Store(true)
4832+
} else {
4833+
epoch := fArgs.Req.Header.Txn.Epoch
4834+
t.Logf("will fail application for txn %s@epoch=%d; req: %+v; raft cmdID: %s",
4835+
fArgs.Req.Header.Txn.ID.String(), epoch, endTxnReq, fArgs.CmdID)
4836+
cmdID.Store(fArgs.CmdID)
4837+
}
4838+
}
4839+
return nil
4840+
},
4841+
TestingApplyCalledTwiceFilter: func(fArgs kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) {
4842+
if fArgs.CmdID == cmdID.Load().(kvserverbase.CmdIDKey) {
4843+
t.Logf("failing application for raft cmdID: %s", fArgs.CmdID)
4844+
return 0, kvpb.NewErrorf("test injected error")
4845+
}
4846+
return 0, nil
4847+
},
4848+
},
4849+
},
4850+
})
4851+
defer s.Stopper().Stop(ctx)
4852+
4853+
scratchStart, err := s.ScratchRange()
4854+
require.NoError(t, err)
4855+
4856+
scratchKey := func(idx int) roachpb.Key {
4857+
key := scratchStart.Clone()
4858+
key = append(key, []byte(fmt.Sprintf("key-%03d", idx))...)
4859+
return key
4860+
}
4861+
4862+
_, _, err = s.SplitRange(scratchKey(6))
4863+
require.NoError(t, err)
4864+
4865+
tracer := s.TracerI().(*tracing.Tracer)
4866+
txn1Ctx, collectAndFinish := tracing.ContextWithRecordingSpan(context.Background(), tracer, "txn1")
4867+
4868+
txn1Err := db.Txn(txn1Ctx, func(txn1Ctx context.Context, txn1 *kv.Txn) error {
4869+
txn1.SetDebugName("txn1")
4870+
targetTxnIDString.Store(txn1.ID().String())
4871+
// Txn1 must be at either SNAPSHOT or READ COMMITTED with Stepping enabled
4872+
// for this bug because it both needs to be in an isolation level that
4873+
// allows committing when wrt != rts and the read needs to produce a read
4874+
// span that requires a refresh such that we can't do a server-side refresh.
4875+
_ = txn1.ConfigureStepping(txn1Ctx, kv.SteppingEnabled)
4876+
if err := txn1.SetIsoLevel(isolation.ReadCommitted); err != nil {
4877+
return err
4878+
}
4879+
if _, err = txn1.Get(txn1Ctx, scratchKey(1)); err != nil {
4880+
return err
4881+
}
4882+
4883+
// Txn2 now executes, arranging for the WriteTooOld error and the timestamp
4884+
// cache bump on the txn's anchor key.
4885+
if txn1.Epoch() == 0 {
4886+
txn2Ctx := context.Background()
4887+
txn2 := db.NewTxn(txn2Ctx, "txn2")
4888+
require.NoError(t, err)
4889+
_, err = txn2.Get(txn2Ctx, scratchKey(1))
4890+
require.NoError(t, err)
4891+
4892+
err = txn2.Put(txn2Ctx, scratchKey(8), "hello")
4893+
require.NoError(t, err)
4894+
err = txn2.Commit(txn2Ctx)
4895+
require.NoError(t, err)
4896+
}
4897+
4898+
b := txn1.NewBatch()
4899+
b.Del(scratchKey(1))
4900+
b.Del(scratchKey(2))
4901+
b.Del(scratchKey(8))
4902+
b.Del(scratchKey(9))
4903+
return txn1.CommitInBatch(txn1Ctx, b)
4904+
})
4905+
val8, err := db.Get(ctx, scratchKey(8))
4906+
require.NoError(t, err)
4907+
4908+
defer func() {
4909+
recording := collectAndFinish()
4910+
if t.Failed() {
4911+
t.Logf("TXN 1 TRACE: %s", recording)
4912+
}
4913+
}()
4914+
4915+
if val8.Exists() {
4916+
// If val8 _exists_ then it means our transaction did not commit. So really
4917+
// any error is correct.
4918+
require.Error(t, txn1Err)
4919+
} else {
4920+
// If val8 doesn't exist, then txn1 was committed so we shouldn't have
4921+
// gotten an error or se should have gotten an ambiguous result error.
4922+
if txn1Err != nil {
4923+
ambigErr := &kvpb.AmbiguousResultError{}
4924+
require.ErrorAs(t, txn1Err, &ambigErr, "transaction committed but non-ambiguous error returned")
4925+
}
4926+
}
4927+
}

pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -259,16 +259,31 @@ func (sr *txnSpanRefresher) maybeRefreshAndRetrySend(
259259
if !ok {
260260
return nil, pErr
261261
}
262+
263+
// If we are in a weak isolation transaction and we had an EndTxn in our
264+
// batch, then this retry faces another hazard: The STAGING record written in
265+
// the first attempt may possibly be satisfied by our future writes.
266+
//
267+
// TODO(ssd): If we were guaranteed to have a BatchResponse if a request was
268+
// evaluated, then we could narrow this further and only bump the refresh
269+
// timestamp if the StagingTimestamp == RefreshTS.
270+
endTxnArg, hasET := ba.GetArg(kvpb.EndTxn)
271+
bumpedRefreshTimestampRequired := hasET && txn.Status == roachpb.STAGING && txn.IsoLevel.ToleratesWriteSkew()
272+
if bumpedRefreshTimestampRequired {
273+
refreshTS = refreshTS.Next()
274+
log.Eventf(ctx, "bumping refresh timestamp to avoid unexpected parallel commit: %s", refreshTS)
275+
}
276+
262277
refreshFrom := txn.ReadTimestamp
263278
refreshToTxn := txn.Clone()
264279
refreshToTxn.BumpReadTimestamp(refreshTS)
265280
switch refreshToTxn.Status {
266281
case roachpb.PENDING:
267282
case roachpb.STAGING:
268283
// If the batch resulted in an error but the EndTxn request succeeded,
269-
// staging the transaction record in the process, downgrade the status
270-
// back to PENDING. Even though the transaction record may have a status
271-
// of STAGING, we know that the transaction failed to implicitly commit.
284+
// staging the transaction record in the process, downgrade the status back
285+
// to PENDING. Even though the transaction record may have a status of
286+
// STAGING, we know that the transaction failed to implicitly commit.
272287
refreshToTxn.Status = roachpb.PENDING
273288
default:
274289
return nil, kvpb.NewError(errors.AssertionFailedf(
@@ -293,8 +308,7 @@ func (sr *txnSpanRefresher) maybeRefreshAndRetrySend(
293308

294309
// To prevent starvation of batches and to ensure the correctness of parallel
295310
// commits, split off the EndTxn request into its own batch on auto-retries.
296-
args, hasET := ba.GetArg(kvpb.EndTxn)
297-
if len(ba.Requests) > 1 && hasET && !args.(*kvpb.EndTxnRequest).Require1PC {
311+
if len(ba.Requests) > 1 && hasET && !endTxnArg.(*kvpb.EndTxnRequest).Require1PC {
298312
log.Eventf(ctx, "sending EndTxn separately from rest of batch on retry")
299313
return sr.splitEndTxnAndRetrySend(ctx, ba)
300314
}

0 commit comments

Comments
 (0)