Skip to content

Commit 301f210

Browse files
authored
Merge branch 'main' into support-retry-cancellation
2 parents 09f65a7 + 5003264 commit 301f210

File tree

11 files changed

+205
-133
lines changed

11 files changed

+205
-133
lines changed

backend/redis/activity.go

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package redis
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/cschleiden/go-workflows/backend"
78
"github.com/cschleiden/go-workflows/backend/history"
@@ -48,17 +49,36 @@ func (rb *redisBackend) CompleteActivityTask(ctx context.Context, task *backend.
4849
return err
4950
}
5051

51-
p := rb.rdb.TxPipeline()
52-
53-
if err := rb.addWorkflowInstanceEventP(ctx, p, workflow.Queue(instanceState.Queue), task.WorkflowInstance, result); err != nil {
52+
// Marshal event data
53+
eventData, payload, err := marshalEvent(result)
54+
if err != nil {
5455
return err
5556
}
5657

57-
// Unlock activity
58-
if _, err := rb.activityQueue.Complete(ctx, p, task.Queue, task.ID); err != nil {
59-
return err
58+
activityQueueKeys := rb.activityQueue.Keys(task.Queue)
59+
workflowQueueKeys := rb.workflowQueue.Keys(workflow.Queue(instanceState.Queue))
60+
61+
err = completeActivityTaskCmd.Run(ctx, rb.rdb,
62+
[]string{
63+
activityQueueKeys.SetKey,
64+
activityQueueKeys.StreamKey,
65+
rb.keys.pendingEventsKey(task.WorkflowInstance),
66+
rb.keys.payloadKey(task.WorkflowInstance),
67+
rb.workflowQueue.queueSetKey,
68+
workflowQueueKeys.SetKey,
69+
workflowQueueKeys.StreamKey,
70+
},
71+
task.ID,
72+
rb.activityQueue.groupName,
73+
result.ID,
74+
eventData,
75+
payload,
76+
rb.workflowQueue.groupName,
77+
instanceSegment(task.WorkflowInstance),
78+
).Err()
79+
if err != nil {
80+
return fmt.Errorf("completing activity task: %w", err)
6081
}
6182

62-
_, err = p.Exec(ctx)
63-
return err
83+
return nil
6484
}

backend/redis/delete.go

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,14 @@ import (
55
"fmt"
66

77
"github.com/cschleiden/go-workflows/core"
8-
redis "github.com/redis/go-redis/v9"
98
)
109

11-
// KEYS[1] - instance key
12-
// KEYS[2] - pending events key
13-
// KEYS[3] - history key
14-
// KEYS[4] - payload key
15-
// KEYS[5] - active-instance-execution key
16-
// KEYS[6] - instances-by-creation key
17-
// ARGV[1] - instance segment
18-
var deleteCmd = redis.NewScript(
19-
`redis.call("DEL", KEYS[1], KEYS[2], KEYS[3], KEYS[4], KEYS[5])
20-
return redis.call("ZREM", KEYS[6], ARGV[1])`)
21-
2210
// deleteInstance deletes an instance from Redis. It does not attempt to remove any future events or pending
2311
// workflow tasks. It's assumed that the instance is in the finished state.
2412
//
2513
// Note: might want to revisit this in the future if we want to support removing hung instances.
2614
func (rb *redisBackend) deleteInstance(ctx context.Context, instance *core.WorkflowInstance) error {
27-
if err := deleteCmd.Run(ctx, rb.rdb, []string{
15+
if err := deleteInstanceCmd.Run(ctx, rb.rdb, []string{
2816
rb.keys.instanceKey(instance),
2917
rb.keys.pendingEventsKey(instance),
3018
rb.keys.historyKey(instance),

backend/redis/events.go

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
11
package redis
22

33
import (
4-
"context"
54
"encoding/json"
6-
"fmt"
75

86
"github.com/cschleiden/go-workflows/backend/history"
9-
"github.com/cschleiden/go-workflows/core"
10-
"github.com/redis/go-redis/v9"
117
)
128

139
type eventWithoutAttributes struct {
@@ -32,43 +28,3 @@ func marshalEventWithoutAttributes(event *history.Event) (string, error) {
3228

3329
return string(data), nil
3430
}
35-
36-
// KEYS[1 - payload key
37-
// ARGV[1..n] - payload values
38-
var addPayloadsCmd = redis.NewScript(`
39-
for i = 1, #ARGV, 2 do
40-
redis.pcall("HSETNX", KEYS[1], ARGV[i], ARGV[i+1])
41-
end
42-
43-
return 0
44-
`)
45-
46-
func (rb *redisBackend) addEventPayloadsP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, events []*history.Event) error {
47-
args := make([]interface{}, 0)
48-
49-
for _, event := range events {
50-
payload, err := json.Marshal(event.Attributes)
51-
if err != nil {
52-
return fmt.Errorf("marshaling event payload: %w", err)
53-
}
54-
55-
args = append(args, event.ID, string(payload))
56-
}
57-
58-
return addPayloadsCmd.Run(ctx, p, []string{rb.keys.payloadKey(instance)}, args...).Err()
59-
}
60-
61-
func addEventToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, event *history.Event) error {
62-
eventData, err := marshalEventWithoutAttributes(event)
63-
if err != nil {
64-
return err
65-
}
66-
67-
return p.XAdd(ctx, &redis.XAddArgs{
68-
Stream: streamKey,
69-
ID: "*",
70-
Values: map[string]any{
71-
"event": eventData,
72-
},
73-
}).Err()
74-
}

backend/redis/instance.go

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,9 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo
3434
return fmt.Errorf("marshaling instance: %w", err)
3535
}
3636

37-
eventData, err := marshalEventWithoutAttributes(event)
37+
eventData, payloadData, err := marshalEvent(event)
3838
if err != nil {
39-
return fmt.Errorf("marshaling event: %w", err)
40-
}
41-
42-
payloadData, err := json.Marshal(event.Attributes)
43-
if err != nil {
44-
return fmt.Errorf("marshaling event payload: %w", err)
39+
return err
4540
}
4641

4742
keyInfo := rb.workflowQueue.Keys(a.Queue)
@@ -133,12 +128,27 @@ func (rb *redisBackend) CancelWorkflowInstance(ctx context.Context, instance *co
133128
return err
134129
}
135130

131+
// Prepare event data
132+
eventData, payloadData, err := marshalEvent(event)
133+
if err != nil {
134+
return err
135+
}
136+
137+
keyInfo := rb.workflowQueue.Keys(workflow.Queue(instanceState.Queue))
138+
136139
// Cancel instance
137-
if _, err := rb.rdb.Pipelined(ctx, func(p redis.Pipeliner) error {
138-
return rb.addWorkflowInstanceEventP(ctx, p, workflow.Queue(instanceState.Queue), instance, event)
139-
}); err != nil {
140-
// fmt.Println(cmds)
141-
return fmt.Errorf("adding cancellation event to workflow instance: %w", err)
140+
if err := cancelWorkflowInstanceCmd.Run(ctx, rb.rdb, []string{
141+
rb.keys.payloadKey(instance),
142+
rb.keys.pendingEventsKey(instance),
143+
keyInfo.SetKey,
144+
keyInfo.StreamKey,
145+
},
146+
event.ID,
147+
eventData,
148+
payloadData,
149+
instanceSegment(instance),
150+
).Err(); err != nil {
151+
return fmt.Errorf("canceling workflow instance: %w", err)
142152
}
143153

144154
return nil

backend/redis/redis.go

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@ var luaScripts embed.FS
2424
var (
2525
createWorkflowInstanceCmd *redis.Script
2626
completeWorkflowTaskCmd *redis.Script
27+
completeActivityTaskCmd *redis.Script
28+
deleteInstanceCmd *redis.Script
2729
futureEventsCmd *redis.Script
2830
expireWorkflowInstanceCmd *redis.Script
31+
cancelWorkflowInstanceCmd *redis.Script
32+
signalWorkflowCmd *redis.Script
2933
)
3034

3135
func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (*redisBackend, error) {
@@ -60,26 +64,16 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (
6064
activityQueue: activityQueue,
6165
}
6266

63-
// Preload scripts here. Usually redis-go attempts to execute them first, and if redis doesn't know
64-
// them, loads them. This doesn't work when using (transactional) pipelines, so eagerly load them on startup.
65-
cmds := map[string]*redis.StringCmd{
66-
"deleteInstanceCmd": deleteCmd.Load(ctx, rb.rdb),
67-
"addPayloadsCmd": addPayloadsCmd.Load(ctx, rb.rdb),
68-
}
69-
for name, cmd := range cmds {
70-
// fmt.Println(name, cmd.Val())
71-
72-
if cmd.Err() != nil {
73-
return nil, fmt.Errorf("loading redis script: %v %w", name, cmd.Err())
74-
}
75-
}
76-
7767
// Load all Lua scripts
7868
cmdMapping := map[string]**redis.Script{
79-
"create_workflow_instance.lua": &createWorkflowInstanceCmd,
69+
"cancel_workflow_instance.lua": &cancelWorkflowInstanceCmd,
70+
"complete_activity_task.lua": &completeActivityTaskCmd,
8071
"complete_workflow_task.lua": &completeWorkflowTaskCmd,
81-
"schedule_future_events.lua": &futureEventsCmd,
72+
"create_workflow_instance.lua": &createWorkflowInstanceCmd,
73+
"delete_instance.lua": &deleteInstanceCmd,
8274
"expire_workflow_instance.lua": &expireWorkflowInstanceCmd,
75+
"schedule_future_events.lua": &futureEventsCmd,
76+
"signal_workflow.lua": &signalWorkflowCmd,
8377
}
8478

8579
if err := loadScripts(ctx, rb.rdb, cmdMapping); err != nil {
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
local payloadHashKey = KEYS[1]
2+
local pendingEventsKey = KEYS[2]
3+
local workflowSetKey = KEYS[3]
4+
local workflowStreamKey = KEYS[4]
5+
6+
local eventId = ARGV[1]
7+
local eventData = ARGV[2]
8+
local payload = ARGV[3]
9+
local instanceSegment = ARGV[4]
10+
11+
-- Add event payload
12+
redis.pcall("HSETNX", payloadHashKey, eventId, payload)
13+
14+
-- Add event to pending events stream
15+
redis.call("XADD", pendingEventsKey, "*", "event", eventData)
16+
17+
-- Queue workflow task
18+
local added = redis.call("SADD", workflowSetKey, instanceSegment)
19+
if added == 1 then
20+
redis.call("XADD", workflowStreamKey, "*", "id", instanceSegment, "data", "")
21+
end
22+
23+
return true
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
-- Complete an activity task, add the result event to the workflow instance, and enqueue the workflow task
2+
-- KEYS[1] = activity set key
3+
-- KEYS[2] = activity stream key
4+
-- KEYS[3] = pending events stream key
5+
-- KEYS[4] = payload hash key
6+
-- KEYS[5] = workflow queues set key
7+
-- KEYS[6] = workflow set key (for specific queue)
8+
-- KEYS[7] = workflow stream key (for specific queue)
9+
-- ARGV[1] = task id (activity)
10+
-- ARGV[2] = group name (activity group)
11+
-- ARGV[3] = event id
12+
-- ARGV[4] = event data (json, without attributes)
13+
-- ARGV[5] = payload data (json, can be empty)
14+
-- ARGV[6] = workflow queue group name
15+
-- ARGV[7] = workflow instance segment id
16+
17+
-- Complete the activity task (from queue/complete.lua)
18+
local task = redis.call("XRANGE", KEYS[2], ARGV[1], ARGV[1])
19+
if #task == 0 then
20+
return nil
21+
end
22+
23+
local id = task[1][2][2]
24+
redis.call("SREM", KEYS[1], id)
25+
redis.call("XACK", KEYS[2], "NOMKSTREAM", ARGV[2], ARGV[1])
26+
redis.call("XDEL", KEYS[2], ARGV[1])
27+
28+
-- Add event to pending events stream for workflow instance
29+
redis.call("XADD", KEYS[3], "*", "event", ARGV[4])
30+
31+
-- Store payload if provided (only if not empty)
32+
if ARGV[5] ~= "" then
33+
redis.pcall("HSETNX", KEYS[4], ARGV[3], ARGV[5])
34+
end
35+
36+
-- Enqueue workflow task (from queue/enqueue.lua)
37+
redis.call("SADD", KEYS[5], KEYS[6])
38+
local added = redis.call("SADD", KEYS[6], ARGV[7])
39+
if added == 1 then
40+
redis.call("XADD", KEYS[7], "*", "id", ARGV[7], "data", "")
41+
end
42+
43+
return true
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
local instanceKey = KEYS[1]
2+
local pendingEventsKey = KEYS[2]
3+
local historyKey = KEYS[3]
4+
local payloadKey = KEYS[4]
5+
local activeInstanceExecutionKey = KEYS[5]
6+
local instancesByCreationKey = KEYS[6]
7+
8+
local instanceSegment = ARGV[1]
9+
10+
-- Delete all instance-related keys
11+
redis.call("DEL", instanceKey, pendingEventsKey, historyKey, payloadKey, activeInstanceExecutionKey)
12+
13+
-- Remove instance from sorted set
14+
return redis.call("ZREM", instancesByCreationKey, instanceSegment)
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
-- Signal a workflow instance by adding an event to its pending events stream and queuing it
2+
--
3+
-- KEYS[1] - payload hash key
4+
-- KEYS[2] - pending events stream key
5+
-- KEYS[3] - workflow task set key
6+
-- KEYS[4] - workflow task stream key
7+
--
8+
-- ARGV[1] - event id
9+
-- ARGV[2] - event data (JSON)
10+
-- ARGV[3] - event payload (JSON)
11+
-- ARGV[4] - instance segment
12+
13+
local payloadHashKey = KEYS[1]
14+
local pendingEventsKey = KEYS[2]
15+
local workflowSetKey = KEYS[3]
16+
local workflowStreamKey = KEYS[4]
17+
18+
local eventId = ARGV[1]
19+
local eventData = ARGV[2]
20+
local payload = ARGV[3]
21+
local instanceSegment = ARGV[4]
22+
23+
-- Add event payload
24+
redis.pcall("HSETNX", payloadHashKey, eventId, payload)
25+
26+
-- Add event to pending events stream
27+
redis.call("XADD", pendingEventsKey, "*", "event", eventData)
28+
29+
-- Queue workflow task
30+
local added = redis.call("SADD", workflowSetKey, instanceSegment)
31+
if added == 1 then
32+
redis.call("XADD", workflowStreamKey, "*", "id", instanceSegment, "data", "")
33+
end
34+
35+
return true

0 commit comments

Comments
 (0)