Skip to content

Commit d6fed7b

Browse files
committed
More reliably send events between unit-test workflows
1 parent 6962b6c commit d6fed7b

File tree

3 files changed

+45
-34
lines changed

3 files changed

+45
-34
lines changed

tester/tester.go

Lines changed: 43 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ import (
55
"fmt"
66
"reflect"
77
"sort"
8+
"sync"
89
"sync/atomic"
910
"testing"
1011
"time"
1112

1213
"github.com/benbjohnson/clock"
14+
"github.com/cschleiden/go-workflows/backend"
1315
"github.com/cschleiden/go-workflows/internal/activity"
1416
margs "github.com/cschleiden/go-workflows/internal/args"
1517
"github.com/cschleiden/go-workflows/internal/command"
@@ -50,7 +52,7 @@ type WorkflowTester[TResult any] interface {
5052

5153
SignalWorkflow(signalName string, value interface{})
5254

53-
SignalWorkflowInstance(wfi *core.WorkflowInstance, signalName string, value interface{})
55+
SignalWorkflowInstance(wfi *core.WorkflowInstance, signalName string, value interface{}) error
5456

5557
WorkflowFinished() bool
5658

@@ -95,7 +97,9 @@ type workflowTester[TResult any] struct {
9597
wfi *core.WorkflowInstance
9698

9799
// Workflows
98-
testWorkflows []*testWorkflow
100+
mtw sync.RWMutex
101+
testWorkflowsByInstanceID map[string]*testWorkflow
102+
testWorkflows []*testWorkflow
99103

100104
workflowFinished bool
101105
workflowResult payload.Payload
@@ -167,7 +171,8 @@ func NewWorkflowTester[TResult any](wf interface{}, opts ...WorkflowTesterOption
167171
wfi: wfi,
168172
registry: registry,
169173

170-
testWorkflows: make([]*testWorkflow, 0),
174+
testWorkflows: make([]*testWorkflow, 0),
175+
testWorkflowsByInstanceID: make(map[string]*testWorkflow),
171176

172177
ma: &mock.Mock{},
173178
mockedActivities: make(map[string]bool),
@@ -236,11 +241,8 @@ func (wt *workflowTester[TResult]) OnSubWorkflow(workflow interface{}, args ...i
236241

237242
func (wt *workflowTester[TResult]) Execute(args ...interface{}) {
238243
// Start workflow under test
239-
wt.testWorkflows = append(wt.testWorkflows, &testWorkflow{
240-
instance: wt.wfi,
241-
pendingEvents: []history.Event{wt.getInitialEvent(wt.wf, args)},
242-
history: make([]history.Event, 0),
243-
})
244+
initialEvent := wt.getInitialEvent(wt.wf, args)
245+
wt.addWorkflow(wt.wfi, initialEvent)
244246

245247
for !wt.workflowFinished {
246248
// Execute all workflows until no more events?
@@ -356,23 +358,10 @@ func (wt *workflowTester[TResult]) Execute(args ...interface{}) {
356358
}
357359

358360
func (wt *workflowTester[TResult]) sendEvent(wfi *core.WorkflowInstance, event history.Event) {
359-
var w *testWorkflow
360-
for _, tw := range wt.testWorkflows {
361-
if tw.instance.InstanceID == wfi.InstanceID {
362-
w = tw
363-
break
364-
}
365-
}
361+
w := wt.getWorkflow(wfi)
366362

367363
if w == nil {
368-
// Workflow not mocked, create new instance
369-
w = &testWorkflow{
370-
instance: wfi,
371-
history: []history.Event{},
372-
pendingEvents: []history.Event{},
373-
}
374-
375-
wt.testWorkflows = append(wt.testWorkflows, w)
364+
panic(fmt.Sprintf("tried to send event to instance %s which does not exist", wfi.InstanceID))
376365
}
377366

378367
w.pendingEvents = append(w.pendingEvents, event)
@@ -382,7 +371,11 @@ func (wt *workflowTester[TResult]) SignalWorkflow(name string, value interface{}
382371
wt.SignalWorkflowInstance(wt.wfi, name, value)
383372
}
384373

385-
func (wt *workflowTester[TResult]) SignalWorkflowInstance(wfi *core.WorkflowInstance, name string, value interface{}) {
374+
func (wt *workflowTester[TResult]) SignalWorkflowInstance(wfi *core.WorkflowInstance, name string, value interface{}) error {
375+
if wt.getWorkflow(wfi) == nil {
376+
return backend.ErrInstanceNotFound
377+
}
378+
386379
arg, err := converter.DefaultConverter.To(value)
387380
if err != nil {
388381
panic("Could not convert signal value to string" + err.Error())
@@ -403,6 +396,8 @@ func (wt *workflowTester[TResult]) SignalWorkflowInstance(wfi *core.WorkflowInst
403396
HistoryEvent: e,
404397
}
405398
}
399+
400+
return nil
406401
}
407402

408403
func (wt *workflowTester[TResult]) WorkflowFinished() bool {
@@ -538,6 +533,28 @@ func (wt *workflowTester[TResult]) scheduleTimer(instance *core.WorkflowInstance
538533
})
539534
}
540535

536+
func (wt *workflowTester[TResult]) getWorkflow(instance *core.WorkflowInstance) *testWorkflow {
537+
wt.mtw.RLock()
538+
defer wt.mtw.RUnlock()
539+
540+
return wt.testWorkflowsByInstanceID[instance.InstanceID]
541+
}
542+
543+
func (wt *workflowTester[TResult]) addWorkflow(instance *core.WorkflowInstance, initialEvent history.Event) *testWorkflow {
544+
wt.mtw.Lock()
545+
defer wt.mtw.Unlock()
546+
547+
tw := &testWorkflow{
548+
instance: instance,
549+
pendingEvents: []history.Event{initialEvent},
550+
history: make([]history.Event, 0),
551+
}
552+
wt.testWorkflows = append(wt.testWorkflows, tw)
553+
wt.testWorkflowsByInstanceID[instance.InstanceID] = tw
554+
555+
return tw
556+
}
557+
541558
func (wt *workflowTester[TResult]) scheduleSubWorkflow(event history.WorkflowEvent) {
542559
a := event.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes)
543560

@@ -568,7 +585,7 @@ func (wt *workflowTester[TResult]) scheduleSubWorkflow(event history.WorkflowEve
568585

569586
if !wt.mockedWorkflows[a.Name] {
570587
// Workflow not mocked, allow event to be processed
571-
wt.sendEvent(event.WorkflowInstance, event.HistoryEvent)
588+
wt.addWorkflow(event.WorkflowInstance, event.HistoryEvent)
572589
return
573590
}
574591

@@ -646,9 +663,7 @@ type signaler[T any] struct {
646663
}
647664

648665
func (s *signaler[T]) SignalWorkflow(ctx context.Context, instanceID string, name string, arg interface{}) error {
649-
s.wt.SignalWorkflowInstance(core.NewWorkflowInstance(instanceID, ""), name, arg)
650-
651-
return nil
666+
return s.wt.SignalWorkflowInstance(core.NewWorkflowInstance(instanceID, ""), name, arg)
652667
}
653668

654669
var _ signals.Signaler = (*signaler[any])(nil)

tester/tester_subworkflow_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func Test_SubWorkflow_Signals(t *testing.T) {
140140
})
141141

142142
tester.ScheduleCallback(time.Millisecond, func() {
143-
tester.SignalWorkflowInstance(subWorkflowInstance, "subworkflow-signal", "42")
143+
require.Nil(t, tester.SignalWorkflowInstance(subWorkflowInstance, "subworkflow-signal", "42"))
144144
})
145145

146146
tester.Execute("hello")

tester/tester_test.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ func Test_SignalSubWorkflowBeforeScheduling(t *testing.T) {
271271

272272
require.True(t, tester.WorkflowFinished())
273273
wfR, wfErr := tester.WorkflowResult()
274-
require.Empty(t, wfErr)
274+
require.Equal(t, backend.ErrInstanceNotFound.Error(), wfErr)
275275
require.IsType(t, "", wfR)
276276
}
277277

@@ -281,10 +281,6 @@ func workflowSubWorkFlowsAndSignals(ctx workflow.Context) (string, error) {
281281
return "", err
282282
}
283283

284-
workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{
285-
InstanceID: "subworkflow",
286-
}, workflowSum, 1, 2).Get(ctx)
287-
288284
return "finished without errors!", nil
289285
}
290286

0 commit comments

Comments
 (0)