Skip to content

Commit 30c0ad2

Browse files
craig[bot]stevendanna
andcommitted
Merge #153799
153799: batcheval: support MaxSpanRequestKeys and TargetBytes for FlushLockTable r=miraradeva a=stevendanna This allows the caller to limit how many keys might be flushed from the lock table. Informs #146356 Release note: None Co-authored-by: Steven Danna <[email protected]>
2 parents 3317b2c + 60eb10b commit 30c0ad2

File tree

9 files changed

+208
-16
lines changed

9 files changed

+208
-16
lines changed

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1101,7 +1101,9 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
11011101
*kvpb.GetRequest, *kvpb.ResolveIntentRequest, *kvpb.DeleteRequest, *kvpb.PutRequest:
11021102
// Accepted point requests that can be in batches with limit. No
11031103
// need to set disallowedReq.
1104-
1104+
case *kvpb.FlushLockTableRequest:
1105+
// FlushLockTableRequest handles limits itself. It is also an isAlone request so
1106+
// most of these checks should not be relevant.
11051107
default:
11061108
disallowedReq = inner.Method().String()
11071109
}

pkg/kv/kvpb/api.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2802,6 +2802,7 @@ message Header {
28022802
// - ResolveIntentRangeRequest
28032803
// - QueryLocksRequest
28042804
// - IsSpanEmptyRequest
2805+
// - FlushLockTableRequest
28052806
//
28062807
// The following request types are also allowed in the batch. These requests
28072808
// do not consume any keys from the limit; however, they may fail to be

pkg/kv/kvserver/batcheval/cmd_flush_lock_table.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,34 @@ func declareKeysFlushLockTable(
4444
func FlushLockTable(
4545
ctx context.Context, rw storage.ReadWriter, cArgs CommandArgs, response kvpb.Response,
4646
) (result.Result, error) {
47+
header := cArgs.Header
4748
args := cArgs.Args.(*kvpb.FlushLockTableRequest)
4849
resp := response.(*kvpb.FlushLockTableResponse)
4950

50-
// TODO(ssd): Allow the caller to limit how many locks we write out.
51-
locksToFlush := make([]roachpb.LockAcquisition, 0)
52-
cArgs.EvalCtx.GetConcurrencyManager().ExportUnreplicatedLocks(args.Span(), func(l *roachpb.LockAcquisition) {
51+
var (
52+
resumeSpan *roachpb.Span
53+
resumeReason kvpb.ResumeReason
54+
currentSize int64
55+
56+
locksToFlush = make([]roachpb.LockAcquisition, 0)
57+
)
58+
cArgs.EvalCtx.GetConcurrencyManager().ExportUnreplicatedLocks(args.Span(), func(l *roachpb.LockAcquisition) bool {
59+
if max := header.MaxSpanRequestKeys; max > 0 && int64(len(locksToFlush)) >= max {
60+
resumeReason = kvpb.RESUME_KEY_LIMIT
61+
resumeSpan = &roachpb.Span{Key: args.Key, EndKey: l.Key}
62+
return false
63+
}
64+
if maxSize := header.TargetBytes; maxSize > 0 {
65+
sz := storage.ApproximateLockTableSize(l)
66+
if currentSize+sz > maxSize {
67+
resumeReason = kvpb.RESUME_KEY_LIMIT
68+
resumeSpan = &roachpb.Span{Key: args.Key, EndKey: l.Key}
69+
return false
70+
}
71+
currentSize += sz
72+
}
5373
locksToFlush = append(locksToFlush, *l)
74+
return true
5475
})
5576

5677
for i, l := range locksToFlush {
@@ -61,6 +82,9 @@ func FlushLockTable(
6182
return result.Result{}, err
6283
}
6384
}
85+
86+
resp.ResumeReason = resumeReason
87+
resp.ResumeSpan = resumeSpan
6488
resp.LocksWritten = uint64(len(locksToFlush))
6589

6690
// NOTE: The locks still exist in the in-memory lock table. They are not

pkg/kv/kvserver/client_lock_table_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ func TestClientLockTableDataDriven(t *testing.T) {
163163
startKey := evalCtx.getNamedKey("start", d)
164164
endKey := evalCtx.getNamedKey("end", d)
165165
b := db.NewBatch()
166+
b.Header.MaxSpanRequestKeys = int64(evalCtx.getInt(d, "max-keys"))
167+
b.Header.TargetBytes = int64(evalCtx.getInt(d, "target-bytes"))
166168
b.AddRawRequest(&kvpb.FlushLockTableRequest{
167169
RequestHeader: kvpb.RequestHeader{
168170
Key: startKey,
@@ -304,6 +306,15 @@ func (e *evalCtx) getLockWaitPolicy(d *datadriven.TestData) lock.WaitPolicy {
304306
}
305307
}
306308

309+
func (e *evalCtx) getInt(d *datadriven.TestData, name string) int {
310+
if d.HasArg(name) {
311+
var i int
312+
d.ScanArgs(e.t, name, &i)
313+
return i
314+
}
315+
return 0
316+
}
317+
307318
func (e *evalCtx) cmdInBatch(cmdStr string, b *kv.Batch) error {
308319
d := datadriven.TestData{}
309320
var err error

pkg/kv/kvserver/concurrency/concurrency_control.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -269,8 +269,8 @@ type LockManager interface {
269269
QueryLockTableState(ctx context.Context, span roachpb.Span, opts QueryLockTableOptions) ([]roachpb.LockStateInfo, QueryLockTableResumeState)
270270

271271
// ExportUnreplicatedLocks runs exporter on each held, unreplicated lock
272-
// in the given span.
273-
ExportUnreplicatedLocks(span roachpb.Span, exporter func(*roachpb.LockAcquisition))
272+
// in the given span until the exporter returns false.
273+
ExportUnreplicatedLocks(span roachpb.Span, exporter func(*roachpb.LockAcquisition) bool)
274274
}
275275

276276
// TransactionManager is concerned with tracking transactions that have their
@@ -776,12 +776,12 @@ type lockTable interface {
776776
QueryLockTableState(span roachpb.Span, opts QueryLockTableOptions) ([]roachpb.LockStateInfo, QueryLockTableResumeState)
777777

778778
// ExportUnreplicatedLocks runs exporter on each held, unreplicated lock
779-
// in the given span.
779+
// in the given span until the exporter returns false.
780780
//
781781
// Note that the caller is responsible for acquiring latches across the span
782782
// it is exporting if it needs to be sure that the exported locks won't be
783783
// updated in the lock table while it is still referencing them.
784-
ExportUnreplicatedLocks(span roachpb.Span, exporter func(*roachpb.LockAcquisition))
784+
ExportUnreplicatedLocks(span roachpb.Span, exporter func(*roachpb.LockAcquisition) bool)
785785

786786
// Metrics returns information about the state of the lockTable.
787787
Metrics() LockTableMetrics

pkg/kv/kvserver/concurrency/concurrency_manager.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -619,7 +619,9 @@ func (m *managerImpl) QueryLockTableState(
619619
}
620620

621621
// ExportUnreplicatedLocks implements the LockManager interface.
622-
func (m *managerImpl) ExportUnreplicatedLocks(span roachpb.Span, f func(*roachpb.LockAcquisition)) {
622+
func (m *managerImpl) ExportUnreplicatedLocks(
623+
span roachpb.Span, f func(*roachpb.LockAcquisition) bool,
624+
) {
623625
m.lt.ExportUnreplicatedLocks(span, f)
624626
}
625627

@@ -734,9 +736,10 @@ func (m *managerImpl) exportUnreplicatedLocks() ([]*roachpb.LockAcquisition, int
734736
// TODO(ssd): Expose a function that allows us to pre-allocate this a bit better.
735737
approximateBatchSize := int64(0)
736738
acquistions := make([]*roachpb.LockAcquisition, 0)
737-
m.lt.ExportUnreplicatedLocks(allKeysSpan, func(acq *roachpb.LockAcquisition) {
739+
m.lt.ExportUnreplicatedLocks(allKeysSpan, func(acq *roachpb.LockAcquisition) bool {
738740
approximateBatchSize += storage.ApproximateLockTableSize(acq)
739741
acquistions = append(acquistions, acq)
742+
return true
740743
})
741744
return acquistions, approximateBatchSize
742745
}

pkg/kv/kvserver/concurrency/lock_table.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4746,7 +4746,7 @@ func (t *lockTableImpl) ClearGE(key roachpb.Key) []roachpb.LockAcquisition {
47464746
// TODO(ssd): Once we have the full set of functions we need for lock flushing, we
47474747
// should do a refactoring pass to reduce some of the duplication.
47484748
func (t *lockTableImpl) ExportUnreplicatedLocks(
4749-
span roachpb.Span, exporter func(*roachpb.LockAcquisition),
4749+
span roachpb.Span, exporter func(*roachpb.LockAcquisition) bool,
47504750
) {
47514751
t.enabledMu.RLock()
47524752
defer t.enabledMu.RUnlock()
@@ -4757,12 +4757,12 @@ func (t *lockTableImpl) ExportUnreplicatedLocks(
47574757
t.locks.mu.RLock()
47584758
defer t.locks.mu.RUnlock()
47594759

4760-
exportKeyLocks := func(l *keyLocks) {
4760+
exportKeyLocks := func(l *keyLocks) bool {
47614761
l.mu.Lock()
47624762
defer l.mu.Unlock()
47634763

47644764
if !l.key.Less(span.EndKey) {
4765-
return
4765+
return false
47664766
}
47674767

47684768
for hl := l.holders.Front(); hl != nil; hl = hl.Next() {
@@ -4777,7 +4777,7 @@ func (t *lockTableImpl) ExportUnreplicatedLocks(
47774777

47784778
for _, str := range unreplicatedHolderStrengths {
47794779
if tl.unreplicatedInfo.held(str) {
4780-
exporter(&roachpb.LockAcquisition{
4780+
keepGoing := exporter(&roachpb.LockAcquisition{
47814781
Span: roachpb.Span{
47824782
Key: l.key,
47834783
},
@@ -4786,14 +4786,20 @@ func (t *lockTableImpl) ExportUnreplicatedLocks(
47864786
Strength: str,
47874787
IgnoredSeqNums: tl.unreplicatedInfo.ignoredSeqNums,
47884788
})
4789+
if !keepGoing {
4790+
return false
4791+
}
47894792
}
47904793
}
47914794
}
4795+
return true
47924796
}
47934797

47944798
iter := t.locks.MakeIter()
47954799
for iter.SeekGE(&keyLocks{key: span.Key}); iter.Valid(); iter.Next() {
4796-
exportKeyLocks(iter.Cur())
4800+
if !exportKeyLocks(iter.Cur()) {
4801+
break
4802+
}
47974803
}
47984804
}
47994805

pkg/kv/kvserver/concurrency/verifiable_lock_table.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (v verifyingLockTable) QueryLockTableState(
118118
}
119119

120120
func (v verifyingLockTable) ExportUnreplicatedLocks(
121-
span roachpb.Span, exporter func(*roachpb.LockAcquisition),
121+
span roachpb.Span, exporter func(*roachpb.LockAcquisition) bool,
122122
) {
123123
v.lt.ExportUnreplicatedLocks(span, exporter)
124124
}

pkg/kv/kvserver/testdata/lock_table/flush

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ new-txn txn=t1
55
put txn=t1 k=a v=bar
66
----
77

8+
put txn=t1 k=b v=baz
9+
----
10+
811
commit txn=t1
912
----
1013

@@ -34,3 +37,145 @@ key: "\xfaa", str: Exclusive, txn: t2
3437

3538
commit txn=t2
3639
----
40+
41+
# MaxSpanRequestKeys Limit
42+
new-txn txn=t3
43+
----
44+
45+
get txn=t3 k=a lock=Exclusive dur=Unreplicated
46+
----
47+
get: "\xfaa"="bar"
48+
49+
get txn=t3 k=b lock=Exclusive dur=Unreplicated
50+
----
51+
get: "\xfab"="baz"
52+
53+
print-in-memory-lock-table
54+
----
55+
num=2
56+
lock: "\xfaa"
57+
holder: txn: t3 epoch: 0, iso: Serializable, ts: <stripped>, info: unrepl [(str: Exclusive seq: 0)]
58+
lock: "\xfab"
59+
holder: txn: t3 epoch: 0, iso: Serializable, ts: <stripped>, info: unrepl [(str: Exclusive seq: 0)]
60+
61+
flush-lock-table start=a end=z max-keys=1
62+
----
63+
64+
print-in-memory-lock-table
65+
----
66+
num=1
67+
lock: "\xfab"
68+
holder: txn: t3 epoch: 0, iso: Serializable, ts: <stripped>, info: unrepl [(str: Exclusive seq: 0)]
69+
70+
print-replicated-lock-table start=a end=z
71+
----
72+
key: "\xfaa", str: Exclusive, txn: t3
73+
74+
commit txn=t3
75+
----
76+
77+
# TargetBytes Limit
78+
new-txn txn=t4
79+
----
80+
81+
get txn=t4 k=a lock=Exclusive dur=Unreplicated
82+
----
83+
get: "\xfaa"="bar"
84+
85+
get txn=t4 k=b lock=Exclusive dur=Unreplicated
86+
----
87+
get: "\xfab"="baz"
88+
89+
print-in-memory-lock-table
90+
----
91+
num=2
92+
lock: "\xfaa"
93+
holder: txn: t4 epoch: 0, iso: Serializable, ts: <stripped>, info: unrepl [(str: Exclusive seq: 0)]
94+
lock: "\xfab"
95+
holder: txn: t4 epoch: 0, iso: Serializable, ts: <stripped>, info: unrepl [(str: Exclusive seq: 0)]
96+
97+
flush-lock-table start=a end=z target-bytes=128
98+
----
99+
100+
print-in-memory-lock-table
101+
----
102+
num=1
103+
lock: "\xfab"
104+
holder: txn: t4 epoch: 0, iso: Serializable, ts: <stripped>, info: unrepl [(str: Exclusive seq: 0)]
105+
106+
print-replicated-lock-table start=a end=z
107+
----
108+
key: "\xfaa", str: Exclusive, txn: t4
109+
110+
commit txn=t4
111+
----
112+
113+
# MaxSpanRequestKeys ineffective limit
114+
new-txn txn=t5
115+
----
116+
117+
get txn=t5 k=a lock=Exclusive dur=Unreplicated
118+
----
119+
get: "\xfaa"="bar"
120+
121+
get txn=t5 k=b lock=Exclusive dur=Unreplicated
122+
----
123+
get: "\xfab"="baz"
124+
125+
print-in-memory-lock-table
126+
----
127+
num=2
128+
lock: "\xfaa"
129+
holder: txn: t5 epoch: 0, iso: Serializable, ts: <stripped>, info: unrepl [(str: Exclusive seq: 0)]
130+
lock: "\xfab"
131+
holder: txn: t5 epoch: 0, iso: Serializable, ts: <stripped>, info: unrepl [(str: Exclusive seq: 0)]
132+
133+
flush-lock-table start=a end=z max-keys=8
134+
----
135+
136+
print-in-memory-lock-table
137+
----
138+
num=0
139+
140+
print-replicated-lock-table start=a end=z
141+
----
142+
key: "\xfaa", str: Exclusive, txn: t5
143+
key: "\xfab", str: Exclusive, txn: t5
144+
145+
commit txn=t5
146+
----
147+
148+
# TargetBytes ineffective limit
149+
new-txn txn=t6
150+
----
151+
152+
get txn=t6 k=a lock=Exclusive dur=Unreplicated
153+
----
154+
get: "\xfaa"="bar"
155+
156+
get txn=t6 k=b lock=Exclusive dur=Unreplicated
157+
----
158+
get: "\xfab"="baz"
159+
160+
print-in-memory-lock-table
161+
----
162+
num=2
163+
lock: "\xfaa"
164+
holder: txn: t6 epoch: 0, iso: Serializable, ts: <stripped>, info: unrepl [(str: Exclusive seq: 0)]
165+
lock: "\xfab"
166+
holder: txn: t6 epoch: 0, iso: Serializable, ts: <stripped>, info: unrepl [(str: Exclusive seq: 0)]
167+
168+
flush-lock-table start=a end=z target-bytes=4096
169+
----
170+
171+
print-in-memory-lock-table
172+
----
173+
num=0
174+
175+
print-replicated-lock-table start=a end=z
176+
----
177+
key: "\xfaa", str: Exclusive, txn: t6
178+
key: "\xfab", str: Exclusive, txn: t6
179+
180+
commit txn=t6
181+
----

0 commit comments

Comments
 (0)