Skip to content

Commit 1354d0b

Browse files
authored
Work around bug with pending events
1 parent 7a392a6 commit 1354d0b

File tree

6 files changed

+106
-46
lines changed

6 files changed

+106
-46
lines changed

backend/redis/events.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,48 @@ import (
44
"context"
55
"encoding/json"
66

7+
"github.com/cschleiden/go-workflows/internal/core"
78
"github.com/cschleiden/go-workflows/internal/history"
89
"github.com/go-redis/redis/v8"
910
"github.com/pkg/errors"
1011
)
1112

12-
func addEventToStream(ctx context.Context, rdb redis.UniversalClient, streamKey string, event *history.Event) error {
13+
func addEventToStream(ctx context.Context, rdb redis.UniversalClient, streamKey string, event *history.Event) (*string, error) {
1314
eventData, err := json.Marshal(event)
1415
if err != nil {
15-
return err
16+
return nil, err
1617
}
1718

18-
if err := rdb.XAdd(ctx, &redis.XAddArgs{
19+
msgID, err := rdb.XAdd(ctx, &redis.XAddArgs{
1920
Stream: streamKey,
2021
ID: "*",
2122
Values: map[string]interface{}{
2223
"event": string(eventData),
2324
},
25+
}).Result()
26+
if err != nil {
27+
return nil, errors.Wrap(err, "could not add event to stream")
28+
}
29+
30+
return &msgID, nil
31+
}
32+
33+
func addFutureEvent(ctx context.Context, rdb redis.UniversalClient, instance *core.WorkflowInstance, event *history.Event) error {
34+
futureEvent := &futureEvent{
35+
Instance: instance,
36+
Event: event,
37+
}
38+
39+
eventData, err := json.Marshal(futureEvent)
40+
if err != nil {
41+
return err
42+
}
43+
44+
if err := rdb.ZAdd(ctx, futureEventsKey(), &redis.Z{
45+
Member: eventData,
46+
Score: float64(event.VisibleAt.Unix()),
2447
}).Err(); err != nil {
25-
return errors.Wrap(err, "could not add event to stream")
48+
return errors.Wrap(err, "could not add future event")
2649
}
2750

2851
return nil

backend/redis/instance.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, event histor
2424
return err
2525
}
2626

27-
_, err = rb.rdb.XAdd(ctx, &redis.XAddArgs{
27+
msgID, err := rb.rdb.XAdd(ctx, &redis.XAddArgs{
2828
Stream: pendingEventsKey(event.WorkflowInstance.InstanceID),
2929
ID: "*",
3030
Values: map[string]interface{}{
@@ -36,7 +36,9 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, event histor
3636
}
3737

3838
// Queue workflow instance task
39-
if _, err := rb.workflowQueue.Enqueue(ctx, event.WorkflowInstance.InstanceID, nil); err != nil {
39+
if _, err := rb.workflowQueue.Enqueue(ctx, event.WorkflowInstance.InstanceID, &workflowTaskData{
40+
LastPendingEventMessageID: msgID,
41+
}); err != nil {
4042
if err != taskqueue.ErrTaskAlreadyInQueue {
4143
return errors.Wrap(err, "could not queue workflow task")
4244
}

backend/redis/redis.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func NewRedisBackend(address, username, password string, db int, opts ...RedisBa
4646
panic(err)
4747
}
4848

49-
workflowQueue, err := taskqueue.New[any](client, "workflows")
49+
workflowQueue, err := taskqueue.New[workflowTaskData](client, "workflows")
5050
if err != nil {
5151
return nil, errors.Wrap(err, "could not create workflow task queue")
5252
}
@@ -81,7 +81,7 @@ type redisBackend struct {
8181
rdb redis.UniversalClient
8282
options *RedisOptions
8383

84-
workflowQueue taskqueue.TaskQueue[any]
84+
workflowQueue taskqueue.TaskQueue[workflowTaskData]
8585
activityQueue taskqueue.TaskQueue[activityData]
8686
}
8787

@@ -91,3 +91,7 @@ type activityData struct {
9191
ID string `json:"id,omitempty"`
9292
Event history.Event `json:"event,omitempty"`
9393
}
94+
95+
type workflowTaskData struct {
96+
LastPendingEventMessageID string `json:"last_pending_event_message_id,omitempty"`
97+
}

backend/redis/signal.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@ import (
99
)
1010

1111
func (rb *redisBackend) SignalWorkflow(ctx context.Context, instanceID string, event history.Event) error {
12-
if err := addEventToStream(ctx, rb.rdb, pendingEventsKey(instanceID), &event); err != nil {
12+
msgID, err := addEventToStream(ctx, rb.rdb, pendingEventsKey(instanceID), &event)
13+
if err != nil {
1314
return errors.Wrap(err, "could not add event to stream")
1415
}
1516

16-
if _, err := rb.workflowQueue.Enqueue(ctx, instanceID, nil); err != nil {
17+
if _, err := rb.workflowQueue.Enqueue(ctx, instanceID, &workflowTaskData{
18+
LastPendingEventMessageID: *msgID,
19+
}); err != nil {
1720
if err != taskqueue.ErrTaskAlreadyInQueue {
1821
return errors.Wrap(err, "could not queue workflow task")
1922
}

backend/redis/taskqueue/queue.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type TaskQueue[T any] interface {
3737
Dequeue(ctx context.Context, lockTimeout, timeout time.Duration) (*TaskItem[T], error)
3838
Extend(ctx context.Context, taskID string) error
3939
Complete(ctx context.Context, taskID string) error
40+
Data(ctx context.Context, taskID string) (*TaskItem[T], error)
4041
}
4142

4243
func New[T any](rdb redis.UniversalClient, tasktype string) (TaskQueue[T], error) {
@@ -172,6 +173,15 @@ func (q *taskQueue[T]) Complete(ctx context.Context, taskID string) error {
172173
return nil
173174
}
174175

176+
func (q *taskQueue[T]) Data(ctx context.Context, taskID string) (*TaskItem[T], error) {
177+
msg, err := q.rdb.XRange(ctx, q.streamKey, taskID, taskID).Result()
178+
if err != nil && err != redis.Nil {
179+
return nil, errors.Wrap(nil, "could not find task")
180+
}
181+
182+
return msgToTaskItem[T](&msg[0])
183+
}
184+
175185
func (q *taskQueue[T]) recover(ctx context.Context, idleTimeout time.Duration) (*TaskItem[T], error) {
176186
// Ignore the start argument, we are deleting tasks as they are completed, so we'll always
177187
// start this scan from the beginning.

backend/redis/workflow.go

Lines changed: 54 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type futureEvent struct {
2121
Event *history.Event `json:"event,omitempty"`
2222
}
2323

24+
// Return all events with a visibleAt timestamp in the past. Also remove them from the set
2425
// KEYS[1] - future event set key
2526
// ARGV[1] - current timestamp for zrange
2627
var futureEventsCmd = redis.NewScript(`
@@ -46,14 +47,17 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
4647
return nil, errors.Wrap(err, "could not unmarshal event")
4748
}
4849

49-
if err := addEventToStream(ctx, rb.rdb, pendingEventsKey(event.Instance.InstanceID), event.Event); err != nil {
50+
msgID, err := addEventToStream(ctx, rb.rdb, pendingEventsKey(event.Instance.InstanceID), event.Event)
51+
if err != nil {
5052
return nil, errors.Wrap(err, "could not add future event to stream")
5153
}
5254

5355
// Instance now has at least one pending event, try to queue task
54-
if _, err := rb.workflowQueue.Enqueue(ctx, event.Instance.InstanceID, nil); err != nil {
56+
if _, err := rb.workflowQueue.Enqueue(ctx, event.Instance.InstanceID, &workflowTaskData{
57+
LastPendingEventMessageID: *msgID,
58+
}); err != nil {
5559
if err != taskqueue.ErrTaskAlreadyInQueue {
56-
return nil, errors.Wrap(err, "could not queue workflow")
60+
return nil, errors.Wrap(err, "could not queue workflow task")
5761
}
5862
}
5963
}
@@ -109,10 +113,6 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
109113
newEvents = append(newEvents, event)
110114
}
111115

112-
// Remove all pending events
113-
// TODO: What happens if the worker dies and this task gets picked up by another one?
114-
rb.rdb.XTrim(ctx, pendingEventsKey(instanceTask.ID), 0)
115-
116116
return &task.Workflow{
117117
ID: instanceTask.TaskID,
118118
WorkflowInstance: instanceState.Instance,
@@ -125,15 +125,31 @@ func (rb *redisBackend) ExtendWorkflowTask(ctx context.Context, taskID string, i
125125
return rb.workflowQueue.Extend(ctx, taskID)
126126
}
127127

128+
// Remove all pending events before (and including) a given message id
129+
// KEYS[1] - pending events stream key
130+
// ARGV[1] - message id
131+
var removePendingEventsCmd = redis.NewScript(`
132+
local trimmed = redis.call("XTRIM", KEYS[1], "MINID", ARGV[1])
133+
local deleted = redis.call("XDEL", KEYS[1], ARGV[1])
134+
local removed = trimmed + deleted
135+
return removed
136+
`)
137+
128138
func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string, instance *core.WorkflowInstance, state backend.WorkflowState, executedEvents []history.Event, activityEvents []history.Event, workflowEvents []history.WorkflowEvent) error {
139+
task, err := rb.workflowQueue.Data(ctx, taskID)
140+
if err != nil {
141+
return errors.Wrap(err, "could not get workflow task")
142+
}
143+
129144
// Add executed events to the history
145+
// TODO: Use pipeline
130146
for _, executedEvent := range executedEvents {
131-
if err := addEventToStream(ctx, rb.rdb, historyKey(instance.InstanceID), &executedEvent); err != nil {
147+
if _, err := addEventToStream(ctx, rb.rdb, historyKey(instance.InstanceID), &executedEvent); err != nil {
132148
return err
133149
}
134150
}
135151

136-
// Send new events to the respective streams
152+
// Send new workflow events to the respective streams
137153
groupedEvents := make(map[*workflow.Instance][]history.Event)
138154
for _, m := range workflowEvents {
139155
if _, ok := groupedEvents[m.WorkflowInstance]; !ok {
@@ -145,46 +161,36 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
145161

146162
for targetInstance, events := range groupedEvents {
147163
if instance.InstanceID != targetInstance.InstanceID {
148-
// Try to create a new instance
164+
// Instance might not exist, try to create a new instance ignoring any duplicates
149165
if err := createInstance(ctx, rb.rdb, targetInstance, true); err != nil {
150166
return err
151167
}
152168
}
153169

154170
// Insert pending events for target instance
155-
addedPendingEvent := false
171+
var lastPendingMessageID *string
156172

173+
// TODO: use pipelines
157174
for _, event := range events {
158175
if event.VisibleAt != nil {
159-
// Add future event in sorted set
160-
futureEvent := &futureEvent{
161-
Instance: targetInstance,
162-
Event: &event,
163-
}
164-
165-
eventData, err := json.Marshal(futureEvent)
166-
if err != nil {
176+
// Add future events
177+
if err := addFutureEvent(ctx, rb.rdb, targetInstance, &event); err != nil {
167178
return err
168179
}
169-
170-
if err := rb.rdb.ZAdd(ctx, futureEventsKey(), &redis.Z{
171-
Member: eventData,
172-
Score: float64(event.VisibleAt.Unix()),
173-
}).Err(); err != nil {
174-
return errors.Wrap(err, "could not add future event")
175-
}
176180
} else {
177181
// Add pending event to stream
178-
if err := addEventToStream(ctx, rb.rdb, pendingEventsKey(targetInstance.InstanceID), &event); err != nil {
182+
lastPendingMessageID, err = addEventToStream(ctx, rb.rdb, pendingEventsKey(targetInstance.InstanceID), &event)
183+
if err != nil {
179184
return err
180185
}
181-
182-
addedPendingEvent = true
183186
}
184187
}
185188

186-
if addedPendingEvent && targetInstance != instance {
187-
if _, err := rb.workflowQueue.Enqueue(ctx, targetInstance.InstanceID, nil); err != nil {
189+
// If any pending message was added, try to queue workflow task
190+
if lastPendingMessageID != nil && targetInstance != instance {
191+
if _, err := rb.workflowQueue.Enqueue(ctx, targetInstance.InstanceID, &workflowTaskData{
192+
LastPendingEventMessageID: *lastPendingMessageID,
193+
}); err != nil {
188194
if err != taskqueue.ErrTaskAlreadyInQueue {
189195
return errors.Wrap(err, "could not add instance to locked instances set")
190196
}
@@ -215,19 +221,28 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
215221
}
216222
}
217223

224+
// Remove executed pending events
225+
_, err = removePendingEventsCmd.Run(ctx, rb.rdb, []string{pendingEventsKey(instance.InstanceID)}, task.Data.LastPendingEventMessageID).Result()
226+
if err != nil {
227+
return errors.Wrap(err, "could not remove pending events")
228+
}
229+
// log.Printf("Removed %v pending events", removed)
230+
218231
// Complete workflow task and unlock instance
219232
if err := rb.workflowQueue.Complete(ctx, taskID); err != nil {
220233
return errors.Wrap(err, "could not complete workflow task")
221234
}
222235

223236
// If there are pending events, queue the instance again
224-
pendingCount, err := rb.rdb.XLen(ctx, pendingEventsKey(instance.InstanceID)).Result()
237+
msgIDs, err := rb.rdb.XRevRangeN(ctx, pendingEventsKey(instance.InstanceID), "+", "-", 1).Result()
225238
if err != nil {
226239
return errors.Wrap(err, "could not read event stream")
227240
}
228241

229-
if state != backend.WorkflowStateFinished && pendingCount > 0 {
230-
if _, err := rb.workflowQueue.Enqueue(ctx, instance.InstanceID, nil); err != nil {
242+
if state != backend.WorkflowStateFinished && len(msgIDs) > 0 {
243+
if _, err := rb.workflowQueue.Enqueue(ctx, instance.InstanceID, &workflowTaskData{
244+
LastPendingEventMessageID: msgIDs[0].ID,
245+
}); err != nil {
231246
if err != taskqueue.ErrTaskAlreadyInQueue {
232247
return errors.Wrap(err, "could not queue workflow")
233248
}
@@ -239,12 +254,15 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
239254

240255
func (rb *redisBackend) addWorkflowInstanceEvent(ctx context.Context, instance *core.WorkflowInstance, event *history.Event) error {
241256
// Add event to pending events for instance
242-
if err := addEventToStream(ctx, rb.rdb, pendingEventsKey(instance.InstanceID), event); err != nil {
257+
msgID, err := addEventToStream(ctx, rb.rdb, pendingEventsKey(instance.InstanceID), event)
258+
if err != nil {
243259
return err
244260
}
245261

246262
// Queue workflow task
247-
if _, err := rb.workflowQueue.Enqueue(ctx, instance.InstanceID, nil); err != nil {
263+
if _, err := rb.workflowQueue.Enqueue(ctx, instance.InstanceID, &workflowTaskData{
264+
LastPendingEventMessageID: *msgID,
265+
}); err != nil {
248266
if err != taskqueue.ErrTaskAlreadyInQueue {
249267
return errors.Wrap(err, "could not queue workflow")
250268
}

0 commit comments

Comments
 (0)