Skip to content

Commit 9d854ec

Browse files
authored
Fix recursive cancellation for sql backends
1 parent 808b1e0 commit 9d854ec

File tree

3 files changed

+12
-21
lines changed

3 files changed

+12
-21
lines changed

.github/workflows/go.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,4 @@ jobs:
3131
- name: Tests
3232
run: |
3333
sudo /etc/init.d/mysql start
34-
go test -v ./...
34+
go test -race -timeout 30s -count 1 ./...

backend/mysql/mysql.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,9 @@ func (b *mysqlBackend) Logger() log.Logger {
8989
}
9090

9191
func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
92-
tx, err := b.db.BeginTx(ctx, nil)
92+
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
93+
Isolation: sql.LevelReadCommitted,
94+
})
9395
if err != nil {
9496
return err
9597
}
@@ -104,17 +106,17 @@ func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *wor
104106
return backend.ErrInstanceNotFound
105107
}
106108

107-
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{*event}); err != nil {
108-
return fmt.Errorf("inserting cancellation event: %w", err)
109-
}
110-
111109
// Recursively, find any sub-workflow instance to cancel
112110
toCancel := []string{instance.InstanceID}
113111

114112
for len(toCancel) > 0 {
115113
toCancelID := toCancel[0]
116114
toCancel = toCancel[1:]
117115

116+
if err := insertNewEvents(ctx, tx, toCancelID, []history.Event{*event}); err != nil {
117+
return fmt.Errorf("inserting cancellation event: %w", err)
118+
}
119+
118120
rows, err := tx.QueryContext(ctx, "SELECT instance_id FROM `instances` WHERE parent_instance_id = ? AND completed_at IS NULL", toCancelID)
119121
defer rows.Close()
120122
if err != nil {
@@ -127,11 +129,6 @@ func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *wor
127129
return fmt.Errorf("geting workflow instance for canceling: %w", err)
128130
}
129131

130-
// Cancel sub-workflow instance
131-
if err := insertNewEvents(ctx, tx, subWorkflowInstanceID, []history.Event{*event}); err != nil {
132-
return fmt.Errorf("inserting cancellation event: %w", err)
133-
}
134-
135132
toCancel = append(toCancel, subWorkflowInstanceID)
136133
}
137134
}

backend/sqlite/sqlite.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -139,18 +139,17 @@ func (sb *sqliteBackend) CancelWorkflowInstance(ctx context.Context, instance *w
139139
return backend.ErrInstanceNotFound
140140
}
141141

142-
// Cancel workflow instance
143-
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{*event}); err != nil {
144-
return fmt.Errorf("inserting cancellation event: %w", err)
145-
}
146-
147142
// Recursively, find any sub-workflow instance to cancel
148143
toCancel := []string{instance.InstanceID}
149144

150145
for len(toCancel) > 0 {
151146
toCancelID := toCancel[0]
152147
toCancel = toCancel[1:]
153148

149+
if err := insertNewEvents(ctx, tx, toCancelID, []history.Event{*event}); err != nil {
150+
return fmt.Errorf("inserting cancellation event: %w", err)
151+
}
152+
154153
rows, err := tx.QueryContext(ctx, "SELECT id FROM `instances` WHERE parent_instance_id = ? AND completed_at IS NULL", toCancelID)
155154
defer rows.Close()
156155
if err != nil {
@@ -163,11 +162,6 @@ func (sb *sqliteBackend) CancelWorkflowInstance(ctx context.Context, instance *w
163162
return fmt.Errorf("geting workflow instance for canceling: %w", err)
164163
}
165164

166-
// Cancel sub-workflow instance
167-
if err := insertNewEvents(ctx, tx, subWorkflowInstanceID, []history.Event{*event}); err != nil {
168-
return fmt.Errorf("inserting cancellation event: %w", err)
169-
}
170-
171165
toCancel = append(toCancel, subWorkflowInstanceID)
172166
}
173167
}

0 commit comments

Comments
 (0)