Skip to content

Commit 46a336c

Browse files
authored
Merge pull request #286 from cschleiden/redis-payload-hash
Use HASH for event payloads
2 parents 7b9a605 + 0f811c5 commit 46a336c

File tree

4 files changed

+29
-28
lines changed

4 files changed

+29
-28
lines changed

backend/redis/events.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,31 +34,29 @@ func marshalEventWithoutAttributes(event *history.Event) (string, error) {
3434
return string(data), nil
3535
}
3636

37-
// KEYS[1..n] - payload keys
37+
// KEYS[1 - payload key
3838
// ARGV[1..n] - payload values
3939
var addPayloadsCmd = redis.NewScript(`
40-
for i = 1, #ARGV do
41-
redis.pcall("SET", KEYS[i], ARGV[i], "NX")
40+
for i = 1, #ARGV, 2 do
41+
redis.pcall("HSETNX", KEYS[1], ARGV[i], ARGV[i+1])
4242
end
4343
4444
return 0
4545
`)
4646

47-
func addEventPayloads(ctx context.Context, p redis.Pipeliner, events []*history.Event) error {
48-
keys := make([]string, 0)
49-
values := make([]interface{}, 0)
47+
func addEventPayloadsP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, events []*history.Event) error {
48+
args := make([]interface{}, 0)
5049

5150
for _, event := range events {
5251
payload, err := json.Marshal(event.Attributes)
5352
if err != nil {
5453
return fmt.Errorf("marshaling event payload: %w", err)
5554
}
5655

57-
keys = append(keys, payloadKey(event.ID))
58-
values = append(values, string(payload))
56+
args = append(args, event.ID, string(payload))
5957
}
6058

61-
return addPayloadsCmd.Run(ctx, p, keys, values...).Err()
59+
return addPayloadsCmd.Run(ctx, p, []string{payloadKey(instance)}, args...).Err()
6260
}
6361

6462
func addEventToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, event *history.Event) error {
@@ -110,15 +108,16 @@ func addEventsToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string
110108
// Adds an event to be delivered in the future. Not cluster-safe.
111109
// KEYS[1] - future event zset key
112110
// KEYS[2] - future event key
113-
// KEYS[3] - future event payload key
111+
// KEYS[3] - instance payload key
114112
// ARGV[1] - timestamp
115113
// ARGV[2] - Instance segment
116-
// ARGV[3] - event data
117-
// ARGV[4] - event payload
114+
// ARGV[3] - event id
115+
// ARGV[4] - event data
116+
// ARGV[5] - event payload
118117
var addFutureEventCmd = redis.NewScript(`
119118
redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2])
120-
redis.call("HSET", KEYS[2], "instance", ARGV[2], "event", ARGV[3], "payload", KEYS[3])
121-
redis.call("SET", KEYS[3], ARGV[4], "NX")
119+
redis.call("HSET", KEYS[2], "instance", ARGV[2], "id", ARGV[3], "event", ARGV[4])
120+
redis.call("HSETNX", KEYS[3], ARGV[3], ARGV[5])
122121
return 0
123122
`)
124123

@@ -135,9 +134,10 @@ func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.Work
135134

136135
return addFutureEventCmd.Run(
137136
ctx, p,
138-
[]string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID), payloadKey(event.ID)},
137+
[]string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID), payloadKey(instance)},
139138
strconv.FormatInt(event.VisibleAt.UnixMilli(), 10),
140139
instanceSegment(instance),
140+
event.ID,
141141
string(eventData),
142142
string(payloadEventData),
143143
).Err()
@@ -146,15 +146,16 @@ func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.Work
146146
// Remove a scheduled future event. Not cluster-safe.
147147
// KEYS[1] - future event zset key
148148
// KEYS[2] - future event key
149+
// KEYS[3] - instance payload key
149150
var removeFutureEventCmd = redis.NewScript(`
150151
redis.call("ZREM", KEYS[1], KEYS[2])
151-
local k = redis.call("HGET", KEYS[2], "payload")
152-
redis.call("DEL", k)
152+
local eventID = redis.call("HGET", KEYS[2], "id")
153+
redis.call("HDEL", KEYS[3], eventID)
153154
return redis.call("DEL", KEYS[2])
154155
`)
155156

