Skip to content

Commit 450bf5b

Browse files
authored
Merge pull request #293 from cschleiden/redis-atomic-create-instance
Refactor workflow instance creation
2 parents 8f4daea + d0a7662 commit 450bf5b

File tree

5 files changed

+116
-25
lines changed

5 files changed

+116
-25
lines changed

backend/redis/diagnostics.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ func (rb *redisBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
5252
instanceKeys = append(instanceKeys, instanceKeyFromSegment(instanceSegment))
5353
}
5454

55+
if len(instanceKeys) == 0 {
56+
return nil, nil
57+
}
58+
5559
instances, err := rb.rdb.MGet(ctx, instanceKeys...).Result()
5660
if err != nil {
5761
return nil, fmt.Errorf("getting instances: %w", err)

backend/redis/instance.go

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,41 +15,60 @@ import (
1515
)
1616

1717
func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
18-
activeInstance, err := readActiveInstanceExecution(ctx, rb.rdb, instance.InstanceID)
19-
if err != nil && err != backend.ErrInstanceNotFound {
20-
return err
21-
}
18+
keyInfo := rb.workflowQueue.Keys()
2219

23-
if activeInstance != nil {
24-
return backend.ErrInstanceAlreadyExists
20+
instanceState, err := json.Marshal(&instanceState{
21+
Instance: instance,
22+
State: core.WorkflowInstanceStateActive,
23+
Metadata: event.Attributes.(*history.ExecutionStartedAttributes).Metadata,
24+
CreatedAt: time.Now(),
25+
})
26+
if err != nil {
27+
return fmt.Errorf("marshaling instance state: %w", err)
2528
}
2629

27-
p := rb.rdb.TxPipeline()
28-
29-
if err := createInstanceP(ctx, p, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata); err != nil {
30-
return err
30+
activeInstance, err := json.Marshal(instance)
31+
if err != nil {
32+
return fmt.Errorf("marshaling instance: %w", err)
3133
}
3234

33-
// Create event stream with initial event
34-
if err := addEventPayloadsP(ctx, p, instance, []*history.Event{event}); err != nil {
35-
return fmt.Errorf("adding event payloads: %w", err)
35+
eventData, err := marshalEventWithoutAttributes(event)
36+
if err != nil {
37+
return fmt.Errorf("marshaling event: %w", err)
3638
}
3739

38-
if err := addEventToStreamP(ctx, p, pendingEventsKey(instance), event); err != nil {
39-
return fmt.Errorf("adding event to stream: %w", err)
40+
payloadData, err := json.Marshal(event.Attributes)
41+
if err != nil {
42+
return fmt.Errorf("marshaling event payload: %w", err)
4043
}
4144

42-
// Queue workflow instance task
43-
if err := rb.workflowQueue.Enqueue(ctx, p, instanceSegment(instance), nil); err != nil {
44-
return fmt.Errorf("queueing workflow task: %w", err)
45-
}
45+
_, err = createWorkflowInstanceCmd.Run(ctx, rb.rdb, []string{
46+
instanceKey(instance),
47+
activeInstanceExecutionKey(instance.InstanceID),
48+
pendingEventsKey(instance),
49+
payloadKey(instance),
50+
instancesActive(),
51+
keyInfo.SetKey,
52+
keyInfo.StreamKey,
53+
},
54+
instanceSegment(instance),
55+
string(instanceState),
56+
string(activeInstance),
57+
event.ID,
58+
eventData,
59+
payloadData,
60+
).Result()
61+
62+
if err != nil {
63+
if _, ok := err.(redis.Error); ok {
64+
if err.Error() == "ERR InstanceAlreadyExists" {
65+
return backend.ErrInstanceAlreadyExists
66+
}
67+
}
4668

47-
if _, err := p.Exec(ctx); err != nil {
4869
return fmt.Errorf("creating workflow instance: %w", err)
4970
}
5071

51-
rb.options.Logger.Debug("Created new workflow instance")
52-
5372
return nil
5473
}
5574

backend/redis/redis.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ var _ backend.Backend = (*redisBackend)(nil)
2424
//go:embed scripts/*.lua
2525
var luaScripts embed.FS
2626

27-
var completeWorkflowTaskCmd *redis.Script
27+
var (
28+
createWorkflowInstanceCmd *redis.Script
29+
completeWorkflowTaskCmd *redis.Script
30+
)
2831

2932
func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (*redisBackend, error) {
3033
workflowQueue, err := newTaskQueue[any](client, "workflows")
@@ -75,7 +78,8 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (
7578
// Load all Lua scripts
7679

7780
cmdMapping := map[string]**redis.Script{
78-
"complete_workflow_task.lua": &completeWorkflowTaskCmd,
81+
"create_workflow_instance.lua": &createWorkflowInstanceCmd,
82+
"complete_workflow_task.lua": &completeWorkflowTaskCmd,
7983
}
8084

8185
for scriptFile, cmd := range cmdMapping {
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
local keyIdx = 1
2+
local argvIdx = 1
3+
4+
local getKey = function()
5+
local key = KEYS[keyIdx]
6+
keyIdx = keyIdx + 1
7+
return key
8+
end
9+
10+
local getArgv = function()
11+
local argv = ARGV[argvIdx]
12+
argvIdx = argvIdx + 1
13+
return argv
14+
end
15+
16+
local instanceKey = getKey()
17+
local activeInstanceExecutionKey = getKey()
18+
local pendingEventsKey = getKey()
19+
local payloadHashKey = getKey()
20+
21+
local instancesActiveKey = getKey()
22+
23+
local workflowSetKey = getKey()
24+
local workflowStreamKey = getKey()
25+
26+
local instanceSegment = getArgv()
27+
28+
-- Is there an existing instance with active execution?
29+
local instanceExists = redis.call("EXISTS", activeInstanceExecutionKey)
30+
if instanceExists == 1 then
31+
return redis.error_reply("ERR InstanceAlreadyExists")
32+
end
33+
34+
-- Create new instance
35+
local instanceState = getArgv()
36+
redis.call("SETNX", instanceKey, instanceState)
37+
38+
-- Set active execution
39+
local activeInstanceExecutionState = getArgv()
40+
redis.call("SET", activeInstanceExecutionKey, activeInstanceExecutionState)
41+
42+
-- Track active instance
43+
redis.call("SADD", instancesActiveKey, instanceSegment)
44+
45+
-- add initial event & payload
46+
local eventId = getArgv()
47+
local eventData = getArgv()
48+
redis.call("XADD", pendingEventsKey, "*", "event", eventData)
49+
50+
local payload = getArgv()
51+
redis.pcall("HSETNX", payloadHashKey, eventId, payload)
52+
53+
-- queue workflow task
54+
local added = redis.call("SADD", workflowSetKey, instanceSegment)
55+
if added == 1 then
56+
redis.call("XADD", workflowStreamKey, "*", "id", instanceSegment, "data", "")
57+
end
58+
59+
return true

client/client.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,12 @@ func (c *Client) CreateWorkflowInstance(ctx context.Context, options WorkflowIns
8585
return nil, fmt.Errorf("creating workflow instance: %w", err)
8686
}
8787

88-
c.backend.Logger().Debug("Created workflow instance", log.InstanceIDKey, wfi.InstanceID, log.ExecutionIDKey, wfi.ExecutionID)
88+
c.backend.Logger().Debug(
89+
"Created workflow instance",
90+
log.InstanceIDKey, wfi.InstanceID,
91+
log.ExecutionIDKey, wfi.ExecutionID,
92+
log.WorkflowNameKey, workflowName,
93+
)
8994

9095
c.backend.Metrics().Counter(metrickeys.WorkflowInstanceCreated, metrics.Tags{}, 1)
9196

0 commit comments

Comments
 (0)