Skip to content

Commit 354b2be

Browse files
authored
Send timer cancelation message
For the Redis backend, timer cancelation message will also remove future scheduled events. For other backends this keeps the current behavior.
1 parent c2e1d06 commit 354b2be

File tree

20 files changed

+178
-85
lines changed

20 files changed

+178
-85
lines changed

.vscode/launch.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
"type": "go",
4646
"request": "launch",
4747
"mode": "debug",
48-
"program": "${workspaceFolder}/samples/cancellation/cancellation.go"
48+
"program": "${workspaceFolder}/samples/cancelation/cancelation.go"
4949
},
5050
{
5151
"name": "Launch subworkflow sample",

backend/redis/events.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@ func addEventToStream(ctx context.Context, rdb redis.UniversalClient, streamKey
3030
return &msgID, nil
3131
}
3232

33+
// KEYS[1] - future event zset key
34+
// KEYS[2] - future event key
35+
// ARGV[1] - timestamp
36+
// ARGV[2] - event payload
37+
var addFutureEventCmd = redis.NewScript(`
38+
redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2])
39+
redis.call("SET", KEYS[2], ARGV[2])
40+
`)
41+
3342
func addFutureEvent(ctx context.Context, rdb redis.UniversalClient, instance *core.WorkflowInstance, event *history.Event) error {
3443
futureEvent := &futureEvent{
3544
Instance: instance,
@@ -41,12 +50,32 @@ func addFutureEvent(ctx context.Context, rdb redis.UniversalClient, instance *co
4150
return err
4251
}
4352

44-
if err := rdb.ZAdd(ctx, futureEventsKey(), &redis.Z{
45-
Member: eventData,
46-
Score: float64(event.VisibleAt.Unix()),
47-
}).Err(); err != nil {
53+
if err := addFutureEventCmd.Run(
54+
ctx,
55+
rdb,
56+
[]string{futureEventsKey(), futureEventKey(instance.InstanceID, event.ScheduleEventID)},
57+
event.VisibleAt.Unix(),
58+
string(eventData),
59+
).Err(); err != nil && err != redis.Nil {
4860
return fmt.Errorf("adding future event: %w", err)
4961
}
5062

5163
return nil
5264
}
65+
66+
// KEYS[1] - future event zset key
67+
// KEYS[2] - future event key
68+
var removeFutureEventCmd = redis.NewScript(`
69+
redis.call("ZREM", KEYS[1], KEYS[2])
70+
redis.call("DEL", KEYS[2])
71+
`)
72+
73+
func removeFutureEvent(ctx context.Context, rdb redis.UniversalClient, instance *core.WorkflowInstance, event *history.Event) error {
74+
key := futureEventKey(instance.InstanceID, event.ScheduleEventID)
75+
76+
if err := removeFutureEventCmd.Run(ctx, rdb, []string{futureEventsKey(), key}).Err(); err != nil && err != redis.Nil {
77+
return fmt.Errorf("removing future event: %w", err)
78+
}
79+
80+
return nil
81+
}

backend/redis/keys.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,7 @@ func historyKey(instanceID string) string {
2323
func futureEventsKey() string {
2424
return "future-events"
2525
}
26+
27+
func futureEventKey(instanceID string, scheduleEventID int64) string {
28+
return fmt.Sprintf("future-event:%v:%v", instanceID, scheduleEventID)
29+
}

backend/redis/workflow.go

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ type futureEvent struct {
2525
// KEYS[1] - future event set key
2626
// ARGV[1] - current timestamp for zrange
2727
var futureEventsCmd = redis.NewScript(`
28-
local events = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
28+
local events = redis.call("ZRANGE", KEYS[1], "-inf", ARGV[1], "BYSCORE")
2929
if events ~= false and #events ~= 0 then
3030
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
31+
return redis.call("MGET", unpack(events))
3132
end
32-
return events
3333
`)
3434

3535
func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, error) {
@@ -38,43 +38,45 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
3838
nowStr := strconv.Itoa(int(now))
3939

4040
result, err := futureEventsCmd.Run(ctx, rb.rdb, []string{futureEventsKey()}, nowStr).Result()
41-
if err != nil {
41+
if err != nil && err != redis.Nil {
4242
return nil, fmt.Errorf("checking future events: %w", err)
4343
}
4444

45-
for _, eventR := range result.([]interface{}) {
46-
eventStr := eventR.(string)
47-
var futureEvent futureEvent
48-
if err := json.Unmarshal([]byte(eventStr), &futureEvent); err != nil {
49-
return nil, fmt.Errorf("unmarshaling event: %w", err)
50-
}
45+
if result != nil {
46+
for _, eventR := range result.([]interface{}) {
47+
eventStr := eventR.(string)
48+
var futureEvent futureEvent
49+
if err := json.Unmarshal([]byte(eventStr), &futureEvent); err != nil {
50+
return nil, fmt.Errorf("unmarshaling event: %w", err)
51+
}
5152

52-
instanceState, err := readInstance(ctx, rb.rdb, futureEvent.Instance.InstanceID)
53-
if err != nil {
54-
if err == backend.ErrInstanceNotFound {
55-
rb.options.Logger.Debug("Ignoring future event for non-existing instance", "instance_id", futureEvent.Instance.InstanceID, "event_id", futureEvent.Event.ID)
56-
continue
57-
} else {
58-
return nil, fmt.Errorf("reading instance: %w", err)
53+
instanceState, err := readInstance(ctx, rb.rdb, futureEvent.Instance.InstanceID)
54+
if err != nil {
55+
if err == backend.ErrInstanceNotFound {
56+
rb.options.Logger.Debug("Ignoring future event for non-existing instance", "instance_id", futureEvent.Instance.InstanceID, "event_id", futureEvent.Event.ID)
57+
continue
58+
} else {
59+
return nil, fmt.Errorf("reading instance: %w", err)
60+
}
5961
}
60-
}
6162

62-
if instanceState.State != backend.WorkflowStateActive {
63-
rb.options.Logger.Debug("Ignoring future event for already completed instance", "instance_id", futureEvent.Instance.InstanceID, "event_id", futureEvent.Event.ID)
64-
continue
65-
}
63+
if instanceState.State != backend.WorkflowStateActive {
64+
rb.options.Logger.Debug("Ignoring future event for already completed instance", "instance_id", futureEvent.Instance.InstanceID, "event_id", futureEvent.Event.ID)
65+
continue
66+
}
6667

67-
msgID, err := addEventToStream(ctx, rb.rdb, pendingEventsKey(futureEvent.Instance.InstanceID), futureEvent.Event)
68-
if err != nil {
69-
return nil, fmt.Errorf("adding future event to stream: %w", err)
70-
}
68+
msgID, err := addEventToStream(ctx, rb.rdb, pendingEventsKey(futureEvent.Instance.InstanceID), futureEvent.Event)
69+
if err != nil {
70+
return nil, fmt.Errorf("adding future event to stream: %w", err)
71+
}
7172

72-
// Instance now has at least one pending event, try to queue task
73-
if _, err := rb.workflowQueue.Enqueue(ctx, futureEvent.Instance.InstanceID, &workflowTaskData{
74-
LastPendingEventMessageID: *msgID,
75-
}); err != nil {
76-
if err != taskqueue.ErrTaskAlreadyInQueue {
77-
return nil, fmt.Errorf("queueing workflow task: %w", err)
73+
// Instance now has at least one pending event, try to queue task
74+
if _, err := rb.workflowQueue.Enqueue(ctx, futureEvent.Instance.InstanceID, &workflowTaskData{
75+
LastPendingEventMessageID: *msgID,
76+
}); err != nil {
77+
if err != taskqueue.ErrTaskAlreadyInQueue {
78+
return nil, fmt.Errorf("queueing workflow task: %w", err)
79+
}
7880
}
7981
}
8082
}
@@ -171,7 +173,11 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
171173

172174
// TODO: use pipelines
173175
for _, event := range events {
174-
if event.VisibleAt != nil {
176+
if event.Type == history.EventType_TimerCanceled {
177+
if err := removeFutureEvent(ctx, rb.rdb, targetInstance, &event); err != nil {
178+
return err
179+
}
180+
} else if event.VisibleAt != nil {
175181
// Add future events
176182
if err := addFutureEvent(ctx, rb.rdb, targetInstance, &event); err != nil {
177183
return err

internal/command/command.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,15 @@ func NewScheduleTimerCommand(id int64, at time.Time) Command {
9595
}
9696

9797
type CancelTimerCommandAttr struct {
98-
TimerID int
98+
TimerScheduleEventID int64
9999
}
100100

101-
func NewCancelTimerCommand(id int64, timerID int) Command {
101+
func NewCancelTimerCommand(id int64, timerID int64) Command {
102102
return Command{
103103
ID: id,
104104
Type: CommandType_CancelTimer,
105105
Attr: &CancelTimerCommandAttr{
106-
TimerID: timerID,
106+
TimerScheduleEventID: timerID,
107107
},
108108
}
109109
}

internal/history/history.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const (
2929

3030
EventType_TimerScheduled
3131
EventType_TimerFired
32+
EventType_TimerCanceled
3233

3334
EventType_SignalReceived
3435

internal/history/serialization.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ func DeserializeAttributes(eventType EventType, attributes []byte) (attr interfa
6060
attr = &TimerScheduledAttributes{}
6161
case EventType_TimerFired:
6262
attr = &TimerFiredAttributes{}
63+
case EventType_TimerCanceled:
64+
attr = &TimerCanceledAttributes{}
6365

6466
case EventType_SubWorkflowScheduled:
6567
attr = &SubWorkflowScheduledAttributes{}

internal/history/timer_canceled.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package history
2+
3+
type TimerCanceledAttributes struct{}
File renamed without changes.

internal/sync/channel.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ type Channel[T any] interface {
1111

1212
Receive(ctx Context) (v T, ok bool)
1313

14-
ReceiveNonblocking(ctx Context) (v T, ok bool)
14+
ReceiveNonBlocking(ctx Context) (v T, ok bool)
1515

1616
Close()
1717
}
@@ -21,6 +21,8 @@ type ChannelInternal[T any] interface {
2121

2222
ReceiveNonBlocking(ctx Context) (v T, ok bool)
2323

24+
// AddReceiveCallback adds a callback that is called once when a value is sent to the channel. This is similar
25+
// to the blocking `Receive` method, but is not blocking a coroutine.
2426
AddReceiveCallback(cb func(v T, ok bool))
2527
}
2628

@@ -144,7 +146,7 @@ func (c *channel[T]) Receive(ctx Context) (v T, ok bool) {
144146
}
145147
}
146148

147-
func (c *channel[T]) ReceiveNonblocking(ctx Context) (T, bool) {
149+
func (c *channel[T]) ReceiveNonBlocking(ctx Context) (T, bool) {
148150
if v, ok, rok := c.tryReceive(); rok {
149151
return v, ok
150152
}
@@ -226,15 +228,6 @@ func (c *channel[T]) AddReceiveCallback(cb func(v T, ok bool)) {
226228
c.receivers = append(c.receivers, cb)
227229
}
228230

229-
func (c *channel[T]) ReceiveNonBlocking(ctx Context) (T, bool) {
230-
if v, ok, rok := c.tryReceive(); rok {
231-
return v, ok
232-
}
233-
234-
var z T
235-
return z, false
236-
}
237-
238231
func (c *channel[T]) Closed() bool {
239232
return c.closed
240233
}

0 commit comments

Comments
 (0)