Skip to content

Commit 0421f4d

Browse files
authored
Simple timer support
1 parent 63af8ef commit 0421f4d

File tree

3 files changed

+93
-20
lines changed

3 files changed

+93
-20
lines changed

backend/redis/keys.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ func pendingEventsKey(instanceID string) string {
1616
return fmt.Sprintf("pending-events:%v", instanceID)
1717
}
1818

19+
func futureEventsKey() string {
20+
return "future-events"
21+
}
22+
1923
func historyKey(instanceID string) string {
2024
return fmt.Sprintf("history:%v", instanceID)
2125
}

backend/redis/workflow.go

Lines changed: 75 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,62 @@ package redis
33
import (
44
"context"
55
"encoding/json"
6-
"log"
6+
"strconv"
7+
"time"
78

89
"github.com/cschleiden/go-workflows/backend"
910
"github.com/cschleiden/go-workflows/backend/redis/taskqueue"
1011
"github.com/cschleiden/go-workflows/internal/core"
1112
"github.com/cschleiden/go-workflows/internal/history"
1213
"github.com/cschleiden/go-workflows/internal/task"
1314
"github.com/cschleiden/go-workflows/workflow"
15+
"github.com/go-redis/redis/v8"
1416
"github.com/pkg/errors"
1517
)
1618

