Skip to content

Commit 37fec6a

Browse files
committed
kvserver: add basic FlushLockTable request
This adds a new request FlushLockTableRequest. When issued, any unreplicated locks in the request header's span will be written as replicated locks to disk and removed from the in-memory lock table. This is an unsplittable request that can't span range boundaries. The goal of this request is two fold: 1) By modelling this as a request, we can easily issue flush operations during KVNemesis at random. This will help us uncover problems with our current flush semantics. 2) We may use this flush request from a new queue that will periodically ensure that the size of the lock table is under control. Epic: CRDB-42764 Release note: None
1 parent 21c5277 commit 37fec6a

File tree

15 files changed

+2234
-8
lines changed

15 files changed

+2234
-8
lines changed

docs/generated/metrics/metrics.html

Lines changed: 2001 additions & 0 deletions
Large diffs are not rendered by default.

docs/generated/metrics/metrics.yaml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2980,6 +2980,18 @@ layers:
29802980
unit: COUNT
29812981
aggregation: AVG
29822982
derivative: NON_NEGATIVE_DERIVATIVE
2983+
- name: distsender.rpc.flushlocktable.sent
2984+
exported_name: distsender_rpc_flushlocktable_sent
2985+
description: |-
2986+
Number of FlushLockTable requests processed.
2987+
2988+
This counts the requests in batches handed to DistSender, not the RPCs
2989+
sent to individual Ranges as a result.
2990+
y_axis_label: RPCs
2991+
type: COUNTER
2992+
unit: COUNT
2993+
aggregation: AVG
2994+
derivative: NON_NEGATIVE_DERIVATIVE
29832995
- name: distsender.rpc.gc.sent
29842996
exported_name: distsender_rpc_gc_sent
29852997
description: |-
@@ -15568,6 +15580,14 @@ layers:
1556815580
unit: COUNT
1556915581
aggregation: AVG
1557015582
derivative: NON_NEGATIVE_DERIVATIVE
15583+
- name: rpc.method.flushlocktable.recv
15584+
exported_name: rpc_method_flushlocktable_recv
15585+
description: Number of FlushLockTable requests processed
15586+
y_axis_label: RPCs
15587+
type: COUNTER
15588+
unit: COUNT
15589+
aggregation: AVG
15590+
derivative: NON_NEGATIVE_DERIVATIVE
1557115591
- name: rpc.method.gc.recv
1557215592
exported_name: rpc_method_gc_recv
1557315593
description: Number of GC requests processed

pkg/kv/kvpb/api.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -985,6 +985,9 @@ func (*BarrierRequest) Method() Method { return Barrier }
985985
// Method implements the Request interface.
986986
func (*IsSpanEmptyRequest) Method() Method { return IsSpanEmpty }
987987

988+
// Method implements the Request interface.
989+
func (*FlushLockTableRequest) Method() Method { return FlushLockTable }
990+
988991
// ShallowCopy implements the Request interface.
989992
func (gr *GetRequest) ShallowCopy() Request {
990993
shallowCopy := *gr
@@ -1273,6 +1276,12 @@ func (r *IsSpanEmptyRequest) ShallowCopy() Request {
12731276
return &shallowCopy
12741277
}
12751278

1279+
// ShallowCopy implements the Request interface.
1280+
func (r *FlushLockTableRequest) ShallowCopy() Request {
1281+
shallowCopy := *r
1282+
return &shallowCopy
1283+
}
1284+
12761285
// ShallowCopy implements the Response interface.
12771286
func (gr *GetResponse) ShallowCopy() Response {
12781287
shallowCopy := *gr
@@ -1559,6 +1568,12 @@ func (r *IsSpanEmptyResponse) ShallowCopy() Response {
15591568
return &shallowCopy
15601569
}
15611570

1571+
// ShallowCopy implements the Response interface.
1572+
func (r *FlushLockTableResponse) ShallowCopy() Response {
1573+
shallowCopy := *r
1574+
return &shallowCopy
1575+
}
1576+
15621577
// NewLockingGet returns a Request initialized to get the value at key. A lock
15631578
// corresponding to the supplied lock strength and durability is acquired on the
15641579
// key, if it exists.
@@ -2087,6 +2102,9 @@ func (r *BarrierRequest) flags() flag {
20872102
return flags
20882103
}
20892104
func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange }
2105+
func (*FlushLockTableRequest) flags() flag {
2106+
return isWrite | isRange | isAlone | isUnsplittable
2107+
}
20902108

20912109
// IsParallelCommit returns whether the EndTxn request is attempting to perform
20922110
// a parallel commit. See txn_interceptor_committer.go for a discussion about

pkg/kv/kvpb/api.proto

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2501,6 +2501,22 @@ message BarrierResponse {
25012501
RangeDescriptor range_desc = 4 [(gogoproto.nullable) = false];
25022502
}
25032503

2504+
// FlushLockTableRequest instructs the replica, if it is the leaseholder, to
2505+
// flush any in-memory unreplicated locks to the replicated lock table contained
2506+
// in the request header's span.
2507+
message FlushLockTableRequest {
2508+
RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
2509+
}
2510+
2511+
// FlushLockTableResponse is the response for a FlushLockTable operation.
2512+
message FlushLockTableResponse {
2513+
ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
2514+
2515+
// LocksWritten is the number of locks written to the replicated lock table as
2516+
// a result of this request.
2517+
uint64 locks_written = 2;
2518+
}
2519+
25042520
// A RequestUnion contains exactly one of the requests.
25052521
// The values added here must match those in ResponseUnion.
25062522
//
@@ -2556,6 +2572,7 @@ message RequestUnion {
25562572
IsSpanEmptyRequest is_span_empty = 56;
25572573
LinkExternalSSTableRequest link_external_sstable = 57;
25582574
ExciseRequest excise = 58;
2575+
FlushLockTableRequest flush_lock_table = 59;
25592576
}
25602577
reserved 8, 15, 23, 25, 26, 27, 31, 34, 49, 52;
25612578
}
@@ -2611,6 +2628,7 @@ message ResponseUnion {
26112628
IsSpanEmptyResponse is_span_empty = 56;
26122629
LinkExternalSSTableResponse link_external_sstable = 57;
26132630
ExciseResponse excise = 58;
2631+
FlushLockTableResponse flush_lock_table = 59;
26142632
}
26152633
reserved 8, 15, 23, 25, 26, 27, 28, 31, 34, 49, 52;
26162634
}

