Skip to content

Commit 1c347c0

Browse files
craig[bot]stevendanna
andcommitted
Merge #145532
145532: concurrency: mark some unreplicated locks as ineligible for export r=miraradeva a=stevendanna Once a lock has been reported as missing via QueryIntent, it is important that it stays missing. As described in #145458, if a lock that has been previously reported as missing is somehow then found via a subsequent QueryIntent request, this can result in a transaction being erroneously committed by the transaction recovery process. When unreplicated lock reliability is enabled, Lease and Merge use ExportUnreplicatedLocks to move unreplicated locks from the lock table into durable storage. However, these locks may include unreplicated locks that "cover" a replicated lock that was previously reported as missing, exactly what we must avoid. Here, we solve that by extending QueryIntent's evaluation to mark any locks it reports as missing as ineligible for export. Fixes #145458 Release note: None Co-authored-by: Steven Danna <[email protected]>
2 parents eae6695 + e390c70 commit 1c347c0

File tree

10 files changed

+331
-26
lines changed

10 files changed

+331
-26
lines changed

pkg/kv/kvclient/kvcoord/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ go_test(
188188
"//pkg/kv/kvpb/kvpbmock",
189189
"//pkg/kv/kvserver",
190190
"//pkg/kv/kvserver/closedts",
191+
"//pkg/kv/kvserver/concurrency",
191192
"//pkg/kv/kvserver/concurrency/isolation",
192193
"//pkg/kv/kvserver/concurrency/lock",
193194
"//pkg/kv/kvserver/kvserverbase",

pkg/kv/kvclient/kvcoord/dist_sender_server_test.go

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"reflect"
1414
"sort"
1515
"strings"
16+
"sync"
1617
"sync/atomic"
1718
"testing"
1819
"time"
@@ -25,7 +26,9 @@ import (
2526
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2627
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
2728
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
29+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
2830
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
31+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
2932
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
3033
"github.com/cockroachdb/cockroach/pkg/roachpb"
3134
"github.com/cockroachdb/cockroach/pkg/rpc"
@@ -4643,3 +4646,173 @@ func TestProxyTracing(t *testing.T) {
46434646
t.Logf("found trace event; msg=%s, tag=%s, loc=%s", msg, tag, loc)
46444647
})
46454648
}
4649+
4650+
// TestUnexpectedCommitOnTxnRecovery constructs a scenario where transaction
4651+
// recovery could incorrectly determine that a transaction is committed. The
4652+
// scenario is as follows:
4653+
//
4654+
// Txn1:
4655+
// - Writes to keyA.
4656+
// - Acquires an unreplicated exclusive lock on keyB.
4657+
// - Acquires a replicated shared lock on keyB. This lock is pipelined, and
4658+
// replication for it fails.
4659+
// - Attempts to commit, but fails because of the lost replicated Shared lock.
4660+
//
4661+
// Lease is then transferred to n3. This causes the unreplicated exclusive lock
4662+
// on keyB to be replicated.
4663+
//
4664+
// Txn2:
4665+
// - Attempts to read keyA, which kicks off transaction recovery for Txn1.
4666+
// - Txn2 (incorrectly) concludes that Txn1 is committed at epoch=1 because it
4667+
// finds a (stronger than Shared) replicated lock on keyB.
4668+
//
4669+
// Txn1:
4670+
// - Back here, we do a stateful retry. We should learn that someone (Txn2)
4671+
// aborted us when we go and try to commit. At the time of writing, we
4672+
// incorrectly learn that we've been (unexpectedly) committed.
4673+
func TestUnexpectedCommitOnTxnRecovery(t *testing.T) {
4674+
defer leaktest.AfterTest(t)()
4675+
defer log.Scope(t).Close(t)
4676+
4677+
keyA := roachpb.Key("a")
4678+
keyB := roachpb.Key("b")
4679+
4680+
var (
4681+
targetTxnIDString atomic.Value
4682+
cmdID atomic.Value
4683+
)
4684+
cmdID.Store(kvserverbase.CmdIDKey(""))
4685+
targetTxnIDString.Store("")
4686+
ctx := context.Background()
4687+
st := cluster.MakeTestingClusterSettings()
4688+
// This test relies on unreplicated locks to be replicated on lease transfers.
4689+
concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(ctx, &st.SV, true)
4690+
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
4691+
ServerArgs: base.TestServerArgs{
4692+
Settings: st,
4693+
Knobs: base.TestingKnobs{
4694+
Store: &kvserver.StoreTestingKnobs{
4695+
TestingProposalFilter: func(fArgs kvserverbase.ProposalFilterArgs) *kvpb.Error {
4696+
if fArgs.Req.Header.Txn == nil ||
4697+
fArgs.Req.Header.Txn.ID.String() != targetTxnIDString.Load().(string) {
4698+
return nil // not our txn
4699+
}
4700+
if !fArgs.Req.IsSingleRequest() {
4701+
// Not the request we care about.
4702+
return nil
4703+
}
4704+
getReq, ok := fArgs.Req.Requests[0].GetInner().(*kvpb.GetRequest)
4705+
// Only fail replication on the first retry.
4706+
epoch := fArgs.Req.Header.Txn.Epoch
4707+
if ok && getReq.KeyLockingDurability == lock.Replicated && epoch == 0 {
4708+
t.Logf("will fail application for txn %s@epoch=%d; req: %+v; raft cmdID: %s",
4709+
fArgs.Req.Header.Txn.ID.String(), epoch, getReq, fArgs.CmdID)
4710+
cmdID.Store(fArgs.CmdID)
4711+
}
4712+
return nil
4713+
},
4714+
TestingApplyCalledTwiceFilter: func(fArgs kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) {
4715+
if fArgs.CmdID == cmdID.Load().(kvserverbase.CmdIDKey) {
4716+
t.Logf("failing application for raft cmdID: %s", cmdID)
4717+
4718+
return 0, kvpb.NewErrorf("test injected error")
4719+
}
4720+
return 0, nil
4721+
},
4722+
},
4723+
},
4724+
},
4725+
})
4726+
defer tc.Stopper().Stop(ctx)
4727+
4728+
transferLease := func(idx int) {
4729+
desc := tc.LookupRangeOrFatal(t, keyB)
4730+
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(idx))
4731+
}
4732+
// Make a db with transaction heartbeating disabled. This ensures that we
4733+
// don't mark Txn1 as PENDING after its first failed parallel commit attempt,
4734+
// which would otherwise prevent Txn2 from recovering Txn1.
4735+
s := tc.Server(0)
4736+
ambient := s.AmbientCtx()
4737+
tsf := kvcoord.NewTxnCoordSenderFactory(
4738+
kvcoord.TxnCoordSenderFactoryConfig{
4739+
AmbientCtx: ambient,
4740+
HeartbeatInterval: -1, // disabled
4741+
Settings: s.ClusterSettings(),
4742+
Clock: s.Clock(),
4743+
Stopper: s.Stopper(),
4744+
},
4745+
s.DistSenderI().(*kvcoord.DistSender),
4746+
)
4747+
db := kv.NewDB(ambient, tsf, s.Clock(), s.Stopper())
4748+
4749+
startTxn2 := make(chan struct{})
4750+
blockCh := make(chan struct{})
4751+
var wg sync.WaitGroup
4752+
wg.Add(1)
4753+
4754+
// Write to keyB so that we can later get a lock on it.
4755+
txn := db.NewTxn(ctx, "txn")
4756+
err := txn.Put(ctx, keyB, "valueB")
4757+
require.NoError(t, err)
4758+
require.NoError(t, txn.Commit(ctx))
4759+
4760+
go func() {
4761+
defer wg.Done()
4762+
4763+
err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
4764+
if txnID := targetTxnIDString.Load(); txnID == "" {
4765+
// Store the txnID for the testing knobs.
4766+
targetTxnIDString.Store(txn.ID().String())
4767+
t.Logf("txn1 ID is: %s", txn.ID())
4768+
} else if txnID != txn.ID() {
4769+
// Since txn recovery aborted us, we get retried again but with an
4770+
// entirely new txnID. This time we just return. Writing nothing.
4771+
return nil
4772+
}
4773+
4774+
switch txn.Epoch() {
4775+
case 0:
4776+
err := txn.Put(ctx, keyA, "value")
4777+
require.NoError(t, err)
4778+
res, err := txn.GetForUpdate(ctx, keyB, kvpb.BestEffort)
4779+
require.NoError(t, err)
4780+
require.Equal(t, res.ValueBytes(), []byte("valueB"))
4781+
res, err = txn.GetForShare(ctx, keyB, kvpb.GuaranteedDurability)
4782+
require.NoError(t, err)
4783+
require.Equal(t, res.ValueBytes(), []byte("valueB"))
4784+
err = txn.Commit(ctx)
4785+
require.Error(t, err)
4786+
require.ErrorContains(t, err, "RETRY_ASYNC_WRITE_FAILURE")
4787+
// Transfer the lease to n3.
4788+
transferLease(2)
4789+
close(startTxn2)
4790+
// Block until Txn2 recovers us.
4791+
<-blockCh
4792+
return err
4793+
case 1:
4794+
// When retrying the write failure we should discover that txn recovery
4795+
// has aborted this transaction.
4796+
err := txn.Put(ctx, keyA, "value")
4797+
require.Error(t, err)
4798+
require.ErrorContains(t, err, "ABORT_REASON_ABORT_SPAN")
4799+
return err
4800+
default:
4801+
t.Errorf("unexpected epoch: %d", txn.Epoch())
4802+
}
4803+
return nil
4804+
})
4805+
require.NoError(t, err)
4806+
}()
4807+
<-startTxn2
4808+
4809+
txn2 := db.NewTxn(ctx, "txn2")
4810+
res, err := txn2.Get(ctx, keyA)
4811+
require.NoError(t, err)
4812+
// NB: Nothing should exist on keyA, because txn1 didn't commit at epoch 1 (or
4813+
// any epoch, for that matter).
4814+
require.False(t, res.Exists())
4815+
4816+
close(blockCh)
4817+
wg.Wait()
4818+
}

