Skip to content

Commit 5f2c009

Browse files
authored
Merge pull request #152 from cschleiden/cschleiden/fix-signal-tester
Fix signaling in tester workflows
2 parents 26d75c9 + d6fed7b commit 5f2c009

File tree

3 files changed

+110
-32
lines changed

3 files changed

+110
-32
lines changed

tester/tester.go

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ import (
55
"fmt"
66
"reflect"
77
"sort"
8+
"sync"
89
"sync/atomic"
910
"testing"
1011
"time"
1112

1213
"github.com/benbjohnson/clock"
14+
"github.com/cschleiden/go-workflows/backend"
1315
"github.com/cschleiden/go-workflows/internal/activity"
1416
margs "github.com/cschleiden/go-workflows/internal/args"
1517
"github.com/cschleiden/go-workflows/internal/command"
@@ -50,7 +52,7 @@ type WorkflowTester[TResult any] interface {
5052

5153
SignalWorkflow(signalName string, value interface{})
5254

53-
SignalWorkflowInstance(wfi *core.WorkflowInstance, signalName string, value interface{})
55+
SignalWorkflowInstance(wfi *core.WorkflowInstance, signalName string, value interface{}) error
5456

5557
WorkflowFinished() bool
5658

@@ -95,7 +97,9 @@ type workflowTester[TResult any] struct {
9597
wfi *core.WorkflowInstance
9698

9799
// Workflows
98-
testWorkflows []*testWorkflow
100+
mtw sync.RWMutex
101+
testWorkflowsByInstanceID map[string]*testWorkflow
102+
testWorkflows []*testWorkflow
99103

100104
workflowFinished bool
101105
workflowResult payload.Payload
@@ -167,7 +171,8 @@ func NewWorkflowTester[TResult any](wf interface{}, opts ...WorkflowTesterOption
167171
wfi: wfi,
168172
registry: registry,
169173

170-
testWorkflows: make([]*testWorkflow, 0),
174+
testWorkflows: make([]*testWorkflow, 0),
175+
testWorkflowsByInstanceID: make(map[string]*testWorkflow),
171176

172177
ma: &mock.Mock{},
173178
mockedActivities: make(map[string]bool),
@@ -236,11 +241,8 @@ func (wt *workflowTester[TResult]) OnSubWorkflow(workflow interface{}, args ...i
236241

237242
func (wt *workflowTester[TResult]) Execute(args ...interface{}) {
238243
// Start workflow under test
239-
wt.testWorkflows = append(wt.testWorkflows, &testWorkflow{
240-
instance: wt.wfi,
241-
pendingEvents: []history.Event{wt.getInitialEvent(wt.wf, args)},
242-
history: make([]history.Event, 0),
243-
})
244+
initialEvent := wt.getInitialEvent(wt.wf, args)
245+
wt.addWorkflow(wt.wfi, initialEvent)
244246

245247
for !wt.workflowFinished {
246248
// Execute all workflows until no more events?
@@ -356,23 +358,10 @@ func (wt *workflowTester[TResult]) Execute(args ...interface{}) {
356358
}
357359

358360
func (wt *workflowTester[TResult]) sendEvent(wfi *core.WorkflowInstance, event history.Event) {
359-
var w *testWorkflow
360-
for _, tw := range wt.testWorkflows {
361-
if tw.instance.InstanceID == wfi.InstanceID {
362-
w = tw
363-
break
364-
}
365-
}
361+
w := wt.getWorkflow(wfi)
366362

367363
if w == nil {
368-
// Workflow not mocked, create new instance
369-
w = &testWorkflow{
370-
instance: wfi,
371-
history: []history.Event{},
372-
pendingEvents: []history.Event{},
373-
}
374-
375-
wt.testWorkflows = append(wt.testWorkflows, w)
364+
panic(fmt.Sprintf("tried to send event to instance %s which does not exist", wfi.InstanceID))
376365
}
377366

378367
w.pendingEvents = append(w.pendingEvents, event)
@@ -382,7 +371,11 @@ func (wt *workflowTester[TResult]) SignalWorkflow(name string, value interface{}
382371
wt.SignalWorkflowInstance(wt.wfi, name, value)
383372
}
384373

385-
func (wt *workflowTester[TResult]) SignalWorkflowInstance(wfi *core.WorkflowInstance, name string, value interface{}) {
374+
func (wt *workflowTester[TResult]) SignalWorkflowInstance(wfi *core.WorkflowInstance, name string, value interface{}) error {
375+
if wt.getWorkflow(wfi) == nil {
376+
return backend.ErrInstanceNotFound
377+
}
378+
386379
arg, err := converter.DefaultConverter.To(value)
387380
if err != nil {
388381
panic("Could not convert signal value to string" + err.Error())
@@ -403,6 +396,8 @@ func (wt *workflowTester[TResult]) SignalWorkflowInstance(wfi *core.WorkflowInst
403396
HistoryEvent: e,
404397
}
405398
}
399+
400+
return nil
406401
}
407402

408403
func (wt *workflowTester[TResult]) WorkflowFinished() bool {
@@ -538,6 +533,28 @@ func (wt *workflowTester[TResult]) scheduleTimer(instance *core.WorkflowInstance
538533
})
539534
}
540535

536+
func (wt *workflowTester[TResult]) getWorkflow(instance *core.WorkflowInstance) *testWorkflow {
537+
wt.mtw.RLock()
538+
defer wt.mtw.RUnlock()
539+
540+
return wt.testWorkflowsByInstanceID[instance.InstanceID]
541+
}
542+
543+
func (wt *workflowTester[TResult]) addWorkflow(instance *core.WorkflowInstance, initialEvent history.Event) *testWorkflow {
544+
wt.mtw.Lock()
545+
defer wt.mtw.Unlock()
546+
547+
tw := &testWorkflow{
548+
instance: instance,
549+
pendingEvents: []history.Event{initialEvent},
550+
history: make([]history.Event, 0),
551+
}
552+
wt.testWorkflows = append(wt.testWorkflows, tw)
553+
wt.testWorkflowsByInstanceID[instance.InstanceID] = tw
554+
555+
return tw
556+
}
557+
541558
func (wt *workflowTester[TResult]) scheduleSubWorkflow(event history.WorkflowEvent) {
542559
a := event.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes)
543560

@@ -568,7 +585,7 @@ func (wt *workflowTester[TResult]) scheduleSubWorkflow(event history.WorkflowEve
568585

569586
if !wt.mockedWorkflows[a.Name] {
570587
// Workflow not mocked, allow event to be processed
571-
wt.sendEvent(event.WorkflowInstance, event.HistoryEvent)
588+
wt.addWorkflow(event.WorkflowInstance, event.HistoryEvent)
572589
return
573590
}
574591

@@ -646,7 +663,7 @@ type signaler[T any] struct {
646663
}
647664

648665
func (s *signaler[T]) SignalWorkflow(ctx context.Context, instanceID string, name string, arg interface{}) error {
649-
return nil
666+
return s.wt.SignalWorkflowInstance(core.NewWorkflowInstance(instanceID, ""), name, arg)
650667
}
651668

652669
var _ signals.Signaler = (*signaler[any])(nil)

tester/tester_subworkflow_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func Test_SubWorkflow_Signals(t *testing.T) {
140140
})
141141

142142
tester.ScheduleCallback(time.Millisecond, func() {
143-
tester.SignalWorkflowInstance(subWorkflowInstance, "subworkflow-signal", "42")
143+
require.Nil(t, tester.SignalWorkflowInstance(subWorkflowInstance, "subworkflow-signal", "42"))
144144
})
145145

146146
tester.Execute("hello")

tester/tester_test.go

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,34 @@ func workflowTimerCancellation(ctx workflow.Context) (time.Time, error) {
204204
return workflow.Now(ctx), nil
205205
}
206206

207+
func Test_TimerRespondingWithoutNewEvents(t *testing.T) {
208+
tester := NewWorkflowTester[time.Time](workflowTimerRespondingWithoutNewEvents)
209+
210+
tester.ScheduleCallback(time.Duration(2*time.Second), func() {
211+
tester.SignalWorkflow("signal", "s42")
212+
})
213+
214+
tester.Execute()
215+
216+
require.True(t, tester.WorkflowFinished())
217+
218+
_, err := tester.WorkflowResult()
219+
require.Empty(t, err)
220+
}
221+
222+
func workflowTimerRespondingWithoutNewEvents(ctx workflow.Context) error {
223+
workflow.ScheduleTimer(ctx, 1*time.Second).Get(ctx)
224+
225+
workflow.Select(
226+
ctx,
227+
workflow.Receive(workflow.NewSignalChannel[any](ctx, "signal"), func(ctx workflow.Context, signal any, ok bool) {
228+
// do nothing
229+
}),
230+
)
231+
232+
return nil
233+
}
234+
207235
func Test_Signals(t *testing.T) {
208236
tester := NewWorkflowTester[string](workflowSignal)
209237
tester.ScheduleCallback(time.Duration(5*time.Second), func() {
@@ -243,7 +271,7 @@ func Test_SignalSubWorkflowBeforeScheduling(t *testing.T) {
243271

244272
require.True(t, tester.WorkflowFinished())
245273
wfR, wfErr := tester.WorkflowResult()
246-
require.Empty(t, wfErr)
274+
require.Equal(t, backend.ErrInstanceNotFound.Error(), wfErr)
247275
require.IsType(t, "", wfR)
248276
}
249277

@@ -253,13 +281,46 @@ func workflowSubWorkFlowsAndSignals(ctx workflow.Context) (string, error) {
253281
return "", err
254282
}
255283

256-
workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{
257-
InstanceID: "subworkflow",
258-
}, workflowSum, 1, 2).Get(ctx)
259-
260284
return "finished without errors!", nil
261285
}
262286

263287
func workflowSum(ctx workflow.Context, valA, valB int) (int, error) {
264288
return valA + valB, nil
265289
}
290+
291+
func Test_SignalSubWorkflow(t *testing.T) {
292+
tester := NewWorkflowTester[int](workflowSubworkflowSignal)
293+
require.NoError(t, tester.Registry().RegisterWorkflow(waitForSignal))
294+
295+
tester.Execute()
296+
297+
require.True(t, tester.WorkflowFinished())
298+
wfR, wfErr := tester.WorkflowResult()
299+
require.Empty(t, wfErr)
300+
require.Equal(t, 42, wfR)
301+
}
302+
303+
func workflowSubworkflowSignal(ctx workflow.Context) (int, error) {
304+
sw := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{
305+
InstanceID: "subworkflow",
306+
}, waitForSignal)
307+
308+
_, err := workflow.SignalWorkflow(ctx, "subworkflow", "signal", "").Get(ctx)
309+
if err != nil {
310+
return 0, err
311+
}
312+
313+
// Wait for subworkflow and return result
314+
return sw.Get(ctx)
315+
}
316+
317+
func waitForSignal(ctx workflow.Context) (int, error) {
318+
workflow.Select(
319+
ctx,
320+
workflow.Receive(workflow.NewSignalChannel[any](ctx, "signal"), func(ctx workflow.Context, signal any, ok bool) {
321+
// Do nothing
322+
}),
323+
)
324+
325+
return 42, nil
326+
}

0 commit comments

Comments
 (0)