Skip to content

Commit 14f0350

Browse files
committed
Support removing workflow instances
1 parent c798be9 commit 14f0350

File tree

10 files changed

+242
-0
lines changed

10 files changed

+242
-0
lines changed

backend/backend.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ type Backend interface {
2828
// RemoveWorkflowInstance removes a workflow instance
2929
RemoveWorkflowInstance(ctx context.Context, instance *workflow.Instance) error
3030

31+
// RemoveWorkflowInstances removes multiple workflow instances
32+
RemoveWorkflowInstances(ctx context.Context, options ...RemovalOption) error
33+
3134
// GetWorkflowInstanceState returns the state of the given workflow instance
3235
GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (core.WorkflowInstanceState, error)
3336

backend/mock_Backend.go

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/mysql.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,72 @@ func (b *mysqlBackend) RemoveWorkflowInstance(ctx context.Context, instance *cor
190190
return tx.Commit()
191191
}
192192

193+
func (b *mysqlBackend) RemoveWorkflowInstances(ctx context.Context, options ...backend.RemovalOption) error {
194+
ro := &backend.RemovalOptions{}
195+
for _, opt := range options {
196+
opt(ro)
197+
}
198+
199+
rows, err := b.db.QueryContext(ctx, `SELECT instance_id, execution_id FROM instances WHERE completed_at < ?`, ro.FinishedBefore)
200+
if err != nil {
201+
return err
202+
}
203+
204+
instanceIDs := []string{}
205+
executionIDs := []string{}
206+
for rows.Next() {
207+
var id, executionID string
208+
if err := rows.Scan(&id, &executionID); err != nil {
209+
return err
210+
}
211+
212+
instanceIDs = append(instanceIDs, id)
213+
executionIDs = append(executionIDs, executionID)
214+
}
215+
216+
batchSize := 100
217+
for i := 0; i < len(instanceIDs); i += batchSize {
218+
instanceIDs := instanceIDs[i:min(i+batchSize, len(instanceIDs))]
219+
executionIDs := executionIDs[i:min(i+batchSize, len(executionIDs))]
220+
221+
tx, err := b.db.BeginTx(ctx, nil)
222+
if err != nil {
223+
return err
224+
}
225+
226+
defer tx.Rollback()
227+
228+
placeholders := strings.Repeat(",?", len(instanceIDs)-1)
229+
whereCondition := fmt.Sprintf("instance_id IN (?%v) AND execution_id IN (?%v)", placeholders, placeholders)
230+
args := make([]interface{}, 0, len(instanceIDs)*2)
231+
for i := range instanceIDs {
232+
args = append(args, instanceIDs[i])
233+
}
234+
for i := range executionIDs {
235+
args = append(args, executionIDs[i])
236+
}
237+
238+
// Delete from instances, history and attributes tables
239+
if _, err := tx.ExecContext(ctx, fmt.Sprintf("DELETE FROM `instances` WHERE %v", whereCondition), args...); err != nil {
240+
return err
241+
}
242+
243+
if _, err := tx.ExecContext(ctx, fmt.Sprintf("DELETE FROM `history` WHERE %v", whereCondition), args...); err != nil {
244+
return err
245+
}
246+
247+
if _, err := tx.ExecContext(ctx, fmt.Sprintf("DELETE FROM `attributes` WHERE %v", whereCondition), args...); err != nil {
248+
return err
249+
}
250+
251+
if err := tx.Commit(); err != nil {
252+
return err
253+
}
254+
}
255+
256+
return nil
257+
}
258+
193259
func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
194260
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
195261
Isolation: sql.LevelReadCommitted,

backend/redis/instance.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package redis
33
import (
44
"context"
55
"encoding/json"
6+
"errors"
67
"fmt"
78
"time"
89

@@ -157,6 +158,10 @@ func (rb *redisBackend) RemoveWorkflowInstance(ctx context.Context, instance *co
157158
return rb.deleteInstance(ctx, instance)
158159
}
159160

161+
func (rb *redisBackend) RemoveWorkflowInstances(ctx context.Context, options ...backend.RemovalOption) error {
162+
return errors.New("not supported, use auto expiration")
163+
}
164+
160165
type instanceState struct {
161166
Queue string `json:"queue"`
162167

backend/removal.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package backend
2+
3+
import (
4+
"time"
5+
)
6+
7+
type RemovalOptions struct {
8+
FinishedBefore time.Time
9+
}
10+
11+
type RemovalOption func(o *RemovalOptions)
12+
13+
func RemoveFinishedBefore(t time.Time) RemovalOption {
14+
return func(o *RemovalOptions) {
15+
o.FinishedBefore = t
16+
}
17+
}

backend/sqlite/sqlite.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,72 @@ func (sb *sqliteBackend) RemoveWorkflowInstance(ctx context.Context, instance *c
255255
return tx.Commit()
256256
}
257257

258+
func (sb *sqliteBackend) RemoveWorkflowInstances(ctx context.Context, options ...backend.RemovalOption) error {
259+
ro := &backend.RemovalOptions{}
260+
for _, opt := range options {
261+
opt(ro)
262+
}
263+
264+
rows, err := sb.db.QueryContext(ctx, `SELECT id, execution_id FROM instances WHERE completed_at < ?`, ro.FinishedBefore)
265+
if err != nil {
266+
return err
267+
}
268+
269+
instanceIDs := []string{}
270+
executionIDs := []string{}
271+
for rows.Next() {
272+
var id, executionID string
273+
if err := rows.Scan(&id, &executionID); err != nil {
274+
return err
275+
}
276+
277+
instanceIDs = append(instanceIDs, id)
278+
executionIDs = append(executionIDs, executionID)
279+
}
280+
281+
batchSize := 100
282+
for i := 0; i < len(instanceIDs); i += batchSize {
283+
instanceIDs := instanceIDs[i:min(i+batchSize, len(instanceIDs))]
284+
executionIDs := executionIDs[i:min(i+batchSize, len(executionIDs))]
285+
286+
tx, err := sb.db.BeginTx(ctx, nil)
287+
if err != nil {
288+
return err
289+
}
290+
291+
defer tx.Rollback()
292+
293+
placeholders := strings.Repeat(",?", len(instanceIDs)-1)
294+
whereCondition := fmt.Sprintf("id IN (?%v) AND execution_id IN (?%v)", placeholders, placeholders)
295+
args := make([]interface{}, 0, len(instanceIDs)*2)
296+
for i := range instanceIDs {
297+
args = append(args, instanceIDs[i])
298+
}
299+
for i := range executionIDs {
300+
args = append(args, executionIDs[i])
301+
}
302+
303+
// Delete from instances, history and attributes tables
304+
if _, err := tx.ExecContext(ctx, fmt.Sprintf("DELETE FROM `instances` WHERE %v", whereCondition), args...); err != nil {
305+
return err
306+
}
307+
308+
if _, err := tx.ExecContext(ctx, fmt.Sprintf("DELETE FROM `history` WHERE %v", whereCondition), args...); err != nil {
309+
return err
310+
}
311+
312+
if _, err := tx.ExecContext(ctx, fmt.Sprintf("DELETE FROM `attributes` WHERE %v", whereCondition), args...); err != nil {
313+
return err
314+
}
315+
316+
if err := tx.Commit(); err != nil {
317+
return err
318+
}
319+
}
320+
321+
return nil
322+
}
323+
258324
func (sb *sqliteBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
259325
tx, err := sb.db.BeginTx(ctx, nil)
260326
if err != nil {

backend/test/e2e.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -822,6 +822,7 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti
822822
tests = append(tests, e2eStatsTests...)
823823
tests = append(tests, e2eDiagTests...)
824824
tests = append(tests, e2eQueueTests...)
825+
tests = append(tests, e2eRemovalTests...)
825826

826827
run := func(suffix string, workerOptions worker.Options) {
827828
for _, tt := range tests {

backend/test/e2e_removal.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/cschleiden/go-workflows/backend"
9+
"github.com/cschleiden/go-workflows/client"
10+
"github.com/cschleiden/go-workflows/worker"
11+
"github.com/cschleiden/go-workflows/workflow"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
var e2eRemovalTests = []backendTest{
16+
{
17+
name: "RemoveWorkflowInstances_Removes",
18+
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
19+
wf := func(ctx workflow.Context) (bool, error) {
20+
return true, nil
21+
}
22+
23+
register(t, ctx, w, []interface{}{wf}, nil)
24+
25+
workflowA := runWorkflow(t, ctx, c, wf)
26+
_, err := client.GetWorkflowResult[bool](ctx, c, workflowA, time.Second*10)
27+
require.NoError(t, err)
28+
29+
now := time.Now()
30+
time.Sleep(300 * time.Millisecond)
31+
32+
_, err = runWorkflowWithResult[bool](t, ctx, c, wf)
33+
require.NoError(t, err)
34+
35+
err = b.RemoveWorkflowInstances(ctx, backend.RemoveFinishedBefore(now))
36+
require.NoError(t, err)
37+
38+
_, err = c.GetWorkflowInstanceState(ctx, workflowA)
39+
require.ErrorIs(t, err, backend.ErrInstanceNotFound)
40+
},
41+
},
42+
}

client/client.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,8 @@ func GetWorkflowResult[T any](ctx context.Context, c *Client, instance *workflow
266266
}
267267

268268
// RemoveWorkflowInstance removes the given workflow instance from the backend.
269+
//
270+
// Instance needs to be in a completed state.
269271
func (c *Client) RemoveWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) error {
270272
ctx, span := c.backend.Tracer().Start(ctx, "RemoveWorkflowInstance", trace.WithAttributes(
271273
attribute.String(log.InstanceIDKey, instance.InstanceID),
@@ -274,3 +276,11 @@ func (c *Client) RemoveWorkflowInstance(ctx context.Context, instance *core.Work
274276

275277
return c.backend.RemoveWorkflowInstance(ctx, instance)
276278
}
279+
280+
// RemoveWorkflowInstances removes completed workflow instances from the backend.
281+
func (c *Client) RemoveWorkflowInstances(ctx context.Context, options ...backend.RemovalOption) error {
282+
ctx, span := c.backend.Tracer().Start(ctx, "RemoveWorkflowInstances")
283+
defer span.End()
284+
285+
return c.backend.RemoveWorkflowInstances(ctx, options...)
286+
}

docs/source/includes/_guide.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,17 @@ if err != nil {
604604

605605
`RemoveWorkflowInstance` on a client instance will remove that workflow instance including all history data from the backend. A workflow instance needs to be in the finished state before calling this, otherwise an error will be returned.
606606

607+
<div style="clear: both"></div>
608+
609+
```go
610+
err = c.RemoveWorkflowInstances(ctx, backend.RemoveFinishedBefore(time.Now().Add(-time.Hour * 24))
611+
if err != nil {
612+
// ...
613+
}
614+
```
615+
616+
`RemoveWorkflowInstances` on a client instance will remove all finished workflow instances that match the given condition(s). Currently only `RemoveFinishedBefore` is supported.
617+
607618
### Automatically expiring finished workflow instances
608619

609620
```go

0 commit comments

Comments
 (0)