Skip to content

Commit a4ea6b5

Browse files
committed
Auto expire finished workflow instances
1 parent 3eb835f commit a4ea6b5

File tree

7 files changed

+144
-37
lines changed

7 files changed

+144
-37
lines changed

README.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ b := mysql.NewMysqlBackend("localhost", 3306, "root", "SqlPassw0rd", "simple")
145145
#### Redis
146146

147147
```go
148-
b, err := redis.NewRedisBackend("localhost:6379", "user", "RedisPassw0rd", 0)
148+
redisClient := ...
149+
b, err := redis.NewRedisBackend(redisClient)
149150
if err != nil {
150151
panic(err)
151152
}
@@ -251,6 +252,15 @@ if err != nil {
251252
}
252253
```
253254

255+
#### Automatically expiring finished workflow instances
256+
257+
For now this is only supported for the Redis backend. When an `AutoExpiration` is passed to the backend, finished workflow instances will be automatically removed after the specified duration. This works by setting a TTL on the Redis keys for finished workflow instances. If `AutoExpiration` is set to `0` (the default), no TTL will be set.
258+
259+
```go
260+
b, err := redis.NewRedisBackend(redisClient, redis.WithAutoExpiration(time.Hour * 48))
261+
// ...
262+
```
263+
254264
### Canceling workflows
255265

256266
Create a `Client` instance then then call `CancelWorkflow` to cancel a workflow. When a workflow is canceled, it's workflow context is canceled. Any subsequent calls to schedule activities or sub-workflows will immediately return an error, skipping their execution. Any activities already running when a workflow is canceled will still run to completion and their result will be available.

backend/redis/expire.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"strconv"
6+
"time"
7+
8+
redis "github.com/redis/go-redis/v9"
9+
)
10+
11+
// We can't have events for redis..we do not want to do it in-process..
12+
13+
// Set the given expiration time on all keys passed in
14+
// KEYS[1] - instances-by-creation key
15+
// KEYS[2] - instances-expiring key
16+
// KEYS[3] - instance key
17+
// KEYS[4] - pending events key
18+
// KEYS[5] - history key
19+
// ARGV[1] - current timestamp
20+
// ARGV[2] - expiration time in seconds
21+
// ARGV[3] - expiration timestamp in unix milliseconds
22+
// ARGV[4] - instance ID
23+
var expireCmd = redis.NewScript(
24+
`-- Find instances which have already expired and remove from the index set
25+
local expiredInstances = redis.call("ZRANGE", KEYS[2], "-inf", ARGV[1], "BYSCORE")
26+
for i = 1, #expiredInstances do
27+
local instanceID = expiredInstances[i]
28+
redis.call("ZREM", KEYS[1], instanceID) -- index set
29+
redis.call("ZREM", KEYS[2], instanceID) -- expiration set
30+
end
31+
32+
-- Add expiration time for future cleanup
33+
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[4])
34+
35+
-- Set expiration on all keys
36+
for i = 3, #KEYS do
37+
redis.call("EXPIRE", KEYS[i], ARGV[2])
38+
end
39+
40+
return 0
41+
`,
42+
)
43+
44+
func setWorkflowInstanceExpiration(ctx context.Context, rdb redis.UniversalClient, instanceID string, expiration time.Duration) error {
45+
now := time.Now().UnixMilli()
46+
nowStr := strconv.FormatInt(now, 10)
47+
48+
exp := time.Now().Add(expiration).UnixMilli()
49+
expStr := strconv.FormatInt(exp, 10)
50+
51+
return expireCmd.Run(ctx, rdb, []string{
52+
instancesByCreation(),
53+
instancesExpiring(),
54+
instanceKey(instanceID),
55+
pendingEventsKey(instanceID),
56+
historyKey(instanceID),
57+
},
58+
nowStr,
59+
expiration.Seconds(),
60+
expStr,
61+
instanceID,
62+
).Err()
63+
}

backend/redis/expire_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/cschleiden/go-workflows/backend"
9+
"github.com/cschleiden/go-workflows/client"
10+
"github.com/cschleiden/go-workflows/worker"
11+
"github.com/cschleiden/go-workflows/workflow"
12+
"github.com/google/uuid"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
func Test_AutoExpiration(t *testing.T) {
17+
if testing.Short() {
18+
t.Skip()
19+
}
20+
21+
autoExpirationTime := time.Second * 1
22+
23+
redisClient := getClient()
24+
setup := getCreateBackend(redisClient, WithAutoExpiration(autoExpirationTime))
25+
b := setup()
26+
27+
c := client.New(b)
28+
w := worker.New(b, &worker.DefaultWorkerOptions)
29+
30+
ctx, cancel := context.WithCancel(context.Background())
31+
32+
require.NoError(t, w.Start(ctx))
33+
34+
wf := func(ctx workflow.Context) error {
35+
return nil
36+
}
37+
38+
wfi, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
39+
InstanceID: uuid.NewString(),
40+
}, wf)
41+
require.NoError(t, err)
42+
43+
require.NoError(t, c.WaitForWorkflowInstance(ctx, wfi, time.Second*10))
44+
45+
// Wait for redis to expire the keys
46+
time.Sleep(autoExpirationTime)
47+
48+
_, err = b.GetWorkflowInstanceState(ctx, wfi)
49+
require.ErrorIs(t, err, backend.ErrInstanceNotFound)
50+
51+
cancel()
52+
require.NoError(t, w.WaitForCompletion())
53+
}

