Skip to content

Commit 30abdcc

Browse files
authored
Merge pull request #56 from cschleiden/sub-workflow-cancellation
Support sub-workflow cancellation via context cancellation
2 parents 0226322 + 9d854ec commit 30abdcc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+568
-234
lines changed

.github/workflows/go.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,4 @@ jobs:
3131
- name: Tests
3232
run: |
3333
sudo /etc/init.d/mysql start
34-
go test -v ./...
34+
go test -race -timeout 30s -count 1 ./...

.vscode/launch.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@
4141
"program": "${workspaceFolder}/samples/signal/signal.go"
4242
},
4343
{
44-
"name": "Launch cancel sample",
44+
"name": "Launch cancellation sample",
4545
"type": "go",
4646
"request": "launch",
4747
"mode": "debug",
48-
"program": "${workspaceFolder}/samples/cancelation/cancelation.go"
48+
"program": "${workspaceFolder}/samples/cancellation/cancellation.go"
4949
},
5050
{
5151
"name": "Launch subworkflow sample",

README.md

Lines changed: 63 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,57 @@ wf, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
237237
if err != nil {
238238
```
239239
240+
### Canceling workflows
241+
242+
Create a `Client` instance then then call `CancelWorkflow` to cancel a workflow. When a workflow is canceled, it's workflow context is canceled. Any subsequent calls to schedule activities or sub-workflows will immediately return an error, skipping their execution. Any activities already running when a workflow is canceled will still run to completion and their result will be available.
243+
244+
Sub-workflows will be canceled if their parent workflow is canceled.
245+
246+
```go
247+
var c client.Client
248+
err = c.CancelWorkflowInstance(context.Background(), workflowInstance)
249+
if err != nil {
250+
panic("could not cancel workflow")
251+
}
252+
```
253+
254+
#### Perform any cleanup
255+
256+
If you need to run any activities or make calls using `workflow.Context`.
257+
258+
```go
259+
func Workflow2(ctx workflow.Context, msg string) (string, error) {
260+
defer func() {
261+
if errors.Is(ctx.Err(), workflow.Canceled) {
262+
// Workflow was canceled. Get new context to perform any cleanup activities
263+
ctx := workflow.NewDisconnectedContext(ctx)
264+
265+
// Execute the cleanup activity
266+
if err := workflow.ExecuteActivity(ctx, ActivityCleanup).Get(ctx, nil); err != nil {
267+
return errors.Wrap(err, "could not perform cleanup")
268+
}
269+
}
270+
}()
271+
272+
r1, err := workflow.ExecuteActivity[int](ctx, ActivityCancel, 1, 2).Get(ctx)
273+
if err != nil { // <---- Workflow is canceled while this activity is running
274+
return errors.Wrap(err, "could not get ActivityCancel result")
275+
}
276+
277+
// r1 will contain the result of ActivityCancel
278+
// ⬇ ActivitySkip will be skipped immediately
279+
r2, err := workflow.ExecuteActivity(ctx, ActivitySkip, 1, 2).Get(ctx)
280+
if err != nil {
281+
return errors.Wrap(err, "could not get ActivitySkip result")
282+
}
283+
284+
return "some result", nil
285+
}
286+
```
287+
240288
### Running activities
241289
242-
From a workflow, call `workflow.ExecuteActivity` to execute an activity. The call returns a `Future` you can await to get the result or any error it might return.
290+
From a workflow, call `workflow.ExecuteActivity` to execute an activity. The call returns a `Future[T]` you can await to get the result or any error it might return.
243291
244292
```go
245293
r1, err := workflow.ExecuteActivity[int](ctx, Activity1, 35, 12, nil, "test").Get(ctx)
@@ -250,6 +298,10 @@ if err != nil {
250298
log.Println(r1)
251299
```
252300
301+
#### Canceling activities
302+
303+
Canceling activities is not supported at this time.
304+
253305
### Timers
254306
255307
You can schedule timers to fire at any point in the future by calling `workflow.ScheduleTimer`. It returns a `Future` you can await to wait for the timer to fire.
@@ -283,44 +335,37 @@ id, _ := workflow.SideEffect[string](ctx, func(ctx workflow.Context) string) {
283335
284336
### Running sub-workflows
285337
286-
Call `workflow.CreateSubWorkflowInstance` to start a sub-workflow.
338+
Call `workflow.CreateSubWorkflowInstance` to start a sub-workflow. The returned `Future` will resolve once the sub-workflow has finished.
287339
288340
```go
289341
func Workflow1(ctx workflow.Context, msg string) error {
290-
wr, err := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowInstanceOptions{}, Workflow2, "some input").Get(ctx, &wr)
342+
result, err := workflow.CreateSubWorkflowInstance[int]
343+
ctx, workflow.SubWorkflowInstanceOptions{}, SubWorkflow, "some input").Get(ctx)
291344
if err != nil {
292345
return errors.Wrap(err, "could not get sub workflow result")
293346
}
294347

295-
log.Println("Sub workflow result:", wr)
348+
logger.Debug("Sub workflow result:", "result", result)
349+
296350
return nil
297351
}
298352

299-
func Workflow2(ctx workflow.Context, msg string) (int, error) {
300-
r1, err := workflow.ExecuteActivity[int](ctx, Activity1, 35, 12).Get(ctx, &r1)
353+
func SubWorkflow(ctx workflow.Context, msg string) (int, error) {
354+
r1, err := workflow.ExecuteActivity[int](ctx, Activity1, 35, 12).Get(ctx)
301355
if err != nil {
302356
return "", errors.Wrap(err, "could not get activity result")
303357
}
304358

305-
log.Println("A1 result:", r1)
306-
359+
logger.Debug("A1 result:", "r1", r1)
307360
return r1, nil
308361
}
309362
```
310363
311-
### Canceling workflows
364+
#### Canceling sub-workflows
312365
313-
Create a `Client` instance then then call `CancelWorkflow` to cancel a workflow. When a workflow is canceled, it's workflow context is canceled. Any subsequent calls to schedule activities or sub-workflows will immediately return an error, skipping their execution. Activities or sub-workflows already running when a workflow is canceled will still run to completion and their result will be available.
366+
Similar to timer cancellation, you can pass a cancelable context to `CreateSubWorkflowInstance` and cancel the sub-workflow that way. Reacting to the cancellation is the same as canceling a workflow via the `Client`. See [Canceling workflows](#canceling-workflows) for more details.
314367
315-
Sub-workflows will be canceled if their parent workflow is canceled.
316368
317-
```go
318-
var c client.Client
319-
err = c.CancelWorkflowInstance(context.Background(), workflowInstance)
320-
if err != nil {
321-
panic("could not cancel workflow")
322-
}
323-
```
324369
325370
### `select`
326371
@@ -399,39 +444,6 @@ workflow.Select(
399444
)
400445
```
401446

402-
403-
#### Perform any cleanup
404-
405-
```go
406-
func Workflow2(ctx workflow.Context, msg string) (string, error) {
407-
defer func() {
408-
if errors.Is(ctx.Err(), workflow.Canceled) {
409-
// Workflow was canceled. Get new context to perform any cleanup activities
410-
ctx := workflow.NewDisconnectedContext(ctx)
411-
412-
// Execute the cleanup activity
413-
if err := workflow.ExecuteActivity(ctx, ActivityCleanup).Get(ctx, nil); err != nil {
414-
return errors.Wrap(err, "could not perform cleanup")
415-
}
416-
}
417-
}()
418-
419-
r1, err := workflow.ExecuteActivity[int](ctx, ActivityCancel, 1, 2).Get(ctx)
420-
if err != nil { // <---- Workflow is canceled while this activity is running
421-
return errors.Wrap(err, "could not get ActivityCancel result")
422-
}
423-
424-
// r1 will contain the result of ActivityCancel
425-
// ⬇ ActivitySkip will be skipped immediately
426-
r2, err := workflow.ExecuteActivity(ctx, ActivitySkip, 1, 2).Get(ctx)
427-
if err != nil {
428-
return errors.Wrap(err, "could not get ActivitySkip result")
429-
}
430-
431-
return "some result", nil
432-
}
433-
```
434-
435447
### Unit testing
436448

437449
go-workflows includes support for testing workflows, a simple example using mocked activities:

backend/mysql/mysql.go

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,9 @@ func (b *mysqlBackend) Logger() log.Logger {
8989
}
9090

9191
func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
92-
tx, err := b.db.BeginTx(ctx, nil)
92+
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
93+
Isolation: sql.LevelReadCommitted,
94+
})
9395
if err != nil {
9496
return err
9597
}
@@ -98,30 +100,37 @@ func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *wor
98100
instanceID := instance.InstanceID
99101

