@@ -29,7 +29,7 @@ import (
29
29
// in that cluster.
30
30
type Simulator struct {
31
31
log.AmbientContext
32
- onRecording func (storeID state.StoreID , rec tracingpb.Recording )
32
+ onRecording func (storeID state.StoreID , atDuration time. Duration , rec tracingpb.Recording )
33
33
34
34
curr time.Time
35
35
end time.Time
@@ -100,9 +100,9 @@ func NewSimulator(
100
100
101
101
s := & Simulator {
102
102
AmbientContext : log .MakeTestingAmbientCtxWithNewTracer (),
103
- onRecording : func (storeID state.StoreID , rec tracingpb.Recording ) {
103
+ onRecording : func (storeID state.StoreID , atDuration time. Duration , rec tracingpb.Recording ) {
104
104
if fn := settings .OnRecording ; fn != nil {
105
- fn (int64 (storeID ), rec )
105
+ fn (int64 (storeID ), atDuration , rec )
106
106
}
107
107
},
108
108
curr : settings .StartTime ,
@@ -323,6 +323,25 @@ func (s *Simulator) tickStoreClocks(tick time.Time) {
323
323
s .state .TickClock (tick )
324
324
}
325
325
326
+ func (s * Simulator ) doAndMaybeTrace (
327
+ ctx context.Context ,
328
+ storeID state.StoreID ,
329
+ tick time.Time ,
330
+ op string ,
331
+ f func (ctx context.Context ),
332
+ ) {
333
+ atDuration := tick .Sub (s .settings .StartTime )
334
+
335
+ var finishAndGetRecording func () tracingpb.Recording
336
+ if s .onRecording != nil {
337
+ ctx , finishAndGetRecording = tracing .ContextWithRecordingSpan (ctx , s .Tracer , op )
338
+ }
339
+ f (ctx )
340
+ if finishAndGetRecording != nil {
341
+ s .onRecording (storeID , atDuration , finishAndGetRecording ())
342
+ }
343
+ }
344
+
326
345
// tickQueues iterates over the next replicas for each store to
327
346
// consider. It then enqueues each of these and ticks the replicate queue for
328
347
// processing.
@@ -334,10 +353,16 @@ func (s *Simulator) tickQueues(ctx context.Context, tick time.Time, state state.
334
353
335
354
// Tick the split queue.
336
355
s .sqs [storeID ].Tick (ctx , tick , state )
356
+
337
357
// Tick the replicate queue.
338
- s .rqs [storeID ].Tick (ctx , tick , state )
358
+ s .doAndMaybeTrace (ctx , storeID , tick , "replicateQueue.PlanOneChange" , func (ctx context.Context ) {
359
+ s .rqs [storeID ].Tick (ctx , tick , state )
360
+ })
361
+
339
362
// Tick the lease queue.
340
- s .lqs [storeID ].Tick (ctx , tick , state )
363
+ s .doAndMaybeTrace (ctx , storeID , tick , "leaseQueue.PlanOneChange" , func (ctx context.Context ) {
364
+ s .lqs [storeID ].Tick (ctx , tick , state )
365
+ })
341
366
342
367
// Tick changes that may have been enqueued with a lower completion
343
368
// than the current tick, from the queues.
@@ -389,14 +414,9 @@ func (s *Simulator) tickMMStoreRebalancers(ctx context.Context, tick time.Time,
389
414
stores := s .state .Stores ()
390
415
s .shuffler (len (stores ), func (i , j int ) { stores [i ], stores [j ] = stores [j ], stores [i ] })
391
416
for _ , store := range stores {
392
- var finishAndGetRecording func () tracingpb.Recording
393
- if s .onRecording != nil {
394
- ctx , finishAndGetRecording = tracing .ContextWithRecordingSpan (ctx , s .Tracer , "mma.ComputeChanges" )
395
- }
396
- s .mmSRs [store .StoreID ()].Tick (ctx , tick , state )
397
- if finishAndGetRecording != nil {
398
- s .onRecording (store .StoreID (), finishAndGetRecording ())
399
- }
417
+ s .doAndMaybeTrace (ctx , store .StoreID (), tick , "mma.ComputeChanges" , func (ctx context.Context ) {
418
+ s .mmSRs [store .StoreID ()].Tick (ctx , tick , state )
419
+ })
400
420
}
401
421
}
402
422
0 commit comments