Skip to content

Commit 59a246d

Browse files
committed
kvnemesis: add follower reads
This commit adds follower reads support to kvnemesis. The target closed timestamp is set to 1s, and select read-only non-locking operations run in batches with a timestamp of now - 3s. Multi-request batches and transactions are also eligible for follower reads as long as they consist only of eligible operations. Fixes: #59061 Release note: None
1 parent cc5d7c7 commit 59a246d

File tree

7 files changed

+154
-15
lines changed

7 files changed

+154
-15
lines changed

pkg/kv/db.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,15 @@ func (s *CrossRangeTxnWrapperSender) Send(
226226
return br, pErr
227227
}
228228

229+
// Before retrying the batch in a transaction, strip the header's timestamp.
230+
// It may have been set to try a follower read, but it's not allowed in a txn
231+
// (see comment near Header.Timestamp). Currently, this non-transactional API
232+
// is used for follower reads only by KVNemesis.
233+
//
234+
// We can end up here if the batch contains transactional requests and spans
235+
// multiple ranges.
236+
ba.Header.Timestamp = hlc.Timestamp{}
237+
229238
err := s.db.Txn(ctx, func(ctx context.Context, txn *Txn) error {
230239
txn.SetDebugName("auto-wrap")
231240
b := txn.NewBatch()

pkg/kv/kvnemesis/applier.go

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,16 @@ func exceptContextCanceled(err error) bool {
122122
strings.Contains(err.Error(), "query execution canceled")
123123
}
124124

125+
const followerReadsOffset = 2 * time.Second
126+
127+
func makeFollowerReadTimestamp(ts hlc.Timestamp) hlc.Timestamp {
128+
return ts.Add(-followerReadsOffset.Nanoseconds(), 0)
129+
}
130+
func configureBatchForFollowerReads(b *kv.Batch, ts hlc.Timestamp) {
131+
b.Header.Timestamp = makeFollowerReadTimestamp(ts)
132+
b.Header.RoutingPolicy = kvpb.RoutingPolicy_NEAREST
133+
}
134+
125135
func (a *Applier) applyOp(ctx context.Context, db *kv.DB, op *Operation) {
126136
switch o := op.GetValue().(type) {
127137
case *GetOperation,
@@ -188,6 +198,15 @@ func (a *Applier) applyOp(ctx context.Context, db *kv.DB, op *Operation) {
188198
})
189199
var savedTxn *kv.Txn
190200
txnErr := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
201+
// Attempt to set a follower reads timestamp only on the first iteration.
202+
// Setting it again and again on retries can lead to starvation.
203+
if o.FollowerReadEligible && savedTxn == nil {
204+
followerReadTs := makeFollowerReadTimestamp(db.Clock().Now())
205+
err := txn.SetFixedTimestamp(ctx, followerReadTs)
206+
if err != nil {
207+
panic(err)
208+
}
209+
}
191210
if err := txn.SetIsoLevel(o.IsoLevel); err != nil {
192211
panic(err)
193212
}
@@ -251,7 +270,7 @@ func (a *Applier) applyOp(ctx context.Context, db *kv.DB, op *Operation) {
251270
}
252271
if o.CommitInBatch != nil {
253272
b := txn.NewBatch()
254-
applyBatchOp(ctx, b, txn.CommitInBatch, o.CommitInBatch)
273+
applyBatchOp(ctx, b, txn.CommitInBatch, o.CommitInBatch, nil /* clock */)
255274
// The KV api disallows use of a txn after an operation on it errors.
256275
if r := o.CommitInBatch.Result; r.Type == ResultType_Error {
257276
return errors.DecodeError(ctx, *r.Err)
@@ -349,6 +368,13 @@ func applyClientOp(
349368
} else {
350369
b.Get(o.Key)
351370
}
371+
if !inTxn && o.FollowerReadEligible {
372+
kvDB, ok := db.(*kv.DB)
373+
if !ok {
374+
panic(errors.AssertionFailedf("unexpected transactional interface"))
375+
}
376+
configureBatchForFollowerReads(b, kvDB.Clock().Now())
377+
}
352378
})
353379
o.Result = resultInit(ctx, err)
354380
if err != nil {
@@ -422,6 +448,13 @@ func applyClientOp(
422448
b.Scan(o.Key, o.EndKey)
423449
}
424450
}
451+
if !inTxn && o.FollowerReadEligible {
452+
kvDB, ok := db.(*kv.DB)
453+
if !ok {
454+
panic(errors.AssertionFailedf("unexpected transactional interface"))
455+
}
456+
configureBatchForFollowerReads(b, kvDB.Clock().Now())
457+
}
425458
})
426459
o.Result = resultInit(ctx, err)
427460
if err != nil {
@@ -531,7 +564,15 @@ func applyClientOp(
531564
o.Result = resultInit(ctx, err)
532565
case *BatchOperation:
533566
b := &kv.Batch{}
534-
applyBatchOp(ctx, b, db.Run, o)
567+
if inTxn {
568+
applyBatchOp(ctx, b, db.Run, o, nil /* clock */)
569+
} else {
570+
kvDB, ok := db.(*kv.DB)
571+
if !ok {
572+
panic(errors.AssertionFailedf("unexpected transactional interface"))
573+
}
574+
applyBatchOp(ctx, b, db.Run, o, kvDB.Clock())
575+
}
535576
case *SavepointCreateOperation:
536577
txn, ok := db.(*kv.Txn) // savepoints are only allowed with transactions
537578
if !ok {
@@ -589,7 +630,11 @@ func setLastReqSeq(b *kv.Batch, seq kvnemesisutil.Seq) {
589630
}
590631

591632
func applyBatchOp(
592-
ctx context.Context, b *kv.Batch, run func(context.Context, *kv.Batch) error, o *BatchOperation,
633+
ctx context.Context,
634+
b *kv.Batch,
635+
run func(context.Context, *kv.Batch) error,
636+
o *BatchOperation,
637+
clock *hlc.Clock,
593638
) {
594639
for i := range o.Ops {
595640
switch subO := o.Ops[i].GetValue().(type) {
@@ -674,6 +719,9 @@ func applyBatchOp(
674719
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
675720
}
676721
}
722+
if clock != nil && o.FollowerReadEligible {
723+
configureBatchForFollowerReads(b, clock.Now())
724+
}
677725
ts, err := batchRun(ctx, run, b)
678726
o.Result = resultInit(ctx, err)
679727
// NB: we intentionally fall through; the batch propagates the error

pkg/kv/kvnemesis/generator.go

Lines changed: 77 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ type ClosureTxnConfig struct {
111111
type ClientOperationConfig struct {
112112
// GetMissing is an operation that Gets a key that definitely doesn't exist.
113113
GetMissing int
114+
// GetMissingFollowerRead is an operation that Gets a key that definitely
115+
// doesn't exist, and is marked eligible to be served as a follower read.
116+
GetMissingFollowerRead int
114117
// GetMissingForUpdate is an operation that Gets a key that definitely doesn't
115118
// exist using a locking read with strength lock.Exclusive.
116119
GetMissingForUpdate int
@@ -146,6 +149,9 @@ type ClientOperationConfig struct {
146149
GetMissingForShareSkipLockedGuaranteedDurability int
147150
// GetExisting is an operation that Gets a key that likely exists.
148151
GetExisting int
152+
// GetExistingFollowerRead is an operation that Gets a key that likely exists,
153+
// and is marked eligible to be served as a follower read.
154+
GetExistingFollowerRead int
149155
// GetExistingForUpdate is an operation that Gets a key that likely exists
150156
// using a locking read with strength lock.Exclusive.
151157
GetExistingForUpdate int
@@ -203,6 +209,9 @@ type ClientOperationConfig struct {
203209
CPutAllowIfDoesNotExist int
204210
// Scan is an operation that Scans a key range that may contain values.
205211
Scan int
212+
// ScanFollowerRead is an operation that Scans a key range that may contain
213+
// values, and is marked eligible to be served as a follower read.
214+
ScanFollowerRead int
206215
// ScanForUpdate is an operation that Scans a key range that may contain
207216
// values using a per-key locking scan with strength lock.Exclusive.
208217
ScanForUpdate int
@@ -240,6 +249,10 @@ type ClientOperationConfig struct {
240249
// ReverseScan is an operation that Scans a key range that may contain
241250
// values in reverse key order.
242251
ReverseScan int
252+
// ReverseScanFollowerRead is an operation that Scans a key range that may
253+
// contain values in reverse key order, and is marked eligible to be served as
254+
// a follower read.
255+
ReverseScanFollowerRead int
243256
// ReverseScanForUpdate is an operation that Scans a key range that may
244257
// contain values using a per-key locking scan with strength lock.Exclusive in
245258
// reverse key order.
@@ -419,16 +432,18 @@ type FaultConfig struct {
419432
// yet pass (for example, if the new operation finds a kv bug or edge case).
420433
func newAllOperationsConfig() GeneratorConfig {
421434
clientOpConfig := ClientOperationConfig{
422-
GetMissing: 1,
423-
GetMissingForUpdate: 1,
424-
GetMissingForUpdateGuaranteedDurability: 1,
425-
GetMissingForUpdateSkipLocked: 1,
426-
GetMissingForUpdateSkipLockedGuaranteedDurability: 1,
427-
GetMissingForShare: 1,
428-
GetMissingForShareGuaranteedDurability: 1,
429-
GetMissingForShareSkipLocked: 1,
430-
GetMissingForShareSkipLockedGuaranteedDurability: 1,
435+
GetMissing: 1,
436+
GetMissingFollowerRead: 1,
437+
GetMissingForUpdate: 1,
438+
GetMissingForUpdateGuaranteedDurability: 1,
439+
GetMissingForUpdateSkipLocked: 1,
440+
GetMissingForUpdateSkipLockedGuaranteedDurability: 1,
441+
GetMissingForShare: 1,
442+
GetMissingForShareGuaranteedDurability: 1,
443+
GetMissingForShareSkipLocked: 1,
444+
GetMissingForShareSkipLockedGuaranteedDurability: 1,
431445
GetExisting: 1,
446+
GetExistingFollowerRead: 1,
432447
GetExistingForUpdate: 1,
433448
GetExistingForUpdateGuaranteedDurability: 1,
434449
GetExistingForShare: 1,
@@ -447,6 +462,7 @@ func newAllOperationsConfig() GeneratorConfig {
447462
CPutNoMatch: 1,
448463
CPutAllowIfDoesNotExist: 1,
449464
Scan: 1,
465+
ScanFollowerRead: 1,
450466
ScanForUpdate: 1,
451467
ScanForUpdateGuaranteedDurability: 1,
452468
ScanForShare: 1,
@@ -457,6 +473,7 @@ func newAllOperationsConfig() GeneratorConfig {
457473
ScanForShareSkipLocked: 1,
458474
ScanForShareSkipLockedGuaranteedDurability: 1,
459475
ReverseScan: 1,
476+
ReverseScanFollowerRead: 1,
460477
ReverseScanForUpdate: 1,
461478
ReverseScanForUpdateGuaranteedDurability: 1,
462479
ReverseScanForShare: 1,
@@ -953,6 +970,7 @@ func (g *generator) selectOp(rng *rand.Rand, contextuallyValid []opGen) Operatio
953970

954971
func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig) {
955972
addOpGen(allowed, randGetMissing, c.GetMissing)
973+
addOpGen(allowed, randGetMissingFollowerRead, c.GetMissingFollowerRead)
956974
addOpGen(allowed, randGetMissingForUpdate, c.GetMissingForUpdate)
957975
addOpGen(
958976
allowed, randGetMissingForUpdateGuaranteedDurability, c.GetMissingForUpdateGuaranteedDurability,
@@ -984,6 +1002,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig
9841002

9851003
if len(g.keys) > 0 {
9861004
addOpGen(allowed, randGetExisting, c.GetExisting)
1005+
addOpGen(allowed, randGetExistingFollowerRead, c.GetExistingFollowerRead)
9871006
addOpGen(allowed, randGetExistingForUpdate, c.GetExistingForUpdate)
9881007
addOpGen(
9891008
allowed,
@@ -1016,6 +1035,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig
10161035
addOpGen(allowed, randDelMustAcquireExclusiveLockExisting, c.DeleteMustAcquireExclusiveLockExisting)
10171036
}
10181037
addOpGen(allowed, randScan, c.Scan)
1038+
addOpGen(allowed, randScanFollowerRead, c.ScanFollowerRead)
10191039
addOpGen(allowed, randScanForUpdate, c.ScanForUpdate)
10201040
addOpGen(allowed, randScanForUpdateGuaranteedDurability, c.ScanForUpdateGuaranteedDurability)
10211041
addOpGen(allowed, randScanForShare, c.ScanForShare)
@@ -1034,6 +1054,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig
10341054
c.ScanForShareSkipLockedGuaranteedDurability,
10351055
)
10361056
addOpGen(allowed, randReverseScan, c.ReverseScan)
1057+
addOpGen(allowed, randReverseScanFollowerRead, c.ReverseScanFollowerRead)
10371058
addOpGen(allowed, randReverseScanForUpdate, c.ReverseScanForUpdate)
10381059
addOpGen(
10391060
allowed,
@@ -1073,6 +1094,12 @@ func randGetMissing(_ *generator, rng *rand.Rand) Operation {
10731094
return get(randKey(rng))
10741095
}
10751096

1097+
func randGetMissingFollowerRead(_ *generator, rng *rand.Rand) Operation {
1098+
op := get(randKey(rng))
1099+
op.Get.FollowerReadEligible = true
1100+
return op
1101+
}
1102+
10761103
func randGetMissingForUpdate(g *generator, rng *rand.Rand) Operation {
10771104
op := randGetMissing(g, rng)
10781105
op.Get.ForUpdate = true
@@ -1136,6 +1163,13 @@ func randGetExisting(g *generator, rng *rand.Rand) Operation {
11361163
return get(key)
11371164
}
11381165

1166+
func randGetExistingFollowerRead(g *generator, rng *rand.Rand) Operation {
1167+
key := randSliceKey(rng, maps.Keys(g.keys))
1168+
op := get(key)
1169+
op.Get.FollowerReadEligible = true
1170+
return op
1171+
}
1172+
11391173
func randGetExistingForUpdate(g *generator, rng *rand.Rand) Operation {
11401174
op := randGetExisting(g, rng)
11411175
op.Get.ForUpdate = true
@@ -1436,6 +1470,13 @@ func randScan(g *generator, rng *rand.Rand) Operation {
14361470
return scan(key, endKey)
14371471
}
14381472

1473+
func randScanFollowerRead(g *generator, rng *rand.Rand) Operation {
1474+
key, endKey := randSpan(rng)
1475+
op := scan(key, endKey)
1476+
op.Scan.FollowerReadEligible = true
1477+
return op
1478+
}
1479+
14391480
func randScanForUpdate(g *generator, rng *rand.Rand) Operation {
14401481
op := randScan(g, rng)
14411482
op.Scan.ForUpdate = true
@@ -1500,6 +1541,13 @@ func randReverseScan(g *generator, rng *rand.Rand) Operation {
15001541
return op
15011542
}
15021543

1544+
func randReverseScanFollowerRead(g *generator, rng *rand.Rand) Operation {
1545+
op := randScan(g, rng)
1546+
op.Scan.Reverse = true
1547+
op.Scan.FollowerReadEligible = true
1548+
return op
1549+
}
1550+
15031551
func randReverseScanForUpdate(g *generator, rng *rand.Rand) Operation {
15041552
op := randReverseScan(g, rng)
15051553
op.Scan.ForUpdate = true
@@ -1833,12 +1881,23 @@ func restartRandNode(g *generator, rng *rand.Rand) Operation {
18331881
return restartNode(randNode)
18341882
}
18351883

1884+
func isFollowerReadEligibleOp(op Operation) bool {
1885+
if op.Get != nil && op.Get.FollowerReadEligible {
1886+
return true
1887+
}
1888+
if op.Scan != nil && op.Scan.FollowerReadEligible {
1889+
return true
1890+
}
1891+
return false
1892+
}
1893+
18361894
func makeRandBatch(c *ClientOperationConfig) opGenFunc {
18371895
return func(g *generator, rng *rand.Rand) Operation {
18381896
var allowed []opGen
18391897
g.registerClientOps(&allowed, c)
18401898
numOps := rng.Intn(4)
18411899
ops := make([]Operation, numOps)
1900+
followerReadEligible := true
18421901
var addedForwardScan, addedReverseScan bool
18431902

18441903
// TODO(ssd): MutateBatchHeader is disallowed with Puts because of
@@ -1849,6 +1908,8 @@ func makeRandBatch(c *ClientOperationConfig) opGenFunc {
18491908

18501909
for i := 0; i < numOps; i++ {
18511910
ops[i] = g.selectOp(rng, allowed)
1911+
// The batch is eligible for follower reads only if all its ops are.
1912+
followerReadEligible = followerReadEligible && isFollowerReadEligibleOp(ops[i])
18521913
if ops[i].Scan != nil {
18531914
if !ops[i].Scan.Reverse {
18541915
if addedReverseScan {
@@ -1883,7 +1944,9 @@ func makeRandBatch(c *ClientOperationConfig) opGenFunc {
18831944
addedBatchHeaderMutation = true
18841945
}
18851946
}
1886-
return batch(ops...)
1947+
op := batch(ops...)
1948+
op.Batch.FollowerReadEligible = followerReadEligible
1949+
return op
18871950
}
18881951
}
18891952

@@ -1933,6 +1996,7 @@ func makeClosureTxn(
19331996
// Stack of savepoint indexes/ids.
19341997
// The last element of the slice is the top of the stack.
19351998
var spIDs []int
1999+
followerReadEligible := true
19362000
for i := range ops {
19372001
// In each iteration, we start with the allowed non-savepoint ops,
19382002
// and we add the valid savepoint ops in registerSavepointOps.
@@ -1943,6 +2007,8 @@ func makeClosureTxn(
19432007
// allowed savepoint ops changes. See registerSavepointOps.
19442008
g.registerSavepointOps(&allowedIncludingSavepointOps, savepointOps, spIDs, i)
19452009
ops[i] = g.selectOp(rng, allowedIncludingSavepointOps)
2010+
// The transaction is eligible for follower reads only if all its ops are.
2011+
followerReadEligible = followerReadEligible && isFollowerReadEligibleOp(ops[i])
19462012
// Now that a random op is selected, we may need to update the stack of
19472013
// existing savepoints. See maybeUpdateSavepoints.
19482014
maybeUpdateSavepoints(&spIDs, ops[i])
@@ -1958,6 +2024,7 @@ func makeClosureTxn(
19582024
}
19592025
op.ClosureTxn.CommitInBatch = makeRandBatch(commitInBatch)(g, rng).Batch
19602026
}
2027+
op.ClosureTxn.FollowerReadEligible = followerReadEligible
19612028
return op
19622029
}
19632030
}

0 commit comments

Comments
 (0)