Skip to content

Commit 2b4bb44

Browse files
craig[bot]stevendanna
andcommitted
Merge #141821
141821: kvserver: add basic FlushLockTable request r=arulajmani a=stevendanna This adds a new request FlushLockTableRequest. When issues, any unreplicated locks in the request header's span will be written as replicated locks to disk and removed from the in-memory lock table.s The goal of this request is two fold: 1) By modelling this as a request, we can easily issue flush operations during KVNemesis at random. This will help us uncover problems with our current flush semantics. 2) We may use this flush request from a new queue that will periodically ensure that the size of the lock table is under control. Epic: [CRDB-42764](https://cockroachlabs.atlassian.net/browse/CRDB-42764) Release note: None Co-authored-by: Steven Danna <[email protected]>
2 parents debf27f + 23bd412 commit 2b4bb44

File tree

23 files changed

+2346
-9
lines changed

23 files changed

+2346
-9
lines changed

docs/generated/metrics/metrics.html

Lines changed: 2001 additions & 0 deletions
Large diffs are not rendered by default.

docs/generated/metrics/metrics.yaml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2980,6 +2980,18 @@ layers:
29802980
unit: COUNT
29812981
aggregation: AVG
29822982
derivative: NON_NEGATIVE_DERIVATIVE
2983+
- name: distsender.rpc.flushlocktable.sent
2984+
exported_name: distsender_rpc_flushlocktable_sent
2985+
description: |-
2986+
Number of FlushLockTable requests processed.
2987+
2988+
This counts the requests in batches handed to DistSender, not the RPCs
2989+
sent to individual Ranges as a result.
2990+
y_axis_label: RPCs
2991+
type: COUNTER
2992+
unit: COUNT
2993+
aggregation: AVG
2994+
derivative: NON_NEGATIVE_DERIVATIVE
29832995
- name: distsender.rpc.gc.sent
29842996
exported_name: distsender_rpc_gc_sent
29852997
description: |-
@@ -15676,6 +15688,14 @@ layers:
1567615688
unit: COUNT
1567715689
aggregation: AVG
1567815690
derivative: NON_NEGATIVE_DERIVATIVE
15691+
- name: rpc.method.flushlocktable.recv
15692+
exported_name: rpc_method_flushlocktable_recv
15693+
description: Number of FlushLockTable requests processed
15694+
y_axis_label: RPCs
15695+
type: COUNTER
15696+
unit: COUNT
15697+
aggregation: AVG
15698+
derivative: NON_NEGATIVE_DERIVATIVE
1567915699
- name: rpc.method.gc.recv
1568015700
exported_name: rpc_method_gc_recv
1568115701
description: Number of GC requests processed

pkg/kv/batch.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ func (b *Batch) fillResults(ctx context.Context) {
294294
case *kvpb.MigrateRequest:
295295
case *kvpb.QueryResolvedTimestampRequest:
296296
case *kvpb.BarrierRequest:
297+
case *kvpb.FlushLockTableRequest:
297298
case *kvpb.LinkExternalSSTableRequest:
298299
case *kvpb.ExciseRequest:
299300
default:
@@ -1141,6 +1142,27 @@ func (b *Batch) barrier(s, e interface{}, withLAI bool) {
11411142
b.initResult(1, 0, notRaw, nil)
11421143
}
11431144

1145+
func (b *Batch) flushLockTable(s, e interface{}) {
1146+
begin, err := marshalKey(s)
1147+
if err != nil {
1148+
b.initResult(0, 0, notRaw, err)
1149+
return
1150+
}
1151+
end, err := marshalKey(e)
1152+
if err != nil {
1153+
b.initResult(0, 0, notRaw, err)
1154+
return
1155+
}
1156+
req := &kvpb.FlushLockTableRequest{
1157+
RequestHeader: kvpb.RequestHeader{
1158+
Key: begin,
1159+
EndKey: end,
1160+
},
1161+
}
1162+
b.appendReqs(req)
1163+
b.initResult(1, 0, notRaw, nil)
1164+
}
1165+
11441166
func (b *Batch) bulkRequest(
11451167
numKeys int, requestFactory func() (req kvpb.RequestUnion, kvSize int),
11461168
) {

pkg/kv/db.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -899,6 +899,23 @@ func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestam
899899
return resp.Timestamp, nil
900900
}
901901

902+
func (db *DB) FlushLockTable(ctx context.Context, begin, end interface{}) error {
903+
b := &Batch{}
904+
b.flushLockTable(begin, end)
905+
if err := getOneErr(db.Run(ctx, b), b); err != nil {
906+
return err
907+
}
908+
if l := len(b.response.Responses); l != 1 {
909+
return errors.Errorf("got %d responses for FlushLockTable", l)
910+
}
911+
resp := b.response.Responses[0].GetFlushLockTable()
912+
if resp == nil {
913+
return errors.Errorf("unexpected response %T for FlushLockTable",
914+
b.response.Responses[0].GetInner())
915+
}
916+
return nil
917+
}
918+
902919
// BarrierWithLAI is like Barrier, but also returns the lease applied index and
903920
// range descriptor at which the barrier was applied. In this case, the barrier
904921
// can't span multiple ranges, otherwise a RangeKeyMismatchError is returned.

pkg/kv/kvnemesis/applier.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
143143
_, err = db.Barrier(ctx, o.Key, o.EndKey)
144144
}
145145
o.Result = resultInit(ctx, err)
146+
case *FlushLockTableOperation:
147+
o.Result = resultInit(ctx, db.FlushLockTable(ctx, o.Key, o.EndKey))
146148
case *ClosureTxnOperation:
147149
// Use a backoff loop to avoid thrashing on txn aborts. Don't wait between
148150
// epochs of the same transaction to avoid waiting while holding locks.
@@ -450,6 +452,16 @@ func applyClientOp(
450452
})
451453
})
452454
o.Result = resultInit(ctx, err)
455+
case *FlushLockTableOperation:
456+
_, _, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
457+
b.AddRawRequest(&kvpb.FlushLockTableRequest{
458+
RequestHeader: kvpb.RequestHeader{
459+
Key: o.Key,
460+
EndKey: o.EndKey,
461+
},
462+
})
463+
})
464+
o.Result = resultInit(ctx, err)
453465
case *BatchOperation:
454466
b := &kv.Batch{}
455467
applyBatchOp(ctx, b, db.Run, o)
@@ -570,6 +582,8 @@ func applyBatchOp(
570582
panic(errors.AssertionFailedf(`AddSSTable cannot be used in batches`))
571583
case *BarrierOperation:
572584
panic(errors.AssertionFailedf(`Barrier cannot be used in batches`))
585+
case *FlushLockTableOperation:
586+
panic(errors.AssertionFailedf(`FlushLockOperation cannot be used in batches`))
573587
default:
574588
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
575589
}

