Skip to content

Commit 929cfd2

Browse files
authored
Merge pull request #324 from cschleiden/prefix-keys
Support key prefix for redis backend
2 parents 6594a78 + 5537496 commit 929cfd2

File tree

17 files changed

+169
-113
lines changed

17 files changed

+169
-113
lines changed

backend/redis/delete.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@ var deleteCmd = redis.NewScript(
2323
// workflow tasks. It's assumed that the instance is in the finished state.
2424
//
2525
// Note: might want to revisit this in the future if we want to support removing hung instances.
26-
func deleteInstance(ctx context.Context, rdb redis.UniversalClient, instance *core.WorkflowInstance) error {
27-
if err := deleteCmd.Run(ctx, rdb, []string{
28-
instanceKey(instance),
29-
pendingEventsKey(instance),
30-
historyKey(instance),
31-
payloadKey(instance),
32-
activeInstanceExecutionKey(instance.InstanceID),
33-
instancesByCreation(),
26+
func (rb *redisBackend) deleteInstance(ctx context.Context, instance *core.WorkflowInstance) error {
27+
if err := deleteCmd.Run(ctx, rb.rdb, []string{
28+
rb.keys.instanceKey(instance),
29+
rb.keys.pendingEventsKey(instance),
30+
rb.keys.historyKey(instance),
31+
rb.keys.payloadKey(instance),
32+
rb.keys.activeInstanceExecutionKey(instance.InstanceID),
33+
rb.keys.instancesByCreation(),
3434
}, instanceSegment(instance)).Err(); err != nil {
3535
return fmt.Errorf("failed to delete instance: %w", err)
3636
}

backend/redis/diagnostics.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func (rb *redisBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
1818

1919
if afterInstanceID != "" {
2020
afterID := instanceSegment(core.NewWorkflowInstance(afterInstanceID, afterExecutionID))
21-
scores, err := rb.rdb.ZMScore(ctx, instancesByCreation(), afterID).Result()
21+
scores, err := rb.rdb.ZMScore(ctx, rb.keys.instancesByCreation(), afterID).Result()
2222
if err != nil {
2323
return nil, fmt.Errorf("getting instance score for %v: %w", afterID, err)
2424
}
@@ -35,7 +35,7 @@ func (rb *redisBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
3535
}
3636

3737
result, err := rb.rdb.ZRangeArgs(ctx, redis.ZRangeArgs{
38-
Key: instancesByCreation(),
38+
Key: rb.keys.instancesByCreation(),
3939
Stop: max,
4040
Start: "-inf",
4141
ByScore: true,
@@ -49,7 +49,7 @@ func (rb *redisBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
4949
instanceKeys := make([]string, 0)
5050
for _, r := range result {
5151
instanceSegment := r
52-
instanceKeys = append(instanceKeys, instanceKeyFromSegment(instanceSegment))
52+
instanceKeys = append(instanceKeys, rb.keys.instanceKeyFromSegment(instanceSegment))
5353
}
5454

5555
if len(instanceKeys) == 0 {
@@ -85,7 +85,7 @@ func (rb *redisBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
8585
}
8686

8787
func (rb *redisBackend) GetWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceRef, error) {
88-
instanceState, err := readInstance(ctx, rb.rdb, instanceKey(instance))
88+
instanceState, err := readInstance(ctx, rb.rdb, rb.keys.instanceKey(instance))
8989
if err != nil {
9090
return nil, err
9191
}

backend/redis/events.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ var addPayloadsCmd = redis.NewScript(`
4343
return 0
4444
`)
4545

46-
func addEventPayloadsP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, events []*history.Event) error {
46+
func (rb *redisBackend) addEventPayloadsP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, events []*history.Event) error {
4747
args := make([]interface{}, 0)
4848

4949
for _, event := range events {
@@ -55,7 +55,7 @@ func addEventPayloadsP(ctx context.Context, p redis.Pipeliner, instance *core.Wo
5555
args = append(args, event.ID, string(payload))
5656
}
5757

58-
return addPayloadsCmd.Run(ctx, p, []string{payloadKey(instance)}, args...).Err()
58+
return addPayloadsCmd.Run(ctx, p, []string{rb.keys.payloadKey(instance)}, args...).Err()
5959
}
6060

6161
func addEventToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, event *history.Event) error {

backend/redis/events_future.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func scheduleFutureEvents(ctx context.Context, rb *redisBackend) error {
5454
queueKeys := rb.workflowQueue.Keys()
5555

5656
if _, err := futureEventsCmd.Run(ctx, rb.rdb, []string{
57-
futureEventsKey(),
57+
rb.keys.futureEventsKey(),
5858
queueKeys.StreamKey,
5959
queueKeys.SetKey,
6060
}, nowStr).Result(); err != nil && err != redis.Nil {

backend/redis/expire.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,20 @@ var expireCmd = redis.NewScript(
4343
`,
4444
)
4545

46-
func setWorkflowInstanceExpiration(ctx context.Context, rdb redis.UniversalClient, instance *core.WorkflowInstance, expiration time.Duration) error {
46+
func (rb *redisBackend) setWorkflowInstanceExpiration(ctx context.Context, instance *core.WorkflowInstance, expiration time.Duration) error {
4747
now := time.Now().UnixMilli()
4848
nowStr := strconv.FormatInt(now, 10)
4949

5050
exp := time.Now().Add(expiration).UnixMilli()
5151
expStr := strconv.FormatInt(exp, 10)
5252

53-
return expireCmd.Run(ctx, rdb, []string{
54-
instancesByCreation(),
55-
instancesExpiring(),
56-
instanceKey(instance),
57-
pendingEventsKey(instance),
58-
historyKey(instance),
59-
payloadKey(instance),
53+
return expireCmd.Run(ctx, rb.rdb, []string{
54+
rb.keys.instancesByCreation(),
55+
rb.keys.instancesExpiring(),
56+
rb.keys.instanceKey(instance),
57+
rb.keys.pendingEventsKey(instance),
58+
rb.keys.historyKey(instance),
59+
rb.keys.payloadKey(instance),
6060
},
6161
nowStr,
6262
expiration.Seconds(),

backend/redis/instance.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo
4343
}
4444

4545
_, err = createWorkflowInstanceCmd.Run(ctx, rb.rdb, []string{
46-
instanceKey(instance),
47-
activeInstanceExecutionKey(instance.InstanceID),
48-
pendingEventsKey(instance),
49-
payloadKey(instance),
50-
instancesActive(),
51-
instancesByCreation(),
46+
rb.keys.instanceKey(instance),
47+
rb.keys.activeInstanceExecutionKey(instance.InstanceID),
48+
rb.keys.pendingEventsKey(instance),
49+
rb.keys.payloadKey(instance),
50+
rb.keys.instancesActive(),
51+
rb.keys.instancesByCreation(),
5252
keyInfo.SetKey,
5353
keyInfo.StreamKey,
5454
},
@@ -81,7 +81,7 @@ func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
8181
start = "(" + historyID(*lastSequenceID)
8282
}
8383

84-
msgs, err := rb.rdb.XRange(ctx, historyKey(instance), start, "+").Result()
84+
msgs, err := rb.rdb.XRange(ctx, rb.keys.historyKey(instance), start, "+").Result()
8585
if err != nil {
8686
return nil, err
8787
}
@@ -98,7 +98,7 @@ func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
9898
events = append(events, event)
9999
}
100100

101-
res, err := rb.rdb.HMGet(ctx, payloadKey(instance), payloadKeys...).Result()
101+
res, err := rb.rdb.HMGet(ctx, rb.keys.payloadKey(instance), payloadKeys...).Result()
102102
if err != nil {
103103
return nil, fmt.Errorf("reading payloads: %w", err)
104104
}
@@ -114,7 +114,7 @@ func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
114114
}
115115

116116
func (rb *redisBackend) GetWorkflowInstanceState(ctx context.Context, instance *core.WorkflowInstance) (core.WorkflowInstanceState, error) {
117-
instanceState, err := readInstance(ctx, rb.rdb, instanceKey(instance))
117+
instanceState, err := readInstance(ctx, rb.rdb, rb.keys.instanceKey(instance))
118118
if err != nil {
119119
return core.WorkflowInstanceStateActive, err
120120
}
@@ -124,7 +124,7 @@ func (rb *redisBackend) GetWorkflowInstanceState(ctx context.Context, instance *
124124

125125
func (rb *redisBackend) CancelWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, event *history.Event) error {
126126
// Read the instance to check if it exists
127-
_, err := readInstance(ctx, rb.rdb, instanceKey(instance))
127+
_, err := readInstance(ctx, rb.rdb, rb.keys.instanceKey(instance))
128128
if err != nil {
129129
return err
130130
}
@@ -141,7 +141,7 @@ func (rb *redisBackend) CancelWorkflowInstance(ctx context.Context, instance *co
141141
}
142142

143143
func (rb *redisBackend) RemoveWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) error {
144-
i, err := readInstance(ctx, rb.rdb, instanceKey(instance))
144+
i, err := readInstance(ctx, rb.rdb, rb.keys.instanceKey(instance))
145145
if err != nil {
146146
return err
147147
}
@@ -151,7 +151,7 @@ func (rb *redisBackend) RemoveWorkflowInstance(ctx context.Context, instance *co
151151
return backend.ErrInstanceNotFinished
152152
}
153153

154-
return deleteInstance(ctx, rb.rdb, instance)
154+
return rb.deleteInstance(ctx, instance)
155155
}
156156

157157
type instanceState struct {
@@ -199,8 +199,8 @@ func readInstancePipelineCmd(cmd *redis.StringCmd) (*instanceState, error) {
199199
return &state, nil
200200
}
201201

202-
func readActiveInstanceExecution(ctx context.Context, rdb redis.UniversalClient, instanceID string) (*core.WorkflowInstance, error) {
203-
val, err := rdb.Get(ctx, activeInstanceExecutionKey(instanceID)).Result()
202+
func (rb *redisBackend) readActiveInstanceExecution(ctx context.Context, instanceID string) (*core.WorkflowInstance, error) {
203+
val, err := rb.rdb.Get(ctx, rb.keys.activeInstanceExecutionKey(instanceID)).Result()
204204
if err != nil {
205205
if err == redis.Nil {
206206
return nil, nil

backend/redis/keys.go

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,59 +6,72 @@ import (
66
"github.com/cschleiden/go-workflows/core"
77
)
88

9+
type keys struct {
10+
// Ensure prefix ends with `:`
11+
prefix string
12+
}
13+
14+
func newKeys(prefix string) *keys {
15+
if prefix != "" && prefix[len(prefix)-1] != ':' {
16+
prefix += ":"
17+
}
18+
19+
return &keys{prefix: prefix}
20+
}
21+
922
// activeInstanceExecutionKey returns the key for the latest execution of the given instance
10-
func activeInstanceExecutionKey(instanceID string) string {
11-
return fmt.Sprintf("active-instance-execution:%v", instanceID)
23+
func (k *keys) activeInstanceExecutionKey(instanceID string) string {
24+
return fmt.Sprintf("%sactive-instance-execution:%v", k.prefix, instanceID)
1225
}
1326

1427
func instanceSegment(instance *core.WorkflowInstance) string {
1528
return fmt.Sprintf("%v:%v", instance.InstanceID, instance.ExecutionID)
1629
}
1730

18-
func instanceKey(instance *core.WorkflowInstance) string {
19-
return instanceKeyFromSegment(instanceSegment(instance))
31+
func (k *keys) instanceKey(instance *core.WorkflowInstance) string {
32+
return k.instanceKeyFromSegment(instanceSegment(instance))
2033
}
2134

22-
func instanceKeyFromSegment(segment string) string {
23-
return fmt.Sprintf("instance:%v", segment)
35+
func (k *keys) instanceKeyFromSegment(segment string) string {
36+
return fmt.Sprintf("%sinstance:%v", k.prefix, segment)
2437
}
2538

2639
// instancesByCreation returns the key for the ZSET that contains all instances sorted by creation date. The score is the
2740
// creation time as a unix timestamp. Used for listing all workflow instances in the diagnostics UI.
28-
func instancesByCreation() string {
29-
return "instances-by-creation"
41+
func (k *keys) instancesByCreation() string {
42+
return fmt.Sprintf("%sinstances-by-creation", k.prefix)
3043
}
3144

3245
// instancesActive returns the key for the SET that contains all active instances. Used for reporting active workflow
3346
// instances in stats.
34-
func instancesActive() string {
35-
return "instances-active"
47+
func (k *keys) instancesActive() string {
48+
return fmt.Sprintf("%sinstances-active", k.prefix)
3649
}
3750

38-
func instancesExpiring() string {
39-
return "instances-expiring"
51+
func (k *keys) instancesExpiring() string {
52+
return fmt.Sprintf("%sinstances-expiring", k.prefix)
4053
}
4154

42-
func pendingEventsKey(instance *core.WorkflowInstance) string {
43-
return fmt.Sprintf("pending-events:%v", instanceSegment(instance))
55+
func (k *keys) pendingEventsKey(instance *core.WorkflowInstance) string {
56+
return fmt.Sprintf("%spending-events:%v", k.prefix, instanceSegment(instance))
4457
}
4558

46-
func historyKey(instance *core.WorkflowInstance) string {
47-
return fmt.Sprintf("history:%v", instanceSegment(instance))
59+
func (k *keys) historyKey(instance *core.WorkflowInstance) string {
60+
return fmt.Sprintf("%shistory:%v", k.prefix, instanceSegment(instance))
4861
}
4962

5063
func historyID(sequenceID int64) string {
5164
return fmt.Sprintf("%v-0", sequenceID)
5265
}
5366

54-
func futureEventsKey() string {
55-
return "future-events"
67+
func (k *keys) futureEventsKey() string {
68+
return fmt.Sprintf("%sfuture-events", k.prefix)
5669
}
5770

58-
func futureEventKey(instance *core.WorkflowInstance, scheduleEventID int64) string {
59-
return fmt.Sprintf("future-event:%v:%v", instanceSegment(instance), scheduleEventID)
71+
func (k *keys) futureEventKey(instance *core.WorkflowInstance, scheduleEventID int64) string {
72+
return fmt.Sprintf("%sfuture-event:%v:%v", k.prefix, instanceSegment(instance), scheduleEventID)
6073
}
6174

62-
func payloadKey(instance *core.WorkflowInstance) string {
63-
return fmt.Sprintf("payload:%v", instanceSegment(instance))
75+
func (k *keys) payloadKey(instance *core.WorkflowInstance) string {
76+
return fmt.Sprintf("%spayload:%v", k.prefix, instanceSegment(instance))
6477
}

backend/redis/keys_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package redis
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func Test_newKeys(t *testing.T) {
10+
t.Run("WithEmptyPrefix", func(t *testing.T) {
11+
k := newKeys("")
12+
require.Equal(t, "", k.prefix)
13+
})
14+
15+
t.Run("WithNonEmptyPrefixWithoutColon", func(t *testing.T) {
16+
k := newKeys("prefix")
17+
require.Equal(t, "prefix:", k.prefix)
18+
})
19+
20+
t.Run("WithNonEmptyPrefixWithColon", func(t *testing.T) {
21+
k := newKeys("prefix:")
22+
require.Equal(t, "prefix:", k.prefix)
23+
})
24+
}

backend/redis/options.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,23 @@ type RedisOptions struct {
1313

1414
AutoExpiration time.Duration
1515
AutoExpirationContinueAsNew time.Duration
16+
17+
KeyPrefix string
1618
}
1719

1820
type RedisBackendOption func(*RedisOptions)
1921

20-
func WithBlockTimeout(timeout time.Duration) RedisBackendOption {
22+
// WithKeyPrefix sets the prefix for all keys used in the Redis backend.
23+
func WithKeyPrefix(prefix string) RedisBackendOption {
2124
return func(o *RedisOptions) {
22-
o.BlockTimeout = timeout
25+
o.KeyPrefix = prefix
2326
}
2427
}
2528

26-
func WithBackendOptions(opts ...backend.BackendOption) RedisBackendOption {
29+
// WithBlockTimeout sets the timeout for blocking operations like dequeuing a workflow or activity task
30+
func WithBlockTimeout(timeout time.Duration) RedisBackendOption {
2731
return func(o *RedisOptions) {
28-
for _, opt := range opts {
29-
opt(&o.Options)
30-
}
32+
o.BlockTimeout = timeout
3133
}
3234
}
3335

@@ -47,3 +49,11 @@ func WithAutoExpirationContinueAsNew(expireContinuedAsNewRunsAfter time.Duration
4749
o.AutoExpirationContinueAsNew = expireContinuedAsNewRunsAfter
4850
}
4951
}
52+
53+
func WithBackendOptions(opts ...backend.BackendOption) RedisBackendOption {
54+
return func(o *RedisOptions) {
55+
for _, opt := range opts {
56+
opt(&o.Options)
57+
}
58+
}
59+
}

backend/redis/queue.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,21 @@ type KeyInfo struct {
3434
SetKey string
3535
}
3636

37-
func newTaskQueue[T any](rdb redis.UniversalClient, tasktype string) (*taskQueue[T], error) {
37+
func newTaskQueue[T any](rdb redis.UniversalClient, keyPrefix string, tasktype string) (*taskQueue[T], error) {
38+
// Ensure the key prefix ends with a colon
39+
if keyPrefix != "" && keyPrefix[len(keyPrefix)-1] != ':' {
40+
keyPrefix += ":"
41+
}
42+
3843
tq := &taskQueue[T]{
3944
tasktype: tasktype,
40-
setKey: "task-set:" + tasktype,
41-
streamKey: "task-stream:" + tasktype,
45+
setKey: keyPrefix + "task-set:" + tasktype,
46+
streamKey: keyPrefix + "task-stream:" + tasktype,
4247
groupName: "task-workers",
4348
workerName: uuid.NewString(),
4449
}
4550

46-
// Pre-load script
51+
// Pre-load scripts
4752
cmds := map[string]*redis.StringCmd{
4853
"createGroupCmd": createGroupCmd.Load(context.Background(), rdb),
4954
"enqueueCmd": enqueueCmd.Load(context.Background(), rdb),

0 commit comments

Comments
 (0)