Skip to content

Commit 0226322

Browse files
authored
Merge pull request #55 from cschleiden/timer-cancellation
Improve timer cancellation (for Redis)
2 parents c2e1d06 + 411eb9c commit 0226322

File tree

21 files changed

+178
-86
lines changed

21 files changed

+178
-86
lines changed

.github/workflows/go.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ name: Build & Test
22

33
on:
44
push:
5-
branches: [ main ]
65
pull_request:
76
branches: [ main ]
87

.vscode/launch.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
"type": "go",
4646
"request": "launch",
4747
"mode": "debug",
48-
"program": "${workspaceFolder}/samples/cancellation/cancellation.go"
48+
"program": "${workspaceFolder}/samples/cancelation/cancelation.go"
4949
},
5050
{
5151
"name": "Launch subworkflow sample",

backend/redis/events.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@ func addEventToStream(ctx context.Context, rdb redis.UniversalClient, streamKey
3030
return &msgID, nil
3131
}
3232

33+
// KEYS[1] - future event zset key
34+
// KEYS[2] - future event key
35+
// ARGV[1] - timestamp
36+
// ARGV[2] - event payload
37+
var addFutureEventCmd = redis.NewScript(`
38+
redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2])
39+
redis.call("SET", KEYS[2], ARGV[2])
40+
`)
41+
3342
func addFutureEvent(ctx context.Context, rdb redis.UniversalClient, instance *core.WorkflowInstance, event *history.Event) error {
3443
futureEvent := &futureEvent{
3544
Instance: instance,
@@ -41,12 +50,32 @@ func addFutureEvent(ctx context.Context, rdb redis.UniversalClient, instance *co
4150
return err
4251
}
4352

44-
if err := rdb.ZAdd(ctx, futureEventsKey(), &redis.Z{
45-
Member: eventData,
46-
Score: float64(event.VisibleAt.Unix()),
47-
}).Err(); err != nil {
53+
if err := addFutureEventCmd.Run(
54+
ctx,
55+
rdb,
56+
[]string{futureEventsKey(), futureEventKey(instance.InstanceID, event.ScheduleEventID)},
57+
event.VisibleAt.Unix(),
58+
string(eventData),
59+
).Err(); err != nil && err != redis.Nil {
4860
return fmt.Errorf("adding future event: %w", err)
4961
}
5062

5163
return nil
5264
}
65+
66+
// KEYS[1] - future event zset key
67+
// KEYS[2] - future event key
68+
var removeFutureEventCmd = redis.NewScript(`
69+
redis.call("ZREM", KEYS[1], KEYS[2])
70+
redis.call("DEL", KEYS[2])
71+
`)
72+
73+
func removeFutureEvent(ctx context.Context, rdb redis.UniversalClient, instance *core.WorkflowInstance, event *history.Event) error {
74+
key := futureEventKey(instance.InstanceID, event.ScheduleEventID)
75+
76+
if err := removeFutureEventCmd.Run(ctx, rdb, []string{futureEventsKey(), key}).Err(); err != nil && err != redis.Nil {
77+
return fmt.Errorf("removing future event: %w", err)
78+
}
79+
80+
return nil
81+
}

backend/redis/keys.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,7 @@ func historyKey(instanceID string) string {
2323
func futureEventsKey() string {
2424
return "future-events"
2525
}
26+
27+
func futureEventKey(instanceID string, scheduleEventID int64) string {
28+
return fmt.Sprintf("future-event:%v:%v", instanceID, scheduleEventID)
29+
}

backend/redis/workflow.go

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ type futureEvent struct {
2525
// KEYS[1] - future event set key
2626
// ARGV[1] - current timestamp for zrange
2727
var futureEventsCmd = redis.NewScript(`
28-
local events = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
28+
local events = redis.call("ZRANGE", KEYS[1], "-inf", ARGV[1], "BYSCORE")
2929
if events ~= false and #events ~= 0 then
3030
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
31+
return redis.call("MGET", unpack(events))
3132
end
32-
return events
3333
`)
3434

3535
func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, error) {
@@ -38,43 +38,45 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
3838
nowStr := strconv.Itoa(int(now))
3939

4040
result, err := futureEventsCmd.Run(ctx, rb.rdb, []string{futureEventsKey()}, nowStr).Result()
41-
if err != nil {
41+
if err != nil && err != redis.Nil {
4242
return nil, fmt.Errorf("checking future events: %w", err)
4343
}
4444

45-
for _, eventR := range result.([]interface{}) {
46-
eventStr := eventR.(string)
47-
var futureEvent futureEvent
48-
if err := json.Unmarshal([]byte(eventStr), &futureEvent); err != nil {
49-
return nil, fmt.Errorf("unmarshaling event: %w", err)
50-
}
45+
if result != nil {
46+
for _, eventR := range result.([]interface{}) {
47+
eventStr := eventR.(string)
48+
var futureEvent futureEvent
49+
if err := json.Unmarshal([]byte(eventStr), &futureEvent); err != nil {
50+
return nil, fmt.Errorf("unmarshaling event: %w", err)
51+
}
5152

52-
instanceState, err := readInstance(ctx, rb.rdb, futureEvent.Instance.InstanceID)
53-
if err != nil {
54-
if err == backend.ErrInstanceNotFound {
55-
rb.options.Logger.Debug("Ignoring future event for non-existing instance", "instance_id", futureEvent.Instance.InstanceID, "event_id", futureEvent.Event.ID)
56-
continue
57-
} else {
58-
return nil, fmt.Errorf("reading instance: %w", err)
53+
instanceState, err := readInstance(ctx, rb.rdb, futureEvent.Instance.InstanceID)
54+
if err != nil {
55+
if err == backend.ErrInstanceNotFound {
56+
rb.options.Logger.Debug("Ignoring future event for non-existing instance", "instance_id", futureEvent.Instance.InstanceID, "event_id", futureEvent.Event.ID)
57+
continue
58+
} else {
59+
return nil, fmt.Errorf("reading instance: %w", err)
60+
}
5961
}
60-
}
6162

62-
if instanceState.State != backend.WorkflowStateActive {
63-
rb.options.Logger.Debug("Ignoring future event for already completed instance", "instance_id", futureEvent.Instance.InstanceID, "event_id", futureEvent.Event.ID)
64-
continue
65-
}
63+
if instanceState.State != backend.WorkflowStateActive {
64+
rb.options.Logger.Debug("Ignoring future event for already completed instance", "instance_id", futureEvent.Instance.InstanceID, "event_id", futureEvent.Event.ID)
65+
continue
66+
}
6667

67-
msgID, err := addEventToStream(ctx, rb.rdb, pendingEventsKey(futureEvent.Instance.InstanceID), futureEvent.Event)
68-
if err != nil {
69-
return nil, fmt.Errorf("adding future event to stream: %w", err)
70-
}
68+
msgID, err := addEventToStream(ctx, rb.rdb, pendingEventsKey(futureEvent.Instance.InstanceID), futureEvent.Event)
69+
if err != nil {
70+
return nil, fmt.Errorf("adding future event to stream: %w", err)
71+
}
7172

72-
// Instance now has at least one pending event, try to queue task
73-
if _, err := rb.workflowQueue.Enqueue(ctx, futureEvent.Instance.InstanceID, &workflowTaskData{
74-
LastPendingEventMessageID: *msgID,
75-
}); err != nil {
76-
if err != taskqueue.ErrTaskAlreadyInQueue {
77-
return nil, fmt.Errorf("queueing workflow task: %w", err)
73+
// Instance now has at least one pending event, try to queue task
74+
if _, err := rb.workflowQueue.Enqueue(ctx, futureEvent.Instance.InstanceID, &workflowTaskData{
75+
LastPendingEventMessageID: *msgID,
76+
}); err != nil {
77+
if err != taskqueue.ErrTaskAlreadyInQueue {
78+
return nil, fmt.Errorf("queueing workflow task: %w", err)
79+
}
7880
}
7981
}
8082
}
@@ -171,7 +173,11 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
171173

172174
// TODO: use pipelines
173175
for _, event := range events {
174-
if event.VisibleAt != nil {
176+
if event.Type == history.EventType_TimerCanceled {
177+
if err := removeFutureEvent(ctx, rb.rdb, targetInstance, &event); err != nil {
178+
return err
179+
}
180+
} else if event.VisibleAt != nil {
175181
// Add future events
176182
if err := addFutureEvent(ctx, rb.rdb, targetInstance, &event); err != nil {
177183
return err

internal/command/command.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,15 @@ func NewScheduleTimerCommand(id int64, at time.Time) Command {
9595
}
9696

9797
type CancelTimerCommandAttr struct {
98-
TimerID int
98+
TimerScheduleEventID int64
9999
}
100100

101-
func NewCancelTimerCommand(id int64, timerID int) Command {
101+
func NewCancelTimerCommand(id int64, timerID int64) Command {
102102
return Command{
103103
ID: id,
104104
Type: CommandType_CancelTimer,
105105
Attr: &CancelTimerCommandAttr{
106-
TimerID: timerID,
106+
TimerScheduleEventID: timerID,
107107
},
108108
}
109109
}

internal/history/history.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const (
2929

3030
EventType_TimerScheduled
3131
EventType_TimerFired
32+
EventType_TimerCanceled
3233

3334
EventType_SignalReceived
3435

internal/history/serialization.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ func DeserializeAttributes(eventType EventType, attributes []byte) (attr interfa
6060
attr = &TimerScheduledAttributes{}
6161
case EventType_TimerFired:
6262
attr = &TimerFiredAttributes{}
63+
case EventType_TimerCanceled:
64+
attr = &TimerCanceledAttributes{}
6365

6466
case EventType_SubWorkflowScheduled:
6567
attr = &SubWorkflowScheduledAttributes{}

internal/history/timer_canceled.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package history
2+
3+
type TimerCanceledAttributes struct{}
File renamed without changes.

0 commit comments

Comments
 (0)