Skip to content

Commit e0728be

Browse files
authored
Merge pull request #198 from cschleiden/redis/expire-finished-instances
Automatic expiration of finished workflow instances
2 parents c47f5f3 + 376000a commit e0728be

File tree

8 files changed

+247
-59
lines changed

8 files changed

+247
-59
lines changed

README.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ b := mysql.NewMysqlBackend("localhost", 3306, "root", "SqlPassw0rd", "simple")
145145
#### Redis
146146

147147
```go
148-
b, err := redis.NewRedisBackend("localhost:6379", "user", "RedisPassw0rd", 0)
148+
redisClient := ...
149+
b, err := redis.NewRedisBackend(redisClient)
149150
if err != nil {
150151
panic(err)
151152
}
@@ -251,6 +252,15 @@ if err != nil {
251252
}
252253
```
253254

255+
#### Automatically expiring finished workflow instances
256+
257+
For now this is only supported for the Redis backend. When an `AutoExpiration` is passed to the backend, finished workflow instances will be automatically removed after the specified duration. This works by setting a TTL on the Redis keys for finished workflow instances. If `AutoExpiration` is set to `0` (the default), no TTL will be set.
258+
259+
```go
260+
b, err := redis.NewRedisBackend(redisClient, redis.WithAutoExpiration(time.Hour * 48))
261+
// ...
262+
```
263+
254264
### Canceling workflows
255265

256266
Create a `Client` instance then then call `CancelWorkflow` to cancel a workflow. When a workflow is canceled, it's workflow context is canceled. Any subsequent calls to schedule activities or sub-workflows will immediately return an error, skipping their execution. Any activities already running when a workflow is canceled will still run to completion and their result will be available.

backend/redis/expire.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"strconv"
6+
"time"
7+
8+
redis "github.com/redis/go-redis/v9"
9+
)
10+
11+
// We can't have events for redis..we do not want to do it in-process..
12+
13+
// Set the given expiration time on all keys passed in
14+
// KEYS[1] - instances-by-creation key
15+
// KEYS[2] - instances-expiring key
16+
// KEYS[3] - instance key
17+
// KEYS[4] - pending events key
18+
// KEYS[5] - history key
19+
// ARGV[1] - current timestamp
20+
// ARGV[2] - expiration time in seconds
21+
// ARGV[3] - expiration timestamp in unix milliseconds
22+
// ARGV[4] - instance ID
23+
var expireCmd = redis.NewScript(
24+
`-- Find instances which have already expired and remove from the index set
25+
local expiredInstances = redis.call("ZRANGE", KEYS[2], "-inf", ARGV[1], "BYSCORE")
26+
for i = 1, #expiredInstances do
27+
local instanceID = expiredInstances[i]
28+
redis.call("ZREM", KEYS[1], instanceID) -- index set
29+
redis.call("ZREM", KEYS[2], instanceID) -- expiration set
30+
end
31+
32+
-- Add expiration time for future cleanup
33+
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[4])
34+
35+
-- Set expiration on all keys
36+
for i = 3, #KEYS do
37+
redis.call("EXPIRE", KEYS[i], ARGV[2])
38+
end
39+
40+
return 0
41+
`,
42+
)
43+
44+
func setWorkflowInstanceExpiration(ctx context.Context, rdb redis.UniversalClient, instanceID string, expiration time.Duration) error {
45+
now := time.Now().UnixMilli()
46+
nowStr := strconv.FormatInt(now, 10)
47+
48+
exp := time.Now().Add(expiration).UnixMilli()
49+
expStr := strconv.FormatInt(exp, 10)
50+
51+
return expireCmd.Run(ctx, rdb, []string{
52+
instancesByCreation(),
53+
instancesExpiring(),
54+
instanceKey(instanceID),
55+
pendingEventsKey(instanceID),
56+
historyKey(instanceID),
57+
},
58+
nowStr,
59+
expiration.Seconds(),
60+
expStr,
61+
instanceID,
62+
).Err()
63+
}