19+
type futureEvent struct {
20+
Instance *core.WorkflowInstance `json:"instance,omitempty"`
21+
Event *history.Event `json:"event,omitempty"`
22+
}
23+
24+
// KEYS[1] - future event set key
25+
// ARGV[1] - current timestamp for zrange
26+
var futureEventsCmd = redis.NewScript(`
27+
local events = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
28+
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
29+
return events
30+
`)
31+
1732
func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, error) {
18-
// TODO: Check for timer events, and add them to pending events if required
33+
// Check for future events
34+
now := time.Now().Unix()
35+
nowStr := strconv.Itoa(int(now))
36+
37+
result, err := futureEventsCmd.Run(ctx, rb.rdb, []string{futureEventsKey()}, nowStr).Result()
38+
if err != nil {
39+
return nil, errors.Wrap(err, "could not check future events")
40+
}
41+
42+
for _, eventR := range result.([]interface{}) {
43+
eventStr := eventR.(string)
44+
var event futureEvent
45+
if err := json.Unmarshal([]byte(eventStr), &event); err != nil {
46+
return nil, errors.Wrap(err, "could not unmarshal event")
47+
}
48+
49+
if err := addEventToStream(ctx, rb.rdb, pendingEventsKey(event.Instance.InstanceID), event.Event); err != nil {
50+
return nil, errors.Wrap(err, "could not add future event to stream")
51+
}
52+
53+
// Instance now has at least one pending event, try to queue task
54+
if _, err := rb.workflowQueue.Enqueue(ctx, event.Instance.InstanceID, nil); err != nil {
55+
if err != taskqueue.ErrTaskAlreadyInQueue {
56+
return nil, errors.Wrap(err, "could not queue workflow")
57+
}
58+
}
59+
}
1960

61+
// Try to get a workflow task
2062
instanceTask, err := rb.workflowQueue.Dequeue(ctx, rb.options.WorkflowLockTimeout, rb.options.BlockTimeout)
2163
if err != nil {
2264
return nil, err
@@ -68,6 +110,7 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
68110
}
69111

70112
// Remove all pending events
113+
// TODO: What happens if the worker dies and this task gets picked up by another one?
71114
rb.rdb.XTrim(ctx, pendingEventsKey(instanceTask.ID), 0)
72115

73116
return &task.Workflow{
@@ -102,21 +145,45 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
102145

103146
for targetInstance, events := range groupedEvents {
104147
if instance.InstanceID != targetInstance.InstanceID {
105-
// Create new instance
148+
// Try to create a new instance
106149
if err := createInstance(ctx, rb.rdb, targetInstance, true); err != nil {
107150
return err
108151
}
109152
}
110153

111154
// Insert pending events for target instance
155+
addedPendingEvent := false
156+
112157
for _, event := range events {
113-
if err := addEventToStream(ctx, rb.rdb, pendingEventsKey(targetInstance.InstanceID), &event); err != nil {
114-
return err
158+
if event.VisibleAt != nil {
159+
// Add future event in sorted set
160+
futureEvent := &futureEvent{
161+
Instance: targetInstance,
162+
Event: &event,
163+
}
164+
165+
eventData, err := json.Marshal(futureEvent)
166+
if err != nil {
167+
return err
168+
}
169+
170+
if err := rb.rdb.ZAdd(ctx, futureEventsKey(), &redis.Z{
171+
Member: eventData,
172+
Score: float64(event.VisibleAt.Unix()),
173+
}).Err(); err != nil {
174+
return errors.Wrap(err, "could not add future event")
175+
}
176+
} else {
177+
// Add pending event to stream
178+
if err := addEventToStream(ctx, rb.rdb, pendingEventsKey(targetInstance.InstanceID), &event); err != nil {
179+
return err
180+
}
181+
182+
addedPendingEvent = true
115183
}
116184
}
117185

118-
// TODO: Delay unlocking the current instance. Can we find a better way here?
119-
if targetInstance != instance {
186+
if addedPendingEvent && targetInstance != instance {
120187
if _, err := rb.workflowQueue.Enqueue(ctx, targetInstance.InstanceID, nil); err != nil {
121188
if err != taskqueue.ErrTaskAlreadyInQueue {
122189
return errors.Wrap(err, "could not add instance to locked instances set")
@@ -153,7 +220,7 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
153220
return errors.Wrap(err, "could not complete workflow task")
154221
}
155222

156-
// If there are pending events, enqueue the instance again
223+
// If there are pending events, queue the instance again
157224
pendingCount, err := rb.rdb.XLen(ctx, pendingEventsKey(instance.InstanceID)).Result()
158225
if err != nil {
159226
return errors.Wrap(err, "could not read event stream")
@@ -167,8 +234,6 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
167234
}
168235
}
169236

170-
log.Println("Unlocked workflow task", instance.InstanceID)
171-
172237
return nil
173238
}
174239

samples/timer/timer.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import (
88
"time"
99

1010
"github.com/cschleiden/go-workflows/backend"
11+
"github.com/cschleiden/go-workflows/backend/redis"
1112
"github.com/cschleiden/go-workflows/backend/sqlite"
1213
"github.com/cschleiden/go-workflows/client"
14+
"github.com/cschleiden/go-workflows/samples"
1315
"github.com/cschleiden/go-workflows/worker"
1416
"github.com/cschleiden/go-workflows/workflow"
1517
"github.com/google/uuid"
@@ -20,6 +22,10 @@ func main() {
2022
ctx, cancel := context.WithCancel(ctx)
2123

2224
b := sqlite.NewInMemoryBackend()
25+
b, err := redis.NewRedisBackend("localhost:6379", "", "RedisPassw0rd", 0)
26+
if err != nil {
27+
panic(err)
28+
}
2329

2430
// Run worker
2531
go RunWorker(ctx, b)
@@ -60,27 +66,25 @@ func RunWorker(ctx context.Context, mb backend.Backend) {
6066
}
6167

6268
func Workflow1(ctx workflow.Context, msg string) (string, error) {
63-
log.Println("Entering Workflow1")
64-
log.Println("\tWorkflow instance input:", msg)
65-
log.Println("\tIsReplaying:", workflow.Replaying(ctx))
69+
samples.Trace(ctx, "Entering Workflow1, input: ", msg)
6670

6771
defer func() {
68-
log.Println("Leaving Workflow1")
72+
samples.Trace(ctx, "Leaving Workflow1")
6973
}()
7074

7175
a1 := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12)
7276

73-
tctx, cancel := workflow.WithCancel(ctx)
77+
tctx, _ := workflow.WithCancel(ctx)
7478
t := workflow.ScheduleTimer(tctx, 2*time.Second)
75-
cancel()
79+
// cancel()
7680

7781
workflow.Select(
7882
ctx,
7983
workflow.Await(t, func(ctx workflow.Context, f workflow.Future[struct{}]) {
8084
if _, err := f.Get(ctx); err != nil {
81-
log.Println("Timer canceled, IsReplaying:", workflow.Replaying(ctx))
85+
samples.Trace(ctx, "Timer canceled")
8286
} else {
83-
log.Println("Timer fired, IsReplaying:", workflow.Replaying(ctx))
87+
samples.Trace(ctx, "Timer fired")
8488
}
8589
}),
8690
workflow.Await(a1, func(ctx workflow.Context, f workflow.Future[int]) {
@@ -89,7 +93,7 @@ func Workflow1(ctx workflow.Context, msg string) (string, error) {
8993
panic(err)
9094
}
9195

92-
log.Println("Activity result", r, ", IsReplaying:", workflow.Replaying(ctx))
96+
samples.Trace(ctx, "Activity result", r)
9397

9498
// Cancel timer
9599
// cancel()
@@ -102,7 +106,7 @@ func Workflow1(ctx workflow.Context, msg string) (string, error) {
102106
func Activity1(ctx context.Context, a, b int) (int, error) {
103107
log.Println("Entering Activity1")
104108

105-
time.Sleep(3 * time.Second)
109+
time.Sleep(10 * time.Second)
106110

107111
defer func() {
108112
log.Println("Leaving Activity1")

0 commit comments

Comments
 (0)