Skip to content

Commit 761e5e8

Browse files
authored
Merge pull request #122 from cschleiden/signal-other-workflows
Signal other workflows
2 parents f8ffcd7 + df5d91e commit 761e5e8

File tree

15 files changed

+344
-35
lines changed

15 files changed

+344
-35
lines changed

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,16 @@ func Workflow(ctx workflow.Context) error {
348348
}
349349
```
350350
351+
#### Signaling workflows from within workflows
352+
353+
```go
354+
func Workflow(ctx workflow.Context) error {
355+
if err := workflow.SignalWorkflow(ctx, "sub-instance-id", "signal-name", "value"); err != nil {
356+
// Handle error
357+
}
358+
}
359+
```
360+
351361
### Executing side effects
352362
353363
Sometimes scheduling an activity is too much overhead for a simple side effect. For those scenarios you can use `workflow.SideEffect`. You can pass a func which will be executed only once inline with its result being recorded in the history. Subsequent executions of the workflow will return the previously recorded result.

backend/mysql/mysql.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -510,22 +510,27 @@ func (b *mysqlBackend) CompleteWorkflowTask(
510510
}
511511

512512
// Insert new workflow events
513-
groupedEvents := history.EventsByWorkflowInstance(workflowEvents)
513+
groupedEvents := history.EventsByWorkflowInstanceID(workflowEvents)
514514

515-
for targetInstance, events := range groupedEvents {
516-
for _, event := range events {
517-
if event.Type == history.EventType_WorkflowExecutionStarted {
518-
a := event.Attributes.(*history.ExecutionStartedAttributes)
515+
for targetInstanceID, events := range groupedEvents {
516+
for _, m := range events {
517+
if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted {
518+
a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes)
519519
// Create new instance
520-
if err := createInstance(ctx, tx, &targetInstance, a.Metadata, true); err != nil {
520+
if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata, true); err != nil {
521521
return err
522522
}
523523

524524
break
525525
}
526526
}
527527

528-
if err := insertPendingEvents(ctx, tx, targetInstance.InstanceID, events); err != nil {
528+
historyEvents := []history.Event{}
529+
for _, m := range events {
530+
historyEvents = append(historyEvents, m.HistoryEvent)
531+
}
532+
533+
if err := insertPendingEvents(ctx, tx, targetInstanceID, historyEvents); err != nil {
529534
return fmt.Errorf("inserting messages: %w", err)
530535
}
531536
}

backend/redis/workflow.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -201,29 +201,29 @@ func (rb *redisBackend) CompleteWorkflowTask(
201201
}
202202

203203
// Send new workflow events to the respective streams
204-
groupedEvents := history.EventsByWorkflowInstance(workflowEvents)
205-
for targetInstance, events := range groupedEvents {
204+
groupedEvents := history.EventsByWorkflowInstanceID(workflowEvents)
205+
for targetInstanceID, events := range groupedEvents {
206206
// Insert pending events for target instance
207-
for _, event := range events {
208-
event := event
207+
for _, m := range events {
208+
m := m
209209

210-
if event.Type == history.EventType_WorkflowExecutionStarted {
210+
if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted {
211211
// Create new instance
212-
a := event.Attributes.(*history.ExecutionStartedAttributes)
213-
if err := createInstanceP(ctx, p, &targetInstance, a.Metadata, true); err != nil {
212+
a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes)
213+
if err := createInstanceP(ctx, p, m.WorkflowInstance, a.Metadata, true); err != nil {
214214
return err
215215
}
216216
}
217217

218218
// Add pending event to stream
219-
if err := addEventToStreamP(ctx, p, pendingEventsKey(targetInstance.InstanceID), &event); err != nil {
219+
if err := addEventToStreamP(ctx, p, pendingEventsKey(targetInstanceID), &m.HistoryEvent); err != nil {
220220
return err
221221
}
222222
}
223223

224224
// Try to queue workflow task
225-
if targetInstance != *instance {
226-
if err := rb.workflowQueue.Enqueue(ctx, p, targetInstance.InstanceID, nil); err != nil {
225+
if targetInstanceID != instance.InstanceID {
226+
if err := rb.workflowQueue.Enqueue(ctx, p, targetInstanceID, nil); err != nil {
227227
return fmt.Errorf("enqueuing workflow task: %w", err)
228228
}
229229
}

backend/sqlite/sqlite.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -399,14 +399,14 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
399399
}
400400

401401
// Insert new workflow events
402-
groupedEvents := history.EventsByWorkflowInstance(workflowEvents)
402+
groupedEvents := history.EventsByWorkflowInstanceID(workflowEvents)
403403

404-
for targetInstance, events := range groupedEvents {
405-
for _, event := range events {
406-
if event.Type == history.EventType_WorkflowExecutionStarted {
407-
a := event.Attributes.(*history.ExecutionStartedAttributes)
404+
for targetInstanceID, events := range groupedEvents {
405+
for _, m := range events {
406+
if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted {
407+
a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes)
408408
// Create new instance
409-
if err := createInstance(ctx, tx, &targetInstance, a.Metadata, true); err != nil {
409+
if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata, true); err != nil {
410410
return err
411411
}
412412

@@ -415,7 +415,11 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
415415
}
416416

417417
// Insert pending events for target instance
418-
if err := insertPendingEvents(ctx, tx, targetInstance.InstanceID, events); err != nil {
418+
historyEvents := []history.Event{}
419+
for _, m := range events {
420+
historyEvents = append(historyEvents, m.HistoryEvent)
421+
}
422+
if err := insertPendingEvents(ctx, tx, targetInstanceID, historyEvents); err != nil {
419423
return fmt.Errorf("inserting messages: %w", err)
420424
}
421425
}

backend/test/e2e.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,40 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
288288
require.Equal(t, backend.ErrInstanceNotFound, err)
289289
},
290290
},
291+
{
292+
name: "SubWorkflow_Signal",
293+
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {
294+
swf := func(ctx workflow.Context, i int) (int, error) {
295+
workflow.NewSignalChannel[string](ctx, "signal").Receive(ctx)
296+
297+
return i * 2, nil
298+
}
299+
wf := func(ctx workflow.Context) (int, error) {
300+
id, _ := workflow.SideEffect(ctx, func(ctx workflow.Context) string {
301+
id := uuid.New().String()
302+
workflow.Logger(ctx).Warn("side effect", "id", id)
303+
return id
304+
}).Get(ctx)
305+
306+
f := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{
307+
InstanceID: id,
308+
}, swf, 1)
309+
310+
if err := workflow.SignalWorkflow(ctx, id, "signal", "hello"); err != nil {
311+
return 0, err
312+
}
313+
314+
return f.Get(ctx)
315+
}
316+
register(t, ctx, w, []interface{}{wf, swf}, nil)
317+
318+
instance := runWorkflow(t, ctx, c, wf)
319+
320+
r, err := client.GetWorkflowResult[int](ctx, c, instance, time.Second*20)
321+
require.NoError(t, err)
322+
require.Equal(t, 2, r)
323+
},
324+
},
291325
{
292326
name: "Timer_CancelWorkflowInstance",
293327
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {

internal/command/signal.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package command
2+
3+
import (
4+
"github.com/benbjohnson/clock"
5+
"github.com/cschleiden/go-workflows/internal/core"
6+
"github.com/cschleiden/go-workflows/internal/history"
7+
"github.com/cschleiden/go-workflows/internal/payload"
8+
)
9+
10+
type SignalWorkflowCommand struct {
11+
command
12+
13+
Instance *core.WorkflowInstance
14+
15+
Name string
16+
Arg payload.Payload
17+
}
18+
19+
var _ Command = (*SignalWorkflowCommand)(nil)
20+
21+
func NewSignalWorkflowCommand(
22+
id int64, workflowInstanceID, name string, arg payload.Payload,
23+
) *SignalWorkflowCommand {
24+
return &SignalWorkflowCommand{
25+
command: command{
26+
state: CommandState_Pending,
27+
id: id,
28+
},
29+
30+
Instance: core.NewWorkflowInstance(workflowInstanceID, ""), // TODO: Do we need a special identifier for an empty execution id?
31+
32+
Name: name,
33+
Arg: arg,
34+
}
35+
}
36+
37+
func (*SignalWorkflowCommand) Type() string {
38+
return "WorkflowSignal"
39+
}
40+
41+
func (c *SignalWorkflowCommand) Commit(clock clock.Clock) *CommandResult {
42+
c.commit()
43+
44+
return &CommandResult{
45+
// Record signal requested
46+
Events: []history.Event{
47+
history.NewPendingEvent(
48+
clock.Now(),
49+
history.EventType_SignalWorkflow,
50+
&history.SignalWorkflowAttributes{
51+
Name: c.Name,
52+
Arg: c.Arg,
53+
},
54+
history.ScheduleEventID(c.id),
55+
),
56+
},
57+
// Send event to workflow instance
58+
WorkflowEvents: []history.WorkflowEvent{
59+
{
60+
WorkflowInstance: c.Instance,
61+
HistoryEvent: history.NewPendingEvent(
62+
clock.Now(),
63+
history.EventType_SignalReceived,
64+
&history.SignalReceivedAttributes{
65+
Name: c.Name,
66+
Arg: c.Arg,
67+
},
68+
),
69+
},
70+
},
71+
}
72+
}

internal/history/grouping.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
11
package history
22

3-
import "github.com/cschleiden/go-workflows/internal/core"
4-
5-
func EventsByWorkflowInstance(events []WorkflowEvent) map[core.WorkflowInstance][]Event {
6-
groupedEvents := make(map[core.WorkflowInstance][]Event)
3+
func EventsByWorkflowInstanceID(events []WorkflowEvent) map[string][]WorkflowEvent {
4+
groupedEvents := make(map[string][]WorkflowEvent)
75

86
for _, m := range events {
97
instance := *m.WorkflowInstance
108

11-
if _, ok := groupedEvents[instance]; !ok {
12-
groupedEvents[instance] = []Event{}
9+
if _, ok := groupedEvents[instance.InstanceID]; !ok {
10+
groupedEvents[instance.InstanceID] = []WorkflowEvent{}
1311
}
1412

15-
groupedEvents[instance] = append(groupedEvents[instance], m.HistoryEvent)
13+
groupedEvents[instance.InstanceID] = append(groupedEvents[instance.InstanceID], m)
1614
}
1715

1816
return groupedEvents

internal/history/grouping_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,19 @@ import (
1212
func TestGrouping_MultipleEventsSameInstance(t *testing.T) {
1313
id := uuid.NewString()
1414

15-
r := EventsByWorkflowInstance([]WorkflowEvent{
15+
r := EventsByWorkflowInstanceID([]WorkflowEvent{
1616
{
1717
WorkflowInstance: core.NewWorkflowInstance(id, "exid"),
1818
HistoryEvent: NewPendingEvent(time.Now(), EventType_SubWorkflowScheduled, &SubWorkflowScheduledAttributes{}),
1919
},
2020
{
21-
WorkflowInstance: core.NewWorkflowInstance(id, "exid"),
22-
HistoryEvent: NewPendingEvent(time.Now(), EventType_SubWorkflowScheduled, &SubWorkflowScheduledAttributes{}),
21+
WorkflowInstance: core.NewWorkflowInstance(id, ""),
22+
HistoryEvent: NewPendingEvent(time.Now(), EventType_SignalReceived, &SubWorkflowScheduledAttributes{}),
2323
},
2424
})
2525

2626
require.Len(t, r, 1)
27-
require.Len(t, r[*core.NewWorkflowInstance(id, "exid")], 2)
27+
require.Len(t, r[id], 2)
28+
require.Equal(t, r[id][0].HistoryEvent.Type, EventType_SubWorkflowScheduled)
29+
require.Equal(t, r[id][1].HistoryEvent.Type, EventType_SignalReceived)
2830
}

internal/history/history.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,50 @@ type EventType uint
1212
const (
1313
_ EventType = iota
1414

15+
// Workflow has started
1516
EventType_WorkflowExecutionStarted
17+
// Workflow has finished
1618
EventType_WorkflowExecutionFinished
19+
// Workflow has been terminated (not yet used)
1720
EventType_WorkflowExecutionTerminated
21+
// Workflow has been canceled
1822
EventType_WorkflowExecutionCanceled
1923

24+
// Workflow task has been started. This event is added to the history every time a workflow task is
25+
// picked up by the worker.
2026
EventType_WorkflowTaskStarted
2127

28+
// SubWorkflow has been scheduled
2229
EventType_SubWorkflowScheduled
30+
// SubWorkflow cancellation has been requested
2331
EventType_SubWorkflowCancellationRequested
32+
// SubWorkflow has completed
2433
EventType_SubWorkflowCompleted
34+
// SubWorkflow has failed
2535
EventType_SubWorkflowFailed
2636

37+
// Activity task has been scheduled
2738
EventType_ActivityScheduled
39+
// Activity task has been completed
2840
EventType_ActivityCompleted
41+
// Activity task has failed
2942
EventType_ActivityFailed
3043

44+
// Timer has been scheduled
3145
EventType_TimerScheduled
46+
// Timer has fired. This is the event received by a workflow when a previously scheduled timer fires.
3247
EventType_TimerFired
48+
// Timer has been canceled.
3349
EventType_TimerCanceled
3450

51+
// Workflow has received a signal
3552
EventType_SignalReceived
3653

54+
// Recorded result of a side-efect
3755
EventType_SideEffectResult
56+
57+
// Signal other workflow
58+
EventType_SignalWorkflow
3859
)
3960

4061
func (et EventType) String() string {
@@ -79,6 +100,10 @@ func (et EventType) String() string {
79100

80101
case EventType_SideEffectResult:
81102
return "SideEffectResult"
103+
104+
case EventType_SignalWorkflow:
105+
return "WorkflowSignalRequested"
106+
82107
default:
83108
return "Unknown"
84109
}

internal/history/serialization.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ func DeserializeAttributes(eventType EventType, attributes []byte) (attr interfa
7373
case EventType_SubWorkflowFailed:
7474
attr = &SubWorkflowFailedAttributes{}
7575

76+
case EventType_SignalWorkflow:
77+
attr = &SignalWorkflowAttributes{}
78+
7679
default:
7780
return nil, errors.New("unknown event type when deserializing attributes")
7881
}

0 commit comments

Comments
 (0)