Skip to content

Commit 394711e

Browse files
authored
Merge pull request #361 from cschleiden/remove-instances2
Support auto-expiration for all backends
2 parents c83048f + 7101d0c commit 394711e

File tree

11 files changed

+276
-11
lines changed

11 files changed

+276
-11
lines changed

backend/backend.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,7 @@ type Backend interface {
9696

9797
// Close closes any underlying resources
9898
Close() error
99+
100+
// FeatureSupported returns true if the given feature is supported by the backend
101+
FeatureSupported(feature Feature) bool
99102
}

backend/features.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package backend
2+
3+
type Feature int
4+
5+
const (
6+
Feature_Expiration Feature = iota
7+
)

backend/mock_Backend.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/mysql.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ type mysqlBackend struct {
7575
options *options
7676
}
7777

78+
func (mb *mysqlBackend) FeatureSupported(feature backend.Feature) bool {
79+
return true
80+
}
81+
7882
func (mb *mysqlBackend) Close() error {
7983
return mb.db.Close()
8084
}
@@ -191,9 +195,9 @@ func (b *mysqlBackend) RemoveWorkflowInstance(ctx context.Context, instance *cor
191195
}
192196

193197
func (b *mysqlBackend) RemoveWorkflowInstances(ctx context.Context, options ...backend.RemovalOption) error {
194-
ro := &backend.RemovalOptions{}
198+
ro := backend.DefaultRemovalOptions
195199
for _, opt := range options {
196-
opt(ro)
200+
opt(&ro)
197201
}
198202

199203
rows, err := b.db.QueryContext(ctx, `SELECT instance_id, execution_id FROM instances WHERE completed_at < ?`, ro.FinishedBefore)
@@ -213,7 +217,7 @@ func (b *mysqlBackend) RemoveWorkflowInstances(ctx context.Context, options ...b
213217
executionIDs = append(executionIDs, executionID)
214218
}
215219

216-
batchSize := 100
220+
batchSize := ro.BatchSize
217221
for i := 0; i < len(instanceIDs); i += batchSize {
218222
instanceIDs := instanceIDs[i:min(i+batchSize, len(instanceIDs))]
219223
executionIDs := executionIDs[i:min(i+batchSize, len(executionIDs))]
@@ -365,7 +369,11 @@ func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *w
365369

366370
func createInstance(ctx context.Context, tx *sql.Tx, queue workflow.Queue, wfi *workflow.Instance, metadata *workflow.Metadata) error {
367371
// Check for existing instance
368-
if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? AND state = ? LIMIT 1", wfi.InstanceID, core.WorkflowInstanceStateActive).
372+
if err := tx.QueryRowContext(
373+
ctx,
374+
"SELECT 1 FROM `instances` WHERE instance_id = ? AND state = ? LIMIT 1",
375+
wfi.InstanceID,
376+
core.WorkflowInstanceStateActive).
369377
Scan(new(int)); err != sql.ErrNoRows {
370378
return backend.ErrInstanceAlreadyExists
371379
}

backend/redis/redis.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,12 @@ func (b *redisBackend) Options() *backend.Options {
143143
func (rb *redisBackend) Close() error {
144144
return rb.rdb.Close()
145145
}
146+
147+
func (rb *redisBackend) FeatureSupported(feature backend.Feature) bool {
148+
switch feature {
149+
case backend.Feature_Expiration:
150+
return false
151+
}
152+
153+
return true
154+
}

backend/removal.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ import (
66

77
type RemovalOptions struct {
88
FinishedBefore time.Time
9+
BatchSize int
10+
}
11+
12+
var DefaultRemovalOptions = RemovalOptions{
13+
BatchSize: 100,
914
}
1015

1116
type RemovalOption func(o *RemovalOptions)
@@ -15,3 +20,9 @@ func RemoveFinishedBefore(t time.Time) RemovalOption {
1520
o.FinishedBefore = t
1621
}
1722
}
23+
24+
func RemoveFinishedBatchSize(size int) RemovalOption {
25+
return func(o *RemovalOptions) {
26+
o.BatchSize = size
27+
}
28+
}

backend/sqlite/sqlite.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ type sqliteBackend struct {
100100

101101
var _ backend.Backend = (*sqliteBackend)(nil)
102102

103+
func (sb *sqliteBackend) FeatureSupported(feature backend.Feature) bool {
104+
return true
105+
}
106+
103107
func (sb *sqliteBackend) Close() error {
104108
if sb.memConn != nil {
105109
if err := sb.memConn.Close(); err != nil {
@@ -256,9 +260,9 @@ func (sb *sqliteBackend) RemoveWorkflowInstance(ctx context.Context, instance *c
256260
}
257261

258262
func (sb *sqliteBackend) RemoveWorkflowInstances(ctx context.Context, options ...backend.RemovalOption) error {
259-
ro := &backend.RemovalOptions{}
263+
ro := backend.DefaultRemovalOptions
260264
for _, opt := range options {
261-
opt(ro)
265+
opt(&ro)
262266
}
263267

264268
rows, err := sb.db.QueryContext(ctx, `SELECT id, execution_id FROM instances WHERE completed_at < ?`, ro.FinishedBefore)
@@ -278,7 +282,7 @@ func (sb *sqliteBackend) RemoveWorkflowInstances(ctx context.Context, options ..
278282
executionIDs = append(executionIDs, executionID)
279283
}
280284

281-
batchSize := 100
285+
batchSize := ro.BatchSize
282286
for i := 0; i < len(instanceIDs); i += batchSize {
283287
instanceIDs := instanceIDs[i:min(i+batchSize, len(instanceIDs))]
284288
executionIDs := executionIDs[i:min(i+batchSize, len(executionIDs))]

backend/test/e2e_removal.go

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515

1616
var e2eRemovalTests = []backendTest{
1717
{
18-
name: "RemoveWorkflowInstances_Removes",
18+
name: "RemoveWorkflowInstances/Removes",
1919
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
2020
wf := func(ctx workflow.Context) (bool, error) {
2121
return true, nil
@@ -28,20 +28,100 @@ var e2eRemovalTests = []backendTest{
2828
require.NoError(t, err)
2929

3030
now := time.Now()
31-
time.Sleep(300 * time.Millisecond)
3231

33-
_, err = runWorkflowWithResult[bool](t, ctx, c, wf)
32+
for i := 0; i < 10; i++ {
33+
time.Sleep(300 * time.Millisecond)
34+
35+
err = b.RemoveWorkflowInstances(ctx, backend.RemoveFinishedBefore(now))
36+
if errors.As(err, &backend.ErrNotSupported{}) {
37+
t.Skip()
38+
return
39+
}
40+
41+
require.NoError(t, err)
42+
43+
_, err = c.GetWorkflowInstanceState(ctx, workflowA)
44+
if errors.Is(err, backend.ErrInstanceNotFound) {
45+
break
46+
}
47+
}
48+
49+
require.ErrorIs(t, err, backend.ErrInstanceNotFound)
50+
},
51+
},
52+
{
53+
name: "AutoExpiration/StartsWorkflowAndRemoves",
54+
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
55+
wf := func(ctx workflow.Context) (bool, error) {
56+
return true, nil
57+
}
58+
59+
register(t, ctx, w, []interface{}{wf}, nil)
60+
61+
err := c.StartAutoExpiration(ctx, time.Millisecond)
62+
if errors.As(err, &backend.ErrNotSupported{}) {
63+
t.Skip()
64+
return
65+
}
66+
3467
require.NoError(t, err)
3568

36-
err = b.RemoveWorkflowInstances(ctx, backend.RemoveFinishedBefore(now))
69+
workflowA := runWorkflow(t, ctx, c, wf)
70+
_, err = client.GetWorkflowResult[bool](ctx, c, workflowA, time.Second*10)
71+
require.NoError(t, err)
72+
73+
for i := 0; i < 10; i++ {
74+
time.Sleep(100 * time.Millisecond)
75+
_, err = c.GetWorkflowInstanceState(ctx, workflowA)
76+
if err != backend.ErrInstanceNotFound {
77+
continue
78+
} else {
79+
break
80+
}
81+
}
82+
83+
require.ErrorIs(t, err, backend.ErrInstanceNotFound)
84+
},
85+
},
86+
{
87+
name: "AutoExpiration/UpdateDelay",
88+
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
89+
wf := func(ctx workflow.Context) (bool, error) {
90+
return true, nil
91+
}
92+
93+
register(t, ctx, w, []interface{}{wf}, nil)
94+
95+
err := c.StartAutoExpiration(ctx, time.Hour)
3796
if errors.As(err, &backend.ErrNotSupported{}) {
3897
t.Skip()
3998
return
4099
}
41100

42101
require.NoError(t, err)
43102

103+
workflowA := runWorkflow(t, ctx, c, wf)
104+
_, err = client.GetWorkflowResult[bool](ctx, c, workflowA, time.Second*10)
105+
require.NoError(t, err)
106+
107+
time.Sleep(1000 * time.Millisecond)
44108
_, err = c.GetWorkflowInstanceState(ctx, workflowA)
109+
require.NotErrorIs(t, err, backend.ErrInstanceNotFound)
110+
111+
// update delay
112+
err = c.StartAutoExpiration(ctx, time.Millisecond)
113+
require.NoError(t, err)
114+
115+
for i := 0; i < 10; i++ {
116+
time.Sleep(100 * time.Millisecond)
117+
_, err = c.GetWorkflowInstanceState(ctx, workflowA)
118+
if err != backend.ErrInstanceNotFound {
119+
continue
120+
} else {
121+
break
122+
}
123+
}
124+
45125
require.ErrorIs(t, err, backend.ErrInstanceNotFound)
46126
},
47127
},

client/expiration.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"time"
8+
9+
"github.com/cschleiden/go-workflows/backend"
10+
"github.com/cschleiden/go-workflows/internal/workflows"
11+
)
12+
13+
const expirationWorkflowInstanceID = "expiration"
14+
15+
// StartAutoExpiration starts a system workflow that will automatically expire workflow instances.
16+
//
17+
// The workflow will run every `delay` and remove all workflow instances finished before Now() - `delay`.
18+
func (c *Client) StartAutoExpiration(ctx context.Context, delay time.Duration) error {
19+
if !c.backend.FeatureSupported(backend.Feature_Expiration) {
20+
return backend.ErrNotSupported{}
21+
}
22+
23+
_, err := c.CreateWorkflowInstance(ctx, WorkflowInstanceOptions{
24+
InstanceID: expirationWorkflowInstanceID,
25+
}, workflows.ExpireWorkflowInstances, delay)
26+
if err != nil {
27+
if errors.Is(err, backend.ErrInstanceAlreadyExists) {
28+
err = c.SignalWorkflow(ctx, expirationWorkflowInstanceID, workflows.UpdateExpirationSignal, delay)
29+
if err != nil {
30+
return fmt.Errorf("updating expiration workflow: %w", err)
31+
}
32+
33+
return nil
34+
}
35+
36+
return fmt.Errorf("starting expiration workflow: %w", err)
37+
}
38+
39+
return nil
40+
}

internal/workflows/expiration.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package workflows
2+
3+
import (
4+
"context"
5+
"errors"
6+
"log/slog"
7+
"time"
8+
9+
"github.com/cschleiden/go-workflows/backend"
10+
"github.com/cschleiden/go-workflows/core"
11+
"github.com/cschleiden/go-workflows/internal/sync"
12+
"github.com/cschleiden/go-workflows/workflow"
13+
)
14+
15+
const (
16+
maxIterations = 10
17+
18+
UpdateExpirationSignal = "update-expiration"
19+
)
20+
21+
func ExpireWorkflowInstances(ctx workflow.Context, delay time.Duration) error {
22+
logger := workflow.Logger(ctx)
23+
24+
updates := workflow.NewSignalChannel[time.Duration](ctx, UpdateExpirationSignal)
25+
26+
for i := 0; i < maxIterations; i++ {
27+
tctx, cancelTimer := workflow.WithCancel(ctx)
28+
t := workflow.ScheduleTimer(tctx, delay)
29+
30+
timerFired := false
31+
for !timerFired {
32+
workflow.Select(ctx,
33+
workflow.Receive(updates, func(ctx workflow.Context, s time.Duration, _ bool) {
34+
delay = s
35+
36+
cancelTimer()
37+
tctx, cancelTimer = workflow.WithCancel(ctx)
38+
t = workflow.ScheduleTimer(tctx, delay)
39+
}),
40+
workflow.Await(t, func(ctx sync.Context, _ workflow.Future[any]) {
41+
timerFired = true
42+
}),
43+
)
44+
}
45+
46+
before := workflow.Now(ctx).Add(-delay)
47+
48+
logger.Info("removing workflow instances", slog.Time("before", before))
49+
50+
var a *Activities
51+
_, err := workflow.ExecuteActivity[any](
52+
ctx, workflow.ActivityOptions{
53+
Queue: core.QueueSystem,
54+
RetryOptions: workflow.RetryOptions{
55+
MaxAttempts: 2,
56+
},
57+
}, a.RemoveWorkflowInstances, before).Get(ctx)
58+
if err != nil {
59+
if errors.As(err, &backend.ErrNotSupported{}) {
60+
logger.Warn("removing workflow instances not supported")
61+
62+
// Stop execution
63+
return nil
64+
}
65+
66+
logger.Error("removing workflow instances", slog.Any("error", err))
67+
}
68+
}
69+
70+
return workflow.ContinueAsNew(ctx, delay)
71+
}
72+
73+
type Activities struct {
74+
Backend backend.Backend
75+
}
76+
77+
func (a *Activities) RemoveWorkflowInstances(ctx context.Context, before time.Time) error {
78+
return a.Backend.RemoveWorkflowInstances(ctx, backend.RemoveFinishedBefore(before))
79+
}

0 commit comments

Comments
 (0)