Skip to content

Commit ba13511

Browse files
committed
Explicitly switch between timer modes
1 parent 604f1f4 commit ba13511

File tree

6 files changed

+317
-66
lines changed

6 files changed

+317
-66
lines changed

tester/options.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package tester
2+
3+
import (
4+
"time"
5+
6+
"github.com/cschleiden/go-workflows/internal/converter"
7+
"github.com/cschleiden/go-workflows/log"
8+
)
9+
10+
type WorkflowTesterOption func(*options)
11+
12+
func WithLogger(logger log.Logger) WorkflowTesterOption {
13+
return func(o *options) {
14+
o.Logger = logger
15+
}
16+
}
17+
18+
func WithConverter(converter converter.Converter) WorkflowTesterOption {
19+
return func(o *options) {
20+
o.Converter = converter
21+
}
22+
}
23+
24+
func WithTestTimeout(timeout time.Duration) WorkflowTesterOption {
25+
return func(o *options) {
26+
o.TestTimeout = timeout
27+
}
28+
}

tester/tester.go

Lines changed: 117 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,29 @@ type testTimer struct {
4545
// ScheduleEventID is the ID of the schedule event for this timer
4646
ScheduleEventID int64
4747

48-
// At is the time this timer is scheduled for
48+
// At is the time this timer is scheduled for in test time
4949
At time.Time
5050

51+
// WallClockAt is the time this timer is scheduled for in wall-clock time
52+
WallClockAt time.Time
53+
5154
// Callback is called when the timer should fire.
52-
Callback func()
55+
Callback *func()
56+
57+
TimerEvent *history.WorkflowEvent
5358

5459
wallClockTimer *clock.Timer
5560
}
5661

