Skip to content

Commit 746569d

Browse files
committed
Deliver signals to sub-workflows as activities
1 parent 07c5fe9 commit 746569d

File tree

12 files changed

+100
-187
lines changed

12 files changed

+100
-187
lines changed

backend/test/e2e.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
331331
InstanceID: id,
332332
}, swf, 1)
333333

334-
if err := workflow.SignalWorkflow(ctx, id, "signal", "hello"); err != nil {
334+
if _, err := workflow.SignalWorkflow(ctx, id, "signal", "hello").Get(ctx); err != nil {
335335
return 0, err
336336
}
337337

@@ -346,6 +346,30 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
346346
require.Equal(t, 2, r)
347347
},
348348
},
349+
{
350+
name: "SubWorkflow_Signal_BeforeStarting",
351+
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {
352+
wf := func(ctx workflow.Context) (int, error) {
353+
id, _ := workflow.SideEffect(ctx, func(ctx workflow.Context) string {
354+
id := uuid.New().String()
355+
workflow.Logger(ctx).Warn("side effect", "id", id)
356+
return id
357+
}).Get(ctx)
358+
359+
if _, err := workflow.SignalWorkflow(ctx, id, "signal", "hello").Get(ctx); err != nil {
360+
return 0, err
361+
}
362+
363+
return 42, nil
364+
}
365+
register(t, ctx, w, []interface{}{wf}, nil)
366+
367+
instance := runWorkflow(t, ctx, c, wf)
368+
369+
_, err := client.GetWorkflowResult[int](ctx, c, instance, time.Second*20)
370+
require.ErrorContains(t, err, backend.ErrInstanceNotFound.Error())
371+
},
372+
},
349373
{
350374
name: "Timer_CancelWorkflowInstance",
351375
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {

internal/command/signal.go

Lines changed: 0 additions & 83 deletions
This file was deleted.

internal/command/signal_test.go

Lines changed: 0 additions & 44 deletions
This file was deleted.

internal/history/history.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,6 @@ const (
5353

5454
// Recorded result of a side-efect
5555
EventType_SideEffectResult
56-
57-
// Signal other workflow
58-
EventType_SignalWorkflow
5956
)
6057

6158
func (et EventType) String() string {
@@ -101,9 +98,6 @@ func (et EventType) String() string {
10198
case EventType_SideEffectResult:
10299
return "SideEffectResult"
103100

104-
case EventType_SignalWorkflow:
105-
return "WorkflowSignalRequested"
106-
107101
default:
108102
return "Unknown"
109103
}

internal/history/serialization.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,6 @@ func DeserializeAttributes(eventType EventType, attributes []byte) (attr interfa
7373
case EventType_SubWorkflowFailed:
7474
attr = &SubWorkflowFailedAttributes{}
7575

76-
case EventType_SignalWorkflow:
77-
attr = &SignalWorkflowAttributes{}
78-
7976
default:
8077
return nil, errors.New("unknown event type when deserializing attributes")
8178
}

internal/history/signal_workflow.go

Lines changed: 0 additions & 8 deletions
This file was deleted.

internal/signals/activities.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package signals
2+
3+
import (
4+
"context"
5+
)
6+
7+
type Signaler interface {
8+
SignalWorkflow(ctx context.Context, instanceID string, name string, arg interface{}) error
9+
}
10+
11+
type Activities struct {
12+
Signaler Signaler
13+
}
14+
15+
func (a *Activities) DeliverWorkflowSignal(ctx context.Context, instanceID, signalName string, arg interface{}) error {
16+
return a.Signaler.SignalWorkflow(ctx, instanceID, signalName, arg)
17+
}

internal/workflow/executor.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -320,9 +320,6 @@ func (e *executor) executeEvent(event history.Event) error {
320320
case history.EventType_SubWorkflowCompleted:
321321
err = e.handleSubWorkflowCompleted(event, event.Attributes.(*history.SubWorkflowCompletedAttributes))
322322

323-
case history.EventType_SignalWorkflow:
324-
err = e.handleSignalWorkflow(event, event.Attributes.(*history.SignalWorkflowAttributes))
325-
326323
default:
327324
return fmt.Errorf("unknown event type: %v", event.Type)
328325
}
@@ -554,12 +551,10 @@ func (e *executor) handleSubWorkflowFailed(event history.Event, a *history.SubWo
554551

555552
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
556553
if c == nil {
557-
// TODO: Adjust
558554
return fmt.Errorf("previous workflow execution scheduled a sub-workflow execution")
559555
}
560556

561557
if _, ok := c.(*command.ScheduleSubWorkflowCommand); !ok {
562-
// TODO: Adjust
563558
return fmt.Errorf("previous workflow execution cancelled a sub-workflow execution, not: %v", c.Type())
564559
}
565560

@@ -582,12 +577,10 @@ func (e *executor) handleSubWorkflowCompleted(event history.Event, a *history.Su
582577

583578
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
584579
if c == nil {
585-
// TODO: Adjust
586580
return fmt.Errorf("previous workflow execution cancelled a sub-workflow execution")
587581
}
588582

589583
if _, ok := c.(*command.ScheduleSubWorkflowCommand); !ok {
590-
// TODO: Adjust
591584
return fmt.Errorf("previous workflow execution cancelled a sub-workflow execution, not: %v", c.Type())
592585
}
593586

@@ -603,22 +596,6 @@ func (e *executor) handleSignalReceived(event history.Event, a *history.SignalRe
603596
return e.workflow.Continue()
604597
}
605598

606-
func (e *executor) handleSignalWorkflow(event history.Event, a *history.SignalWorkflowAttributes) error {
607-
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
608-
if c == nil {
609-
return fmt.Errorf("previous workflow execution requested a signal")
610-
}
611-
612-
sewc, ok := c.(*command.SignalWorkflowCommand)
613-
if !ok {
614-
return fmt.Errorf("previous workflow execution requested to signal a workflow, not: %v", c.Type())
615-
}
616-
617-
sewc.Done()
618-
619-
return e.workflow.Continue()
620-
}
621-
622599
func (e *executor) handleSideEffectResult(event history.Event, a *history.SideEffectResultAttributes) error {
623600
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
624601
if c == nil {

tester/tester.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cschleiden/go-workflows/internal/history"
2020
"github.com/cschleiden/go-workflows/internal/logger"
2121
"github.com/cschleiden/go-workflows/internal/payload"
22+
"github.com/cschleiden/go-workflows/internal/signals"
2223
"github.com/cschleiden/go-workflows/internal/task"
2324
"github.com/cschleiden/go-workflows/internal/workflow"
2425
"github.com/cschleiden/go-workflows/log"
@@ -66,7 +67,7 @@ type WorkflowTester[TResult any] interface {
6667
}
6768

6869
type testTimer struct {
69-
// At is the timer this timer is scheduled for. This will advance the mock clock
70+
// At is the time this timer is scheduled for. This will advance the mock clock
7071
// to this timestamp
7172
At time.Time
7273

@@ -184,6 +185,10 @@ func NewWorkflowTester[TResult any](wf interface{}, opts ...WorkflowTesterOption
184185
tracer: tracer,
185186
}
186187

188+
// Register internal activities
189+
signalActivities := &signals.Activities{Signaler: &signaler[TResult]{wt}}
190+
registry.RegisterActivity(signalActivities)
191+
187192
// Always register the workflow under test
188193
if err := wt.registry.RegisterWorkflow(wf); err != nil {
189194
panic(fmt.Sprintf("could not workflow under test: %v", err))
@@ -635,3 +640,13 @@ func getNextWorkflowTask(wfi *core.WorkflowInstance, history []history.Event, ne
635640
NewEvents: newEvents,
636641
}
637642
}
643+
644+
type signaler[T any] struct {
645+
wt *workflowTester[T]
646+
}
647+
648+
func (s *signaler[T]) SignalWorkflow(ctx context.Context, instanceID string, name string, arg interface{}) error {
649+
return nil
650+
}
651+
652+
var _ signals.Signaler = (*signaler[any])(nil)

tester/tester_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"testing"
77
"time"
88

9+
"github.com/cschleiden/go-workflows/backend"
910
"github.com/cschleiden/go-workflows/internal/sync"
1011
"github.com/cschleiden/go-workflows/workflow"
1112
"github.com/stretchr/testify/mock"
@@ -234,3 +235,31 @@ func workflowSignal(ctx workflow.Context) (string, error) {
234235

235236
return val, nil
236237
}
238+
239+
func Test_SignalSubWorkflowBeforeScheduling(t *testing.T) {
240+
tester := NewWorkflowTester[string](workflowSubWorkFlowsAndSignals)
241+
242+
tester.Execute()
243+
244+
require.True(t, tester.WorkflowFinished())
245+
wfR, wfErr := tester.WorkflowResult()
246+
require.Empty(t, wfErr)
247+
require.IsType(t, "", wfR)
248+
}
249+
250+
func workflowSubWorkFlowsAndSignals(ctx workflow.Context) (string, error) {
251+
_, err := workflow.SignalWorkflow(ctx, "subworkflow", "test", "").Get(ctx)
252+
if err != backend.ErrInstanceNotFound {
253+
return "", err
254+
}
255+
256+
workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{
257+
InstanceID: "subworkflow",
258+
}, workflowSum, 1, 2).Get(ctx)
259+
260+
return "finished without errors!", nil
261+
}
262+
263+
func workflowSum(ctx workflow.Context, valA, valB int) (int, error) {
264+
return valA + valB, nil
265+
}

0 commit comments

Comments
 (0)