Skip to content

Commit 616e549

Browse files
committed
kvnemesis: add MutateBatchHeaderOperation
This adds a new operation which sets the TargetBytes or MaxSpanRequestKeys on the header of a batch operation. These header options substantially change the how requests are processed in dist sender and can lead to early termination of a batch. Since they can lead to early termination of a batch, there is a trade-off here since this operation may result in other operations being no-ops. Fixes #152668 Release note: None
1 parent 22b0b5b commit 616e549

File tree

6 files changed

+151
-17
lines changed

6 files changed

+151
-17
lines changed

pkg/kv/kvnemesis/applier.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,9 @@ func applyBatchOp(
605605
panic(errors.AssertionFailedf(`Barrier cannot be used in batches`))
606606
case *FlushLockTableOperation:
607607
panic(errors.AssertionFailedf(`FlushLockOperation cannot be used in batches`))
608+
case *MutateBatchHeaderOperation:
609+
b.Header.MaxSpanRequestKeys = subO.MaxSpanRequestKeys
610+
b.Header.TargetBytes = subO.TargetBytes
608611
default:
609612
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
610613
}
@@ -615,25 +618,34 @@ func applyBatchOp(
615618
// to each result.
616619
err = nil
617620
o.Result.OptionalTimestamp = ts
621+
resultIdx := 0
618622
for i := range o.Ops {
619623
switch subO := o.Ops[i].GetValue().(type) {
620624
case *GetOperation:
621-
if b.Results[i].Err != nil {
622-
subO.Result = resultInit(ctx, b.Results[i].Err)
625+
res := b.Results[resultIdx]
626+
if res.Err != nil {
627+
subO.Result = resultInit(ctx, res.Err)
623628
} else {
624-
subO.Result.Type = ResultType_Value
625-
result := b.Results[i].Rows[0]
626-
if result.Value != nil {
627-
subO.Result.Value = result.Value.RawBytes
629+
if res.ResumeSpan != nil {
630+
subO.Result.Type = ResultType_NoError
628631
} else {
629-
subO.Result.Value = nil
632+
subO.Result.Type = ResultType_Value
633+
result := res.Rows[0]
634+
if result.Value != nil {
635+
subO.Result.Value = result.Value.RawBytes
636+
} else {
637+
subO.Result.Value = nil
638+
}
630639
}
631640
}
641+
subO.Result.ResumeSpan = res.ResumeSpan
632642
case *PutOperation:
633-
err := b.Results[i].Err
643+
err := b.Results[resultIdx].Err
634644
subO.Result = resultInit(ctx, err)
645+
subO.Result.ResumeSpan = b.Results[resultIdx].ResumeSpan
635646
case *ScanOperation:
636-
kvs, err := b.Results[i].Rows, b.Results[i].Err
647+
res := b.Results[resultIdx]
648+
kvs, err := res.Rows, res.Err
637649
if err != nil {
638650
subO.Result = resultInit(ctx, err)
639651
} else {
@@ -646,11 +658,14 @@ func applyBatchOp(
646658
}
647659
}
648660
}
661+
subO.Result.ResumeSpan = res.ResumeSpan
649662
case *DeleteOperation:
650-
err := b.Results[i].Err
663+
err := b.Results[resultIdx].Err
651664
subO.Result = resultInit(ctx, err)
665+
subO.Result.ResumeSpan = b.Results[resultIdx].ResumeSpan
652666
case *DeleteRangeOperation:
653-
keys, err := b.Results[i].Keys, b.Results[i].Err
667+
res := b.Results[resultIdx]
668+
keys, err := res.Keys, res.Err
654669
if err != nil {
655670
subO.Result = resultInit(ctx, err)
656671
} else {
@@ -660,13 +675,20 @@ func applyBatchOp(
660675
subO.Result.Keys[j] = key
661676
}
662677
}
678+
subO.Result.ResumeSpan = res.ResumeSpan
663679
case *DeleteRangeUsingTombstoneOperation:
664680
subO.Result = resultInit(ctx, err)
681+
subO.Result.ResumeSpan = b.Results[resultIdx].ResumeSpan
682+
case *MutateBatchHeaderOperation:
683+
subO.Result = resultInit(ctx, nil)
665684
case *AddSSTableOperation:
666685
panic(errors.AssertionFailedf(`AddSSTable cannot be used in batches`))
667686
default:
668687
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
669688
}
689+
if o.Ops[i].OperationHasResultInBatch() {
690+
resultIdx++
691+
}
670692
}
671693
}
672694

pkg/kv/kvnemesis/generator.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,10 @@ type ClientOperationConfig struct {
283283
// FlushLockTable is an operation that moves unreplicated locks in the
284284
// in-memory lock table into the
285285
FlushLockTable int
286+
287+
// MutateBatchHeader mutates elements of the batch Header that may influence
288+
// batch evaluation. Only relevant for BatchOperations.
289+
MutateBatchHeader int
286290
}
287291

288292
// BatchOperationConfig configures the relative probability of generating a
@@ -433,6 +437,7 @@ func newAllOperationsConfig() GeneratorConfig {
433437
AddSSTable: 1,
434438
Barrier: 1,
435439
FlushLockTable: 1,
440+
MutateBatchHeader: 1,
436441
}
437442
batchOpConfig := BatchOperationConfig{
438443
Batch: 4,
@@ -498,6 +503,14 @@ func newAllOperationsConfig() GeneratorConfig {
498503
// operations/make some operations more likely.
499504
func NewDefaultConfig() GeneratorConfig {
500505
config := newAllOperationsConfig()
506+
507+
// MutateBatchHeader is only valid in batches.
508+
config.Ops.DB.MutateBatchHeader = 0
509+
config.Ops.ClosureTxn.TxnClientOps.MutateBatchHeader = 0
510+
511+
// TODO(#153446): Header mutations with EndTransaction are not currently safe.
512+
config.Ops.ClosureTxn.CommitBatchOps.MutateBatchHeader = 0
513+
501514
// DeleteRangeUsingTombstone does not support transactions.
502515
config.Ops.ClosureTxn.TxnClientOps.DeleteRangeUsingTombstone = 0
503516
config.Ops.ClosureTxn.TxnBatchOps.Ops.DeleteRangeUsingTombstone = 0
@@ -876,6 +889,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig
876889
addOpGen(allowed, randAddSSTable, c.AddSSTable)
877890
addOpGen(allowed, randBarrier, c.Barrier)
878891
addOpGen(allowed, randFlushLockTable, c.FlushLockTable)
892+
addOpGen(allowed, randBatchMutation, c.MutateBatchHeader)
879893
}
880894

881895
func (g *generator) registerBatchOps(allowed *[]opGen, c *BatchOperationConfig) {
@@ -1452,6 +1466,19 @@ func randMergeIsSplit(g *generator, rng *rand.Rand) Operation {
14521466
return merge(key)
14531467
}
14541468

1469+
func randBatchMutation(g *generator, rng *rand.Rand) Operation {
1470+
op := &MutateBatchHeaderOperation{}
1471+
// We currently only support two header option mutations, both of which can
1472+
// lead to early termination. Half the time we choose a value very likely to
1473+
// lead to early termination.
1474+
if rng.Float64() > 0.5 {
1475+
op.MaxSpanRequestKeys = randItem(rng, []int64{1, 100})
1476+
} else {
1477+
op.TargetBytes = randItem(rng, []int64{1, 1 << 20 /* 1MiB */})
1478+
}
1479+
return Operation{MutateBatchHeader: op}
1480+
}
1481+
14551482
func makeRemoveReplicaFn(key string, current []roachpb.ReplicationTarget, voter bool) opGenFunc {
14561483
return func(g *generator, rng *rand.Rand) Operation {
14571484
var changeType roachpb.ReplicaChangeType
@@ -1565,6 +1592,13 @@ func makeRandBatch(c *ClientOperationConfig) opGenFunc {
15651592
numOps := rng.Intn(4)
15661593
ops := make([]Operation, numOps)
15671594
var addedForwardScan, addedReverseScan bool
1595+
1596+
// TODO(ssd): MutateBatchHeader is disallowed with Puts because of
1597+
// validation in the txnWriteBuffer that disallows such requests. We could
1598+
// relax this restriction for many batches if we had information about the
1599+
// enclosing transaction here.
1600+
var addedPutOrCPut, addedBatchHeaderMutation bool
1601+
15681602
for i := 0; i < numOps; i++ {
15691603
ops[i] = g.selectOp(rng, allowed)
15701604
if ops[i].Scan != nil {
@@ -1585,6 +1619,20 @@ func makeRandBatch(c *ClientOperationConfig) opGenFunc {
15851619
}
15861620
addedReverseScan = true
15871621
}
1622+
} else if ops[i].Put != nil {
1623+
if addedBatchHeaderMutation {
1624+
i--
1625+
continue
1626+
}
1627+
addedPutOrCPut = true
1628+
} else if ops[i].MutateBatchHeader != nil {
1629+
// In addition to avoiding batch mutations when we have Puts or CPuts,
1630+
// we also skip adding mutations if one is already added.
1631+
if addedPutOrCPut || addedBatchHeaderMutation {
1632+
i--
1633+
continue
1634+
}
1635+
addedBatchHeaderMutation = true
15881636
}
15891637
}
15901638
return batch(ops...)
@@ -1803,6 +1851,10 @@ func keysBetween(keys map[string]struct{}, start, end string) []string {
18031851
return between
18041852
}
18051853

1854+
func randItem[T any](rng *rand.Rand, l []T) T {
1855+
return l[rng.Intn(len(l))]
1856+
}
1857+
18061858
func randKey(rng *rand.Rand) string {
18071859
// Avoid the endpoints because having point writes at the
18081860
// endpoints complicates randRangeSpan.

pkg/kv/kvnemesis/generator_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,8 @@ func TestRandStep(t *testing.T) {
266266
client.Barrier++
267267
case *FlushLockTableOperation:
268268
client.FlushLockTable++
269+
case *MutateBatchHeaderOperation:
270+
client.MutateBatchHeader++
269271
case *BatchOperation:
270272
batch.Batch++
271273
countClientOps(&batch.Ops, nil, o.Ops...)
@@ -303,7 +305,8 @@ func TestRandStep(t *testing.T) {
303305
*DeleteRangeUsingTombstoneOperation,
304306
*AddSSTableOperation,
305307
*BarrierOperation,
306-
*FlushLockTableOperation:
308+
*FlushLockTableOperation,
309+
*MutateBatchHeaderOperation:
307310
countClientOps(&counts.DB, &counts.Batch, step.Op)
308311
case *ClosureTxnOperation:
309312
countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...)

pkg/kv/kvnemesis/operations.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ func (op Operation) Result() *Result {
6262
return &o.Result
6363
case *SavepointRollbackOperation:
6464
return &o.Result
65+
case *MutateBatchHeaderOperation:
66+
return &o.Result
6567
default:
6668
panic(errors.AssertionFailedf(`unknown operation: %T %v`, o, o))
6769
}
@@ -115,6 +117,14 @@ func formatOps(w *strings.Builder, fctx formatCtx, ops []Operation) {
115117
}
116118
}
117119

120+
func (op Operation) OperationHasResultInBatch() bool {
121+
if op.MutateBatchHeader != nil {
122+
return false
123+
} else {
124+
return true
125+
}
126+
}
127+
118128
func (op Operation) String() string {
119129
fctx := formatCtx{receiver: `x`, indent: ``}
120130
var buf strings.Builder
@@ -222,6 +232,8 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) {
222232
o.format(w, fctx)
223233
case *SavepointRollbackOperation:
224234
o.format(w, fctx)
235+
case *MutateBatchHeaderOperation:
236+
o.format(w, fctx)
225237
default:
226238
fmt.Fprintf(w, "%v", op.GetValue())
227239
}
@@ -453,6 +465,15 @@ func (op SavepointRollbackOperation) format(w *strings.Builder, fctx formatCtx)
453465
op.Result.format(w)
454466
}
455467

468+
func (op MutateBatchHeaderOperation) format(w *strings.Builder, fctx formatCtx) {
469+
if op.TargetBytes > 0 {
470+
fmt.Fprintf(w, `%s.Header.TargetBytes = %d // MutateBatchHeaderOperation`, fctx.receiver, op.TargetBytes)
471+
}
472+
if op.MaxSpanRequestKeys > 0 {
473+
fmt.Fprintf(w, `%s.Header.MaxSpanRequestKeys = %d // MutateBatchHeaderOperation`, fctx.receiver, op.MaxSpanRequestKeys)
474+
}
475+
}
476+
456477
func (r Result) format(w *strings.Builder) {
457478
if r.Type == ResultType_Unknown {
458479
return

pkg/kv/kvnemesis/operations.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,13 @@ message SavepointRollbackOperation {
165165
Result result = 2 [(gogoproto.nullable) = false];
166166
}
167167

168+
message MutateBatchHeaderOperation {
169+
int64 max_span_request_keys = 1;
170+
int64 target_bytes = 2;
171+
172+
Result result = 3 [(gogoproto.nullable) = false];
173+
}
174+
168175
message Operation {
169176
option (gogoproto.goproto_stringer) = false;
170177
option (gogoproto.onlyone) = true;
@@ -195,6 +202,7 @@ message Operation {
195202
SavepointRollbackOperation savepoint_rollback = 22;
196203
BarrierOperation barrier = 23;
197204
FlushLockTableOperation flush_lock_table = 24;
205+
MutateBatchHeaderOperation mutate_batch_header = 25;
198206
}
199207

200208
enum ResultType {
@@ -223,6 +231,7 @@ message Result {
223231
// Only set if Type is ResultType_Values. The RawBytes of a roachpb.Value.
224232
repeated KeyValue values = 5 [(gogoproto.nullable) = false];
225233
util.hlc.Timestamp optional_timestamp = 6 [(gogoproto.nullable) = false];
234+
roachpb.Span resume_span = 7;
226235
}
227236

228237
message Step {

pkg/kv/kvnemesis/validator.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,9 @@ func (v *validator) processOp(op Operation) {
433433
if _, isErr := v.checkError(op, t.Result); isErr {
434434
break
435435
}
436+
if t.Result.ResumeSpan != nil {
437+
break
438+
}
436439
read := &observedRead{
437440
Key: t.Key,
438441
SkipLocked: t.SkipLocked,
@@ -464,6 +467,9 @@ func (v *validator) processOp(op Operation) {
464467
if v.checkNonAmbError(op, t.Result) {
465468
break
466469
}
470+
if t.Result.ResumeSpan != nil {
471+
break
472+
}
467473
// Accumulate all the writes for this transaction.
468474
write := &observedWrite{
469475
Key: t.Key,
@@ -482,6 +488,9 @@ func (v *validator) processOp(op Operation) {
482488
if v.checkNonAmbError(op, t.Result) {
483489
break
484490
}
491+
if t.Result.ResumeSpan != nil {
492+
break
493+
}
485494
sv, _ := v.tryConsumeWrite(t.Key, t.Seq)
486495
write := &observedWrite{
487496
Key: t.Key,
@@ -532,10 +541,14 @@ func (v *validator) processOp(op Operation) {
532541
// not prevent new keys from being inserted in the deletion span between the
533542
// transaction's read and write timestamps.
534543
if v.observationFilter != observeLocking {
544+
endKey := t.EndKey
545+
if t.Result.ResumeSpan != nil {
546+
endKey = t.Result.ResumeSpan.Key
547+
}
535548
v.curObservations = append(v.curObservations, &observedScan{
536549
Span: roachpb.Span{
537550
Key: t.Key,
538-
EndKey: t.EndKey,
551+
EndKey: endKey,
539552
},
540553
IsDeleteRange: true, // just for printing
541554
KVs: nil,
@@ -747,13 +760,24 @@ func (v *validator) processOp(op Operation) {
747760
if _, isErr := v.checkError(op, t.Result); isErr {
748761
break
749762
}
763+
readSpan := roachpb.Span{Key: t.Key, EndKey: t.EndKey}
764+
// If the ResumeSpan equals the original request span, the scan wasn't
765+
// processed at all.
766+
if t.Result.ResumeSpan != nil && t.Result.ResumeSpan.Equal(readSpan) {
767+
break
768+
}
769+
750770
switch v.observationFilter {
751771
case observeAll:
772+
if t.Result.ResumeSpan != nil {
773+
if op.Scan.Reverse {
774+
readSpan.Key = t.Result.ResumeSpan.EndKey
775+
} else {
776+
readSpan.EndKey = t.Result.ResumeSpan.Key
777+
}
778+
}
752779
scan := &observedScan{
753-
Span: roachpb.Span{
754-
Key: t.Key,
755-
EndKey: t.EndKey,
756-
},
780+
Span: readSpan,
757781
Reverse: t.Reverse,
758782
SkipLocked: t.SkipLocked,
759783
KVs: make([]roachpb.KeyValue, len(t.Result.Values)),
@@ -926,6 +950,9 @@ func (v *validator) processOp(op Operation) {
926950
// Don't fail on all errors because savepoints can be labeled with
927951
// errOmitted if a previous op in the txn failed.
928952
v.checkError(op, t.Result)
953+
case *MutateBatchHeaderOperation:
954+
execTimestampStrictlyOptional = true
955+
v.checkError(op, t.Result)
929956
default:
930957
panic(errors.AssertionFailedf(`unknown operation type: %T %v`, t, t))
931958
}

0 commit comments

Comments
 (0)