Skip to content

Commit 789d7e3

Browse files
craig[bot]stevendanna
andcommitted
Merge #150846
150846: kvnemesis: add user priority r=miraradeva a=stevendanna This allows KVNemesis to randomly set user priority to either Min, Normal, or Max priority. Epic: none Release note: None Co-authored-by: Steven Danna <[email protected]>
2 parents a666b16 + a4efe6a commit 789d7e3

File tree

5 files changed

+138
-144
lines changed

5 files changed

+138
-144
lines changed

pkg/kv/kvnemesis/applier.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,11 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
157157
if err := txn.SetIsoLevel(o.IsoLevel); err != nil {
158158
panic(err)
159159
}
160+
if o.UserPriority > 0 {
161+
if err := txn.SetUserPriority(o.UserPriority); err != nil {
162+
panic(err)
163+
}
164+
}
160165
txn.SetBufferedWritesEnabled(o.BufferedWrites)
161166
if savedTxn != nil && txn.TestingCloneTxn().Epoch == 0 {
162167
// If the txn's current epoch is 0 and we've run at least one prior

pkg/kv/kvnemesis/generator.go

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ import (
3333

3434
// GeneratorConfig contains all the tunable knobs necessary to run a Generator.
3535
type GeneratorConfig struct {
36-
Ops OperationConfig
37-
NumNodes, NumReplicas int
36+
Ops OperationConfig
37+
TxnConfig TxnConfig
3838

39-
BufferedWritesProb float64
39+
NumNodes, NumReplicas int
4040

4141
SeedForLogging int64
4242
RandSourceCounterForLogging counter
@@ -282,6 +282,11 @@ type BatchOperationConfig struct {
282282
Ops ClientOperationConfig
283283
}
284284

285+
type TxnConfig struct {
286+
BufferedWritesProb float64
287+
RandomUserPriority bool
288+
}
289+
285290
// SplitConfig configures the relative probability of generating a Split
286291
// operation.
287292
type SplitConfig struct {
@@ -1523,31 +1528,31 @@ func (g *generator) registerClosureTxnOps(allowed *[]opGen, c *ClosureTxnConfig)
15231528
const Commit, Rollback = ClosureTxnType_Commit, ClosureTxnType_Rollback
15241529
const SSI, SI, RC = isolation.Serializable, isolation.Snapshot, isolation.ReadCommitted
15251530
addOpGen(allowed,
1526-
makeClosureTxn(Commit, SSI, g.Config.BufferedWritesProb, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSerializable)
1531+
makeClosureTxn(Commit, SSI, g.Config.TxnConfig, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSerializable)
15271532
addOpGen(allowed,
1528-
makeClosureTxn(Commit, SI, g.Config.BufferedWritesProb, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSnapshot)
1533+
makeClosureTxn(Commit, SI, g.Config.TxnConfig, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSnapshot)
15291534
addOpGen(allowed,
1530-
makeClosureTxn(Commit, RC, g.Config.BufferedWritesProb, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitReadCommitted)
1535+
makeClosureTxn(Commit, RC, g.Config.TxnConfig, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitReadCommitted)
15311536

15321537
addOpGen(allowed,
1533-
makeClosureTxn(Rollback, SSI, g.Config.BufferedWritesProb, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackSerializable)
1538+
makeClosureTxn(Rollback, SSI, g.Config.TxnConfig, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackSerializable)
15341539
addOpGen(allowed,
1535-
makeClosureTxn(Rollback, SI, g.Config.BufferedWritesProb, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackSnapshot)
1540+
makeClosureTxn(Rollback, SI, g.Config.TxnConfig, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackSnapshot)
15361541
addOpGen(allowed,
1537-
makeClosureTxn(Rollback, RC, g.Config.BufferedWritesProb, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackReadCommitted)
1542+
makeClosureTxn(Rollback, RC, g.Config.TxnConfig, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackReadCommitted)
15381543

15391544
addOpGen(allowed,
1540-
makeClosureTxn(Commit, SSI, g.Config.BufferedWritesProb, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSerializableInBatch)
1545+
makeClosureTxn(Commit, SSI, g.Config.TxnConfig, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSerializableInBatch)
15411546
addOpGen(allowed,
1542-
makeClosureTxn(Commit, SI, g.Config.BufferedWritesProb, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSnapshotInBatch)
1547+
makeClosureTxn(Commit, SI, g.Config.TxnConfig, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSnapshotInBatch)
15431548
addOpGen(allowed,
1544-
makeClosureTxn(Commit, RC, g.Config.BufferedWritesProb, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitReadCommittedInBatch)
1549+
makeClosureTxn(Commit, RC, g.Config.TxnConfig, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitReadCommittedInBatch)
15451550
}
15461551

15471552
func makeClosureTxn(
15481553
txnType ClosureTxnType,
15491554
iso isolation.Level,
1550-
bufferedWritesProb float64,
1555+
txnConfig TxnConfig,
15511556
txnClientOps *ClientOperationConfig,
15521557
txnBatchOps *BatchOperationConfig,
15531558
commitInBatch *ClientOperationConfig,
@@ -1580,7 +1585,10 @@ func makeClosureTxn(
15801585
maybeUpdateSavepoints(&spIDs, ops[i])
15811586
}
15821587
op := closureTxn(txnType, iso, ops...)
1583-
op.ClosureTxn.BufferedWrites = rng.Float64() < bufferedWritesProb
1588+
if txnConfig.RandomUserPriority {
1589+
op.ClosureTxn.UserPriority = randomUserPriority(rng)
1590+
}
1591+
op.ClosureTxn.BufferedWrites = rng.Float64() < txnConfig.BufferedWritesProb
15841592
if commitInBatch != nil {
15851593
if txnType != ClosureTxnType_Commit {
15861594
panic(errors.AssertionFailedf(`CommitInBatch must commit got: %s`, txnType))
@@ -1591,6 +1599,16 @@ func makeClosureTxn(
15911599
}
15921600
}
15931601

1602+
var userPriorities = [3]roachpb.UserPriority{
1603+
roachpb.MinUserPriority,
1604+
roachpb.NormalUserPriority,
1605+
roachpb.MaxUserPriority,
1606+
}
1607+
1608+
func randomUserPriority(rng *rand.Rand) roachpb.UserPriority {
1609+
return userPriorities[rng.Intn(len(userPriorities))]
1610+
}
1611+
15941612
// registerSavepointOps assumes existingSp is the current stack of savepoints
15951613
// and uses it to register only valid savepoint ops. I.e. releasing or rolling
15961614
// back a savepoint that hasn't been created or has already been released or

pkg/kv/kvnemesis/kvnemesis_test.go

Lines changed: 83 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,9 @@ type kvnemesisTestCfg struct {
290290
// transactions, this will apply to all transactions.
291291
bufferedWriteProb float64 // [0,1)
292292

293+
// If enabled, set the user priority of transactions to a random value.
294+
randomUserPriority bool
295+
293296
// If enabled, track Raft proposals and command application, and assert
294297
// invariants (in particular that we don't double-apply a request or
295298
// proposal).
@@ -310,34 +313,37 @@ type kvnemesisTestCfg struct {
310313
testGeneratorConfig func(*GeneratorConfig)
311314
}
312315

313-
func TestKVNemesisSingleNode(t *testing.T) {
314-
defer leaktest.AfterTest(t)()
315-
defer log.Scope(t).Close(t)
316-
317-
testKVNemesisImpl(t, kvnemesisTestCfg{
318-
numNodes: 1,
316+
func defaultTestConfiguration(numNodes int) kvnemesisTestCfg {
317+
return kvnemesisTestCfg{
318+
numNodes: numNodes,
319319
numSteps: defaultNumSteps,
320320
concurrency: 5,
321321
seedOverride: 0,
322322
invalidLeaseAppliedIndexProb: 0.2,
323323
injectReproposalErrorProb: 0.2,
324324
assertRaftApply: true,
325-
})
325+
randomUserPriority: true,
326+
}
327+
}
328+
329+
func TestKVNemesisSingleNode(t *testing.T) {
330+
defer leaktest.AfterTest(t)()
331+
defer log.Scope(t).Close(t)
332+
cfg := defaultTestConfiguration(1)
333+
cfg.seedOverride = 0
334+
testKVNemesisImpl(t, cfg)
326335
}
327336

328337
func TestKVNemesisSingleNode_ReproposalChaos(t *testing.T) {
329338
defer leaktest.AfterTest(t)()
330339
defer log.Scope(t).Close(t)
331340

332-
testKVNemesisImpl(t, kvnemesisTestCfg{
333-
numNodes: 1,
334-
numSteps: defaultNumSteps,
335-
concurrency: 5,
336-
seedOverride: 0,
337-
invalidLeaseAppliedIndexProb: 0.9,
338-
injectReproposalErrorProb: 0.5,
339-
assertRaftApply: true,
340-
})
341+
cfg := defaultTestConfiguration(1)
342+
cfg.seedOverride = 0
343+
cfg.invalidLeaseAppliedIndexProb = 0.9
344+
cfg.injectReproposalErrorProb = 0.5
345+
346+
testKVNemesisImpl(t, cfg)
341347
}
342348

343349
// TestKVNemesisMultiNode_BufferedWritesNoLockDurabilityUpgrades runs KVNemesis
@@ -346,21 +352,15 @@ func TestKVNemesisSingleNode_ReproposalChaos(t *testing.T) {
346352
func TestKVNemesisMultiNode_BufferedWritesNoLockDurabilityUpgrades(t *testing.T) {
347353
defer leaktest.AfterTest(t)()
348354
defer log.Scope(t).Close(t)
349-
350-
testKVNemesisImpl(t, kvnemesisTestCfg{
351-
numNodes: 3,
352-
numSteps: defaultNumSteps,
353-
concurrency: 5,
354-
seedOverride: 0,
355-
invalidLeaseAppliedIndexProb: 0.2,
356-
injectReproposalErrorProb: 0.2,
357-
assertRaftApply: true,
358-
bufferedWriteProb: 0.70,
359-
testSettings: func(ctx context.Context, st *cluster.Settings) {
360-
concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(ctx, &st.SV, false)
361-
concurrency.UnreplicatedLockReliabilityMerge.Override(ctx, &st.SV, false)
362-
kvcoord.BufferedWritesEnabled.Override(ctx, &st.SV, true)
363-
}})
355+
cfg := defaultTestConfiguration(3)
356+
cfg.seedOverride = 0
357+
cfg.bufferedWriteProb = 0.7
358+
cfg.testSettings = func(ctx context.Context, st *cluster.Settings) {
359+
concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(ctx, &st.SV, false)
360+
concurrency.UnreplicatedLockReliabilityMerge.Override(ctx, &st.SV, false)
361+
kvcoord.BufferedWritesEnabled.Override(ctx, &st.SV, true)
362+
}
363+
testKVNemesisImpl(t, cfg)
364364
}
365365

366366
// TestKVNemesisMultiNode_BufferedWritesLockDurabilityUpgrades tests buffered
@@ -369,22 +369,17 @@ func TestKVNemesisMultiNode_BufferedWritesLockDurabilityUpgrades(t *testing.T) {
369369
defer leaktest.AfterTest(t)()
370370
defer log.Scope(t).Close(t)
371371

372-
testKVNemesisImpl(t, kvnemesisTestCfg{
373-
numNodes: 3,
374-
numSteps: defaultNumSteps,
375-
concurrency: 5,
376-
seedOverride: 0,
377-
invalidLeaseAppliedIndexProb: 0.2,
378-
injectReproposalErrorProb: 0.2,
379-
assertRaftApply: true,
380-
bufferedWriteProb: 0.70,
381-
testSettings: func(ctx context.Context, st *cluster.Settings) {
382-
kvcoord.BufferedWritesEnabled.Override(ctx, &st.SV, true)
383-
concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(ctx, &st.SV, true)
384-
concurrency.UnreplicatedLockReliabilityMerge.Override(ctx, &st.SV, true)
385-
concurrency.UnreplicatedLockReliabilitySplit.Override(ctx, &st.SV, true)
386-
},
387-
})
372+
cfg := defaultTestConfiguration(3)
373+
cfg.seedOverride = 0
374+
cfg.bufferedWriteProb = 0.7
375+
cfg.testSettings = func(ctx context.Context, st *cluster.Settings) {
376+
kvcoord.BufferedWritesEnabled.Override(ctx, &st.SV, true)
377+
concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(ctx, &st.SV, true)
378+
concurrency.UnreplicatedLockReliabilityMerge.Override(ctx, &st.SV, true)
379+
concurrency.UnreplicatedLockReliabilitySplit.Override(ctx, &st.SV, true)
380+
}
381+
382+
testKVNemesisImpl(t, cfg)
388383
}
389384

390385
// TestKVNemesisMultiNode_BufferedWritesNoPipelining turns on buffered
@@ -393,38 +388,36 @@ func TestKVNemesisMultiNode_BufferedWritesNoPipelining(t *testing.T) {
393388
defer leaktest.AfterTest(t)()
394389
defer log.Scope(t).Close(t)
395390

396-
testKVNemesisImpl(t, kvnemesisTestCfg{
397-
numNodes: 3,
398-
numSteps: defaultNumSteps,
399-
concurrency: 5,
400-
seedOverride: 0,
401-
invalidLeaseAppliedIndexProb: 0.2,
402-
injectReproposalErrorProb: 0.2,
403-
assertRaftApply: true,
404-
bufferedWriteProb: 0.70,
405-
testSettings: func(ctx context.Context, st *cluster.Settings) {
406-
kvcoord.BufferedWritesEnabled.Override(ctx, &st.SV, true)
407-
kvcoord.PipelinedWritesEnabled.Override(ctx, &st.SV, false)
408-
concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(ctx, &st.SV, true)
409-
concurrency.UnreplicatedLockReliabilityMerge.Override(ctx, &st.SV, true)
410-
concurrency.UnreplicatedLockReliabilitySplit.Override(ctx, &st.SV, true)
411-
},
412-
})
391+
cfg := defaultTestConfiguration(3)
392+
cfg.seedOverride = 0
393+
cfg.bufferedWriteProb = 0.7
394+
cfg.testSettings = func(ctx context.Context, st *cluster.Settings) {
395+
kvcoord.BufferedWritesEnabled.Override(ctx, &st.SV, true)
396+
kvcoord.PipelinedWritesEnabled.Override(ctx, &st.SV, false)
397+
concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(ctx, &st.SV, true)
398+
concurrency.UnreplicatedLockReliabilityMerge.Override(ctx, &st.SV, true)
399+
concurrency.UnreplicatedLockReliabilitySplit.Override(ctx, &st.SV, true)
400+
}
401+
testKVNemesisImpl(t, cfg)
413402
}
414403

415404
func TestKVNemesisMultiNode(t *testing.T) {
416405
defer leaktest.AfterTest(t)()
417406
defer log.Scope(t).Close(t)
407+
cfg := defaultTestConfiguration(4)
408+
cfg.seedOverride = 0
409+
testKVNemesisImpl(t, cfg)
410+
}
418411

419-
testKVNemesisImpl(t, kvnemesisTestCfg{
420-
numNodes: 4,
421-
numSteps: defaultNumSteps,
422-
concurrency: 5,
423-
seedOverride: 0,
424-
invalidLeaseAppliedIndexProb: 0.2,
425-
injectReproposalErrorProb: 0.2,
426-
assertRaftApply: true,
427-
})
412+
func TestKVNemesisMultiNode_LeaderLeases(t *testing.T) {
413+
defer leaktest.AfterTest(t)()
414+
defer log.Scope(t).Close(t)
415+
416+
cfg := defaultTestConfiguration(4)
417+
cfg.seedOverride = 0
418+
cfg.leaseTypeOverride = roachpb.LeaseLeader
419+
420+
testKVNemesisImpl(t, cfg)
428421
}
429422

430423
// FuzzKVNemesisSingleNode is an attempt ot make it possible to run KVNemesis
@@ -434,56 +427,27 @@ func FuzzKVNemesisSingleNode(f *testing.F) {
434427
defer leaktest.AfterTest(f)()
435428
defer log.Scope(f).Close(f)
436429

437-
const (
438-
// Set to > 0 to pre-generate corpus data.
439-
corpusSize = 0
440-
// I've set these to low values for now to at least get things running
441-
// reliably. With all default settings the test runner fails without
442-
// printing any useful info. I _think_ it might be the result of a
443-
// hard-coded 10s timeout in the go-fuzz test worker.
444-
numStep = 10
445-
concurrency = 1
446-
)
430+
// Set to > 0 to pre-generate corpus data.
431+
const corpusSize = 0
432+
433+
cfg := defaultTestConfiguration(1)
434+
// I've set these to low values for now to at least get things running
435+
// reliably. With all default settings the test runner fails without
436+
// printing any useful info. I _think_ it might be the result of a
437+
// hard-coded 10s timeout in the go-fuzz test worker.
438+
cfg.numSteps = 10
439+
cfg.concurrency = 1
440+
447441
for range corpusSize {
448442
rndSource := randutil.NewRecordingRandSource(rand.NewSource(randutil.NewPseudoSeed()).(rand.Source64))
449-
testKVNemesisImpl(f, kvnemesisTestCfg{
450-
numNodes: 1,
451-
numSteps: numStep,
452-
concurrency: concurrency,
453-
randSource: rndSource,
454-
invalidLeaseAppliedIndexProb: 0.2,
455-
injectReproposalErrorProb: 0.2,
456-
assertRaftApply: true,
457-
})
443+
cfg.randSource = rndSource
444+
testKVNemesisImpl(f, cfg)
458445
f.Add(rndSource.Output())
459446
}
460447

461448
f.Fuzz(func(t *testing.T, data []byte) {
462-
testKVNemesisImpl(t, kvnemesisTestCfg{
463-
numNodes: 1,
464-
numSteps: numStep,
465-
concurrency: concurrency,
466-
randSource: randutil.NewFuzzRandSource(t, data),
467-
invalidLeaseAppliedIndexProb: 0.2,
468-
injectReproposalErrorProb: 0.2,
469-
assertRaftApply: true,
470-
})
471-
})
472-
}
473-
474-
func TestKVNemesisMultiNode_LeaderLeases(t *testing.T) {
475-
defer leaktest.AfterTest(t)()
476-
defer log.Scope(t).Close(t)
477-
478-
testKVNemesisImpl(t, kvnemesisTestCfg{
479-
numNodes: 4,
480-
numSteps: defaultNumSteps,
481-
concurrency: 5,
482-
seedOverride: 0,
483-
invalidLeaseAppliedIndexProb: 0.2,
484-
injectReproposalErrorProb: 0.2,
485-
assertRaftApply: true,
486-
leaseTypeOverride: roachpb.LeaseLeader,
449+
cfg.randSource = randutil.NewFuzzRandSource(t, data)
450+
testKVNemesisImpl(t, cfg)
487451
})
488452
}
489453

@@ -519,7 +483,8 @@ func testKVNemesisImpl(t testing.TB, cfg kvnemesisTestCfg) {
519483
config := NewDefaultConfig()
520484
config.NumNodes = cfg.numNodes
521485
config.NumReplicas = 3
522-
config.BufferedWritesProb = cfg.bufferedWriteProb
486+
config.TxnConfig.BufferedWritesProb = cfg.bufferedWriteProb
487+
config.TxnConfig.RandomUserPriority = cfg.randomUserPriority
523488

524489
config.SeedForLogging = seed
525490
config.RandSourceCounterForLogging = countingSource

0 commit comments

Comments
 (0)