Skip to content

Commit 896df55

Browse files
authored
Merge pull request #105 from cschleiden/fix-redis-get-history
Honor `lastSequenceID` parameter when retrieving history from redis backend
2 parents 46acbf7 + 8fd06ff commit 896df55

File tree

8 files changed

+35
-28
lines changed

8 files changed

+35
-28
lines changed

backend/redis/events.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,21 @@ func addEventToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string,
3131
// ARGV[1] - event data as serialized strings
3232
var addEventsToStreamCmd = redis.NewScript(`
3333
local msgID = ""
34-
for i = 1, #ARGV do
35-
msgID = redis.call("XADD", KEYS[1], "*", "event", ARGV[i])
34+
for i = 1, #ARGV,2 do
35+
msgID = redis.call("XADD", KEYS[1], ARGV[i], "event", ARGV[i + 1])
3636
end
3737
return msgID
3838
`)
3939

40-
func addEventsToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, events []history.Event) error {
40+
func addEventsToHistoryStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, events []history.Event) error {
4141
eventsData := make([]string, 0)
4242
for _, event := range events {
4343
eventData, err := json.Marshal(event)
4444
if err != nil {
4545
return err
4646
}
4747

48+
eventsData = append(eventsData, historyID(event.SequenceID))
4849
eventsData = append(eventsData, string(eventData))
4950
}
5051

backend/redis/instance.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,13 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo
5858
}
5959