pkg/kv/kvpb/batch_generated.go

Lines changed: 26 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/kv/kvpb/method.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,11 @@ const (
167167
// Excise is a non-MVCC command that destroys all data in a user MVCC key
168168
// span. See ExciseRequest for details.
169169
Excise
170+
171+
// FlushLockTable is an operation writes unreplicated locks in the
172+
// in-memory lock table to the replicated lock table.
173+
FlushLockTable
174+
170175
// MaxMethod is the maximum method.
171176
MaxMethod Method = iota - 1
172177
// NumMethods represents the total number of API methods.

pkg/kv/kvpb/method_string.go

Lines changed: 8 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/kv/kvserver/batcheval/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ go_library(
1313
"cmd_end_transaction.go",
1414
"cmd_excise.go",
1515
"cmd_export.go",
16+
"cmd_flush_lock_table.go",
1617
"cmd_gc.go",
1718
"cmd_get.go",
1819
"cmd_heartbeat_txn.go",
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package batcheval
7+
8+
import (
9+
"context"
10+
"time"
11+
12+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
13+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
14+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
15+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset"
16+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
17+
"github.com/cockroachdb/cockroach/pkg/roachpb"
18+
"github.com/cockroachdb/cockroach/pkg/storage"
19+
)
20+
21+
func init() {
22+
RegisterReadWriteCommand(kvpb.FlushLockTable, declareKeysFlushLockTable, FlushLockTable)
23+
}
24+
25+
func declareKeysFlushLockTable(
26+
_ ImmutableRangeState,
27+
_ *kvpb.Header,
28+
req kvpb.Request,
29+
latchSpans *spanset.SpanSet,
30+
_ *lockspanset.LockSpanSet,
31+
_ time.Duration,
32+
) error {
33+
// We declare non-MVCC read-write latches over the entire span we are
34+
// exporting. This is similar to the latches LeaseTransfer and Merge will
35+
// take. This is perhaps more aggressive than needed.
36+
//
37+
// TODO(ssd): Consider moving this back to normal MVCC write latches.
38+
latchSpans.AddNonMVCC(spanset.SpanReadWrite, req.Header().Span())
39+
return nil
40+
}
41+
42+
// FlushLockTable scans the in-memory lock tbale for unreplicated locks and
43+
// writes them as replicated locks.
44+
func FlushLockTable(
45+
ctx context.Context, rw storage.ReadWriter, cArgs CommandArgs, response kvpb.Response,
46+
) (result.Result, error) {
47+
args := cArgs.Args.(*kvpb.FlushLockTableRequest)
48+
resp := response.(*kvpb.FlushLockTableResponse)
49+
50+
// TODO(ssd): Allow the caller to limit how many locks we write out.
51+
locksToFlush := make([]roachpb.LockAcquisition, 0)
52+
cArgs.EvalCtx.GetConcurrencyManager().ExportUnreplicatedLocks(args.Span(), func(l *roachpb.LockAcquisition) {
53+
locksToFlush = append(locksToFlush, *l)
54+
})
55+
56+
for i, l := range locksToFlush {
57+
locksToFlush[i].Durability = lock.Replicated
58+
if err := storage.MVCCAcquireLock(ctx, rw,
59+
&l.Txn, l.IgnoredSeqNums, l.Strength, l.Key,
60+
cArgs.Stats, 0, 0, true /* allowSequenceNumberRegression */); err != nil {
61+
return result.Result{}, err
62+
}
63+
}
64+
resp.LocksWritten = uint64(len(locksToFlush))
65+
66+
// NOTE: The locks still exist in the in-memory lock table. They are not
67+
// cleared until OnLockAcquired is called by (*replica).handleReadWriteLocalEvalResult.
68+
return result.WithAcquiredLocks(locksToFlush...), nil
69+
}

pkg/kv/kvserver/client_lock_table_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,20 @@ func TestClientLockTableDataDriven(t *testing.T) {
159159
return fmt.Sprintf("error: %s", err.Error())
160160
}
161161
return ""
162+
case "flush-lock-table":
163+
startKey := evalCtx.getNamedKey("start", d)
164+
endKey := evalCtx.getNamedKey("end", d)
165+
b := db.NewBatch()
166+
b.AddRawRequest(&kvpb.FlushLockTableRequest{
167+
RequestHeader: kvpb.RequestHeader{
168+
Key: startKey,
169+
EndKey: endKey,
170+
},
171+
})
172+
if err := db.Run(ctx, b); err != nil {
173+
return fmt.Sprintf("error: %s", err.Error())
174+
}
175+
return ""
162176
default:
163177
d.Fatalf(t, "unknown command: %s", d.Cmd)
164178
return ""

0 commit comments

Comments
 (0)