pkg/kv/kvnemesis/kvnemesis_test.go

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -337,21 +337,14 @@ func TestKVNemesisMultiNode_BufferedWrites(t *testing.T) {
337337
defer log.Scope(t).Close(t)
338338

339339
testKVNemesisImpl(t, kvnemesisTestCfg{
340-
numNodes: 3,
341-
numSteps: defaultNumSteps,
342-
concurrency: 5,
343-
seedOverride: 0,
344-
// TODO(#145458): Until #145458 is fixed reduce the
345-
// rate of lost writes by avoiding lease transfers and
346-
// merges and also turning off error injection.
347-
invalidLeaseAppliedIndexProb: 0.0,
348-
injectReproposalErrorProb: 0.0,
340+
numNodes: 3,
341+
numSteps: defaultNumSteps,
342+
concurrency: 5,
343+
seedOverride: 0,
344+
invalidLeaseAppliedIndexProb: 0.2,
345+
injectReproposalErrorProb: 0.2,
349346
assertRaftApply: true,
350347
bufferedWriteProb: 0.70,
351-
testGeneratorConfig: func(g *GeneratorConfig) {
352-
g.Ops.ChangeLease = ChangeLeaseConfig{}
353-
g.Ops.Merge = MergeConfig{}
354-
},
355348
testSettings: func(ctx context.Context, st *cluster.Settings) {
356349
kvcoord.BufferedWritesEnabled.Override(ctx, &st.SV, true)
357350
concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(ctx, &st.SV, true)
@@ -362,9 +355,7 @@ func TestKVNemesisMultiNode_BufferedWrites(t *testing.T) {
362355
}
363356

364357
// TestKVNemesisMultiNode_BufferedWritesNoPipelining turns on buffered
365-
// writes and turns off write pipelining. Turning off write pipelining
366-
// allows us to test the lock reliability features even without a fix
367-
// for #145458.
358+
// writes and turns off write pipelining.
368359
func TestKVNemesisMultiNode_BufferedWritesNoPipelining(t *testing.T) {
369360
defer leaktest.AfterTest(t)()
370361
defer log.Scope(t).Close(t)

pkg/kv/kvserver/batcheval/cmd_query_intent.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,11 @@ func QueryIntent(
157157
}
158158
}
159159

160+
res := result.Result{}
160161
if !reply.FoundIntent && args.ErrorIfMissing {
161-
return result.Result{}, kvpb.NewIntentMissingError(args.Key, intent)
162+
l := roachpb.MakeLockAcquisition(args.Txn, args.Key, lock.Replicated, args.Strength, args.IgnoredSeqNums)
163+
res.Local.ReportedMissingLocks = []roachpb.LockAcquisition{l}
164+
return res, kvpb.NewIntentMissingError(args.Key, intent)
162165
}
163-
return result.Result{}, nil
166+
return res, nil
164167
}

pkg/kv/kvserver/batcheval/result/result.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ type LocalResult struct {
3838
// UpdatedTxns stores transaction records that have been updated by
3939
// calls to EndTxn, PushTxn, and RecoverTxn.
4040
UpdatedTxns []*roachpb.Transaction
41+
// ReportedMissingLocks stores lock acquisition structs that represent locks
42+
// that have been reported as missing via QueryIntent. Such locks must be
43+
// reported to the concurrency manager.
44+
ReportedMissingLocks []roachpb.LockAcquisition
4145
// EndTxns stores completed transactions. If the transaction
4246
// contains unresolved intents, they should be handed off for
4347
// asynchronous intent resolution. A bool in each EndTxnIntents
@@ -131,6 +135,17 @@ func (lResult *LocalResult) DetachEncounteredIntents() []roachpb.Intent {
131135
return r
132136
}
133137

138+
// DetachMissingLocks returns (and removes) those locks that have been reported
139+
// missing during an QueryIntentRequest and must be handled.
140+
func (lResult *LocalResult) DetachMissingLocks() []roachpb.LockAcquisition {
141+
if lResult == nil {
142+
return nil
143+
}
144+
r := lResult.ReportedMissingLocks
145+
lResult.ReportedMissingLocks = nil
146+
return r
147+
}
148+
134149
// DetachEndTxns returns (and removes) the EndTxnIntent objects from
135150
// the local result. If alwaysOnly is true, the slice is filtered to
136151
// include only those which have specified returnAlways=true, meaning
@@ -418,6 +433,13 @@ func (p *Result) MergeAndDestroy(q Result) error {
418433
}
419434
q.Local.ResolvedLocks = nil
420435

436+
if p.Local.ReportedMissingLocks == nil {
437+
p.Local.ReportedMissingLocks = q.Local.ReportedMissingLocks
438+
} else {
439+
p.Local.ReportedMissingLocks = append(p.Local.ReportedMissingLocks, q.Local.ReportedMissingLocks...)
440+
}
441+
q.Local.ReportedMissingLocks = nil
442+
421443
if p.Local.UpdatedTxns == nil {
422444
p.Local.UpdatedTxns = q.Local.UpdatedTxns
423445
} else {

pkg/kv/kvserver/concurrency/concurrency_control.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,11 @@ type LockManager interface {
254254
// acquired a new lock or re-acquired an existing lock that it already held.
255255
OnLockAcquired(context.Context, *roachpb.LockAcquisition)
256256

257+
// OnLockMissing informs the concurrency manager that a lock has been reported
258+
// missing to a client via QueryIntent. Such locks cannot later be
259+
// materialized via a lock table flush.
260+
OnLockMissing(context.Context, *roachpb.LockAcquisition)
261+
257262
// OnLockUpdated informs the concurrency manager that a transaction has
258263
// updated or released a lock or range of locks that it previously held.
259264
// The Durability field of the lock update struct is ignored.
@@ -697,6 +702,11 @@ type lockTable interface {
697702
// intent has been applied to the replicated state machine.
698703
AcquireLock(*roachpb.LockAcquisition) error
699704

705+
// MarkIneligibleForExport marks any locks held by this transaction on the
706+
// same key as ineligible for export from the lock table for replication since
707+
// doing so could result in a transaction being erroneously committed.
708+
MarkIneligibleForExport(*roachpb.LockAcquisition) error
709+
700710
// UpdateLocks informs the lockTable that an existing lock or range of locks
701711
// was either updated or released.
702712
//

pkg/kv/kvserver/concurrency/concurrency_manager.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,7 @@ var UnreplicatedLockReliabilityLeaseTransfer = settings.RegisterBoolSetting(
131131
settings.SystemOnly,
132132
"kv.lock_table.unreplicated_lock_reliability.lease_transfer.enabled",
133133
"whether the replica should attempt to keep unreplicated locks during lease transfers",
134-
// TODO(#145458): We've disabled this by default to avoid flakes until the underlying bug is fixed.
135-
// metamorphic.ConstantWithTestBool("kv.lock_table.unreplicated_lock_reliability.lease_transfer.enabled", true),
136-
false,
134+
metamorphic.ConstantWithTestBool("kv.lock_table.unreplicated_lock_reliability.lease_transfer.enabled", false),
137135
)
138136

139137
// UnreplicatedLockReliabilityMerge controls whether the replica will
@@ -142,9 +140,7 @@ var UnreplicatedLockReliabilityMerge = settings.RegisterBoolSetting(
142140
settings.SystemOnly,
143141
"kv.lock_table.unreplicated_lock_reliability.merge.enabled",
144142
"whether the replica should attempt to keep unreplicated locks during range merges",
145-
// TODO(#145458): We've disabled this by default to avoid flakes until the underlying bug is fixed.
146-
// metamorphic.ConstantWithTestBool("kv.lock_table.unreplicated_lock_reliability.merge.enabled", true),
147-
false,
143+
metamorphic.ConstantWithTestBool("kv.lock_table.unreplicated_lock_reliability.merge.enabled", false),
148144
)
149145

150146
var MaxLockFlushSize = settings.RegisterByteSizeSetting(
@@ -599,6 +595,15 @@ func (m *managerImpl) OnLockAcquired(ctx context.Context, acq *roachpb.LockAcqui
599595
}
600596
}
601597

598+
// OnLockMissing implements the Lockmanager interface.
599+
func (m *managerImpl) OnLockMissing(ctx context.Context, acq *roachpb.LockAcquisition) {
600+
if err := m.lt.MarkIneligibleForExport(acq); err != nil {
601+
// We don't currently expect any errors other than assertion failures that represent
602+
// programming errors from this method.
603+
log.Fatalf(ctx, "%v", err)
604+
}
605+
}
606+
602607
// OnLockUpdated implements the LockManager interface.
603608
func (m *managerImpl) OnLockUpdated(ctx context.Context, up *roachpb.LockUpdate) {
604609
if err := m.lt.UpdateLocks(up); err != nil {

0 commit comments

Comments
 (0)