Skip to content

Commit 6034491

Browse files
authored
Merge pull request #155 from cschleiden/cschleiden/tester-timers
Do not run tester timers while activities are running
2 parents 4093d8f + fd0a48e commit 6034491

File tree

2 files changed

+53
-12
lines changed

2 files changed

+53
-12
lines changed

tester/tester.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ type workflowTester[TResult any] struct {
121121

122122
workflowHistory []history.Event
123123
clock *clock.Mock
124+
startTime time.Time
124125

125126
timers []*testTimer
126127
callbacks chan func() *history.WorkflowEvent
@@ -149,7 +150,7 @@ func WithTestTimeout(timeout time.Duration) WorkflowTesterOption {
149150
}
150151

151152
func NewWorkflowTester[TResult any](wf interface{}, opts ...WorkflowTesterOption) WorkflowTester[TResult] {
152-
// Start with the current wall-clock tiem
153+
// Start with the current wall-clock time
153154
clock := clock.NewMock()
154155
clock.Set(time.Now())
155156

@@ -246,6 +247,9 @@ func (wt *workflowTester[TResult]) OnSubWorkflow(workflow interface{}, args ...i
246247
}
247248

248249
func (wt *workflowTester[TResult]) Execute(args ...interface{}) {
250+
// Record start time of test run
251+
wt.startTime = wt.clock.Now()
252+
249253
// Start workflow under test
250254
initialEvent := wt.getInitialEvent(wt.wf, args)
251255
wt.addWorkflow(wt.wfi, initialEvent)
@@ -295,6 +299,16 @@ func (wt *workflowTester[TResult]) Execute(args ...interface{}) {
295299
}
296300
}
297301

302+
// Schedule activities
303+
for _, event := range result.ActivityEvents {
304+
gotNewEvents = true
305+
306+
a := event.Attributes.(*history.ActivityScheduledAttributes)
307+
wt.logger.Debug("Activity event", "activity", a.Name)
308+
309+
wt.scheduleActivity(tw.instance, event)
310+
}
311+
298312
for _, workflowEvent := range result.WorkflowEvents {
299313
gotNewEvents = true
300314
wt.logger.Debug("Workflow event", "event_type", workflowEvent.HistoryEvent.Type)
@@ -314,11 +328,6 @@ func (wt *workflowTester[TResult]) Execute(args ...interface{}) {
314328

315329
wt.scheduleTimer(tw.instance, timerEvent)
316330
}
317-
318-
// Schedule activities
319-
for _, event := range result.ActivityEvents {
320-
wt.scheduleActivity(tw.instance, event)
321-
}
322331
}
323332

324333
for !wt.workflowFinished && !gotNewEvents {
@@ -334,12 +343,10 @@ func (wt *workflowTester[TResult]) Execute(args ...interface{}) {
334343
default:
335344
}
336345

337-
if len(wt.timers) > 0 {
338-
// Take first timer and execute it
339-
sort.SliceStable(wt.timers, func(i, j int) bool {
340-
return wt.timers[i].At.Before(wt.timers[j].At)
341-
})
346+
// If there are no running activities and timers, skip time and jump to the next scheduled timer
342347

348+
if atomic.LoadInt32(&wt.runningActivities) == 0 && len(wt.timers) > 0 {
349+
// Take first timer and execute it
343350
t := wt.timers[0]
344351
wt.timers = wt.timers[1:]
345352

@@ -431,8 +438,9 @@ func (wt *workflowTester[TResult]) AssertExpectations(t *testing.T) {
431438
func (wt *workflowTester[TResult]) scheduleActivity(wfi *core.WorkflowInstance, event history.Event) {
432439
e := event.Attributes.(*history.ActivityScheduledAttributes)
433440

441+
atomic.AddInt32(&wt.runningActivities, 1)
442+
434443
go func() {
435-
atomic.AddInt32(&wt.runningActivities, 1)
436444
defer atomic.AddInt32(&wt.runningActivities, -1)
437445

438446
var activityErr error
@@ -542,6 +550,10 @@ func (wt *workflowTester[TResult]) scheduleTimer(instance *core.WorkflowInstance
542550
}
543551
},
544552
})
553+
554+
sort.SliceStable(wt.timers, func(i, j int) bool {
555+
return wt.timers[i].At.Before(wt.timers[j].At)
556+
})
545557
}
546558

547559
func (wt *workflowTester[TResult]) cancelTimer(instance *core.WorkflowInstance, event history.Event) {

tester/tester_timers_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package tester
22

33
import (
4+
"context"
45
"testing"
56
"time"
67

@@ -133,3 +134,31 @@ func workflowTimerRespondingWithoutNewEvents(ctx workflow.Context) error {
133134

134135
return nil
135136
}
137+
138+
func Test_TimerDuringActivities(t *testing.T) {
139+
activityFinished := false
140+
141+
act := func(ctx context.Context) error {
142+
time.Sleep(100 * time.Millisecond)
143+
activityFinished = true
144+
return nil
145+
}
146+
147+
wf := func(ctx workflow.Context) error {
148+
_, err := workflow.ExecuteActivity[any](ctx, workflow.DefaultActivityOptions, act).Get(ctx)
149+
return err
150+
}
151+
152+
tester := NewWorkflowTester[timerResult](wf)
153+
tester.Registry().RegisterActivity(act)
154+
155+
tester.ScheduleCallback(time.Duration(50*time.Millisecond), func() {
156+
require.True(t, activityFinished, "Activity should have finished before timer is fired")
157+
})
158+
159+
tester.Execute()
160+
161+
require.True(t, tester.WorkflowFinished())
162+
_, wrErr := tester.WorkflowResult()
163+
require.Empty(t, wrErr)
164+
}

0 commit comments

Comments
 (0)