backend/redis/expire_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/cschleiden/go-workflows/backend"
9+
"github.com/cschleiden/go-workflows/client"
10+
"github.com/cschleiden/go-workflows/internal/core"
11+
"github.com/cschleiden/go-workflows/worker"
12+
"github.com/cschleiden/go-workflows/workflow"
13+
"github.com/google/uuid"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
func Test_AutoExpiration(t *testing.T) {
18+
if testing.Short() {
19+
t.Skip()
20+
}
21+
22+
autoExpirationTime := time.Second * 1
23+
24+
redisClient := getClient()
25+
setup := getCreateBackend(redisClient, WithAutoExpiration(autoExpirationTime))
26+
b := setup()
27+
28+
c := client.New(b)
29+
w := worker.New(b, &worker.DefaultWorkerOptions)
30+
31+
ctx, cancel := context.WithCancel(context.Background())
32+
33+
require.NoError(t, w.Start(ctx))
34+
35+
wf := func(ctx workflow.Context) error {
36+
return nil
37+
}
38+
39+
w.RegisterWorkflow(wf)
40+
41+
wfi, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
42+
InstanceID: uuid.NewString(),
43+
}, wf)
44+
require.NoError(t, err)
45+
46+
require.NoError(t, c.WaitForWorkflowInstance(ctx, wfi, time.Second*10))
47+
48+
// Wait for redis to expire the keys
49+
time.Sleep(autoExpirationTime)
50+
51+
_, err = b.GetWorkflowInstanceState(ctx, wfi)
52+
require.ErrorIs(t, err, backend.ErrInstanceNotFound)
53+
54+
cancel()
55+
require.NoError(t, w.WaitForCompletion())
56+
}
57+
58+
func Test_AutoExpiration_SubWorkflow(t *testing.T) {
59+
if testing.Short() {
60+
t.Skip()
61+
}
62+
63+
autoExpirationTime := time.Second * 1
64+
65+
redisClient := getClient()
66+
setup := getCreateBackend(redisClient, WithAutoExpiration(autoExpirationTime))
67+
b := setup()
68+
69+
c := client.New(b)
70+
w := worker.New(b, &worker.DefaultWorkerOptions)
71+
72+
ctx, cancel := context.WithCancel(context.Background())
73+
74+
require.NoError(t, w.Start(ctx))
75+
76+
swf := func(ctx workflow.Context) (int, error) {
77+
return 42, nil
78+
}
79+
80+
swfInstanceID := uuid.NewString()
81+
82+
wf := func(ctx workflow.Context) (int, error) {
83+
r, err := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{
84+
InstanceID: swfInstanceID,
85+
}, swf).Get(ctx)
86+
87+
workflow.ScheduleTimer(ctx, time.Second*2).Get(ctx)
88+
89+
return r, err
90+
}
91+
92+
w.RegisterWorkflow(wf)
93+
w.RegisterWorkflow(swf)
94+
95+
wfi, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
96+
InstanceID: uuid.NewString(),
97+
}, wf)
98+
require.NoError(t, err)
99+
100+
r, err := client.GetWorkflowResult[int](ctx, c, wfi, time.Second*10)
101+
require.NoError(t, err)
102+
require.Equal(t, 42, r)
103+
104+
// Subworkflow should be expired by now
105+
_, err = b.GetWorkflowInstanceState(ctx, core.NewWorkflowInstance(swfInstanceID))
106+
require.ErrorIs(t, err, backend.ErrInstanceNotFound)
107+
108+
// Wait for redis to expire the keys
109+
time.Sleep(autoExpirationTime)
110+
111+
// Main workflow should now be expired
112+
_, err = b.GetWorkflowInstanceState(ctx, wfi)
113+
require.ErrorIs(t, err, backend.ErrInstanceNotFound)
114+
115+
cancel()
116+
require.NoError(t, w.WaitForCompletion())
117+
}

backend/redis/keys.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ func instancesByCreation() string {
1414
return "instances-by-creation"
1515
}
1616

17+
func instancesExpiring() string {
18+
return "instances-expiring"
19+
}
20+
1721
func pendingEventsKey(instanceID string) string {
1822
return fmt.Sprintf("pending-events:%v", instanceID)
1923
}

backend/redis/options.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package redis
2+
3+
import (
4+
"time"
5+
6+
"github.com/cschleiden/go-workflows/backend"
7+
)
8+
9+
type RedisOptions struct {
10+
backend.Options
11+
12+
BlockTimeout time.Duration
13+
14+
AutoExpiration time.Duration
15+
}
16+
17+
type RedisBackendOption func(*RedisOptions)
18+
19+
func WithBlockTimeout(timeout time.Duration) RedisBackendOption {
20+
return func(o *RedisOptions) {
21+
o.BlockTimeout = timeout
22+
}
23+
}
24+
25+
func WithBackendOptions(opts ...backend.BackendOption) RedisBackendOption {
26+
return func(o *RedisOptions) {
27+
for _, opt := range opts {
28+
opt(&o.Options)
29+
}
30+
}
31+
}
32+
33+
// WithAutoExpiration sets the duration after which finished runs will expire from the data store.
34+
// If set to 0 (default), runs will never expire and need to be manually removed.
35+
func WithAutoExpiration(expireFinishedRunsAfter time.Duration) RedisBackendOption {
36+
return func(o *RedisOptions) {
37+
o.AutoExpiration = expireFinishedRunsAfter
38+
}
39+
}

