Skip to content

Commit c5527eb

Browse files
committed
kvserver: don't generate large writes during lock durability upgrade
Since this is still a best-effort feature for the time being, we want to protect against writing too many locks during transfer or merge requests. Epic: none Release note: None
1 parent b48c8ef commit c5527eb

File tree

9 files changed

+252
-36
lines changed

9 files changed

+252
-36
lines changed

pkg/kv/kvserver/batcheval/cmd_lease.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010

1111
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
12+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
1213
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
1314
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary"
1415
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
@@ -69,17 +70,26 @@ func evalNewLease(
6970
localReadSum := rec.GetCurrentReadSummary(ctx)
7071
priorReadSum = &localReadSum
7172

73+
durabilityUpgradeLimit := concurrency.GetMaxLockFlushSize(&rec.ClusterSettings().SV)
7274
// If this is a lease transfer, we write out all unreplicated leases to
7375
// storage, so that any waiters will discover them on the new leaseholder.
74-
acquisitions := rec.GetConcurrencyManager().OnRangeLeaseTransferEval()
76+
acquisitions, approxSize := rec.GetConcurrencyManager().OnRangeLeaseTransferEval()
7577
log.VEventf(ctx, 2, "upgrading durability of %d locks", len(acquisitions))
76-
for _, acq := range acquisitions {
77-
if err := storage.MVCCAcquireLock(ctx, readWriter,
78-
&acq.Txn, acq.IgnoredSeqNums, acq.Strength, acq.Key, ms, 0, 0); err != nil {
79-
return newFailedLeaseTrigger(isTransfer), err
78+
if approxSize > durabilityUpgradeLimit {
79+
log.Warningf(ctx,
80+
"refusing to upgrade lock durability of %d locks since approximate lock size of %d byte exceeds %d bytes",
81+
len(acquisitions),
82+
approxSize,
83+
durabilityUpgradeLimit)
84+
} else {
85+
for _, acq := range acquisitions {
86+
if err := storage.MVCCAcquireLock(ctx, readWriter,
87+
&acq.Txn, acq.IgnoredSeqNums, acq.Strength, acq.Key, ms, 0, 0); err != nil {
88+
return newFailedLeaseTrigger(isTransfer), err
89+
}
8090
}
91+
locksWritten = len(acquisitions)
8192
}
82-
locksWritten = len(acquisitions)
8393
} else {
8494
// If the new lease is not equivalent to the old lease (i.e. either the
8595
// lease is changing hands or the leaseholder restarted), construct a

pkg/kv/kvserver/batcheval/cmd_subsume.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/cockroachdb/cockroach/pkg/keys"
1515
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1616
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
17+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
1718
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset"
1819
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
1920
"github.com/cockroachdb/cockroach/pkg/storage"
@@ -185,19 +186,28 @@ func Subsume(
185186
// (*Store).MergeRange.
186187
stats := cArgs.EvalCtx.GetMVCCStats()
187188
if args.PreserveUnreplicatedLocks {
188-
acquisitions := cArgs.EvalCtx.GetConcurrencyManager().OnRangeSubsumeEval()
189-
log.VEventf(ctx, 2, "upgrading durability of %d locks", len(acquisitions))
190-
statsDelta := enginepb.MVCCStats{}
191-
for _, acq := range acquisitions {
192-
if err := storage.MVCCAcquireLock(ctx, readWriter,
193-
&acq.Txn, acq.IgnoredSeqNums, acq.Strength, acq.Key, &statsDelta, 0, 0); err != nil {
194-
return result.Result{}, err
189+
durabilityUpgradeLimit := concurrency.GetMaxLockFlushSize(&cArgs.EvalCtx.ClusterSettings().SV)
190+
acquisitions, approxSize := cArgs.EvalCtx.GetConcurrencyManager().OnRangeSubsumeEval()
191+
if approxSize > durabilityUpgradeLimit {
192+
log.Warningf(ctx,
193+
"refusing to upgrade lock durability of %d locks since approximate lock size of %d byte exceeds %d bytes",
194+
len(acquisitions),
195+
approxSize,
196+
durabilityUpgradeLimit)
197+
} else {
198+
log.VEventf(ctx, 2, "upgrading durability of %d locks", len(acquisitions))
199+
statsDelta := enginepb.MVCCStats{}
200+
for _, acq := range acquisitions {
201+
if err := storage.MVCCAcquireLock(ctx, readWriter,
202+
&acq.Txn, acq.IgnoredSeqNums, acq.Strength, acq.Key, &statsDelta, 0, 0); err != nil {
203+
return result.Result{}, err
204+
}
195205
}
206+
// Apply the stats delta to both the stats snapshot we are sending in the
207+
// response and to the stats update we expect as part of this proposal.
208+
stats.Add(statsDelta)
209+
cArgs.Stats.Add(statsDelta)
196210
}
197-
// Apply the stats delta to both the stats snapshot we are sending in the
198-
// response and to the stats update we expect as part of this proposal.
199-
stats.Add(statsDelta)
200-
cArgs.Stats.Add(statsDelta)
201211
}
202212

203213
// Now that the range is frozen, collect some information to ship to the LHS

pkg/kv/kvserver/client_replica_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5911,6 +5911,115 @@ func TestLeaseTransferReplicatesLocks(t *testing.T) {
59115911
require.NoError(t, g.Wait())
59125912
}
59135913

5914+
func TestLeaseTransferDropsLocksIfLargerThanCommandSize(t *testing.T) {
5915+
defer leaktest.AfterTest(t)()
5916+
defer log.Scope(t).Close(t)
5917+
5918+
require.NoError(t, log.SetVModule("cmd_lease=2"))
5919+
5920+
// Test Plan:
5921+
//
5922+
// - Reduce MaxRaftCommandSize
5923+
// - Move scratch range to known location.
5924+
// - Take out a large number of unreplicated locks
5925+
// - Transfer lease without an error
5926+
//
5927+
ctx := context.Background()
5928+
st := cluster.MakeClusterSettings()
5929+
concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(ctx, &st.SV, true)
5930+
kvserverbase.MaxCommandSize.Override(ctx, &st.SV, 1<<20)
5931+
// To see the test fail:
5932+
// concurrency.MaxLockFlushSize.Override(ctx, &st.SV, 2<<20)
5933+
5934+
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
5935+
ServerArgs: base.TestServerArgs{Settings: st},
5936+
})
5937+
defer tc.Stopper().Stop(ctx)
5938+
5939+
scratch := tc.ScratchRange(t)
5940+
desc, err := tc.LookupRange(scratch)
5941+
require.NoError(t, err)
5942+
5943+
// Start with the lease on store 1.
5944+
t.Logf("transfering to s1")
5945+
require.NoError(t, tc.TransferRangeLease(desc, tc.Target(0)))
5946+
t.Logf("done transfering to s1")
5947+
5948+
mkRandomScratchKey := func() roachpb.Key {
5949+
return append(scratch.Clone(), uuid.MakeV4().String()...)
5950+
}
5951+
5952+
numLocks := 9000
5953+
txn := tc.Server(1).DB().NewTxn(ctx, "test-lots-o-locks")
5954+
b := txn.NewBatch()
5955+
for range numLocks {
5956+
b.GetForUpdate(mkRandomScratchKey(), kvpb.BestEffort)
5957+
b.Requests()[len(b.Requests())-1].GetGet().LockNonExisting = true
5958+
}
5959+
require.NoError(t, txn.Run(ctx, b))
5960+
5961+
t.Log("transfering lease from s1 -> s2")
5962+
require.NoError(t, tc.TransferRangeLease(desc, tc.Target(1)))
5963+
}
5964+
5965+
func TestMergeDropsLocksIfLargerThanMax(t *testing.T) {
5966+
defer leaktest.AfterTest(t)()
5967+
defer log.Scope(t).Close(t)
5968+
5969+
require.NoError(t, log.SetVModule("cmd_subsume=2"))
5970+
5971+
// Test Plan:
5972+
//
5973+
// - Reduce MaxRaftCommandSize
5974+
// - Move scratch range to known location.
5975+
// - Take out a large number of unreplicated locks
5976+
// - Merge range without an error
5977+
//
5978+
var (
5979+
splitPoint = "b"
5980+
ctx = context.Background()
5981+
st = cluster.MakeClusterSettings()
5982+
)
5983+
5984+
concurrency.UnreplicatedLockReliabilityMerge.Override(ctx, &st.SV, true)
5985+
kvserverbase.MaxCommandSize.Override(ctx, &st.SV, 1<<20)
5986+
// To see the test fail:
5987+
// concurrency.MaxLockFlushSize.Override(ctx, &st.SV, 2<<20)
5988+
5989+
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
5990+
ServerArgs: base.TestServerArgs{Settings: st},
5991+
})
5992+
defer tc.Stopper().Stop(ctx)
5993+
5994+
scratch := tc.ScratchRange(t)
5995+
5996+
mkKey := func(s string) roachpb.Key {
5997+
prefix := scratch.Clone()
5998+
return append(prefix[:len(prefix):len(prefix)], s...)
5999+
}
6000+
6001+
splitKey := mkKey(splitPoint)
6002+
tc.SplitRangeOrFatal(t, splitKey)
6003+
6004+
mkRandomScratchKey := func() roachpb.Key {
6005+
return append(mkKey(splitPoint), uuid.MakeV4().String()...)
6006+
}
6007+
6008+
numLocks := 6000
6009+
txn := tc.Server(0).DB().NewTxn(ctx, "test-lots-o-locks")
6010+
b := txn.NewBatch()
6011+
for range numLocks {
6012+
b.GetForUpdate(mkRandomScratchKey(), kvpb.BestEffort)
6013+
b.Requests()[len(b.Requests())-1].GetGet().LockNonExisting = true
6014+
}
6015+
require.NoError(t, txn.Run(ctx, b))
6016+
6017+
// Merge Range
6018+
t.Logf("merging range %s", scratch)
6019+
_, err := tc.MergeRanges(scratch)
6020+
require.NoError(t, err)
6021+
}
6022+
59146023
func TestMergeReplicatesLocks(t *testing.T) {
59156024
defer leaktest.AfterTest(t)()
59166025
defer log.Scope(t).Close(t)

pkg/kv/kvserver/concurrency/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@ go_library(
2323
"//pkg/kv/kvserver/concurrency/lock",
2424
"//pkg/kv/kvserver/concurrency/poison",
2525
"//pkg/kv/kvserver/intentresolver",
26+
"//pkg/kv/kvserver/kvserverbase",
2627
"//pkg/kv/kvserver/lockspanset",
2728
"//pkg/kv/kvserver/spanlatch",
2829
"//pkg/kv/kvserver/spanset",
2930
"//pkg/kv/kvserver/txnwait",
3031
"//pkg/roachpb",
3132
"//pkg/settings",
3233
"//pkg/settings/cluster",
34+
"//pkg/storage",
3335
"//pkg/storage/enginepb",
3436
"//pkg/util/buildutil",
3537
"//pkg/util/container/list",

pkg/kv/kvserver/concurrency/concurrency_control.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,11 +290,11 @@ type RangeStateListener interface {
290290
// may want to flush to disk as replicated. Since lease transfers declare
291291
// latches that conflict with all requests, the caller knows that nothing is
292292
// going to modify the lock table as its evaluating.
293-
OnRangeLeaseTransferEval() []*roachpb.LockAcquisition
293+
OnRangeLeaseTransferEval() ([]*roachpb.LockAcquisition, int64)
294294

295295
// OnRangeSubsumeEval informs the concurrency manager that the range is
296296
// evaluating a merge.
297-
OnRangeSubsumeEval() []*roachpb.LockAcquisition
297+
OnRangeSubsumeEval() ([]*roachpb.LockAcquisition, int64)
298298

299299
// OnRangeLeaseUpdated informs the concurrency manager that its range's
300300
// lease has been updated. The argument indicates whether this manager's

pkg/kv/kvserver/concurrency/concurrency_manager.go

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ import (
1414
"github.com/cockroachdb/cockroach/pkg/kv"
1515
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1616
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
17+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
1718
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset"
1819
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch"
1920
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
2021
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
2122
"github.com/cockroachdb/cockroach/pkg/roachpb"
2223
"github.com/cockroachdb/cockroach/pkg/settings"
2324
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
25+
"github.com/cockroachdb/cockroach/pkg/storage"
2426
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
2527
"github.com/cockroachdb/cockroach/pkg/util/debugutil"
2628
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -141,6 +143,24 @@ var UnreplicatedLockReliabilityMerge = settings.RegisterBoolSetting(
141143
metamorphic.ConstantWithTestBool("kv.lock_table.unreplicated_lock_reliability.merge.enabled", true),
142144
)
143145

146+
var MaxLockFlushSize = settings.RegisterByteSizeSetting(
147+
settings.SystemOnly,
148+
"kv.lock_table.unreplicated_lock_reliability.max_flush_size",
149+
"maximum size of locks that will be flushed during merge and transfer operations (if 0, defaults to half of the MaxCommandSizeDefault)",
150+
0,
151+
)
152+
153+
// MaxLockFlushSize is the maximum number of lock bytes that we will attempt to
154+
// flush during merge and transfer operations.
155+
func GetMaxLockFlushSize(sv *settings.Values) int64 {
156+
s := MaxLockFlushSize.Get(sv)
157+
if s > 0 {
158+
return s
159+
} else {
160+
return kvserverbase.MaxCommandSize.Get(sv) / 2
161+
}
162+
}
163+
144164
// managerImpl implements the Manager interface.
145165
type managerImpl struct {
146166
st *cluster.Settings
@@ -607,33 +627,23 @@ func (m *managerImpl) OnRangeDescUpdated(desc *roachpb.RangeDescriptor) {
607627
var allKeysSpan = roachpb.Span{Key: keys.MinKey, EndKey: keys.MaxKey}
608628

609629
// OnRangeLeaseTransferEval implements the RangeStateListener interface.
610-
func (m *managerImpl) OnRangeLeaseTransferEval() []*roachpb.LockAcquisition {
630+
func (m *managerImpl) OnRangeLeaseTransferEval() ([]*roachpb.LockAcquisition, int64) {
611631
if !UnreplicatedLockReliabilityLeaseTransfer.Get(&m.st.SV) {
612-
return nil
632+
return nil, 0
613633
}
614634

615-
// TODO(ssd): Expose a function that allows us to pre-allocate this a bit better.
616-
acquistions := make([]*roachpb.LockAcquisition, 0)
617-
m.lt.ExportUnreplicatedLocks(allKeysSpan, func(acq *roachpb.LockAcquisition) {
618-
acquistions = append(acquistions, acq)
619-
})
620-
return acquistions
635+
return m.exportUnreplicatedLocks()
621636
}
622637

623638
// OnRangeSubumeEval implements the RangeStateListener interface. It is called
624639
// during evalutation of Subsume. The returned LockAcquisition structs represent
625640
// held locks that we may want to flush to disk as replicated.
626-
func (m *managerImpl) OnRangeSubsumeEval() []*roachpb.LockAcquisition {
641+
func (m *managerImpl) OnRangeSubsumeEval() ([]*roachpb.LockAcquisition, int64) {
627642
if !UnreplicatedLockReliabilityMerge.Get(&m.st.SV) {
628-
return nil
643+
return nil, 0
629644
}
630645

631-
// TODO(ssd): Expose a function that allows us to pre-allocate this a bit better.
632-
acquistions := make([]*roachpb.LockAcquisition, 0)
633-
m.lt.ExportUnreplicatedLocks(allKeysSpan, func(acq *roachpb.LockAcquisition) {
634-
acquistions = append(acquistions, acq)
635-
})
636-
return acquistions
646+
return m.exportUnreplicatedLocks()
637647
}
638648

639649
// OnRangeLeaseUpdated implements the RangeStateListener interface.
@@ -706,6 +716,17 @@ func (m *managerImpl) LockTableMetrics() LockTableMetrics {
706716
return m.lt.Metrics()
707717
}
708718

719+
func (m *managerImpl) exportUnreplicatedLocks() ([]*roachpb.LockAcquisition, int64) {
720+
// TODO(ssd): Expose a function that allows us to pre-allocate this a bit better.
721+
approximateBatchSize := int64(0)
722+
acquistions := make([]*roachpb.LockAcquisition, 0)
723+
m.lt.ExportUnreplicatedLocks(allKeysSpan, func(acq *roachpb.LockAcquisition) {
724+
approximateBatchSize += storage.ApproximateLockTableSize(acq)
725+
acquistions = append(acquistions, acq)
726+
})
727+
return acquistions, approximateBatchSize
728+
}
729+
709730
// TestingLockTableString implements the MetricExporter interface.
710731
func (m *managerImpl) TestingLockTableString() string {
711732
return m.lt.String()

pkg/kv/kvserver/concurrency/concurrency_manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
545545

546546
case "on-lease-transfer-eval":
547547
mon.runSync("eval transfer lease", func(ctx context.Context) {
548-
locksToWrite := m.OnRangeLeaseTransferEval()
548+
locksToWrite, _ := m.OnRangeLeaseTransferEval()
549549
if len(locksToWrite) > 0 {
550550
log.Eventf(ctx, "locks to propose as replicated: %d", len(locksToWrite))
551551
}

pkg/storage/mvcc.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6319,6 +6319,18 @@ func MVCCVerifyLock(
63196319
return false, nil
63206320
}
63216321

6322+
var didUpdate bool
6323+
6324+
func ApproximateLockTableSize(acq *roachpb.LockAcquisition) int64 {
6325+
keySize := int64(len(acq.Key)) + engineKeyVersionLockTableLen
6326+
metaSize := int64((&enginepb.MVCCMetadata{
6327+
Txn: &acq.Txn,
6328+
Timestamp: acq.Txn.WriteTimestamp.ToLegacyTimestamp(),
6329+
TxnDidNotUpdateMeta: &didUpdate,
6330+
}).Size())
6331+
return keySize + metaSize
6332+
}
6333+
63226334
// mvccReleaseLockInternal releases a lock at the specified key and strength and
63236335
// by the specified transaction. The function accepts the instructions for how
63246336
// to release the lock (encoded in the LockUpdate), and the current value of the

0 commit comments

Comments
 (0)