Skip to content

Commit a20bbf5

Browse files
authored
Support sub-workflow cancellation
1 parent 153b8f3 commit a20bbf5

39 files changed

+554
-204
lines changed

.vscode/launch.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@
4141
"program": "${workspaceFolder}/samples/signal/signal.go"
4242
},
4343
{
44-
"name": "Launch cancel sample",
44+
"name": "Launch cancellation sample",
4545
"type": "go",
4646
"request": "launch",
4747
"mode": "debug",
48-
"program": "${workspaceFolder}/samples/cancelation/cancelation.go"
48+
"program": "${workspaceFolder}/samples/cancellation/cancellation.go"
4949
},
5050
{
5151
"name": "Launch subworkflow sample",

README.md

Lines changed: 63 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,57 @@ wf, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
237237
if err != nil {
238238
```
239239
240+
### Canceling workflows
241+
242+
Create a `Client` instance then then call `CancelWorkflow` to cancel a workflow. When a workflow is canceled, it's workflow context is canceled. Any subsequent calls to schedule activities or sub-workflows will immediately return an error, skipping their execution. Any activities already running when a workflow is canceled will still run to completion and their result will be available.
243+
244+
Sub-workflows will be canceled if their parent workflow is canceled.
245+
246+
```go
247+
var c client.Client
248+
err = c.CancelWorkflowInstance(context.Background(), workflowInstance)
249+
if err != nil {
250+
panic("could not cancel workflow")
251+
}
252+
```
253+
254+
#### Perform any cleanup
255+
256+
If you need to run any activities or make calls using `workflow.Context`.
257+
258+
```go
259+
func Workflow2(ctx workflow.Context, msg string) (string, error) {
260+
defer func() {
261+
if errors.Is(ctx.Err(), workflow.Canceled) {
262+
// Workflow was canceled. Get new context to perform any cleanup activities
263+
ctx := workflow.NewDisconnectedContext(ctx)
264+
265+
// Execute the cleanup activity
266+
if err := workflow.ExecuteActivity(ctx, ActivityCleanup).Get(ctx, nil); err != nil {
267+
return errors.Wrap(err, "could not perform cleanup")
268+
}
269+
}
270+
}()
271+
272+
r1, err := workflow.ExecuteActivity[int](ctx, ActivityCancel, 1, 2).Get(ctx)
273+
if err != nil { // <---- Workflow is canceled while this activity is running
274+
return errors.Wrap(err, "could not get ActivityCancel result")
275+
}
276+
277+
// r1 will contain the result of ActivityCancel
278+
// ⬇ ActivitySkip will be skipped immediately
279+
r2, err := workflow.ExecuteActivity(ctx, ActivitySkip, 1, 2).Get(ctx)
280+
if err != nil {
281+
return errors.Wrap(err, "could not get ActivitySkip result")
282+
}
283+
284+
return "some result", nil
285+
}
286+
```
287+
240288
### Running activities
241289
242-
From a workflow, call `workflow.ExecuteActivity` to execute an activity. The call returns a `Future` you can await to get the result or any error it might return.
290+
From a workflow, call `workflow.ExecuteActivity` to execute an activity. The call returns a `Future[T]` you can await to get the result or any error it might return.
243291
244292
```go
245293
r1, err := workflow.ExecuteActivity[int](ctx, Activity1, 35, 12, nil, "test").Get(ctx)
@@ -250,6 +298,10 @@ if err != nil {
250298
log.Println(r1)
251299
```
252300
301+
#### Canceling activities
302+
303+
Canceling activities is not supported at this time.
304+
253305
### Timers
254306
255307
You can schedule timers to fire at any point in the future by calling `workflow.ScheduleTimer`. It returns a `Future` you can await to wait for the timer to fire.
@@ -283,44 +335,37 @@ id, _ := workflow.SideEffect[string](ctx, func(ctx workflow.Context) string) {
283335
284336
### Running sub-workflows
285337
286-
Call `workflow.CreateSubWorkflowInstance` to start a sub-workflow.
338+
Call `workflow.CreateSubWorkflowInstance` to start a sub-workflow. The returned `Future` will resolve once the sub-workflow has finished.
287339
288340
```go
289341
func Workflow1(ctx workflow.Context, msg string) error {
290-
wr, err := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowInstanceOptions{}, Workflow2, "some input").Get(ctx, &wr)
342+
result, err := workflow.CreateSubWorkflowInstance[int]
343+
ctx, workflow.SubWorkflowInstanceOptions{}, SubWorkflow, "some input").Get(ctx)
291344
if err != nil {
292345
return errors.Wrap(err, "could not get sub workflow result")
293346
}
294347

295-
log.Println("Sub workflow result:", wr)
348+
logger.Debug("Sub workflow result:", "result", result)
349+
296350
return nil
297351
}
298352

299-
func Workflow2(ctx workflow.Context, msg string) (int, error) {
300-
r1, err := workflow.ExecuteActivity[int](ctx, Activity1, 35, 12).Get(ctx, &r1)
353+
func SubWorkflow(ctx workflow.Context, msg string) (int, error) {
354+
r1, err := workflow.ExecuteActivity[int](ctx, Activity1, 35, 12).Get(ctx)
301355
if err != nil {
302356
return "", errors.Wrap(err, "could not get activity result")
303357
}
304358

305-
log.Println("A1 result:", r1)
306-
359+
logger.Debug("A1 result:", "r1", r1)
307360
return r1, nil
308361
}
309362
```
310363
311-
### Canceling workflows
364+
#### Canceling sub-workflows
312365
313-
Create a `Client` instance then then call `CancelWorkflow` to cancel a workflow. When a workflow is canceled, it's workflow context is canceled. Any subsequent calls to schedule activities or sub-workflows will immediately return an error, skipping their execution. Activities or sub-workflows already running when a workflow is canceled will still run to completion and their result will be available.
366+
Similar to timer cancellation, you can pass a cancelable context to `CreateSubWorkflowInstance` and cancel the sub-workflow that way. Reacting to the cancellation is the same as canceling a workflow via the `Client`. See [Canceling workflows](#canceling-workflows) for more details.
314367
315-
Sub-workflows will be canceled if their parent workflow is canceled.
316368
317-
```go
318-
var c client.Client
319-
err = c.CancelWorkflowInstance(context.Background(), workflowInstance)
320-
if err != nil {
321-
panic("could not cancel workflow")
322-
}
323-
```
324369
325370
### `select`
326371
@@ -399,39 +444,6 @@ workflow.Select(
399444
)
400445
```
401446

402-
403-
#### Perform any cleanup
404-
405-
```go
406-
func Workflow2(ctx workflow.Context, msg string) (string, error) {
407-
defer func() {
408-
if errors.Is(ctx.Err(), workflow.Canceled) {
409-
// Workflow was canceled. Get new context to perform any cleanup activities
410-
ctx := workflow.NewDisconnectedContext(ctx)
411-
412-
// Execute the cleanup activity
413-
if err := workflow.ExecuteActivity(ctx, ActivityCleanup).Get(ctx, nil); err != nil {
414-
return errors.Wrap(err, "could not perform cleanup")
415-
}
416-
}
417-
}()
418-
419-
r1, err := workflow.ExecuteActivity[int](ctx, ActivityCancel, 1, 2).Get(ctx)
420-
if err != nil { // <---- Workflow is canceled while this activity is running
421-
return errors.Wrap(err, "could not get ActivityCancel result")
422-
}
423-
424-
// r1 will contain the result of ActivityCancel
425-
// ⬇ ActivitySkip will be skipped immediately
426-
r2, err := workflow.ExecuteActivity(ctx, ActivitySkip, 1, 2).Get(ctx)
427-
if err != nil {
428-
return errors.Wrap(err, "could not get ActivitySkip result")
429-
}
430-
431-
return "some result", nil
432-
}
433-
```
434-
435447
### Unit testing
436448

437449
go-workflows includes support for testing workflows, a simple example using mocked activities:

backend/mysql/mysql.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -98,30 +98,42 @@ func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *wor
9898
instanceID := instance.InstanceID
9999

100100
// Cancel workflow instance
101+
// TODO: Combine this with the event insertion
102+
res := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? LIMIT 1", instanceID)
103+
if err := res.Scan(nil); err == sql.ErrNoRows {
104+
return backend.ErrInstanceNotFound
105+
}
106+
101107
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{*event}); err != nil {
102108
return fmt.Errorf("inserting cancellation event: %w", err)
103109
}
104110

105111
// Recursively, find any sub-workflow instance to cancel
106-
for {
107-
row := tx.QueryRowContext(ctx, "SELECT instance_id FROM `instances` WHERE parent_instance_id = ? AND completed_at IS NULL LIMIT 1", instanceID)
108-
109-
var subWorkflowInstanceID string
110-
if err := row.Scan(&subWorkflowInstanceID); err != nil {
111-
if err == sql.ErrNoRows {
112-
// No more sub-workflow instances to cancel
113-
break
114-
}
112+
toCancel := []string{instance.InstanceID}
115113

116-
return fmt.Errorf("getting workflow instance for cancelling: %w", err)
117-
}
114+
for len(toCancel) > 0 {
115+
toCancelID := toCancel[0]
116+
toCancel = toCancel[1:]
118117

119-
// Cancel sub-workflow instance
120-
if err := insertNewEvents(ctx, tx, subWorkflowInstanceID, []history.Event{*event}); err != nil {
121-
return fmt.Errorf("inserting cancellation event: %w", err)
118+
rows, err := tx.QueryContext(ctx, "SELECT instance_id FROM `instances` WHERE parent_instance_id = ? AND completed_at IS NULL", toCancelID)
119+
defer rows.Close()
120+
if err != nil {
121+
return fmt.Errorf("finding sub-workflow instances: %w", err)
122122
}
123123

124-
instanceID = subWorkflowInstanceID
124+
for rows.Next() {
125+
var subWorkflowInstanceID string
126+
if err := rows.Scan(&subWorkflowInstanceID); err != nil {
127+
return fmt.Errorf("geting workflow instance for canceling: %w", err)
128+
}
129+
130+
// Cancel sub-workflow instance
131+
if err := insertNewEvents(ctx, tx, subWorkflowInstanceID, []history.Event{*event}); err != nil {
132+
return fmt.Errorf("inserting cancellation event: %w", err)
133+
}
134+
135+
toCancel = append(toCancel, subWorkflowInstanceID)
136+
}
125137
}
126138

127139
return tx.Commit()

backend/redis/instance.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,12 @@ func (rb *redisBackend) GetWorkflowInstanceState(ctx context.Context, instance *
7979
}
8080

8181
func (rb *redisBackend) CancelWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, event *history.Event) error {
82-
// Recursively, find any sub-workflow instance to cancel
82+
_, err := readInstance(ctx, rb.rdb, instance.InstanceID)
83+
if err != nil {
84+
return err
85+
}
86+
87+
// Recursively, find any sub-workflow instances to cancel
8388
toCancel := make([]*core.WorkflowInstance, 0)
8489
toCancel = append(toCancel, instance)
8590
for len(toCancel) > 0 {
@@ -94,7 +99,7 @@ func (rb *redisBackend) CancelWorkflowInstance(ctx context.Context, instance *co
9499
// Find sub-workflows
95100
subInstances, err := subWorkflowInstances(ctx, rb.rdb, instance)
96101
if err != nil {
97-
return err
102+
return fmt.Errorf("finding sub-workflow instances for cancellation: %w", err)
98103
}
99104

100105
toCancel = append(toCancel, subInstances...)

backend/redis/workflow.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
182182
if err := addFutureEvent(ctx, rb.rdb, targetInstance, &event); err != nil {
183183
return err
184184
}
185+
} else if event.Type == history.EventType_SubWorkflowCancellationRequested {
186+
if err := rb.CancelWorkflowInstance(ctx, targetInstance, &event); err != nil {
187+
return err
188+
}
185189
} else {
186190
// Add pending event to stream
187191
lastPendingMessageID, err = addEventToStream(ctx, rb.rdb, pendingEventsKey(targetInstance.InstanceID), &event)

backend/sqlite/schema.sql

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,29 @@ CREATE INDEX IF NOT EXISTS `idx_instances_locked_until_completed_at` ON `instanc
1414
CREATE INDEX IF NOT EXISTS `idx_instances_parent_instance_id` ON `instances` (`parent_instance_id`);
1515

1616
CREATE TABLE IF NOT EXISTS `pending_events` (
17-
`id` TEXT PRIMARY KEY,
17+
`id` TEXT,
1818
`sequence_id` INTEGER NOT NULL, -- not used but keep for now for query compat
1919
`instance_id` TEXT NOT NULL,
2020
`event_type` INTEGER NOT NULL,
2121
`timestamp` DATETIME NOT NULL,
2222
`schedule_event_id` INT NOT NULL,
2323
`attributes` BLOB NOT NULL,
24-
`visible_at` DATETIME NULL
24+
`visible_at` DATETIME NULL,
25+
PRIMARY KEY(`id`, `instance_id`)
2526
);
2627

2728
CREATE INDEX IF NOT EXISTS `idx_pending_events_instance_id_visible_at` ON `pending_events` (`instance_id`, `visible_at`);
2829

2930
CREATE TABLE IF NOT EXISTS `history` (
30-
`id` TEXT PRIMARY KEY,
31+
`id` TEXT,
3132
`sequence_id` INTEGER NOT NULL,
3233
`instance_id` TEXT NOT NULL,
3334
`event_type` INTEGER NOT NULL,
3435
`timestamp` DATETIME NOT NULL,
3536
`schedule_event_id` INT NOT NULL,
3637
`attributes` BLOB NOT NULL,
37-
`visible_at` DATETIME NULL
38+
`visible_at` DATETIME NULL,
39+
PRIMARY KEY(`id`, `instance_id`)
3840
);
3941

4042
CREATE INDEX IF NOT EXISTS `idx_history_instance_sequence_id` ON `history` (`instance_id`, `sequence_id`);

backend/sqlite/sqlite.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -133,31 +133,43 @@ func (sb *sqliteBackend) CancelWorkflowInstance(ctx context.Context, instance *w
133133

134134
instanceID := instance.InstanceID
135135

136+
// TODO: Combine with event insertion
137+
res := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE id = ? LIMIT 1", instanceID)
138+
if err := res.Scan(nil); err == sql.ErrNoRows {
139+
return backend.ErrInstanceNotFound
140+
}
141+
136142
// Cancel workflow instance
137143
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{*event}); err != nil {
138144
return fmt.Errorf("inserting cancellation event: %w", err)
139145
}
140146

141147
// Recursively, find any sub-workflow instance to cancel
142-
for {
143-
row := tx.QueryRowContext(ctx, "SELECT id FROM `instances` WHERE parent_instance_id = ? AND completed_at IS NULL LIMIT 1", instanceID)
144-
145-
var subWorkflowInstanceID string
146-
if err := row.Scan(&subWorkflowInstanceID); err != nil {
147-
if err == sql.ErrNoRows {
148-
// No more sub-workflow instances to cancel
149-
break
150-
}
148+
toCancel := []string{instance.InstanceID}
151149

152-
return fmt.Errorf("geting workflow instance for cancelling: %w", err)
153-
}
150+
for len(toCancel) > 0 {
151+
toCancelID := toCancel[0]
152+
toCancel = toCancel[1:]
154153

155-
// Cancel sub-workflow instance
156-
if err := insertNewEvents(ctx, tx, subWorkflowInstanceID, []history.Event{*event}); err != nil {
157-
return fmt.Errorf("inserting cancellation event: %w", err)
154+
rows, err := tx.QueryContext(ctx, "SELECT id FROM `instances` WHERE parent_instance_id = ? AND completed_at IS NULL", toCancelID)
155+
defer rows.Close()
156+
if err != nil {
157+
return fmt.Errorf("finding sub-workflow instances: %w", err)
158158
}
159159

160-
instanceID = subWorkflowInstanceID
160+
for rows.Next() {
161+
var subWorkflowInstanceID string
162+
if err := rows.Scan(&subWorkflowInstanceID); err != nil {
163+
return fmt.Errorf("geting workflow instance for canceling: %w", err)
164+
}
165+
166+
// Cancel sub-workflow instance
167+
if err := insertNewEvents(ctx, tx, subWorkflowInstanceID, []history.Event{*event}); err != nil {
168+
return fmt.Errorf("inserting cancellation event: %w", err)
169+
}
170+
171+
toCancel = append(toCancel, subWorkflowInstanceID)
172+
}
161173
}
162174

163175
return tx.Commit()

0 commit comments

Comments
 (0)