@@ -38,6 +38,39 @@ func (t *testHistoryProvider) GetWorkflowInstanceHistory(ctx context.Context, in
38
38
return t .history , nil
39
39
}
40
40
41
+ type testTimer struct {
42
+ // Instance is the workflow instance this timer is for
43
+ Instance * core.WorkflowInstance
44
+
45
+ // ScheduleEventID is the ID of the schedule event for this timer
46
+ ScheduleEventID int64
47
+
48
+ // At is the time this timer is scheduled for in test time
49
+ At time.Time
50
+
51
+ // Callback is called when the timer should fire.
52
+ Callback * func ()
53
+
54
+ TimerEvent * history.WorkflowEvent
55
+
56
+ wallClockTimer * clock.Timer
57
+ }
58
+
59
+ func (tt * testTimer ) fire () * history.WorkflowEvent {
60
+ if tt .Callback != nil {
61
+ (* tt .Callback )()
62
+ return nil
63
+ }
64
+
65
+ return tt .TimerEvent
66
+ }
67
+
68
+ type testWorkflow struct {
69
+ instance * core.WorkflowInstance
70
+ history []* history.Event
71
+ pendingEvents []* history.Event
72
+ }
73
+
41
74
type WorkflowTester [TResult any ] interface {
42
75
// Now returns the current time of the simulated clock in the tester
43
76
Now () time.Time
@@ -68,34 +101,6 @@ type WorkflowTester[TResult any] interface {
68
101
ListenSubWorkflow (listener func (instance * core.WorkflowInstance , name string ))
69
102
}
70
103
71
- type testTimer struct {
72
- // Instance is the workflow instance this timer is for
73
- Instance * core.WorkflowInstance
74
-
75
- // ScheduleEventID is the ID of the schedule event for this timer
76
- ScheduleEventID int64
77
-
78
- // At is the time this timer is scheduled for
79
- At time.Time
80
-
81
- // Callback is called when the timer should fire.
82
- Callback func ()
83
-
84
- wallClockTimer * clock.Timer
85
- }
86
-
87
- type testWorkflow struct {
88
- instance * core.WorkflowInstance
89
- history []* history.Event
90
- pendingEvents []* history.Event
91
- }
92
-
93
- type options struct {
94
- TestTimeout time.Duration
95
- Logger log.Logger
96
- Converter converter.Converter
97
- }
98
-
99
104
type workflowTester [TResult any ] struct {
100
105
options * options
101
106
@@ -123,11 +128,16 @@ type workflowTester[TResult any] struct {
123
128
workflowHistory []* history.Event
124
129
clock * clock.Mock
125
130
wallClock clock.Clock
126
- startTime time.Time
127
131
128
- sync.Map
129
- timers []* testTimer
130
- nextTimer * testTimer
132
+ // Wall-clock start time of the workflow test run
133
+ startTime time.Time
134
+
135
+ timers []* testTimer
136
+ wallClockTimer * clock.Timer
137
+
138
+ // timerWallClockStart time.Time
139
+ timerMode timeMode
140
+
131
141
callbacks chan func () * history.WorkflowEvent
132
142
133
143
subWorkflowListener func (* core.WorkflowInstance , string )
@@ -141,27 +151,9 @@ type workflowTester[TResult any] struct {
141
151
converter converter.Converter
142
152
}
143
153
144
- type WorkflowTesterOption func (* options )
145
-
146
- func WithLogger (logger log.Logger ) WorkflowTesterOption {
147
- return func (o * options ) {
148
- o .Logger = logger
149
- }
150
- }
151
-
152
- func WithConverter (converter converter.Converter ) WorkflowTesterOption {
153
- return func (o * options ) {
154
- o .Converter = converter
155
- }
156
- }
154
+ var _ WorkflowTester [any ] = (* workflowTester [any ])(nil )
157
155
158
- func WithTestTimeout (timeout time.Duration ) WorkflowTesterOption {
159
- return func (o * options ) {
160
- o .TestTimeout = timeout
161
- }
162
- }
163
-
164
- func NewWorkflowTester [TResult any ](wf interface {}, opts ... WorkflowTesterOption ) WorkflowTester [TResult ] {
156
+ func NewWorkflowTester [TResult any ](wf interface {}, opts ... WorkflowTesterOption ) * workflowTester [TResult ] {
165
157
if err := margs.ReturnTypeMatch [TResult ](wf ); err != nil {
166
158
panic (fmt .Sprintf ("workflow return type does not match: %s" , err ))
167
159
}
@@ -207,6 +199,7 @@ func NewWorkflowTester[TResult any](wf interface{}, opts ...WorkflowTesterOption
207
199
208
200
timers : make ([]* testTimer , 0 ),
209
201
callbacks : make (chan func () * history.WorkflowEvent , 1024 ),
202
+ timerMode : TM_TimeTravel ,
210
203
211
204
logger : options .Logger .With ("source" , "tester" ),
212
205
tracer : tracer ,
@@ -235,8 +228,9 @@ func (wt *workflowTester[TResult]) Registry() *workflow.Registry {
235
228
236
229
func (wt * workflowTester [TResult ]) ScheduleCallback (delay time.Duration , callback func ()) {
237
230
wt .timers = append (wt .timers , & testTimer {
238
- At : wt .clock .Now ().Add (delay ),
239
- Callback : callback ,
231
+ At : wt .clock .Now ().Add (delay ),
232
+ Callback : & callback ,
233
+ TimerEvent : nil ,
240
234
})
241
235
}
242
236
@@ -362,34 +356,8 @@ func (wt *workflowTester[TResult]) Execute(args ...interface{}) {
362
356
}
363
357
364
358
// No callbacks, try to fire any pending timers
365
- if len (wt .timers ) > 0 && wt .nextTimer == nil {
366
- // Take first timer and execute it
367
- t := wt .timers [0 ]
368
- wt .timers = wt .timers [1 :]
369
-
370
- // If there are no running activities, we can time-travel to the next timer and execute it. Otherwise, if
371
- // there are running activities, only fire the timer if it is due.
372
- runningActivities := atomic .LoadInt32 (& wt .runningActivities )
373
- if runningActivities > 0 {
374
- // Wall-clock mode
375
- wt .logger .Debug ("Scheduling wall-clock timer" , "at" , t .At )
376
-
377
- wt .nextTimer = t
378
-
379
- remainingTime := wt .clock .Until (t .At )
380
- t .wallClockTimer = wt .wallClock .AfterFunc (remainingTime , func () {
381
- t .Callback ()
382
- wt .nextTimer = nil
383
- })
384
- } else {
385
- // Time-travel mode
386
- wt .logger .Debug ("Advancing workflow clock to fire timer" , "to" , t .At )
387
-
388
- // Advance workflow clock and fire the timer
389
- wt .clock .Set (t .At )
390
- t .Callback ()
391
- }
392
-
359
+ if wt .fireTimer () {
360
+ // Timer fired
393
361
continue
394
362
}
395
363
@@ -412,6 +380,86 @@ func (wt *workflowTester[TResult]) Execute(args ...interface{}) {
412
380
}
413
381
}
414
382
383
+ func (wt * workflowTester [TResult ]) fireTimer () bool {
384
+ if len (wt .timers ) == 0 {
385
+ // No timers to fire
386
+ return false
387
+ }
388
+
389
+ // Determine mode we should be in and transition if it doesn't match the current one
390
+ newMode := wt .newTimerMode ()
391
+ if wt .timerMode != newMode {
392
+ wt .logger .Debug ("Transitioning timer mode" , "from" , wt .timerMode , "to" , newMode )
393
+
394
+ // Transition timer mode
395
+ switch newMode {
396
+ case TM_TimeTravel :
397
+ if wt .wallClockTimer != nil {
398
+ wt .wallClockTimer .Stop ()
399
+ wt .wallClockTimer = nil
400
+ }
401
+
402
+ case TM_WallClock :
403
+ // Going from time-travel to wall-clock mode. Nothing to do here.
404
+ }
405
+
406
+ wt .timerMode = newMode
407
+ }
408
+
409
+ switch wt .timerMode {
410
+ case TM_TimeTravel :
411
+ {
412
+ // Pop first timer and execute it
413
+ t := wt .timers [0 ]
414
+ wt .timers = wt .timers [1 :]
415
+
416
+ wt .logger .Debug ("Advancing workflow clock to fire timer" , "to" , t .At )
417
+
418
+ // Advance workflow clock and fire the timer
419
+ wt .clock .Set (t .At )
420
+ wt .callbacks <- t .fire
421
+ return true
422
+ }
423
+
424
+ case TM_WallClock :
425
+ {
426
+ if wt .wallClockTimer != nil {
427
+ // Wall-clock timer already scheduled
428
+ return false
429
+ }
430
+
431
+ t := wt .timers [0 ]
432
+
433
+ wt .logger .Debug ("Scheduling wall-clock timer" , "at" , t .At )
434
+
435
+ // Determine when this should run
436
+ remainingTime := t .At .Sub (wt .clock .Now ())
437
+
438
+ // Schedule timer
439
+ wt .wallClockTimer = wt .wallClock .AfterFunc (remainingTime , func () {
440
+ wt .callbacks <- func () * history.WorkflowEvent {
441
+ // Remove timer
442
+ wt .timers = wt .timers [1 :]
443
+ wt .wallClockTimer = nil
444
+
445
+ return t .fire ()
446
+ }
447
+ })
448
+ }
449
+ }
450
+
451
+ return false
452
+ }
453
+
454
+ func (wt * workflowTester [TResult ]) newTimerMode () timeMode {
455
+ runningActivities := atomic .LoadInt32 (& wt .runningActivities )
456
+ if runningActivities > 0 {
457
+ return TM_WallClock
458
+ }
459
+
460
+ return TM_TimeTravel
461
+ }
462
+
415
463
func (wt * workflowTester [TResult ]) sendEvent (wfi * core.WorkflowInstance , event * history.Event ) {
416
464
w := wt .getWorkflow (wfi )
417
465
@@ -580,13 +628,9 @@ func (wt *workflowTester[TResult]) scheduleTimer(instance *core.WorkflowInstance
580
628
Instance : instance ,
581
629
ScheduleEventID : event .ScheduleEventID ,
582
630
At : e .At ,
583
- Callback : func () {
584
- wt .callbacks <- func () * history.WorkflowEvent {
585
- return & history.WorkflowEvent {
586
- WorkflowInstance : instance ,
587
- HistoryEvent : event ,
588
- }
589
- }
631
+ TimerEvent : & history.WorkflowEvent {
632
+ WorkflowInstance : instance ,
633
+ HistoryEvent : event ,
590
634
},
591
635
})
592
636
0 commit comments