Skip to content

Commit e6e027c

Browse files
authored
Merge pull request #169 from cschleiden/cschleiden/wall-clock-timers
Schedule wall-clock timers during activities
2 parents c528042 + be8cca4 commit e6e027c

File tree

3 files changed

+171
-61
lines changed

3 files changed

+171
-61
lines changed

tester/tester.go

Lines changed: 66 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,13 @@ type testTimer struct {
7575
// ScheduleEventID is the ID of the schedule event for this timer
7676
ScheduleEventID int64
7777

78-
// At is the time this timer is scheduled for. This will advance the mock clock
79-
// to this timestamp
78+
// At is the time this timer is scheduled for
8079
At time.Time
8180

82-
// Callback is called when the timer should fire. It can return a history event which
83-
// will be added to the event history being executed.
81+
// Callback is called when the timer should fire.
8482
Callback func()
83+
84+
wallClockTimer *clock.Timer
8585
}
8686

8787
type testWorkflow struct {
@@ -122,9 +122,12 @@ type workflowTester[TResult any] struct {
122122

123123
workflowHistory []*history.Event
124124
clock *clock.Mock
125+
wallClock clock.Clock
125126
startTime time.Time
126127

128+
sync.Map
127129
timers []*testTimer
130+
nextTimer *testTimer
128131
callbacks chan func() *history.WorkflowEvent
129132

130133
subWorkflowListener func(*core.WorkflowInstance, string)
@@ -159,9 +162,13 @@ func WithTestTimeout(timeout time.Duration) WorkflowTesterOption {
159162
}
160163

161164
func NewWorkflowTester[TResult any](wf interface{}, opts ...WorkflowTesterOption) WorkflowTester[TResult] {
162-
// Start with the current wall-clock time
163-
clock := clock.NewMock()
164-
clock.Set(time.Now())
165+
if err := margs.ReturnTypeMatch[TResult](wf); err != nil {
166+
panic(fmt.Sprintf("workflow return type does not match: %s", err))
167+
}
168+
169+
// Start with the current wall-c time
170+
c := clock.NewMock()
171+
c.Set(time.Now())
165172

166173
wfi := core.NewWorkflowInstance(uuid.NewString(), uuid.NewString())
167174
registry := workflow.NewRegistry()
@@ -195,12 +202,13 @@ func NewWorkflowTester[TResult any](wf interface{}, opts ...WorkflowTesterOption
195202
mockedWorkflows: make(map[string]bool),
196203

197204
workflowHistory: make([]*history.Event, 0),
198-
clock: clock,
205+
clock: c,
206+
wallClock: clock.New(),
199207

200208
timers: make([]*testTimer, 0),
201209
callbacks: make(chan func() *history.WorkflowEvent, 1024),
202210

203-
logger: options.Logger,
211+
logger: options.Logger.With("source", "tester"),
204212
tracer: tracer,
205213
converter: options.Converter,
206214
}
@@ -263,7 +271,7 @@ func (wt *workflowTester[TResult]) Execute(args ...interface{}) {
263271
wt.addWorkflow(wt.wfi, initialEvent)
264272

265273
for !wt.workflowFinished {
266-
// Execute all workflows until no more events?
274+
// Execute all workflows until no more events
267275
gotNewEvents := false
268276

269277
for _, tw := range wt.testWorkflows {
@@ -334,14 +342,14 @@ func (wt *workflowTester[TResult]) Execute(args ...interface{}) {
334342
// Schedule timers
335343
for _, timerEvent := range result.TimerEvents {
336344
gotNewEvents = true
337-
wt.logger.Debug("Timer event", "event_type", timerEvent.Type)
345+
wt.logger.Debug("Timer future event", "event_type", timerEvent.Type, "at", *timerEvent.VisibleAt)
338346

339347
wt.scheduleTimer(tw.instance, timerEvent)
340348
}
341349
}
342350

343351
for !wt.workflowFinished && !gotNewEvents {
344-
// No new events left and the workflow isn't finished yet. Check for timers or callbacks
352+
// No new events left and workflows aren't finished yet. Check for callbacks
345353
select {
346354
case callback := <-wt.callbacks:
347355
event := callback()
@@ -353,31 +361,52 @@ func (wt *workflowTester[TResult]) Execute(args ...interface{}) {
353361
default:
354362
}
355363

356-
// If there are no running activities and timers, skip time and jump to the next scheduled timer
357-
358-
if atomic.LoadInt32(&wt.runningActivities) == 0 && len(wt.timers) > 0 {
364+
// No callbacks, try to fire any pending timers
365+
if len(wt.timers) > 0 && wt.nextTimer == nil {
359366
// Take first timer and execute it
360367
t := wt.timers[0]
361368
wt.timers = wt.timers[1:]
362369

363-
// Advance workflow clock to fire the timer
364-
wt.logger.Debug("Advancing workflow clock to fire timer")
365-
wt.clock.Set(t.At)
366-
t.Callback()
367-
} else {
368-
t := time.NewTimer(wt.options.TestTimeout)
369-
370-
select {
371-
case callback := <-wt.callbacks:
372-
event := callback()
373-
if event != nil {
374-
wt.sendEvent(event.WorkflowInstance, event.HistoryEvent)
375-
gotNewEvents = true
376-
}
377-
case <-t.C:
378-
t.Stop()
379-
panic("No new events generated during workflow execution and no pending timers, workflow blocked?")
370+
// If there are no running activities, we can time-travel to the next timer and execute it. Otherwise, if
371+
// there are running activities, only fire the timer if it is due.
372+
runningActivities := atomic.LoadInt32(&wt.runningActivities)
373+
if runningActivities > 0 {
374+
// Wall-clock mode
375+
wt.logger.Debug("Scheduling wall-clock timer", "at", t.At)
376+
377+
wt.nextTimer = t
378+
379+
remainingTime := wt.clock.Until(t.At)
380+
t.wallClockTimer = wt.wallClock.AfterFunc(remainingTime, func() {
381+
t.Callback()
382+
wt.nextTimer = nil
383+
})
384+
} else {
385+
// Time-travel mode
386+
wt.logger.Debug("Advancing workflow clock to fire timer", "to", t.At)
387+
388+
// Advance workflow clock and fire the timer
389+
wt.clock.Set(t.At)
390+
t.Callback()
380391
}
392+
393+
continue
394+
}
395+
396+
// Wait until a callback is ready or we hit the test/idle timeout
397+
t := time.NewTimer(wt.options.TestTimeout)
398+
399+
select {
400+
case callback := <-wt.callbacks:
401+
event := callback()
402+
if event != nil {
403+
wt.sendEvent(event.WorkflowInstance, event.HistoryEvent)
404+
gotNewEvents = true
405+
}
406+
407+
case <-t.C:
408+
t.Stop()
409+
panic("No new events generated during workflow execution and no pending timers, workflow blocked?")
381410
}
382411
}
383412
}
@@ -569,6 +598,11 @@ func (wt *workflowTester[TResult]) scheduleTimer(instance *core.WorkflowInstance
569598
func (wt *workflowTester[TResult]) cancelTimer(instance *core.WorkflowInstance, event *history.Event) {
570599
for i, t := range wt.timers {
571600
if t.Instance != nil && t.Instance.InstanceID == instance.InstanceID && t.ScheduleEventID == event.ScheduleEventID {
601+
// If this was the next timer to fire, stop the timer
602+
if t.wallClockTimer != nil {
603+
t.wallClockTimer.Stop()
604+
}
605+
572606
wt.timers = append(wt.timers[:i], wt.timers[i+1:]...)
573607
break
574608
}

tester/tester_activity_test.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package tester
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/cschleiden/go-workflows/workflow"
8+
"github.com/stretchr/testify/mock"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func Test_LongActivity(t *testing.T) {
13+
activity1 := func() (string, error) {
14+
return "activity", nil
15+
}
16+
17+
wf := func(ctx workflow.Context) (string, error) {
18+
tctx, cancel := workflow.WithCancel(ctx)
19+
20+
var r string
21+
var err error
22+
23+
workflow.Select(ctx,
24+
// Fire timer before `activity1` completes
25+
workflow.Await(workflow.ScheduleTimer(tctx, time.Millisecond*100), func(ctx workflow.Context, f workflow.Future[struct{}]) {
26+
// Timer fired
27+
r = "timer"
28+
}),
29+
workflow.Await(
30+
workflow.ExecuteActivity[string](
31+
ctx, workflow.DefaultActivityOptions, activity1),
32+
func(ctx workflow.Context, f workflow.Future[string]) {
33+
cancel() // cancel timer
34+
r, err = f.Get(ctx)
35+
}),
36+
)
37+
38+
if err != nil {
39+
return "", err
40+
}
41+
42+
return r, nil
43+
}
44+
45+
tester := NewWorkflowTester[string](wf, WithTestTimeout(time.Second*3))
46+
47+
tester.OnActivity(activity1).Run(func(args mock.Arguments) {
48+
time.Sleep(200 * time.Millisecond)
49+
}).Return("activity", nil)
50+
51+
tester.Execute()
52+
53+
require.True(t, tester.WorkflowFinished())
54+
wr, _ := tester.WorkflowResult()
55+
require.Equal(t, "timer", wr)
56+
tester.AssertExpectations(t)
57+
}
58+
59+
func Test_ActivityRaceWithSignal(t *testing.T) {
60+
activity1 := func() (string, error) {
61+
return "activity", nil
62+
}
63+
64+
wf := func(ctx workflow.Context) (string, error) {
65+
var r string
66+
var err error
67+
68+
workflow.Select(ctx,
69+
workflow.Await(
70+
workflow.ExecuteActivity[string](
71+
ctx, workflow.DefaultActivityOptions, activity1),
72+
func(ctx workflow.Context, f workflow.Future[string]) {
73+
r, err = f.Get(ctx)
74+
}),
75+
workflow.Receive(
76+
workflow.NewSignalChannel[any](ctx, "signal"),
77+
func(ctx workflow.Context, signal any, ok bool) {
78+
r = "signal"
79+
}),
80+
)
81+
82+
if err != nil {
83+
return "", err
84+
}
85+
86+
return r, nil
87+
}
88+
89+
tester := NewWorkflowTester[string](wf, WithTestTimeout(time.Second*3))
90+
91+
tester.OnActivity(activity1).Run(func(args mock.Arguments) {
92+
time.Sleep(200 * time.Millisecond)
93+
}).Return("activity", nil)
94+
95+
tester.ScheduleCallback(time.Millisecond*100, func() {
96+
tester.SignalWorkflow("signal", "stop")
97+
})
98+
99+
tester.Execute()
100+
101+
require.True(t, tester.WorkflowFinished())
102+
wr, _ := tester.WorkflowResult()
103+
require.Equal(t, "signal", wr)
104+
tester.AssertExpectations(t)
105+
}

tester/tester_timers_test.go

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package tester
22

33
import (
4-
"context"
54
"testing"
65
"time"
76

@@ -134,31 +133,3 @@ func workflowTimerRespondingWithoutNewEvents(ctx workflow.Context) error {
134133

135134
return nil
136135
}
137-
138-
func Test_TimerDuringActivities(t *testing.T) {
139-
activityFinished := false
140-
141-
act := func(ctx context.Context) error {
142-
time.Sleep(100 * time.Millisecond)
143-
activityFinished = true
144-
return nil
145-
}
146-
147-
wf := func(ctx workflow.Context) error {
148-
_, err := workflow.ExecuteActivity[any](ctx, workflow.DefaultActivityOptions, act).Get(ctx)
149-
return err
150-
}
151-
152-
tester := NewWorkflowTester[timerResult](wf)
153-
tester.Registry().RegisterActivity(act)
154-
155-
tester.ScheduleCallback(time.Duration(50*time.Millisecond), func() {
156-
require.True(t, activityFinished, "Activity should have finished before timer is fired")
157-
})
158-
159-
tester.Execute()
160-
161-
require.True(t, tester.WorkflowFinished())
162-
_, wrErr := tester.WorkflowResult()
163-
require.Empty(t, wrErr)
164-
}

0 commit comments

Comments
 (0)