6060
func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *core.WorkflowInstance, lastSequenceID *int64) ([]history.Event, error) {
61-
msgs, err := rb.rdb.XRange(ctx, historyKey(instance.InstanceID), "-", "+").Result()
61+
start := "-"
62+
63+
if lastSequenceID != nil {
64+
start = "(" + historyID(*lastSequenceID)
65+
}
66+
67+
msgs, err := rb.rdb.XRange(ctx, historyKey(instance.InstanceID), start, "+").Result()
6268
if err != nil {
6369
return nil, err
6470
}

backend/redis/keys.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ func historyKey(instanceID string) string {
2020
return fmt.Sprintf("history:%v", instanceID)
2121
}
2222

23+
func historyID(sequenceID int64) string {
24+
return fmt.Sprintf("%v-0", sequenceID)
25+
}
26+
2327
func futureEventsKey() string {
2428
return "future-events"
2529
}

backend/redis/queue.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package redis
33
import (
44
"context"
55
"encoding/json"
6-
"errors"
76
"fmt"
87
"time"
98

@@ -30,8 +29,6 @@ type TaskItem[T any] struct {
3029
Data T
3130
}
3231

33-
var errTaskAlreadyInQueue = errors.New("task already in queue")
34-
3532
type KeyInfo struct {
3633
StreamKey string
3734
SetKey string
@@ -80,7 +77,7 @@ func (q *taskQueue[T]) Keys() KeyInfo {
8077
}
8178
}
8279

83-
// KEYS[1] = stream
80+
// KEYS[1] = set
8481
// KEYS[2] = stream
8582
// ARGV[1] = caller provided id of the task
8683
// ARGV[2] = additional data to store with the task

backend/redis/queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func Test_TaskQueue(t *testing.T) {
7373
_, err = client.Pipelined(ctx, func(p redis.Pipeliner) error {
7474
return q.Enqueue(ctx, p, "t1", nil)
7575
})
76-
require.Error(t, errTaskAlreadyInQueue, err)
76+
require.NoError(t, err)
7777

7878
task, err := q.Dequeue(ctx, client, lockTimeout, blockTimeout)
7979
require.NoError(t, err)

backend/redis/signal.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,11 @@ func (rb *redisBackend) SignalWorkflow(ctx context.Context, instanceID string, e
2525
))
2626
defer span.End()
2727

28-
if _, err = rb.rdb.Pipelined(ctx, func(p redis.Pipeliner) error {
29-
if err := addEventToStreamP(ctx, p, pendingEventsKey(instanceID), &event); err != nil {
28+
if _, err = rb.rdb.TxPipelined(ctx, func(p redis.Pipeliner) error {
29+
if err := rb.addWorkflowInstanceEventP(ctx, p, instanceState.Instance, &event); err != nil {
3030
return fmt.Errorf("adding event to stream: %w", err)
3131
}
3232

33-
if err := rb.workflowQueue.Enqueue(ctx, p, instanceID, nil); err != nil {
34-
if err != errTaskAlreadyInQueue {
35-
return fmt.Errorf("queueing workflow task: %w", err)
36-
}
37-
}
38-
3933
return nil
4034
}); err != nil {
4135
return err

backend/redis/workflow.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
104104

105105
if instanceState.State == backend.WorkflowStateFinished {
106106
l := rb.Logger().With(
107-
"task_id", instanceTask.ID,
107+
"task_id", instanceTask.TaskID,
108+
"id", instanceTask.ID,
108109
"instance_id", instanceState.Instance.InstanceID)
109110

110111
// This should never happen. For now, log information and then panic.
@@ -153,8 +154,8 @@ var removePendingEventsCmd = redis.NewScript(`
153154
var requeueInstanceCmd = redis.NewScript(`
154155
local pending_events = redis.call("XLEN", KEYS[1])
155156
if pending_events > 0 then
156-
local already_queued = redis.call("SADD", KEYS[3], ARGV[1])
157-
if already_queued ~= 0 then
157+
local added = redis.call("SADD", KEYS[3], ARGV[1])
158+
if added == 1 then
158159
redis.call("XADD", KEYS[2], "*", "id", ARGV[1], "data", "")
159160
end
160161
end
@@ -181,7 +182,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
181182
p := rb.rdb.TxPipeline()
182183

183184
// Add executed events to the history
184-
if err := addEventsToStreamP(ctx, p, historyKey(instance.InstanceID), executedEvents); err != nil {
185+
if err := addEventsToHistoryStreamP(ctx, p, historyKey(instance.InstanceID), executedEvents); err != nil {
185186
return fmt.Errorf("serializing : %w", err)
186187
}
187188

@@ -192,7 +193,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
192193
}
193194
}
194195

195-
// Scheduler timers
196+
// Schedule timers
196197
for _, timerEvent := range timerEvents {
197198
if err := addFutureEventP(ctx, p, instance, &timerEvent); err != nil {
198199
return err
@@ -220,12 +221,10 @@ func (rb *redisBackend) CompleteWorkflowTask(
220221
}
221222
}
222223

223-
// If any pending message was added, try to queue workflow task
224+
// Try to queue workflow task
224225
if targetInstance != instance {
225226
if err := rb.workflowQueue.Enqueue(ctx, p, targetInstance.InstanceID, nil); err != nil {
226-
if err != errTaskAlreadyInQueue {
227-
return fmt.Errorf("adding instance to locked instances set: %w", err)
228-
}
227+
return fmt.Errorf("enqueuing workflow task: %w", err)
229228
}
230229
}
231230
}

internal/workflow/executor.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,12 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
9797

9898
logger := e.logger.With("task_id", t.ID, "instance_id", t.WorkflowInstance.InstanceID)
9999

100-
logger.Debug("Executing workflow task")
100+
logger.Debug("Executing workflow task", "task_last_sequence_id", t.LastSequenceID)
101101

102102
skipNewEvents := false
103103

104104
if t.LastSequenceID > e.lastSequenceID {
105-
logger.Debug("Task has newer history than current state, fetching and replaying history", "task_sequence_id", t.LastSequenceID, "sequence_id", e.lastSequenceID)
105+
logger.Debug("Task has newer history than current state, fetching and replaying history", "task_sequence_id", t.LastSequenceID, "local_sequence_id", e.lastSequenceID)
106106

107107
h, err := e.historyProvider.GetWorkflowInstanceHistory(ctx, t.WorkflowInstance, &e.lastSequenceID)
108108
if err != nil {
@@ -188,6 +188,10 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
188188
func (e *executor) replayHistory(history []history.Event) error {
189189
e.workflowState.SetReplaying(true)
190190
for _, event := range history {
191+
if event.SequenceID < e.lastSequenceID {
192+
panic("history has older events than current state")
193+
}
194+
191195
if err := e.executeEvent(event); err != nil {
192196
return err
193197
}
@@ -216,6 +220,8 @@ func (e *executor) executeNewEvents(newEvents []history.Event) ([]history.Event,
216220

217221
func (e *executor) Close() {
218222
if e.workflow != nil {
223+
e.logger.Debug("Stopping workflow executor", "instance_id", e.workflowState.Instance().InstanceID)
224+
219225
// End workflow if running to prevent leaking goroutines
220226
e.workflow.Close()
221227
}

0 commit comments

Comments
 (0)