pkg/kv/kvnemesis/generator.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,10 @@ type ClientOperationConfig struct {
267267
AddSSTable int
268268
// Barrier is an operation that waits for in-flight writes to complete.
269269
Barrier int
270+
271+
// FlushLockTable is an operation that moves unreplicated locks in the
272+
// in-memory lock table into the
273+
FlushLockTable int
270274
}
271275

272276
// BatchOperationConfig configures the relative probability of generating a
@@ -407,6 +411,7 @@ func newAllOperationsConfig() GeneratorConfig {
407411
DeleteRangeUsingTombstone: 1,
408412
AddSSTable: 1,
409413
Barrier: 1,
414+
FlushLockTable: 1,
410415
}
411416
batchOpConfig := BatchOperationConfig{
412417
Batch: 4,
@@ -542,6 +547,11 @@ func NewDefaultConfig() GeneratorConfig {
542547
config.Ops.ClosureTxn.CommitBatchOps.Barrier = 0
543548
config.Ops.ClosureTxn.TxnClientOps.Barrier = 0
544549
config.Ops.ClosureTxn.TxnBatchOps.Ops.Barrier = 0
550+
551+
config.Ops.Batch.Ops.FlushLockTable = 0
552+
config.Ops.ClosureTxn.CommitBatchOps.FlushLockTable = 0
553+
config.Ops.ClosureTxn.TxnClientOps.FlushLockTable = 0
554+
config.Ops.ClosureTxn.TxnBatchOps.Ops.FlushLockTable = 0
545555
return config
546556
}
547557

@@ -839,6 +849,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig
839849
addOpGen(allowed, randDelRangeUsingTombstone, c.DeleteRangeUsingTombstone)
840850
addOpGen(allowed, randAddSSTable, c.AddSSTable)
841851
addOpGen(allowed, randBarrier, c.Barrier)
852+
addOpGen(allowed, randFlushLockTable, c.FlushLockTable)
842853
}
843854

844855
func (g *generator) registerBatchOps(allowed *[]opGen, c *BatchOperationConfig) {
@@ -1144,6 +1155,19 @@ func randBarrier(g *generator, rng *rand.Rand) Operation {
11441155
return barrier(key, endKey, withLAI)
11451156
}
11461157

1158+
func randFlushLockTable(g *generator, rng *rand.Rand) Operation {
1159+
// FlushLockTable can't span multiple ranges. We want to test a combination of
1160+
// requests that span the entire range and those that span part of a range.
1161+
key, endKey := randRangeSpan(rng, g.currentSplits)
1162+
1163+
wholeRange := rng.Float64() < 0.5
1164+
if !wholeRange {
1165+
key = randKeyBetween(rng, key, endKey)
1166+
}
1167+
1168+
return flushLockTable(key, endKey)
1169+
}
1170+
11471171
func randScan(g *generator, rng *rand.Rand) Operation {
11481172
key, endKey := randSpan(rng)
11491173
return scan(key, endKey)
@@ -1986,6 +2010,13 @@ func barrier(key, endKey string, withLAI bool) Operation {
19862010
}}
19872011
}
19882012

