Skip to content

Commit c54d9f0

Browse files
authored
Merge pull request #273 from cschleiden/cschleiden/redis-payloads2
2 parents 2d8c65e + d309315 commit c54d9f0

File tree

5 files changed

+136
-28
lines changed

5 files changed

+136
-28
lines changed

backend/redis/events.go

Lines changed: 74 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,66 @@ package redis
33
import (
44
"context"
55
"encoding/json"
6+
"fmt"
67
"strconv"
78

89
"github.com/cschleiden/go-workflows/backend/history"
910
"github.com/cschleiden/go-workflows/core"
1011
"github.com/redis/go-redis/v9"
1112
)
1213

14+
type eventWithoutAttributes struct {
15+
*history.Event
16+
}
17+
18+
func (e *eventWithoutAttributes) MarshalJSON() ([]byte, error) {
19+
return json.Marshal(&struct {
20+
*history.Event
21+
Attributes interface{} `json:"attr"`
22+
}{
23+
Event: e.Event,
24+
Attributes: nil,
25+
})
26+
}
27+
28+
func marshalEventWithoutAttributes(event *history.Event) (string, error) {
29+
data, err := json.Marshal(&eventWithoutAttributes{event})
30+
if err != nil {
31+
return "", err
32+
}
33+
34+
return string(data), nil
35+
}
36+
37+
// KEYS[1..n] - payload keys
38+
// ARGV[1..n] - payload values
39+
var addPayloadsCmd = redis.NewScript(`
40+
for i = 1, #ARGV do
41+
redis.pcall("SET", KEYS[i], ARGV[i], "NX")
42+
end
43+
44+
return 0
45+
`)
46+
47+
func addEventPayloads(ctx context.Context, p redis.Pipeliner, events []*history.Event) error {
48+
keys := make([]string, 0)
49+
values := make([]interface{}, 0)
50+
51+
for _, event := range events {
52+
payload, err := json.Marshal(event.Attributes)
53+
if err != nil {
54+
return fmt.Errorf("marshaling event payload: %w", err)
55+
}
56+
57+
keys = append(keys, payloadKey(event.ID))
58+
values = append(values, string(payload))
59+
}
60+
61+
return addPayloadsCmd.Run(ctx, p, keys, values...).Err()
62+
}
63+
1364
func addEventToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, event *history.Event) error {
14-
eventData, err := json.Marshal(event)
65+
eventData, err := marshalEventWithoutAttributes(event)
1566
if err != nil {
1667
return err
1768
}
@@ -37,10 +88,10 @@ var addEventsToStreamCmd = redis.NewScript(`
3788
return msgID
3889
`)
3990

40-
func addEventsToHistoryStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, events []*history.Event) error {
91+
func addEventsToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, events []*history.Event) error {
4192
eventsData := make([]string, 0)
4293
for _, event := range events {
43-
eventData, err := json.Marshal(event)
94+
eventData, err := marshalEventWithoutAttributes(event)
4495
if err != nil {
4596
return err
4697
}
@@ -56,37 +107,49 @@ func addEventsToHistoryStreamP(ctx context.Context, p redis.Pipeliner, streamKey
56107
return nil
57108
}
58109

110+
// Adds an event to be delivered in the future. Not cluster-safe.
59111
// KEYS[1] - future event zset key
60112
// KEYS[2] - future event key
113+
// KEYS[3] - future event payload key
61114
// ARGV[1] - timestamp
62115
// ARGV[2] - Instance segment
63-
// ARGV[3] - event payload
116+
// ARGV[3] - event data
117+
// ARGV[4] - event payload
64118
var addFutureEventCmd = redis.NewScript(`
65119
redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2])
66-
return redis.call("HSET", KEYS[2], "instance", ARGV[2], "event", ARGV[3])
120+
redis.call("HSET", KEYS[2], "instance", ARGV[2], "event", ARGV[3], "payload", KEYS[3])
121+
redis.call("SET", KEYS[3], ARGV[4], "NX")
122+
return 0
67123
`)
68124

69125
func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) error {
70-
eventData, err := json.Marshal(event)
126+
eventData, err := marshalEventWithoutAttributes(event)
71127
if err != nil {
72128
return err
73129
}
74130

75-
addFutureEventCmd.Run(
131+
payloadEventData, err := json.Marshal(event.Attributes)
132+
if err != nil {
133+
return err
134+
}
135+
136+
return addFutureEventCmd.Run(
76137
ctx, p,
77-
[]string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID)},
138+
[]string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID), payloadKey(event.ID)},
78139
strconv.FormatInt(event.VisibleAt.UnixMilli(), 10),
79140
instanceSegment(instance),
80141
string(eventData),
81-
)
82-
83-
return nil
142+
string(payloadEventData),
143+
).Err()
84144
}
85145