backend/redis/redis.go

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,6 @@ import (
1616
"go.opentelemetry.io/otel/trace"
1717
)
1818

19-
type RedisOptions struct {
20-
backend.Options
21-
22-
BlockTimeout time.Duration
23-
}
24-
25-
type RedisBackendOption func(*RedisOptions)
26-
27-
func WithBlockTimeout(timeout time.Duration) RedisBackendOption {
28-
return func(o *RedisOptions) {
29-
o.BlockTimeout = timeout
30-
}
31-
}
32-
33-
func WithBackendOptions(opts ...backend.BackendOption) RedisBackendOption {
34-
return func(o *RedisOptions) {
35-
for _, opt := range opts {
36-
opt(&o.Options)
37-
}
38-
}
39-
}
40-
4119
var _ backend.Backend = (*redisBackend)(nil)
4220

4321
func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (*redisBackend, error) {
@@ -80,6 +58,7 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (
8058
"removePendingEventsCmd": removePendingEventsCmd.Load(ctx, rb.rdb),
8159
"requeueInstanceCmd": requeueInstanceCmd.Load(ctx, rb.rdb),
8260
"deleteInstanceCmd": deleteCmd.Load(ctx, rb.rdb),
61+
"expireInstanceCmd": expireCmd.Load(ctx, rb.rdb),
8362
}
8463
for name, cmd := range cmds {
8564
// fmt.Println(name, cmd.Val())

backend/redis/redis_test.go

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,8 @@ import (
88
"testing"
99
"time"
1010

11-
"github.com/cschleiden/go-workflows/backend"
1211
"github.com/cschleiden/go-workflows/backend/test"
1312
"github.com/cschleiden/go-workflows/internal/history"
14-
"github.com/cschleiden/go-workflows/log"
1513
"github.com/redis/go-redis/v9"
1614
)
1715

@@ -27,7 +25,7 @@ func Test_RedisBackend(t *testing.T) {
2725
}
2826

2927
client := getClient()
30-
setup := getCreateBackend(client, false)
28+
setup := getCreateBackend(client)
3129

3230
test.BackendTest(t, setup, nil)
3331
}
@@ -38,7 +36,7 @@ func Test_EndToEndRedisBackend(t *testing.T) {
3836
}
3937

4038
client := getClient()
41-
setup := getCreateBackend(client, false)
39+
setup := getCreateBackend(client)
4240

4341
test.EndToEndBackendTest(t, setup, nil)
4442
}
@@ -54,7 +52,7 @@ func getClient() redis.UniversalClient {
5452
return client
5553
}
5654

57-
func getCreateBackend(client redis.UniversalClient, ignoreLog bool) func() test.TestBackend {
55+
func getCreateBackend(client redis.UniversalClient, additionalOptions ...RedisBackendOption) func() test.TestBackend {
5856
return func() test.TestBackend {
5957
// Flush database
6058
if err := client.FlushDB(context.Background()).Err(); err != nil {
@@ -74,9 +72,7 @@ func getCreateBackend(client redis.UniversalClient, ignoreLog bool) func() test.
7472
WithBlockTimeout(time.Millisecond * 10),
7573
}
7674

77-
if ignoreLog {
78-
options = append(options, WithBackendOptions(backend.WithLogger(&nullLogger{})))
79-
}
75+
options = append(options, additionalOptions...)
8076

8177
b, err := NewRedisBackend(client, options...)
8278
if err != nil {
@@ -87,33 +83,6 @@ func getCreateBackend(client redis.UniversalClient, ignoreLog bool) func() test.
8783
}
8884
}
8985

90-
type nullLogger struct {
91-
defaultFields []interface{}
92-
}
93-
94-
// Debug implements log.Logger
95-
func (*nullLogger) Debug(msg string, fields ...interface{}) {
96-
}
97-
98-
// Error implements log.Logger
99-
func (*nullLogger) Error(msg string, fields ...interface{}) {
100-
}
101-
102-
// Panic implements log.Logger
103-
func (*nullLogger) Panic(msg string, fields ...interface{}) {
104-
}
105-
106-
// Warn implements log.Logger
107-
func (*nullLogger) Warn(msg string, fields ...interface{}) {
108-
}
109-
110-
// With implements log.Logger
111-
func (nl *nullLogger) With(fields ...interface{}) log.Logger {
112-
return nl
113-
}
114-
115-
var _ log.Logger = (*nullLogger)(nil)
116-
11786
var _ test.TestBackend = (*redisBackend)(nil)
11887

11988
// GetFutureEvents

0 commit comments

Comments
 (0)