Skip to content

Commit e57e126

Browse files
authored
Extract adding events
1 parent eb622d6 commit e57e126

File tree

3 files changed

+44
-60
lines changed

3 files changed

+44
-60
lines changed

backend/redis/events.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
7+
"github.com/cschleiden/go-workflows/internal/history"
8+
"github.com/go-redis/redis/v8"
9+
"github.com/pkg/errors"
10+
)
11+
12+
func addEventToStream(ctx context.Context, rdb redis.UniversalClient, streamKey string, event *history.Event) error {
13+
eventData, err := json.Marshal(event)
14+
if err != nil {
15+
return err
16+
}
17+
18+
if err := rdb.XAdd(ctx, &redis.XAddArgs{
19+
Stream: streamKey,
20+
ID: "*",
21+
Values: map[string]interface{}{
22+
"event": string(eventData),
23+
},
24+
}).Err(); err != nil {
25+
return errors.Wrap(err, "could not add event to stream")
26+
}
27+
28+
return nil
29+
}

backend/redis/instance.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,11 @@ func (rb *redisBackend) CancelWorkflowInstance(ctx context.Context, instance cor
6767
}
6868

6969
type instanceState struct {
70-
InstanceID string `json:"instance_id,omitempty"`
71-
ExecutionID string `json:"execution_id,omitempty"`
72-
State backend.WorkflowState `json:"state,omitempty"`
73-
CreatedAt time.Time `json:"created_at,omitempty"`
74-
CompletedAt *time.Time `json:"completed_at,omitempty"`
75-
LastMessageID string `redis:"last_message_id"`
70+
InstanceID string `json:"instance_id,omitempty"`
71+
ExecutionID string `json:"execution_id,omitempty"`
72+
State backend.WorkflowState `json:"state,omitempty"`
73+
CreatedAt time.Time `json:"created_at,omitempty"`
74+
CompletedAt *time.Time `json:"completed_at,omitempty"`
7675
}
7776

7877
func createInstance(ctx context.Context, rdb redis.UniversalClient, instance core.WorkflowInstance, state *instanceState) error {

backend/redis/workflow.go

Lines changed: 10 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ import (
1010
"github.com/cschleiden/go-workflows/internal/history"
1111
"github.com/cschleiden/go-workflows/internal/task"
1212
"github.com/cschleiden/go-workflows/workflow"
13-
"github.com/go-redis/redis/v8"
1413
"github.com/pkg/errors"
1514
)
1615

1716
func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, error) {
17+
// TODO: Check for timer events, and add them to pending events if required
18+
1819
instanceTask, err := rb.workflowQueue.Dequeue(ctx, rb.options.WorkflowLockTimeout, rb.options.BlockTimeout)
1920
if err != nil {
2021
return nil, err
@@ -70,8 +71,6 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
7071
// Remove all pending events
7172
rb.rdb.XTrim(ctx, pendingEventsKey(instanceTask.ID), 0)
7273

73-
log.Println("Returned task for ", instanceTask)
74-
7574
return &task.Workflow{
7675
ID: instanceTask.ID,
7776
WorkflowInstance: core.NewWorkflowInstance(instanceTask.ID, instanceState.ExecutionID),
@@ -85,30 +84,11 @@ func (rb *redisBackend) ExtendWorkflowTask(ctx context.Context, taskID string, i
8584
}
8685

8786
func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string, instance core.WorkflowInstance, state backend.WorkflowState, executedEvents []history.Event, activityEvents []history.Event, workflowEvents []history.WorkflowEvent) error {
88-
89-
// Add events to stream
90-
var lastMessageID string
91-
87+
// Add executed events to the history
9288
for _, executedEvent := range executedEvents {
93-
// TODO: Use pipeline
94-
eventData, err := json.Marshal(executedEvent)
95-
if err != nil {
89+
if err := addEventToStream(ctx, rb.rdb, historyKey(instance.GetInstanceID()), &executedEvent); err != nil {
9690
return err
9791
}
98-
99-
cmd := rb.rdb.XAdd(ctx, &redis.XAddArgs{
100-
Stream: historyKey(instance.GetInstanceID()),
101-
ID: "*",
102-
Values: map[string]interface{}{
103-
"event": string(eventData),
104-
},
105-
})
106-
id, err := cmd.Result()
107-
if err != nil {
108-
return errors.Wrap(err, "could not create event stream")
109-
}
110-
111-
lastMessageID = id
11292
}
11393

11494
// Send new events to the respective streams
@@ -130,27 +110,15 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
130110

131111
// Insert pending events for target instance
132112
for _, event := range events {
133-
// TODO: Use pipeline
134-
eventData, err := json.Marshal(event)
135-
if err != nil {
113+
if err := addEventToStream(ctx, rb.rdb, pendingEventsKey(instance.GetInstanceID()), &event); err != nil {
136114
return err
137115
}
138-
139-
cmd := rb.rdb.XAdd(ctx, &redis.XAddArgs{
140-
Stream: pendingEventsKey(targetInstance.GetInstanceID()),
141-
ID: "*",
142-
Values: map[string]interface{}{
143-
"event": string(eventData),
144-
},
145-
})
146-
_, err = cmd.Result()
147-
if err != nil {
148-
return errors.Wrap(err, "could not create event stream")
149-
}
150116
}
151117

152118
// TODO: Delay unlocking the current instance. Can we find a better way here?
153119
if targetInstance != instance {
120+
// TODO: Make sure this task is not already enqueued
121+
154122
if err := rb.workflowQueue.Enqueue(ctx, targetInstance.GetInstanceID(), nil); err != nil {
155123
return errors.Wrap(err, "could not add instance to locked instances set")
156124
}
@@ -164,7 +132,6 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
164132
}
165133

166134
instanceState.State = state
167-
instanceState.LastMessageID = lastMessageID // TODO: Do we need this?
168135

169136
if err := updateInstance(ctx, rb.rdb, instance.GetInstanceID(), instanceState); err != nil {
170137
return errors.Wrap(err, "could not update workflow instance")
@@ -182,7 +149,7 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
182149
}
183150
}
184151

185-
// Unlock instance
152+
// Complete workflow task and unlock instance
186153
if err := rb.workflowQueue.Complete(ctx, taskID); err != nil {
187154
return errors.Wrap(err, "could not complete workflow task")
188155
}
@@ -193,22 +160,11 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
193160
}
194161

195162
func (rb *redisBackend) addWorkflowInstanceEvent(ctx context.Context, instance core.WorkflowInstance, event history.Event) error {
196-
// Add pending event
197-
eventData, err := json.Marshal(event)
198-
if err != nil {
163+
// Add event to pending events for instance
164+
if err := addEventToStream(ctx, rb.rdb, pendingEventsKey(instance.GetInstanceID()), &event); err != nil {
199165
return err
200166
}
201167

202-
if err := rb.rdb.XAdd(ctx, &redis.XAddArgs{
203-
Stream: pendingEventsKey(instance.GetInstanceID()),
204-
ID: "*",
205-
Values: map[string]interface{}{
206-
"event": string(eventData),
207-
},
208-
}).Err(); err != nil {
209-
return errors.Wrap(err, "could not add event to stream")
210-
}
211-
212168
// Queue workflow task
213169
// TODO: Ensure this can only be queued once
214170
if err := rb.workflowQueue.Enqueue(ctx, instance.GetInstanceID(), nil); err != nil {

0 commit comments

Comments
 (0)