Skip to content

Commit a988f49

Browse files
committed
Support sending signals to other workflows
1 parent f8ffcd7 commit a988f49

File tree

9 files changed

+297
-0
lines changed

9 files changed

+297
-0
lines changed

backend/test/e2e.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,38 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
288288
require.Equal(t, backend.ErrInstanceNotFound, err)
289289
},
290290
},
291+
{
292+
name: "SubWorkflow_Signal",
293+
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {
294+
swf := func(ctx workflow.Context, i int) (int, error) {
295+
workflow.NewSignalChannel[string](ctx, "signal").Receive(ctx)
296+
297+
return i * 2, nil
298+
}
299+
wf := func(ctx workflow.Context) (int, error) {
300+
id, _ := workflow.SideEffect(ctx, func(ctx workflow.Context) string {
301+
return uuid.New().String()
302+
}).Get(ctx)
303+
304+
f := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{
305+
InstanceID: id,
306+
}, swf, 1)
307+
308+
if err := workflow.SignalWorkflow(ctx, id, "signal", "hello"); err != nil {
309+
return 0, err
310+
}
311+
312+
return f.Get(ctx)
313+
}
314+
register(t, ctx, w, []interface{}{wf, swf}, nil)
315+
316+
instance := runWorkflow(t, ctx, c, wf)
317+
318+
r, err := client.GetWorkflowResult[int](ctx, c, instance, time.Second*20)
319+
require.NoError(t, err)
320+
require.Equal(t, 2, r)
321+
},
322+
},
291323
{
292324
name: "Timer_CancelWorkflowInstance",
293325
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {

internal/command/signal.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package command
2+
3+
import (
4+
"github.com/benbjohnson/clock"
5+
"github.com/cschleiden/go-workflows/internal/core"
6+
"github.com/cschleiden/go-workflows/internal/history"
7+
"github.com/cschleiden/go-workflows/internal/payload"
8+
)
9+
10+
type SignalWorkflowCommand struct {
11+
command
12+
13+
Instance *core.WorkflowInstance
14+
15+
Name string
16+
Arg payload.Payload
17+
}
18+
19+
var _ Command = (*SignalWorkflowCommand)(nil)
20+
21+
func NewSignalWorkflowCommand(
22+
id int64, workflowInstanceID, name string, arg payload.Payload,
23+
) *SignalWorkflowCommand {
24+
return &SignalWorkflowCommand{
25+
command: command{
26+
state: CommandState_Pending,
27+
id: id,
28+
},
29+
30+
Instance: core.NewWorkflowInstance(workflowInstanceID, ""), // TODO: Do we need a special identifier for an empty execution id?
31+
32+
Name: name,
33+
Arg: arg,
34+
}
35+
}
36+
37+
func (*SignalWorkflowCommand) Type() string {
38+
return "WorkflowSignal"
39+
}
40+
41+
func (c *SignalWorkflowCommand) Commit(clock clock.Clock) *CommandResult {
42+
c.commit()
43+
44+
return &CommandResult{
45+
// Record signal requested
46+
Events: []history.Event{
47+
history.NewPendingEvent(
48+
clock.Now(),
49+
history.EventType_SignalWorkflow,
50+
&history.SignalWorkflowAttributes{
51+
Name: c.Name,
52+
Arg: c.Arg,
53+
},
54+
history.ScheduleEventID(c.id),
55+
),
56+
},
57+
// Send event to workflow instance
58+
WorkflowEvents: []history.WorkflowEvent{
59+
{
60+
WorkflowInstance: c.Instance,
61+
HistoryEvent: history.NewPendingEvent(
62+
clock.Now(),
63+
history.EventType_SignalReceived,
64+
&history.SignalReceivedAttributes{
65+
Name: c.Name,
66+
Arg: c.Arg,
67+
},
68+
),
69+
},
70+
},
71+
}
72+
}

internal/history/history.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,50 @@ type EventType uint
1212
const (
1313
_ EventType = iota
1414

15+
// Workflow has started
1516
EventType_WorkflowExecutionStarted
17+
// Workflow has finished
1618
EventType_WorkflowExecutionFinished
19+
// Workflow has been terminated (not yet used)
1720
EventType_WorkflowExecutionTerminated
21+
// Workflow has been canceled
1822
EventType_WorkflowExecutionCanceled
1923

24+
// Workflow task has been started. This event is added to the history every time a workflow task is
25+
// picked up by the worker.
2026
EventType_WorkflowTaskStarted
2127

28+
// SubWorkflow has been scheduled
2229
EventType_SubWorkflowScheduled
30+
// SubWorkflow cancellation has been requested
2331
EventType_SubWorkflowCancellationRequested
32+
// SubWorkflow has completed
2433
EventType_SubWorkflowCompleted
34+
// SubWorkflow has failed
2535
EventType_SubWorkflowFailed
2636

37+
// Activity task has been scheduled
2738
EventType_ActivityScheduled
39+
// Activity task has been completed
2840
EventType_ActivityCompleted
41+
// Activity task has failed
2942
EventType_ActivityFailed
3043

44+
// Timer has been scheduled
3145
EventType_TimerScheduled
46+
// Timer has fired. This is the event received by a workflow when a previously scheduled timer fires.
3247
EventType_TimerFired
48+
// Timer has been canceled.
3349
EventType_TimerCanceled
3450

51+
// Workflow has received a signal
3552
EventType_SignalReceived
3653

54+
// Recorded result of a side-efect
3755
EventType_SideEffectResult
56+
57+
// Signal other workflow
58+
EventType_SignalWorkflow
3859
)
3960

4061
func (et EventType) String() string {
@@ -79,6 +100,10 @@ func (et EventType) String() string {
79100

80101
case EventType_SideEffectResult:
81102
return "SideEffectResult"
103+
104+
case EventType_SignalWorkflow:
105+
return "WorkflowSignalRequested"
106+
82107
default:
83108
return "Unknown"
84109
}

internal/history/serialization.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ func DeserializeAttributes(eventType EventType, attributes []byte) (attr interfa
7373
case EventType_SubWorkflowFailed:
7474
attr = &SubWorkflowFailedAttributes{}
7575

76+
case EventType_SignalWorkflow:
77+
attr = &SignalWorkflowAttributes{}
78+
7679
default:
7780
return nil, errors.New("unknown event type when deserializing attributes")
7881
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package history
2+
3+
import "github.com/cschleiden/go-workflows/internal/payload"
4+
5+
type SignalWorkflowAttributes struct {
6+
Name string `json:"name,omitempty"`
7+
Arg payload.Payload `json:"arg,omitempty"`
8+
}

internal/workflow/executor.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,9 @@ func (e *executor) executeEvent(event history.Event) error {
288288
case history.EventType_SubWorkflowCompleted:
289289
err = e.handleSubWorkflowCompleted(event, event.Attributes.(*history.SubWorkflowCompletedAttributes))
290290

291+
case history.EventType_SignalWorkflow:
292+
err = e.handleSignalWorkflow(event, event.Attributes.(*history.SignalWorkflowAttributes))
293+
291294
default:
292295
return fmt.Errorf("unknown event type: %v", event.Type)
293296
}
@@ -494,6 +497,31 @@ func (e *executor) handleSignalReceived(event history.Event, a *history.SignalRe
494497
return e.workflow.Continue()
495498
}
496499

500+
func (e *executor) handleSignalWorkflow(event history.Event, a *history.SignalWorkflowAttributes) error {
501+
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
502+
if c == nil {
503+
return fmt.Errorf("previous workflow execution requested a signal")
504+
}
505+
506+
sewc, ok := c.(*command.SignalWorkflowCommand)
507+
if !ok {
508+
return fmt.Errorf("previous workflow execution requested to signal a workflow, not: %v", c.Type())
509+
}
510+
511+
sewc.Done()
512+
513+
f, ok := e.workflowState.FutureByScheduleEventID(event.ScheduleEventID)
514+
if !ok {
515+
return errors.New("no pending future found for workflow signal requested event")
516+
}
517+
518+
if err := f(nil, nil); err != nil {
519+
return fmt.Errorf("setting workflow signal requested result: %w", err)
520+
}
521+
522+
return e.workflow.Continue()
523+
}
524+
497525
func (e *executor) handleSideEffectResult(event history.Event, a *history.SideEffectResultAttributes) error {
498526
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
499527
if c == nil {

samples/runner.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
package samples
22

33
import (
4+
"context"
45
"flag"
6+
"log"
7+
"net/http"
58
"time"
69

710
"github.com/cschleiden/go-workflows/backend"
811
"github.com/cschleiden/go-workflows/backend/mysql"
912
"github.com/cschleiden/go-workflows/backend/redis"
1013
"github.com/cschleiden/go-workflows/backend/sqlite"
14+
"github.com/cschleiden/go-workflows/diag"
1115
redisv8 "github.com/go-redis/redis/v8"
1216
)
1317

@@ -34,11 +38,21 @@ func GetBackend(name string, opt ...backend.BackendOption) backend.Backend {
3438
WriteTimeout: time.Second * 30,
3539
ReadTimeout: time.Second * 30,
3640
})
41+
42+
rclient.FlushAll(context.Background()).Result()
43+
3744
b, err := redis.NewRedisBackend(rclient, redis.WithBackendOptions(opt...))
3845
if err != nil {
3946
panic(err)
4047
}
4148

49+
// Start diagnostic server under /diag
50+
m := http.NewServeMux()
51+
m.Handle("/diag/", http.StripPrefix("/diag", diag.NewServeMux(b)))
52+
go http.ListenAndServe(":3000", m)
53+
54+
log.Println("Debug UI available at http://localhost:3000/diag")
55+
4256
return b
4357

4458
default:
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"os"
8+
"os/signal"
9+
10+
"github.com/cschleiden/go-workflows/backend"
11+
"github.com/cschleiden/go-workflows/client"
12+
"github.com/cschleiden/go-workflows/samples"
13+
"github.com/cschleiden/go-workflows/worker"
14+
"github.com/cschleiden/go-workflows/workflow"
15+
"github.com/google/uuid"
16+
)
17+
18+
func main() {
19+
ctx := context.Background()
20+
21+
b := samples.GetBackend("subworkflow-signal")
22+
23+
// Run worker
24+
go RunWorker(ctx, b)
25+
26+
// Start workflow via client
27+
c := client.New(b)
28+
29+
startWorkflow(ctx, c)
30+
31+
c2 := make(chan os.Signal, 1)
32+
signal.Notify(c2, os.Interrupt)
33+
<-c2
34+
}
35+
36+
func startWorkflow(ctx context.Context, c client.Client) {
37+
subID := uuid.NewString()
38+
39+
wf, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
40+
InstanceID: uuid.NewString(),
41+
}, Workflow1, "Hello world", subID)
42+
if err != nil {
43+
panic("could not start workflow")
44+
}
45+
46+
log.Println("Started workflow", wf.InstanceID)
47+
}
48+
49+
func RunWorker(ctx context.Context, mb backend.Backend) {
50+
w := worker.New(mb, nil)
51+
52+
w.RegisterWorkflow(Workflow1)
53+
w.RegisterWorkflow(SubWorkflow1)
54+
55+
if err := w.Start(ctx); err != nil {
56+
panic("could not start worker")
57+
}
58+
}
59+
60+
func Workflow1(ctx workflow.Context, msg string, subID string) (string, error) {
61+
logger := workflow.Logger(ctx)
62+
logger.Debug("Entering Workflow1")
63+
64+
logger.Debug("Scheduling sub-workflow")
65+
fsw := workflow.CreateSubWorkflowInstance[string](ctx, workflow.SubWorkflowOptions{
66+
InstanceID: subID,
67+
}, SubWorkflow1)
68+
69+
if err := workflow.SignalWorkflow(ctx, subID, "sub-signal", 42); err != nil {
70+
return "", fmt.Errorf("could not signal sub-workflow: %w", err)
71+
}
72+
73+
r, err := fsw.Get(ctx)
74+
if err != nil {
75+
return "", fmt.Errorf("could not get sub-workflow result: %w", err)
76+
}
77+
78+
return r, nil
79+
}
80+
81+
func SubWorkflow1(ctx workflow.Context) (string, error) {
82+
logger := workflow.Logger(ctx)
83+
logger.Debug("Waiting for signal in sub-workflow")
84+
85+
c := workflow.NewSignalChannel[int](ctx, "sub-signal")
86+
c.Receive(ctx)
87+
88+
logger.Debug("Received sub-workflow signal")
89+
90+
return "World", nil
91+
}

0 commit comments

Comments
 (0)