100102
// Cancel workflow instance
101-
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{*event}); err != nil {
102-
return fmt.Errorf("inserting cancellation event: %w", err)
103+
// TODO: Combine this with the event insertion
104+
res := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? LIMIT 1", instanceID)
105+
if err := res.Scan(nil); err == sql.ErrNoRows {
106+
return backend.ErrInstanceNotFound
103107
}
104108

105109
// Recursively, find any sub-workflow instance to cancel
106-
for {
107-
row := tx.QueryRowContext(ctx, "SELECT instance_id FROM `instances` WHERE parent_instance_id = ? AND completed_at IS NULL LIMIT 1", instanceID)
108-
109-
var subWorkflowInstanceID string
110-
if err := row.Scan(&subWorkflowInstanceID); err != nil {
111-
if err == sql.ErrNoRows {
112-
// No more sub-workflow instances to cancel
113-
break
114-
}
110+
toCancel := []string{instance.InstanceID}
115111

116-
return fmt.Errorf("getting workflow instance for cancelling: %w", err)
117-
}
112+
for len(toCancel) > 0 {
113+
toCancelID := toCancel[0]
114+
toCancel = toCancel[1:]
118115

119-
// Cancel sub-workflow instance
120-
if err := insertNewEvents(ctx, tx, subWorkflowInstanceID, []history.Event{*event}); err != nil {
116+
if err := insertNewEvents(ctx, tx, toCancelID, []history.Event{*event}); err != nil {
121117
return fmt.Errorf("inserting cancellation event: %w", err)
122118
}
123119

124-
instanceID = subWorkflowInstanceID
120+
rows, err := tx.QueryContext(ctx, "SELECT instance_id FROM `instances` WHERE parent_instance_id = ? AND completed_at IS NULL", toCancelID)
121+
defer rows.Close()
122+
if err != nil {
123+
return fmt.Errorf("finding sub-workflow instances: %w", err)
124+
}
125+
126+
for rows.Next() {
127+
var subWorkflowInstanceID string
128+
if err := rows.Scan(&subWorkflowInstanceID); err != nil {
129+
return fmt.Errorf("geting workflow instance for canceling: %w", err)
130+
}
131+
132+
toCancel = append(toCancel, subWorkflowInstanceID)
133+
}
125134
}
126135

127136
return tx.Commit()

backend/mysql/mysql_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@ func Test_MysqlBackend(t *testing.T) {
2222
t.Skip()
2323
}
2424

25-
dbName := "test_" + strings.Replace(uuid.NewString(), "-", "", -1)
25+
var dbName string
2626

2727
test.BackendTest(t, func() backend.Backend {
2828
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword))
2929
if err != nil {
3030
panic(err)
3131
}
3232

33+
dbName = "test_" + strings.Replace(uuid.NewString(), "-", "", -1)
3334
if _, err := db.Exec("CREATE DATABASE " + dbName); err != nil {
3435
panic(fmt.Errorf("creating database: %w", err))
3536
}

backend/redis/instance.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,12 @@ func (rb *redisBackend) GetWorkflowInstanceState(ctx context.Context, instance *
7979
}
8080

8181
func (rb *redisBackend) CancelWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, event *history.Event) error {
82-
// Recursively, find any sub-workflow instance to cancel
82+
_, err := readInstance(ctx, rb.rdb, instance.InstanceID)
83+
if err != nil {
84+
return err
85+
}
86+
87+
// Recursively, find any sub-workflow instances to cancel
8388
toCancel := make([]*core.WorkflowInstance, 0)
8489
toCancel = append(toCancel, instance)
8590
for len(toCancel) > 0 {
@@ -94,7 +99,7 @@ func (rb *redisBackend) CancelWorkflowInstance(ctx context.Context, instance *co
9499
// Find sub-workflows
95100
subInstances, err := subWorkflowInstances(ctx, rb.rdb, instance)
96101
if err != nil {
97-
return err
102+
return fmt.Errorf("finding sub-workflow instances for cancellation: %w", err)
98103
}
99104

100105
toCancel = append(toCancel, subInstances...)

backend/redis/workflow.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
182182
if err := addFutureEvent(ctx, rb.rdb, targetInstance, &event); err != nil {
183183
return err
184184
}
185+
} else if event.Type == history.EventType_SubWorkflowCancellationRequested {
186+
if err := rb.CancelWorkflowInstance(ctx, targetInstance, &event); err != nil {
187+
return err
188+
}
185189
} else {
186190
// Add pending event to stream
187191
lastPendingMessageID, err = addEventToStream(ctx, rb.rdb, pendingEventsKey(targetInstance.InstanceID), &event)

backend/sqlite/schema.sql

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,29 @@ CREATE INDEX IF NOT EXISTS `idx_instances_locked_until_completed_at` ON `instanc
1414
CREATE INDEX IF NOT EXISTS `idx_instances_parent_instance_id` ON `instances` (`parent_instance_id`);
1515

1616
CREATE TABLE IF NOT EXISTS `pending_events` (
17-
`id` TEXT PRIMARY KEY,
17+
`id` TEXT,
1818
`sequence_id` INTEGER NOT NULL, -- not used but keep for now for query compat
1919
`instance_id` TEXT NOT NULL,
2020
`event_type` INTEGER NOT NULL,
2121
`timestamp` DATETIME NOT NULL,
2222
`schedule_event_id` INT NOT NULL,
2323
`attributes` BLOB NOT NULL,
24-
`visible_at` DATETIME NULL
24+
`visible_at` DATETIME NULL,
25+
PRIMARY KEY(`id`, `instance_id`)
2526
);
2627

2728
CREATE INDEX IF NOT EXISTS `idx_pending_events_instance_id_visible_at` ON `pending_events` (`instance_id`, `visible_at`);
2829

2930
CREATE TABLE IF NOT EXISTS `history` (
30-
`id` TEXT PRIMARY KEY,
31+
`id` TEXT,
3132
`sequence_id` INTEGER NOT NULL,
3233
`instance_id` TEXT NOT NULL,
3334
`event_type` INTEGER NOT NULL,
3435
`timestamp` DATETIME NOT NULL,
3536
`schedule_event_id` INT NOT NULL,
3637
`attributes` BLOB NOT NULL,
37-
`visible_at` DATETIME NULL
38+
`visible_at` DATETIME NULL,
39+
PRIMARY KEY(`id`, `instance_id`)
3840
);
3941

4042
CREATE INDEX IF NOT EXISTS `idx_history_instance_sequence_id` ON `history` (`instance_id`, `sequence_id`);

backend/sqlite/sqlite.go

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -133,31 +133,37 @@ func (sb *sqliteBackend) CancelWorkflowInstance(ctx context.Context, instance *w
133133

134134
instanceID := instance.InstanceID
135135

136-
// Cancel workflow instance
137-
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{*event}); err != nil {
138-
return fmt.Errorf("inserting cancellation event: %w", err)
136+
// TODO: Combine with event insertion
137+
res := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE id = ? LIMIT 1", instanceID)
138+
if err := res.Scan(nil); err == sql.ErrNoRows {
139+
return backend.ErrInstanceNotFound
139140
}
140141

141142
// Recursively, find any sub-workflow instance to cancel
142-
for {
143-
row := tx.QueryRowContext(ctx, "SELECT id FROM `instances` WHERE parent_instance_id = ? AND completed_at IS NULL LIMIT 1", instanceID)
144-
145-
var subWorkflowInstanceID string
146-
if err := row.Scan(&subWorkflowInstanceID); err != nil {
147-
if err == sql.ErrNoRows {
148-
// No more sub-workflow instances to cancel
149-
break
150-
}
143+
toCancel := []string{instance.InstanceID}
151144

152-
return fmt.Errorf("geting workflow instance for cancelling: %w", err)
153-
}
145+
for len(toCancel) > 0 {
146+
toCancelID := toCancel[0]
147+
toCancel = toCancel[1:]
154148

155-
// Cancel sub-workflow instance
156-
if err := insertNewEvents(ctx, tx, subWorkflowInstanceID, []history.Event{*event}); err != nil {
149+
if err := insertNewEvents(ctx, tx, toCancelID, []history.Event{*event}); err != nil {
157150
return fmt.Errorf("inserting cancellation event: %w", err)
158151
}
159152

160-
instanceID = subWorkflowInstanceID
153+
rows, err := tx.QueryContext(ctx, "SELECT id FROM `instances` WHERE parent_instance_id = ? AND completed_at IS NULL", toCancelID)
154+
defer rows.Close()
155+
if err != nil {
156+
return fmt.Errorf("finding sub-workflow instances: %w", err)
157+
}
158+
159+
for rows.Next() {
160+
var subWorkflowInstanceID string
161+
if err := rows.Scan(&subWorkflowInstanceID); err != nil {
162+
return fmt.Errorf("geting workflow instance for canceling: %w", err)
163+
}
164+
165+
toCancel = append(toCancel, subWorkflowInstanceID)
166+
}
161167
}
162168

163169
return tx.Commit()

0 commit comments

Comments
 (0)