Skip to content

Commit ffa358c

Browse files
authored
Merge pull request #195 from cschleiden/remove-instances
Support removing workflow instances
2 parents 0a66dd0 + 15460fd commit ffa358c

File tree

12 files changed

+217
-13
lines changed

12 files changed

+217
-13
lines changed

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,19 @@ wf, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
236236
InstanceID: uuid.NewString(),
237237
}, Workflow1, "input-for-workflow")
238238
if err != nil {
239+
// ...
240+
}
241+
```
242+
243+
### Removing workflow instances
244+
245+
`RemoveWorkflowInstance` on a client instance will remove that workflow instance including all history data from the backend. A workflow instance needs to be in the finished state before calling this, otherwise an error will be returned.
246+
247+
```go
248+
err = c.RemoveWorkflowInstance(ctx, workflowInstance)
249+
if err != nil {
250+
// ...
251+
}
239252
```
240253

241254
### Canceling workflows

backend/backend.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
var ErrInstanceNotFound = errors.New("workflow instance not found")
1818
var ErrInstanceAlreadyExists = errors.New("workflow instance already exists")
19+
var ErrInstanceNotFinished = errors.New("workflow instance is not finished")
1920

2021
const TracerName = "go-workflow"
2122

@@ -27,6 +28,9 @@ type Backend interface {
2728
// CancelWorkflowInstance cancels a running workflow instance
2829
CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, cancelEvent *history.Event) error
2930

31+
// RemoveWorkflowInstance removes a workflow instance
32+
RemoveWorkflowInstance(ctx context.Context, instance *workflow.Instance) error
33+
3034
// GetWorkflowInstanceState returns the state of the given workflow instance
3135
GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (core.WorkflowInstanceState, error)
3236

backend/mock_Backend.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/mysql.go

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,22 @@ type mysqlBackend struct {
6262
options backend.Options
6363
}
6464

65-
// CreateWorkflowInstance creates a new workflow instance
65+
func (b *mysqlBackend) Logger() log.Logger {
66+
return b.options.Logger
67+
}
68+
69+
func (b *mysqlBackend) Tracer() trace.Tracer {
70+
return b.options.TracerProvider.Tracer(backend.TracerName)
71+
}
72+
73+
func (b *mysqlBackend) Metrics() metrics.Client {
74+
return b.options.Metrics.WithTags(metrics.Tags{metrickeys.Backend: "mysql"})
75+
}
76+
77+
func (b *mysqlBackend) Converter() converter.Converter {
78+
return b.options.Converter
79+
}
80+
6681
func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
6782
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
6883
Isolation: sql.LevelReadCommitted,
@@ -89,20 +104,37 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, instance *wor
89104
return nil
90105
}
91106

92-
func (b *mysqlBackend) Logger() log.Logger {
93-
return b.options.Logger
94-
}
107+
func (b *mysqlBackend) RemoveWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) error {
108+
tx, err := b.db.BeginTx(ctx, nil)
109+
if err != nil {
110+
return err
111+
}
112+
defer tx.Rollback()
95113

96-
func (b *mysqlBackend) Tracer() trace.Tracer {
97-
return b.options.TracerProvider.Tracer(backend.TracerName)
98-
}
114+
instanceID := instance.InstanceID
99115

100-
func (b *mysqlBackend) Metrics() metrics.Client {
101-
return b.options.Metrics.WithTags(metrics.Tags{metrickeys.Backend: "mysql"})
102-
}
116+
row := tx.QueryRowContext(ctx, "SELECT completed_at FROM `instances` WHERE instance_id = ? LIMIT 1", instanceID)
117+
var completedAt sql.NullTime
118+
if err := row.Scan(&completedAt); err != nil {
119+
if err == sql.ErrNoRows {
120+
return backend.ErrInstanceNotFound
121+
}
122+
}
103123

104-
func (b *mysqlBackend) Converter() converter.Converter {
105-
return b.options.Converter
124+
if !completedAt.Valid {
125+
return backend.ErrInstanceNotFinished
126+
}
127+
128+
// Delete from instances and history tables
129+
if _, err := tx.ExecContext(ctx, "DELETE FROM `instances` WHERE instance_id = ?", instanceID); err != nil {
130+
return err
131+
}
132+
133+
if _, err := tx.ExecContext(ctx, "DELETE FROM `history` WHERE instance_id = ?", instanceID); err != nil {
134+
return err
135+
}
136+
137+
return tx.Commit()
106138
}
107139

108140
func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {

backend/redis/delete.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
redis "github.com/redis/go-redis/v9"
8+
)
9+
10+
// KEYS[1] - instance key
11+
// KEYS[2] - pending events key
12+
// KEYS[3] - history key
13+
// KEYS[4] - instances-by-creation key
14+
// ARGV[1] - instance ID
15+
var deleteCmd = redis.NewScript(
16+
`redis.call("DEL", KEYS[1], KEYS[2], KEYS[3])
17+
return redis.call("ZREM", KEYS[4], ARGV[1])`)
18+
19+
// deleteInstance deletes an instance from Redis. It does not attempt to remove any future events or pending
20+
// workflow tasks. It's assumed that the instance is in the finished state.
21+
//
22+
// Note: might want to revisit this in the future if we want to support removing hung instances.
23+
func deleteInstance(ctx context.Context, rdb redis.UniversalClient, instanceID string) error {
24+
if err := deleteCmd.Run(ctx, rdb, []string{
25+
instanceKey(instanceID),
26+
pendingEventsKey(instanceID),
27+
historyKey(instanceID),
28+
instancesByCreation(),
29+
}, instanceID).Err(); err != nil {
30+
return fmt.Errorf("failed to delete instance: %w", err)
31+
}
32+
33+
return nil
34+
}

backend/redis/instance.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,20 @@ func (rb *redisBackend) CancelWorkflowInstance(ctx context.Context, instance *co
109109
return nil
110110
}
111111

112+
func (rb *redisBackend) RemoveWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) error {
113+
i, err := readInstance(ctx, rb.rdb, instance.InstanceID)
114+
if err != nil {
115+
return err
116+
}
117+
118+
// Check state
119+
if i.State != core.WorkflowInstanceStateFinished {
120+
return backend.ErrInstanceNotFinished
121+
}
122+
123+
return deleteInstance(ctx, rb.rdb, instance.InstanceID)
124+
}
125+
112126
type instanceState struct {
113127
Instance *core.WorkflowInstance `json:"instance,omitempty"`
114128
State core.WorkflowInstanceState `json:"state,omitempty"`

backend/redis/keys.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ func instanceKey(instanceID string) string {
88
return fmt.Sprintf("instance:%v", instanceID)
99
}
1010

11+
// instancesByCreation returns the key for the ZSET that contains all instances sorted by creation date. The score is the
12+
// creation time. Used for listing all workflow instances in the diagnostics UI.
1113
func instancesByCreation() string {
1214
return "instances-by-creation"
1315
}

backend/redis/redis.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (
6969
activityQueue: activityQueue,
7070
}
7171

72-
// Preload scripts here. Usually redis-go attempts to execute them first, and the if redis doesn't know
72+
// Preload scripts here. Usually redis-go attempts to execute them first, and if redis doesn't know
7373
// them, loads them. This doesn't work when using (transactional) pipelines, so eagerly load them on startup.
7474
ctx := context.Background()
7575
cmds := map[string]*redis.StringCmd{
@@ -79,6 +79,7 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (
7979
"removeFutureEventCmd": removeFutureEventCmd.Load(ctx, rb.rdb),
8080
"removePendingEventsCmd": removePendingEventsCmd.Load(ctx, rb.rdb),
8181
"requeueInstanceCmd": requeueInstanceCmd.Load(ctx, rb.rdb),
82+
"deleteInstanceCmd": deleteCmd.Load(ctx, rb.rdb),
8283
}
8384
for name, cmd := range cmds {
8485
// fmt.Println(name, cmd.Val())

backend/sqlite/sqlite.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ type sqliteBackend struct {
6464
options backend.Options
6565
}
6666

67+
var _ backend.Backend = (*sqliteBackend)(nil)
68+
6769
func (sb *sqliteBackend) Logger() log.Logger {
6870
return sb.options.Logger
6971
}
@@ -145,6 +147,39 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, met
145147
return nil
146148
}
147149

150+
func (sb *sqliteBackend) RemoveWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) error {
151+
tx, err := sb.db.BeginTx(ctx, nil)
152+
if err != nil {
153+
return err
154+
}
155+
defer tx.Rollback()
156+
157+
instanceID := instance.InstanceID
158+
159+
row := tx.QueryRowContext(ctx, "SELECT completed_at FROM `instances` WHERE id = ? LIMIT 1", instanceID)
160+
var completedAt sql.NullTime
161+
if err := row.Scan(&completedAt); err != nil {
162+
if err == sql.ErrNoRows {
163+
return backend.ErrInstanceNotFound
164+
}
165+
}
166+
167+
if !completedAt.Valid {
168+
return backend.ErrInstanceNotFinished
169+
}
170+
171+
// Delete from instances and history tables
172+
if _, err := tx.ExecContext(ctx, "DELETE FROM `instances` WHERE id = ?", instanceID); err != nil {
173+
return err
174+
}
175+
176+
if _, err := tx.ExecContext(ctx, "DELETE FROM `history` WHERE instance_id = ?", instanceID); err != nil {
177+
return err
178+
}
179+
180+
return tx.Commit()
181+
}
182+
148183
func (sb *sqliteBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
149184
tx, err := sb.db.BeginTx(ctx, nil)
150185
if err != nil {

backend/test/backendtest.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,33 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
8181
require.Equal(t, *metadata, *task.Metadata)
8282
},
8383
},
84+
{
85+
name: "RemoveWorkflowInstance_ErrorWhenInstanceInProgress",
86+
f: func(t *testing.T, ctx context.Context, b backend.Backend) {
87+
instanceID := uuid.NewString()
88+
wfi := core.NewWorkflowInstance(instanceID)
89+
90+
err := b.CreateWorkflowInstance(
91+
ctx, wfi, history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}),
92+
)
93+
require.NoError(t, err)
94+
95+
err = b.RemoveWorkflowInstance(ctx, wfi)
96+
require.Error(t, err)
97+
require.Equal(t, backend.ErrInstanceNotFinished, err)
98+
},
99+
},
100+
{
101+
name: "RemoveWorkflowInstance_ErrorWhenInstanceDoesNotExist",
102+
f: func(t *testing.T, ctx context.Context, b backend.Backend) {
103+
instanceID := uuid.NewString()
104+
wfi := core.NewWorkflowInstance(instanceID)
105+
106+
err := b.RemoveWorkflowInstance(ctx, wfi)
107+
require.Error(t, err)
108+
require.Equal(t, backend.ErrInstanceNotFound, err)
109+
},
110+
},
84111
{
85112
name: "GetWorkflowTask_ReturnsNilWhenTimeout",
86113
f: func(t *testing.T, ctx context.Context, b backend.Backend) {

0 commit comments

Comments
 (0)