156157
// removeFutureEvent removes a scheduled future event for the given event. Events are associated via their ScheduleEventID
157158
func removeFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) {
158159
key := futureEventKey(instance, event.ScheduleEventID)
159-
removeFutureEventCmd.Run(ctx, p, []string{futureEventsKey(), key})
160+
removeFutureEventCmd.Run(ctx, p, []string{futureEventsKey(), key, payloadKey(instance)})
160161
}

backend/redis/instance.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo
3131
}
3232

3333
// Create event stream with initial event
34-
if err := addEventPayloads(ctx, p, []*history.Event{event}); err != nil {
34+
if err := addEventPayloadsP(ctx, p, instance, []*history.Event{event}); err != nil {
3535
return fmt.Errorf("adding event payloads: %w", err)
3636
}
3737

@@ -73,11 +73,11 @@ func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
7373
return nil, fmt.Errorf("unmarshaling event: %w", err)
7474
}
7575

76-
payloadKeys = append(payloadKeys, payloadKey(event.ID))
76+
payloadKeys = append(payloadKeys, event.ID)
7777
events = append(events, event)
7878
}
7979

80-
res, err := rb.rdb.MGet(ctx, payloadKeys...).Result()
80+
res, err := rb.rdb.HMGet(ctx, payloadKey(instance), payloadKeys...).Result()
8181
if err != nil {
8282
return nil, fmt.Errorf("reading payloads: %w", err)
8383
}

backend/redis/keys.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,6 @@ func futureEventKey(instance *core.WorkflowInstance, scheduleEventID int64) stri
5757
return fmt.Sprintf("future-event:%v:%v:%v", instance.InstanceID, instance.ExecutionID, scheduleEventID)
5858
}
5959

60-
func payloadKey(eventID string) string {
61-
return fmt.Sprintf("payload:%v", eventID)
60+
func payloadKey(instance *core.WorkflowInstance) string {
61+
return fmt.Sprintf("payload:%v", instanceSegment(instance))
6262
}

backend/redis/workflow.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,13 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowT
9999
return nil, fmt.Errorf("unmarshaling event: %w", err)
100100
}
101101

102-
payloadKeys = append(payloadKeys, payloadKey(event.ID))
102+
payloadKeys = append(payloadKeys, event.ID)
103103
newEvents = append(newEvents, event)
104104
}
105105

106106
// Fetch event payloads
107107
if len(payloadKeys) > 0 {
108-
res, err := rb.rdb.MGet(ctx, payloadKeys...).Result()
108+
res, err := rb.rdb.HMGet(ctx, payloadKey(instanceState.Instance), payloadKeys...).Result()
109109
if err != nil {
110110
return nil, fmt.Errorf("reading payloads: %w", err)
111111
}
@@ -182,7 +182,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
182182
p := rb.rdb.TxPipeline()
183183

184184
// Add executed events to the history
185-
if err := addEventPayloads(ctx, p, executedEvents); err != nil {
185+
if err := addEventPayloadsP(ctx, p, instance, executedEvents); err != nil {
186186
return fmt.Errorf("adding event payloads: %w", err)
187187
}
188188

@@ -220,7 +220,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
220220
}
221221

222222
// Add pending event to stream
223-
if err := addEventPayloads(ctx, p, []*history.Event{m.HistoryEvent}); err != nil {
223+
if err := addEventPayloadsP(ctx, p, &targetInstance, []*history.Event{m.HistoryEvent}); err != nil {
224224
return fmt.Errorf("adding event payloads: %w", err)
225225
}
226226

@@ -327,7 +327,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
327327

328328
func (rb *redisBackend) addWorkflowInstanceEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) error {
329329
// Add event to pending events for instance
330-
if err := addEventPayloads(ctx, p, []*history.Event{event}); err != nil {
330+
if err := addEventPayloadsP(ctx, p, instance, []*history.Event{event}); err != nil {
331331
return err
332332
}
333333

0 commit comments

Comments
 (0)