146+
// Remove a scheduled future event. Not cluster-safe.
86147
// KEYS[1] - future event zset key
87148
// KEYS[2] - future event key
88149
var removeFutureEventCmd = redis.NewScript(`
89150
redis.call("ZREM", KEYS[1], KEYS[2])
151+
local k = redis.call("HGET", KEYS[2], "payload")
152+
redis.call("DEL", k)
90153
return redis.call("DEL", KEYS[2])
91154
`)
92155

backend/redis/instance.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,14 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo
3030
return err
3131
}
3232

33-
// Create event stream
34-
eventData, err := json.Marshal(event)
35-
if err != nil {
36-
return err
33+
// Create event stream with initial event
34+
if err := addEventPayloads(ctx, p, []*history.Event{event}); err != nil {
35+
return fmt.Errorf("adding event payloads: %w", err)
3736
}
3837

39-
p.XAdd(ctx, &redis.XAddArgs{
40-
Stream: pendingEventsKey(instance),
41-
ID: "*",
42-
Values: map[string]interface{}{
43-
"event": string(eventData),
44-
},
45-
})
38+
if err := addEventToStreamP(ctx, p, pendingEventsKey(instance), event); err != nil {
39+
return fmt.Errorf("adding event to stream: %w", err)
40+
}
4641

4742
// Queue workflow instance task
4843
if err := rb.workflowQueue.Enqueue(ctx, p, instanceSegment(instance), nil); err != nil {
@@ -70,16 +65,30 @@ func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
7065
return nil, err
7166
}
7267

68+
payloadKeys := make([]string, 0, len(msgs))
7369
var events []*history.Event
7470
for _, msg := range msgs {
7571
var event *history.Event
7672
if err := json.Unmarshal([]byte(msg.Values["event"].(string)), &event); err != nil {
7773
return nil, fmt.Errorf("unmarshaling event: %w", err)
7874
}
7975

76+
payloadKeys = append(payloadKeys, payloadKey(event.ID))
8077
events = append(events, event)
8178
}
8279

80+
res, err := rb.rdb.MGet(ctx, payloadKeys...).Result()
81+
if err != nil {
82+
return nil, fmt.Errorf("reading payloads: %w", err)
83+
}
84+
85+
for i, event := range events {
86+
event.Attributes, err = history.DeserializeAttributes(event.Type, []byte(res[i].(string)))
87+
if err != nil {
88+
return nil, fmt.Errorf("deserializing attributes for event %v: %w", event.Type, err)
89+
}
90+
}
91+
8392
return events, nil
8493
}
8594

backend/redis/keys.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,7 @@ func futureEventsKey() string {
5656
func futureEventKey(instance *core.WorkflowInstance, scheduleEventID int64) string {
5757
return fmt.Sprintf("future-event:%v:%v:%v", instance.InstanceID, instance.ExecutionID, scheduleEventID)
5858
}
59+
60+
func payloadKey(eventID string) string {
61+
return fmt.Sprintf("payload:%v", eventID)
62+
}

backend/redis/redis.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (
6060
"requeueInstanceCmd": requeueInstanceCmd.Load(ctx, rb.rdb),
6161
"deleteInstanceCmd": deleteCmd.Load(ctx, rb.rdb),
6262
"expireInstanceCmd": expireCmd.Load(ctx, rb.rdb),
63+
"addPayloadsCmd": addPayloadsCmd.Load(ctx, rb.rdb),
6364
}
6465
for name, cmd := range cmds {
6566
// fmt.Println(name, cmd.Val())

backend/redis/workflow.go

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowT
9090
return nil, fmt.Errorf("reading event stream: %w", err)
9191
}
9292

93+
payloadKeys := make([]string, 0, len(msgs))
9394
newEvents := make([]*history.Event, 0, len(msgs))
9495
for _, msg := range msgs {
9596
var event *history.Event
@@ -98,9 +99,25 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowT
9899
return nil, fmt.Errorf("unmarshaling event: %w", err)
99100
}
100101

102+
payloadKeys = append(payloadKeys, payloadKey(event.ID))
101103
newEvents = append(newEvents, event)
102104
}
103105

