Skip to content

Commit db4dea4

Browse files
committed
Make redis backend honor lastSequenceID when retrieving history
1 parent 46acbf7 commit db4dea4

File tree

3 files changed

+15
-4
lines changed

3 files changed

+15
-4
lines changed

backend/redis/events.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,21 @@ func addEventToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string,
3131
// ARGV[1] - event data as serialized strings
3232
var addEventsToStreamCmd = redis.NewScript(`
3333
local msgID = ""
34-
for i = 1, #ARGV do
35-
msgID = redis.call("XADD", KEYS[1], "*", "event", ARGV[i])
34+
for i = 1, #ARGV,2 do
35+
msgID = redis.call("XADD", KEYS[1], ARGV[i], "event", ARGV[i + 1])
3636
end
3737
return msgID
3838
`)
3939

40-
func addEventsToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, events []history.Event) error {
40+
func addEventsToHistoryStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, events []history.Event) error {
4141
eventsData := make([]string, 0)
4242
for _, event := range events {
4343
eventData, err := json.Marshal(event)
4444
if err != nil {
4545
return err
4646
}
4747

48+
eventsData = append(eventsData, historyID(event.SequenceID))
4849
eventsData = append(eventsData, string(eventData))
4950
}
5051

backend/redis/instance.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,13 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo
5858
}
5959

6060
func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *core.WorkflowInstance, lastSequenceID *int64) ([]history.Event, error) {
61-
msgs, err := rb.rdb.XRange(ctx, historyKey(instance.InstanceID), "-", "+").Result()
61+
start := "-"
62+
63+
if lastSequenceID != nil {
64+
start = "(" + historyID(*lastSequenceID)
65+
}
66+
67+
msgs, err := rb.rdb.XRange(ctx, historyKey(instance.InstanceID), start, "+").Result()
6268
if err != nil {
6369
return nil, err
6470
}

backend/redis/keys.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ func historyKey(instanceID string) string {
2020
return fmt.Sprintf("history:%v", instanceID)
2121
}
2222

23+
func historyID(sequenceID int64) string {
24+
return fmt.Sprintf("%v-0", sequenceID)
25+
}
26+
2327
func futureEventsKey() string {
2428
return "future-events"
2529
}

0 commit comments

Comments
 (0)