Skip to content

Commit 26d75c9

Browse files
authored
Merge pull request #151 from cschleiden/cschleiden/handle-signal-errors
Deliver signals from within workflows as activities
2 parents af7b6ed + c75152d commit 26d75c9

File tree

18 files changed

+114
-292
lines changed

18 files changed

+114
-292
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ func Workflow(ctx workflow.Context) error {
352352
353353
```go
354354
func Workflow(ctx workflow.Context) error {
355-
if err := workflow.SignalWorkflow(ctx, "sub-instance-id", "signal-name", "value"); err != nil {
355+
if _, err := workflow.SignalWorkflow(ctx, "sub-instance-id", "signal-name", "value").Get(ctx); err != nil {
356356
// Handle error
357357
}
358358
}

backend/backend.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ type Backend interface {
3434
GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance, lastSequenceID *int64) ([]history.Event, error)
3535

3636
// SignalWorkflow signals a running workflow instance
37+
//
38+
// If the given instance does not exist, it will return an error
3739
SignalWorkflow(ctx context.Context, instanceID string, event history.Event) error
3840

3941
// GetWorkflowInstance returns a pending workflow task or nil if there are no pending worflow executions

backend/test/e2e.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
331331
InstanceID: id,
332332
}, swf, 1)
333333

334-
if err := workflow.SignalWorkflow(ctx, id, "signal", "hello"); err != nil {
334+
if _, err := workflow.SignalWorkflow(ctx, id, "signal", "hello").Get(ctx); err != nil {
335335
return 0, err
336336
}
337337

@@ -346,6 +346,30 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
346346
require.Equal(t, 2, r)
347347
},
348348
},
349+
{
350+
name: "SubWorkflow_Signal_BeforeStarting",
351+
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {
352+
wf := func(ctx workflow.Context) (int, error) {
353+
id, _ := workflow.SideEffect(ctx, func(ctx workflow.Context) string {
354+
id := uuid.New().String()
355+
workflow.Logger(ctx).Warn("side effect", "id", id)
356+
return id
357+
}).Get(ctx)
358+
359+
if _, err := workflow.SignalWorkflow(ctx, id, "signal", "hello").Get(ctx); err != nil {
360+
return 0, err
361+
}
362+
363+
return 42, nil
364+
}
365+
register(t, ctx, w, []interface{}{wf}, nil)
366+
367+
instance := runWorkflow(t, ctx, c, wf)
368+
369+
_, err := client.GetWorkflowResult[int](ctx, c, instance, time.Second*20)
370+
require.ErrorContains(t, err, backend.ErrInstanceNotFound.Error())
371+
},
372+
},
349373
{
350374
name: "Timer_CancelWorkflowInstance",
351375
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {

internal/command/signal.go

Lines changed: 0 additions & 83 deletions
This file was deleted.

internal/command/signal_test.go

Lines changed: 0 additions & 44 deletions
This file was deleted.

internal/history/history.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,6 @@ const (
5353

5454
// Recorded result of a side-efect
5555
EventType_SideEffectResult
56-
57-
// Signal other workflow
58-
EventType_SignalWorkflow
5956
)
6057

6158
func (et EventType) String() string {
@@ -101,9 +98,6 @@ func (et EventType) String() string {
10198
case EventType_SideEffectResult:
10299
return "SideEffectResult"
103100

104-
case EventType_SignalWorkflow:
105-
return "WorkflowSignalRequested"
106-
107101
default:
108102
return "Unknown"
109103
}

internal/history/serialization.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,6 @@ func DeserializeAttributes(eventType EventType, attributes []byte) (attr interfa
7373
case EventType_SubWorkflowFailed:
7474
attr = &SubWorkflowFailedAttributes{}
7575

76-
case EventType_SignalWorkflow:
77-
attr = &SignalWorkflowAttributes{}
78-
7976
default:
8077
return nil, errors.New("unknown event type when deserializing attributes")
8178
}

internal/history/signal_workflow.go

Lines changed: 0 additions & 8 deletions
This file was deleted.

internal/signals/activities.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package signals
2+
3+
import (
4+
"context"
5+
)
6+
7+
type Signaler interface {
8+
SignalWorkflow(ctx context.Context, instanceID string, name string, arg interface{}) error
9+
}
10+
11+
type Activities struct {
12+
Signaler Signaler
13+
}
14+
15+
func (a *Activities) DeliverWorkflowSignal(ctx context.Context, instanceID, signalName string, arg interface{}) error {
16+
return a.Signaler.SignalWorkflow(ctx, instanceID, signalName, arg)
17+
}

internal/worker/workflow.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,11 +192,8 @@ func (ww *WorkflowWorker) getExecutor(ctx context.Context, t *task.Workflow) (wo
192192
}
193193

194194
if !ok {
195-
executor, err = workflow.NewExecutor(
195+
executor = workflow.NewExecutor(
196196
ww.backend.Logger(), ww.backend.Tracer(), ww.registry, ww.backend, t.WorkflowInstance, clock.New())
197-
if err != nil {
198-
return nil, fmt.Errorf("creating workflow executor: %w", err)
199-
}
200197
}
201198

202199
// Cache executor instance for future continuation tasks, or refresh last access time

0 commit comments

Comments
 (0)