62+
func (tt *testTimer) fire() *history.WorkflowEvent {
63+
if tt.Callback != nil {
64+
(*tt.Callback)()
65+
return nil
66+
}
67+
68+
return tt.TimerEvent
69+
}
70+
5771
type testWorkflow struct {
5872
instance *core.WorkflowInstance
5973
history []*history.Event
@@ -93,11 +107,16 @@ type workflowTester[TResult any] struct {
93107
workflowHistory []*history.Event
94108
clock *clock.Mock
95109
wallClock clock.Clock
96-
startTime time.Time
97110

98-
sync.Map
99-
timers []*testTimer
100-
nextTimer *testTimer
111+
// Wall-clock start time of the workflow test run
112+
startTime time.Time
113+
114+
timers []*testTimer
115+
wallClockTimer *clock.Timer
116+
117+
// timerWallClockStart time.Time
118+
timerMode timeMode
119+
101120
callbacks chan func() *history.WorkflowEvent
102121

103122
subWorkflowListener func(*core.WorkflowInstance, string)
@@ -111,26 +130,6 @@ type workflowTester[TResult any] struct {
111130
converter converter.Converter
112131
}
113132

114-
type WorkflowTesterOption func(*options)
115-
116-
func WithLogger(logger log.Logger) WorkflowTesterOption {
117-
return func(o *options) {
118-
o.Logger = logger
119-
}
120-
}
121-
122-
func WithConverter(converter converter.Converter) WorkflowTesterOption {
123-
return func(o *options) {
124-
o.Converter = converter
125-
}
126-
}
127-
128-
func WithTestTimeout(timeout time.Duration) WorkflowTesterOption {
129-
return func(o *options) {
130-
o.TestTimeout = timeout
131-
}
132-
}
133-
134133
func NewWorkflowTester[TResult any](wf interface{}, opts ...WorkflowTesterOption) *workflowTester[TResult] {
135134
if err := margs.ReturnTypeMatch[TResult](wf); err != nil {
136135
panic(fmt.Sprintf("workflow return type does not match: %s", err))
@@ -177,6 +176,7 @@ func NewWorkflowTester[TResult any](wf interface{}, opts ...WorkflowTesterOption
177176

178177
timers: make([]*testTimer, 0),
179178
callbacks: make(chan func() *history.WorkflowEvent, 1024),
179+
timerMode: TM_TimeTravel,
180180

181181
logger: options.Logger.With("source", "tester"),
182182
tracer: tracer,
@@ -205,8 +205,9 @@ func (wt *workflowTester[TResult]) Registry() *workflow.Registry {
205205

206206
func (wt *workflowTester[TResult]) ScheduleCallback(delay time.Duration, callback func()) {
207207
wt.timers = append(wt.timers, &testTimer{
208-
At: wt.clock.Now().Add(delay),
209-
Callback: callback,
208+
At: wt.clock.Now().Add(delay),
209+
Callback: &callback,
210+
TimerEvent: nil,
210211
})
211212
}
212213

@@ -332,34 +333,8 @@ func (wt *workflowTester[TResult]) Execute(args ...interface{}) {
332333
}
333334

334335
// No callbacks, try to fire any pending timers
335-
if len(wt.timers) > 0 && wt.nextTimer == nil {
336-
// Take first timer and execute it
337-
t := wt.timers[0]
338-
wt.timers = wt.timers[1:]
339-
340-
// If there are no running activities, we can time-travel to the next timer and execute it. Otherwise, if
341-
// there are running activities, only fire the timer if it is due.
342-
runningActivities := atomic.LoadInt32(&wt.runningActivities)
343-
if runningActivities > 0 {
344-
// Wall-clock mode
345-
wt.logger.Debug("Scheduling wall-clock timer", "at", t.At)
346-
347-
wt.nextTimer = t
348-
349-
remainingTime := wt.clock.Until(t.At)
350-
t.wallClockTimer = wt.wallClock.AfterFunc(remainingTime, func() {
351-
t.Callback()
352-
wt.nextTimer = nil
353-
})
354-
} else {
355-
// Time-travel mode
356-
wt.logger.Debug("Advancing workflow clock to fire timer", "to", t.At)
357-
358-
// Advance workflow clock and fire the timer
359-
wt.clock.Set(t.At)
360-
t.Callback()
361-
}
362-
336+
if wt.fireTimer() {
337+
// Timer fired
363338
continue
364339
}
365340

@@ -382,6 +357,88 @@ func (wt *workflowTester[TResult]) Execute(args ...interface{}) {
382357
}
383358
}
384359

360+
func (wt *workflowTester[TResult]) fireTimer() bool {
361+
if len(wt.timers) == 0 {
362+
// No timers to fire
363+
return false
364+
}
365+
366+
// Determine mode we should be in and transition if it doesn't match the current one
367+
newMode := wt.newTimerMode()
368+
if wt.timerMode != newMode {
369+
wt.logger.Debug("Transitioning timer mode", "from", wt.timerMode, "to", newMode)
370+
371+
// Transition timer mode
372+
switch newMode {
373+
case TM_TimeTravel:
374+
if wt.wallClockTimer != nil {
375+
wt.wallClockTimer.Stop()
376+
wt.wallClockTimer = nil
377+
}
378+
379+
case TM_WallClock:
380+
// Going from time-travel to wall-clock mode. Nothing to do here.
381+
}
382+
383+
wt.timerMode = newMode
384+
}
385+
386+
switch wt.timerMode {
387+
case TM_TimeTravel:
388+
{
389+
// Pop first timer and execute it
390+
t := wt.timers[0]
391+
wt.timers = wt.timers[1:]
392+
393+
wt.logger.Debug("Advancing workflow clock to fire timer", "to", t.At)
394+
395+
// Advance workflow clock and fire the timer
396+
wt.clock.Set(t.At)
397+
wt.callbacks <- t.fire
398+
return true
399+
}
400+
401+
case TM_WallClock:
402+
{
403+
t := wt.timers[0]
404+
405+
wt.logger.Debug("Scheduling wall-clock timer", "at", t.WallClockAt)
406+
407+
// wt.nextTimer = t
408+
409+
if wt.wallClock.Now().After(t.WallClockAt) {
410+
// Fire timer
411+
wt.timers = wt.timers[1:]
412+
wt.callbacks <- t.fire
413+
414+
return true
415+
} else if wt.wallClockTimer == nil {
416+
// Schedule timer
417+
wt.wallClockTimer = wt.wallClock.AfterFunc(t.WallClockAt.Sub(wt.wallClock.Now()), func() {
418+
wt.callbacks <- func() *history.WorkflowEvent {
419+
// Remove timer
420+
wt.timers = wt.timers[1:]
421+
wt.wallClockTimer = nil
422+
423+
return t.fire()
424+
}
425+
})
426+
}
427+
}
428+
}
429+
430+
return false
431+
}
432+
433+
func (wt *workflowTester[TResult]) newTimerMode() timeMode {
434+
runningActivities := atomic.LoadInt32(&wt.runningActivities)
435+
if runningActivities > 0 {
436+
return TM_WallClock
437+
}
438+
439+
return TM_TimeTravel
440+
}
441+
385442
func (wt *workflowTester[TResult]) sendEvent(wfi *core.WorkflowInstance, event *history.Event) {
386443
w := wt.getWorkflow(wfi)
387444

@@ -550,13 +607,10 @@ func (wt *workflowTester[TResult]) scheduleTimer(instance *core.WorkflowInstance
550607
Instance: instance,
551608
ScheduleEventID: event.ScheduleEventID,
552609
At: e.At,
553-
Callback: func() {
554-
wt.callbacks <- func() *history.WorkflowEvent {
555-
return &history.WorkflowEvent{
556-
WorkflowInstance: instance,
557-
HistoryEvent: event,
558-
}
559-
}
610+
WallClockAt: wt.wallClock.Now().Add(e.At.Sub(wt.clock.Now())),
611+
TimerEvent: &history.WorkflowEvent{
612+
WorkflowInstance: instance,
613+
HistoryEvent: event,
560614
},
561615
})
562616

tester/tester_activity_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ func Test_LongActivity(t *testing.T) {
6060

6161
func Test_ActivityRaceWithSignal(t *testing.T) {
6262
activity1 := func() (string, error) {
63+
time.Sleep(200 * time.Millisecond)
64+
6365
return "activity", nil
6466
}
6567

@@ -90,9 +92,7 @@ func Test_ActivityRaceWithSignal(t *testing.T) {
9092

9193
tester := NewWorkflowTester[string](wf, WithTestTimeout(time.Second*3))
9294

93-
tester.OnActivity(activity1).Run(func(args mock.Arguments) {
94-
time.Sleep(200 * time.Millisecond)
95-
}).Return("activity", nil)
95+
tester.OnActivity(activity1).Return("activity", nil)
9696

9797
tester.ScheduleCallback(time.Millisecond*100, func() {
9898
tester.SignalWorkflow("signal", "stop")

tester/tester_logger_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package tester
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"github.com/cschleiden/go-workflows/internal/logger"
8+
"github.com/cschleiden/go-workflows/log"
9+
)
10+
11+
type debugLogger struct {
12+
defaultFields []interface{}
13+
lines *[]string
14+
l log.Logger
15+
}
16+
17+
func newDebugLogger() *debugLogger {
18+
lines := []string{}
19+
return &debugLogger{
20+
lines: &lines,
21+
defaultFields: []interface{}{},
22+
l: logger.NewDefaultLogger(),
23+
}
24+
}
25+
26+
func (dl *debugLogger) hasLine(msg string) bool {
27+
for _, line := range *dl.lines {
28+
if strings.Contains(line, msg) {
29+
return true
30+
}
31+
}
32+
33+
return false
34+
}
35+
36+
func (dl *debugLogger) formatFields(level, msg string, fields ...interface{}) string {
37+
var result []string
38+
39+
result = append(result, fmt.Sprintf("|%s| %s", level, msg))
40+
41+
for i := 0; i < len(dl.defaultFields)/2; i++ {
42+
result = append(result, fmt.Sprintf("%v=%v", dl.defaultFields[i*2], dl.defaultFields[i*2+1]))
43+
}
44+
45+
for i := 0; i < len(fields)/2; i++ {
46+
result = append(result, fmt.Sprintf("%v=%v", fields[i*2], fields[i*2+1]))
47+
}
48+
49+
return strings.Join(result, " ")
50+
}
51+
52+
func (dl *debugLogger) addLine(level, msg string, fields ...interface{}) {
53+
// Persist for debugging
54+
*dl.lines = append(*dl.lines, dl.formatFields(level, msg, fields...))
55+
}
56+
57+
func (dl *debugLogger) Debug(msg string, fields ...interface{}) {
58+
dl.addLine("DEBUG", msg, fields...)
59+
dl.l.Debug(msg, fields...)
60+
}
61+
62+
func (dl *debugLogger) Error(msg string, fields ...interface{}) {
63+
dl.addLine("ERROR", msg, fields...)
64+
dl.l.Error(msg, fields...)
65+
}
66+
67+
func (dl *debugLogger) Panic(msg string, fields ...interface{}) {
68+
dl.addLine("PANIC", msg, fields...)
69+
dl.l.Panic(msg, fields...)
70+
}
71+
72+
func (dl *debugLogger) Warn(msg string, fields ...interface{}) {
73+
dl.addLine("WARN", msg, fields...)
74+
dl.l.Warn(msg, fields...)
75+
}
76+
77+
func (dl *debugLogger) With(fields ...interface{}) log.Logger {
78+
return &debugLogger{
79+
lines: dl.lines, // Keep this here
80+
defaultFields: append(dl.defaultFields, fields...),
81+
l: dl.l.With(fields...),
82+
}
83+
}
84+
85+
var _ log.Logger = (*debugLogger)(nil)

0 commit comments

Comments
 (0)