2013+
func flushLockTable(key, endKey string) Operation {
2014+
return Operation{FlushLockTable: &FlushLockTableOperation{
2015+
Key: []byte(key),
2016+
EndKey: []byte(endKey),
2017+
}}
2018+
}
2019+
19892020
func createSavepoint(id int) Operation {
19902021
return Operation{SavepointCreate: &SavepointCreateOperation{ID: int32(id)}}
19912022
}

pkg/kv/kvnemesis/generator_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,8 @@ func TestRandStep(t *testing.T) {
248248
client.AddSSTable++
249249
case *BarrierOperation:
250250
client.Barrier++
251+
case *FlushLockTableOperation:
252+
client.FlushLockTable++
251253
case *BatchOperation:
252254
batch.Batch++
253255
countClientOps(&batch.Ops, nil, o.Ops...)
@@ -284,7 +286,8 @@ func TestRandStep(t *testing.T) {
284286
*DeleteRangeOperation,
285287
*DeleteRangeUsingTombstoneOperation,
286288
*AddSSTableOperation,
287-
*BarrierOperation:
289+
*BarrierOperation,
290+
*FlushLockTableOperation:
288291
countClientOps(&counts.DB, &counts.Batch, step.Op)
289292
case *ClosureTxnOperation:
290293
countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...)

pkg/kv/kvnemesis/operations.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ func (op Operation) Result() *Result {
3838
return &o.Result
3939
case *BarrierOperation:
4040
return &o.Result
41+
case *FlushLockTableOperation:
42+
return &o.Result
4143
case *SplitOperation:
4244
return &o.Result
4345
case *MergeOperation:
@@ -138,6 +140,8 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) {
138140
o.format(w, fctx)
139141
case *BarrierOperation:
140142
o.format(w, fctx)
143+
case *FlushLockTableOperation:
144+
o.format(w, fctx)
141145
case *SplitOperation:
142146
o.format(w, fctx)
143147
case *MergeOperation:
@@ -368,6 +372,11 @@ func (op BarrierOperation) format(w *strings.Builder, fctx formatCtx) {
368372
op.Result.format(w)
369373
}
370374

375+
func (op FlushLockTableOperation) format(w *strings.Builder, fctx formatCtx) {
376+
fmt.Fprintf(w, `%s.FlushLockTable(ctx, %s, %s)`, fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey))
377+
op.Result.format(w)
378+
}
379+
371380
func (op SplitOperation) format(w *strings.Builder, fctx formatCtx) {
372381
fmt.Fprintf(w, `%s.AdminSplit(ctx, %s, hlc.MaxTimestamp)`, fctx.receiver, fmtKey(op.Key))
373382
op.Result.format(w)

pkg/kv/kvnemesis/operations.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ message BarrierOperation {
9898
Result result = 4 [(gogoproto.nullable) = false];
9999
}
100100

101+
message FlushLockTableOperation {
102+
bytes key = 1;
103+
bytes end_key = 2;
104+
Result result = 3 [(gogoproto.nullable) = false];
105+
}
106+
101107
message SplitOperation {
102108
bytes key = 1;
103109
Result result = 2 [(gogoproto.nullable) = false];
@@ -183,6 +189,7 @@ message Operation {
183189
SavepointReleaseOperation savepoint_release = 21;
184190
SavepointRollbackOperation savepoint_rollback = 22;
185191
BarrierOperation barrier = 23;
192+
FlushLockTableOperation flush_lock_table = 24;
186193
}
187194

188195
enum ResultType {

pkg/kv/kvnemesis/validator.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -735,6 +735,14 @@ func (v *validator) processOp(op Operation) {
735735
}
736736
// We don't yet actually check the barrier guarantees here, i.e. that all
737737
// concurrent writes are applied by the time it completes. Maybe later.
738+
case *FlushLockTableOperation:
739+
execTimestampStrictlyOptional = true
740+
if resultHasErrorType(t.Result, &kvpb.RangeKeyMismatchError{}) {
741+
// FlushLockTableOperation may race with a split.
742+
} else {
743+
// Fail or retry on other errors, depending on type.
744+
v.checkNonAmbError(op, t.Result, exceptUnhandledRetry)
745+
}
738746
case *ScanOperation:
739747
if _, isErr := v.checkError(op, t.Result); isErr {
740748
break

0 commit comments

Comments
 (0)