backend/redis/keys.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ func instancesByCreation() string {
1414
return "instances-by-creation"
1515
}
1616

17+
func instancesExpiring() string {
18+
return "instances-expiring"
19+
}
20+
1721
func pendingEventsKey(instanceID string) string {
1822
return fmt.Sprintf("pending-events:%v", instanceID)
1923
}

backend/redis/redis.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (
5858
"removePendingEventsCmd": removePendingEventsCmd.Load(ctx, rb.rdb),
5959
"requeueInstanceCmd": requeueInstanceCmd.Load(ctx, rb.rdb),
6060
"deleteInstanceCmd": deleteCmd.Load(ctx, rb.rdb),
61+
"expireInstanceCmd": expireCmd.Load(ctx, rb.rdb),
6162
}
6263
for name, cmd := range cmds {
6364
// fmt.Println(name, cmd.Val())

backend/redis/redis_test.go

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,8 @@ import (
88
"testing"
99
"time"
1010

11-
"github.com/cschleiden/go-workflows/backend"
1211
"github.com/cschleiden/go-workflows/backend/test"
1312
"github.com/cschleiden/go-workflows/internal/history"
14-
"github.com/cschleiden/go-workflows/log"
1513
"github.com/redis/go-redis/v9"
1614
)
1715

@@ -27,7 +25,7 @@ func Test_RedisBackend(t *testing.T) {
2725
}
2826

2927
client := getClient()
30-
setup := getCreateBackend(client, false)
28+
setup := getCreateBackend(client)
3129

3230
test.BackendTest(t, setup, nil)
3331
}
@@ -38,7 +36,7 @@ func Test_EndToEndRedisBackend(t *testing.T) {
3836
}
3937

4038
client := getClient()
41-
setup := getCreateBackend(client, false)
39+
setup := getCreateBackend(client)
4240

4341
test.EndToEndBackendTest(t, setup, nil)
4442
}
@@ -54,7 +52,7 @@ func getClient() redis.UniversalClient {
5452
return client
5553
}
5654

57-
func getCreateBackend(client redis.UniversalClient, ignoreLog bool) func() test.TestBackend {
55+
func getCreateBackend(client redis.UniversalClient, additionalOptions ...RedisBackendOption) func() test.TestBackend {
5856
return func() test.TestBackend {
5957
// Flush database
6058
if err := client.FlushDB(context.Background()).Err(); err != nil {
@@ -74,9 +72,7 @@ func getCreateBackend(client redis.UniversalClient, ignoreLog bool) func() test.
7472
WithBlockTimeout(time.Millisecond * 10),
7573
}
7674

77-
if ignoreLog {
78-
options = append(options, WithBackendOptions(backend.WithLogger(&nullLogger{})))
79-
}
75+
options = append(options, additionalOptions...)
8076

8177
b, err := NewRedisBackend(client, options...)
8278
if err != nil {
@@ -87,33 +83,6 @@ func getCreateBackend(client redis.UniversalClient, ignoreLog bool) func() test.
8783
}
8884
}
8985

90-
type nullLogger struct {
91-
defaultFields []interface{}
92-
}
93-
94-
// Debug implements log.Logger
95-
func (*nullLogger) Debug(msg string, fields ...interface{}) {
96-
}
97-
98-
// Error implements log.Logger
99-
func (*nullLogger) Error(msg string, fields ...interface{}) {
100-
}
101-
102-
// Panic implements log.Logger
103-
func (*nullLogger) Panic(msg string, fields ...interface{}) {
104-
}
105-
106-
// Warn implements log.Logger
107-
func (*nullLogger) Warn(msg string, fields ...interface{}) {
108-
}
109-
110-
// With implements log.Logger
111-
func (nl *nullLogger) With(fields ...interface{}) log.Logger {
112-
return nl
113-
}
114-
115-
var _ log.Logger = (*nullLogger)(nil)
116-
11786
var _ test.TestBackend = (*redisBackend)(nil)
11887

11988
// GetFutureEvents

backend/redis/workflow.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
203203
}
204204
}
205205

206-
// Try to queue workflow task
206+
// Try to enqueue workflow task
207207
if targetInstanceID != instance.InstanceID {
208208
if err := rb.workflowQueue.Enqueue(ctx, p, targetInstanceID, nil); err != nil {
209209
return fmt.Errorf("enqueuing workflow task: %w", err)
@@ -273,12 +273,19 @@ func (rb *redisBackend) CompleteWorkflowTask(
273273
}
274274

275275
if state == core.WorkflowInstanceStateFinished {
276+
// Trace workflow completion
276277
ctx = tracing.UnmarshalSpan(ctx, instanceState.Metadata)
277278
_, span := rb.Tracer().Start(ctx, "WorkflowComplete",
278279
trace.WithAttributes(
279280
attribute.String("workflow_instance_id", instanceState.Instance.InstanceID),
280281
))
281282
span.End()
283+
284+
if rb.options.AutoExpiration > 0 {
285+
if err := setWorkflowInstanceExpiration(ctx, rb.rdb, instance.InstanceID, rb.options.AutoExpiration); err != nil {
286+
return fmt.Errorf("setting workflow instance expiration: %w", err)
287+
}
288+
}
282289
}
283290

284291
return nil

0 commit comments

Comments
 (0)