Skip to content

Commit a91324d

Browse files
committed
kvserver: factor out mutate in TestReplicaLifecycleDatadriven
Epic: none Release note: None
1 parent b4026d8 commit a91324d

File tree

2 files changed

+104
-135
lines changed

2 files changed

+104
-135
lines changed

pkg/kv/kvserver/batcheval/cmd_end_transaction.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1302,6 +1302,9 @@ type SplitTriggerHelperInput struct {
13021302
// splitTriggerHelper continues the work begun by splitTrigger, but has a
13031303
// reduced scope that has all stats-related concerns bundled into a
13041304
// splitStatsHelper.
1305+
//
1306+
// TODO(arul): consider having this function write keys to the batch in sorted
1307+
// order, much like how destroyReplicaImpl does.
13051308
func splitTriggerHelper(
13061309
ctx context.Context,
13071310
rec EvalContext,

pkg/kv/kvserver/replica_lifecycle_datadriven_test.go

Lines changed: 101 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -179,35 +179,22 @@ func TestReplicaLifecycleDataDriven(t *testing.T) {
179179
}
180180
repl := rs.mustGetReplicaDescriptor(t, roachpb.NodeID(1))
181181

182-
batch := tc.storage.NewBatch()
183-
defer batch.Close()
184-
185-
if initialized {
186-
require.NoError(t, kvstorage.WriteInitialRangeState(
187-
ctx, batch, batch,
188-
rs.desc, repl.ReplicaID, rs.version,
189-
))
190-
} else {
191-
require.NoError(t, kvstorage.CreateUninitializedReplica(
192-
ctx, kvstorage.TODOState(batch), batch, 1, /* StoreID */
193-
roachpb.FullReplicaID{RangeID: rs.desc.RangeID, ReplicaID: repl.ReplicaID},
194-
))
195-
}
196-
tc.updatePostReplicaCreateState(t, ctx, rs, batch)
182+
output := tc.mutate(t, func(batch storage.Batch) {
183+
if initialized {
184+
require.NoError(t, kvstorage.WriteInitialRangeState(
185+
ctx, batch, batch,
186+
rs.desc, repl.ReplicaID, rs.version,
187+
))
188+
} else {
189+
require.NoError(t, kvstorage.CreateUninitializedReplica(
190+
ctx, kvstorage.TODOState(batch), batch, 1, /* StoreID */
191+
roachpb.FullReplicaID{RangeID: rs.desc.RangeID, ReplicaID: repl.ReplicaID},
192+
))
193+
}
194+
tc.updatePostReplicaCreateState(t, ctx, rs, batch)
195+
})
197196

198-
// Print the descriptor and batch output.
199-
var sb strings.Builder
200-
output, err := print.DecodeWriteBatch(batch.Repr())
201-
require.NoError(t, err, "error decoding batch")
202-
sb.WriteString(fmt.Sprintf("created replica: %v", repl))
203-
if output != "" {
204-
sb.WriteString("\n")
205-
sb.WriteString(output)
206-
}
207-
// Commit the batch.
208-
err = batch.Commit(true)
209-
require.NoError(t, err, "error committing batch")
210-
return sb.String()
197+
return fmt.Sprintf("created replica: %v\n%s", repl, output)
211198

212199
case "create-split":
213200
rangeID := dd.ScanArg[roachpb.RangeID](t, d, "range-id")
@@ -272,71 +259,48 @@ func TestReplicaLifecycleDataDriven(t *testing.T) {
272259
require.True(t, ok, "split trigger not found for range-id %d", rangeID)
273260
rs := tc.mustGetRangeState(t, rangeID)
274261
desc := rs.desc
275-
batch := tc.storage.NewBatch()
276-
defer batch.Close()
277-
278-
rec := (&batcheval.MockEvalCtx{
279-
ClusterSettings: tc.st,
280-
Desc: &desc,
281-
Clock: tc.clock,
282-
AbortSpan: rs.abortspan,
283-
LastReplicaGCTimestamp: rs.lastGCTimestamp,
284-
RangeLeaseDuration: tc.rangeLeaseDuration,
285-
}).EvalContext()
286-
287-
in := batcheval.SplitTriggerHelperInput{
288-
LeftLease: rs.lease,
289-
GCThreshold: &rs.gcThreshold,
290-
GCHint: &rs.gcHint,
291-
ReplicaVersion: rs.version,
292-
}
293-
// Actually run the split trigger.
294-
_, _, err := batcheval.TestingSplitTrigger(
295-
ctx, rec, batch /* bothDeltaMS */, enginepb.MVCCStats{}, split, in, hlc.Timestamp{},
296-
)
297-
require.NoError(t, err)
298-
299-
// Update the test context's notion of the range state after the
300-
// split.
301-
tc.updatePostSplitRangeState(t, ctx, batch, rangeID, split)
302-
// Print the state of the batch (all keys/values written as part
303-
// of the split trigger).
304-
output, err := print.DecodeWriteBatch(batch.Repr())
305-
require.NoError(t, err)
306-
// Commit the batch.
307-
err = batch.Commit(true)
308-
require.NoError(t, err, "error committing batch")
309-
// TODO(arul): There are double lines in the output (see tryTxn
310-
// in debug_print.go) that we need to strip out for the benefit
311-
// of the datadriven test driver. Until that TODO is addressed,
312-
// we manually split things out here.
313-
return strings.ReplaceAll(output, "\n\n", "\n")
262+
263+
return tc.mutate(t, func(batch storage.Batch) {
264+
rec := (&batcheval.MockEvalCtx{
265+
ClusterSettings: tc.st,
266+
Desc: &desc,
267+
Clock: tc.clock,
268+
AbortSpan: rs.abortspan,
269+
LastReplicaGCTimestamp: rs.lastGCTimestamp,
270+
RangeLeaseDuration: tc.rangeLeaseDuration,
271+
}).EvalContext()
272+
273+
in := batcheval.SplitTriggerHelperInput{
274+
LeftLease: rs.lease,
275+
GCThreshold: &rs.gcThreshold,
276+
GCHint: &rs.gcHint,
277+
ReplicaVersion: rs.version,
278+
}
279+
_, _, err := batcheval.TestingSplitTrigger(
280+
ctx, rec, batch, enginepb.MVCCStats{}, split, in, hlc.Timestamp{},
281+
)
282+
require.NoError(t, err)
283+
284+
tc.updatePostSplitRangeState(t, ctx, batch, rangeID, split)
285+
})
314286

315287
case "destroy-replica":
316288
rangeID := dd.ScanArg[roachpb.RangeID](t, d, "range-id")
317289
rs := tc.mustGetRangeState(t, rangeID)
318290
rs.mustGetReplicaDescriptor(t, roachpb.NodeID(1)) // ensure replica exists
319291

320-
batch := tc.storage.NewBatch()
321-
defer batch.Close()
322-
323-
err := kvstorage.DestroyReplica(
324-
ctx,
325-
kvstorage.TODOReadWriter(batch),
326-
kvstorage.DestroyReplicaInfo{
327-
FullReplicaID: rs.replica.FullReplicaID,
328-
Keys: rs.desc.RSpan(),
329-
},
330-
rs.desc.NextReplicaID,
331-
)
332-
require.NoError(t, err)
333-
output, err := print.DecodeWriteBatch(batch.Repr())
334-
require.NoError(t, err)
335-
err = batch.Commit(true)
336-
require.NoError(t, err)
337-
338-
// Clear the replica from the range state.
339-
rs.replica = nil
292+
output := tc.mutate(t, func(batch storage.Batch) {
293+
require.NoError(t, kvstorage.DestroyReplica(
294+
ctx,
295+
kvstorage.TODOReadWriter(batch),
296+
kvstorage.DestroyReplicaInfo{
297+
FullReplicaID: rs.replica.FullReplicaID,
298+
Keys: rs.desc.RSpan(),
299+
},
300+
rs.desc.NextReplicaID,
301+
))
302+
})
303+
rs.replica = nil // clear the replica from the range state
340304
return output
341305

342306
case "append-raft-entries":
@@ -345,29 +309,22 @@ func TestReplicaLifecycleDataDriven(t *testing.T) {
345309
rs := tc.mustGetRangeState(t, rangeID)
346310
require.NotNil(t, rs.replica, "replica must be created before appending entries")
347311

348-
batch := tc.storage.NewBatch()
349-
defer batch.Close()
350-
351312
sl := logstore.NewStateLoader(rangeID)
352313
lastIndex := rs.replica.lastIdx
353314
rs.replica.lastIdx += kvpb.RaftIndex(numEntries)
354315
term := rs.replica.hs.Term
355316

356-
for i := 0; i < numEntries; i++ {
357-
entryIndex := lastIndex + 1 + kvpb.RaftIndex(i)
358-
require.NoError(t, storage.MVCCBlindPutProto(
359-
ctx, batch,
360-
sl.RaftLogKey(entryIndex), hlc.Timestamp{},
361-
&raftpb.Entry{Index: uint64(entryIndex), Term: term},
362-
storage.MVCCWriteOptions{},
363-
))
364-
}
365-
366-
output, err := print.DecodeWriteBatch(batch.Repr())
367-
require.NoError(t, err)
368-
err = batch.Commit(true)
369-
require.NoError(t, err)
370-
return strings.ReplaceAll(output, "\n\n", "\n")
317+
return tc.mutate(t, func(batch storage.Batch) {
318+
for i := 0; i < numEntries; i++ {
319+
entryIndex := lastIndex + 1 + kvpb.RaftIndex(i)
320+
require.NoError(t, storage.MVCCBlindPutProto(
321+
ctx, batch,
322+
sl.RaftLogKey(entryIndex), hlc.Timestamp{},
323+
&raftpb.Entry{Index: uint64(entryIndex), Term: term},
324+
storage.MVCCWriteOptions{},
325+
))
326+
}
327+
})
371328

372329
case "create-range-data":
373330
rangeID := dd.ScanArg[roachpb.RangeID](t, d, "range-id")
@@ -377,41 +334,33 @@ func TestReplicaLifecycleDataDriven(t *testing.T) {
377334
require.True(t, numUserKeys > 0 || numSystemKeys > 0 || numLockTableKeys > 0)
378335

379336
rs := tc.mustGetRangeState(t, rangeID)
380-
batch := tc.storage.NewBatch()
381-
defer batch.Close()
382-
383337
ts := hlc.Timestamp{WallTime: 1}
384-
385338
getUserKey := func(i int) roachpb.Key {
386339
return append(rs.desc.StartKey.AsRawKey(), strconv.Itoa(i)...)
387340
}
388341

389-
// 1. User keys.
390-
for i := 0; i < numUserKeys; i++ {
391-
require.NoError(t, batch.PutMVCC(
392-
storage.MVCCKey{Key: getUserKey(i), Timestamp: ts}, storage.MVCCValue{},
393-
))
394-
}
395-
// 2. System keys.
396-
for i := 0; i < numSystemKeys; i++ {
397-
key := keys.TransactionKey(getUserKey(i), uuid.NamespaceDNS)
398-
require.NoError(t, batch.PutMVCC(
399-
storage.MVCCKey{Key: key, Timestamp: ts}, storage.MVCCValue{},
400-
))
401-
}
402-
// 3. Lock table keys.
403-
for i := 0; i < numLockTableKeys; i++ {
404-
ek, _ := storage.LockTableKey{
405-
Key: getUserKey(i), Strength: lock.Intent, TxnUUID: uuid.UUID{},
406-
}.ToEngineKey(nil)
407-
require.NoError(t, batch.PutEngineKey(ek, nil))
408-
}
409-
410-
output, err := print.DecodeWriteBatch(batch.Repr())
411-
require.NoError(t, err)
412-
err = batch.Commit(true)
413-
require.NoError(t, err)
414-
return output
342+
return tc.mutate(t, func(batch storage.Batch) {
343+
// 1. User keys.
344+
for i := 0; i < numUserKeys; i++ {
345+
require.NoError(t, batch.PutMVCC(
346+
storage.MVCCKey{Key: getUserKey(i), Timestamp: ts}, storage.MVCCValue{},
347+
))
348+
}
349+
// 2. System keys.
350+
for i := 0; i < numSystemKeys; i++ {
351+
key := keys.TransactionKey(getUserKey(i), uuid.NamespaceDNS)
352+
require.NoError(t, batch.PutMVCC(
353+
storage.MVCCKey{Key: key, Timestamp: ts}, storage.MVCCValue{},
354+
))
355+
}
356+
// 3. Lock table keys.
357+
for i := 0; i < numLockTableKeys; i++ {
358+
ek, _ := storage.LockTableKey{
359+
Key: getUserKey(i), Strength: lock.Intent, TxnUUID: uuid.UUID{},
360+
}.ToEngineKey(nil)
361+
require.NoError(t, batch.PutEngineKey(ek, nil))
362+
}
363+
})
415364

416365
case "print-range-state":
417366
var sb strings.Builder
@@ -500,6 +449,23 @@ func (tc *testCtx) close() {
500449
tc.storage.Close()
501450
}
502451

452+
// mutate executes a write operation on a batch and commits it. All KVs written
453+
// as part of the batch are returned as a string for the benefit of the
454+
// datadriven test output.
455+
func (tc *testCtx) mutate(t *testing.T, write func(storage.Batch)) string {
456+
batch := tc.storage.NewBatch()
457+
defer batch.Close()
458+
write(batch)
459+
output, err := print.DecodeWriteBatch(batch.Repr())
460+
require.NoError(t, err)
461+
require.NoError(t, batch.Commit(false))
462+
// TODO(arul): There may be double new lines in the output (see tryTxn in
463+
// debug_print.go) that we need to strip out for the benefit of the
464+
// datadriven test driver. Until that TODO is addressed, we manually split
465+
// things out here.
466+
return strings.ReplaceAll(output, "\n\n", "\n")
467+
}
468+
503469
// newRangeState constructs a new rangeState for the supplied descriptor.
504470
func newRangeState(desc roachpb.RangeDescriptor) *rangeState {
505471
gcThreshold := hlc.Timestamp{WallTime: 4}

0 commit comments

Comments
 (0)