106+
// Fetch event payloads
107+
if len(payloadKeys) > 0 {
108+
res, err := rb.rdb.MGet(ctx, payloadKeys...).Result()
109+
if err != nil {
110+
return nil, fmt.Errorf("reading payloads: %w", err)
111+
}
112+
113+
for i, event := range newEvents {
114+
event.Attributes, err = history.DeserializeAttributes(event.Type, []byte(res[i].(string)))
115+
if err != nil {
116+
return nil, fmt.Errorf("deserializing attributes for event %v: %w", event.Type, err)
117+
}
118+
}
119+
}
120+
104121
return &backend.WorkflowTask{
105122
ID: instanceTask.TaskID,
106123
WorkflowInstance: instanceState.Instance,
@@ -161,11 +178,15 @@ func (rb *redisBackend) CompleteWorkflowTask(
161178

162179
// Check-point the workflow. We guarantee that no other worker is working on this workflow instance at this point via the
163180
// task queue, so we don't need to WATCH the keys, we just need to make sure all commands are executed atomically to prevent
164-
// a worker crashing in the middle of this execution.
181+
// bad state if a worker crashes in the middle of this execution.
165182
p := rb.rdb.TxPipeline()
166183

167184
// Add executed events to the history
168-
if err := addEventsToHistoryStreamP(ctx, p, historyKey(instance), executedEvents); err != nil {
185+
if err := addEventPayloads(ctx, p, executedEvents); err != nil {
186+
return fmt.Errorf("adding event payloads: %w", err)
187+
}
188+
189+
if err := addEventsToStreamP(ctx, p, historyKey(instance), executedEvents); err != nil {
169190
return fmt.Errorf("serializing : %w", err)
170191
}
171192

@@ -179,7 +200,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
179200
// Schedule timers
180201
for _, timerEvent := range timerEvents {
181202
if err := addFutureEventP(ctx, p, instance, timerEvent); err != nil {
182-
return err
203+
return fmt.Errorf("adding future event: %w", err)
183204
}
184205
}
185206

@@ -199,8 +220,12 @@ func (rb *redisBackend) CompleteWorkflowTask(
199220
}
200221

201222
// Add pending event to stream
223+
if err := addEventPayloads(ctx, p, []*history.Event{m.HistoryEvent}); err != nil {
224+
return fmt.Errorf("adding event payloads: %w", err)
225+
}
226+
202227
if err := addEventToStreamP(ctx, p, pendingEventsKey(&targetInstance), m.HistoryEvent); err != nil {
203-
return err
228+
return fmt.Errorf("adding event to stream: %w", err)
204229
}
205230
}
206231

@@ -243,7 +268,9 @@ func (rb *redisBackend) CompleteWorkflowTask(
243268
// Remove executed pending events
244269
if task.CustomData != nil {
245270
lastPendingEventMessageID := task.CustomData.(string)
246-
removePendingEventsCmd.Run(ctx, p, []string{pendingEventsKey(instance)}, lastPendingEventMessageID)
271+
if err := removePendingEventsCmd.Run(ctx, p, []string{pendingEventsKey(instance)}, lastPendingEventMessageID).Err(); err != nil {
272+
return fmt.Errorf("removing pending events: %w", err)
273+
}
247274
}
248275

249276
// Complete workflow task and unlock instance.
@@ -268,7 +295,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
268295

269296
for _, cmd := range executedCmds {
270297
if cmdErr := cmd.Err(); cmdErr != nil {
271-
rb.Logger().Debug("redis command error", log.NamespaceKey+".redis.cmd", cmd.FullName(), log.NamespaceKey+".redis.cmdErr", cmdErr.Error())
298+
rb.Logger().Debug("redis command error", log.NamespaceKey+".redis.cmd", cmd.Name(), "cmdString", cmd.String(), log.NamespaceKey+".redis.cmdErr", cmdErr.Error())
272299
}
273300
}
274301

@@ -300,6 +327,10 @@ func (rb *redisBackend) CompleteWorkflowTask(
300327

301328
func (rb *redisBackend) addWorkflowInstanceEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) error {
302329
// Add event to pending events for instance
330+
if err := addEventPayloads(ctx, p, []*history.Event{event}); err != nil {
331+
return err
332+
}
333+
303334
if err := addEventToStreamP(ctx, p, pendingEventsKey(instance), event); err != nil {
304335
return err
305336
}

0 commit comments

Comments
 (0)