Skip to content

Commit 6c332fa

Browse files
authored
Merge pull request #297 from cschleiden/redis-diag
Track newly created instances correctly
2 parents d7edac9 + 85d008c commit 6c332fa

File tree

6 files changed

+68
-3
lines changed

6 files changed

+68
-3
lines changed

backend/redis/diagnostics_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/cschleiden/go-workflows/client"
8+
"github.com/cschleiden/go-workflows/diag"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func Test_Diag_GetWorkflowInstances(t *testing.T) {
13+
if testing.Short() {
14+
t.Skip()
15+
}
16+
17+
rclient := getClient()
18+
setup := getCreateBackend(rclient)
19+
20+
b := setup()
21+
22+
t.Cleanup(func() {
23+
b.Close()
24+
})
25+
26+
bd := b.(diag.Backend)
27+
28+
ctx := context.Background()
29+
instances, err := bd.GetWorkflowInstances(ctx, "", "", 5)
30+
require.NoError(t, err)
31+
require.Len(t, instances, 0)
32+
33+
c := client.New(b)
34+
35+
_, err = c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
36+
InstanceID: "ex1",
37+
}, "some-workflow")
38+
require.NoError(t, err)
39+
40+
instances, err = bd.GetWorkflowInstances(ctx, "", "", 5)
41+
require.NoError(t, err)
42+
require.Len(t, instances, 1)
43+
require.Equal(t, "ex1", instances[0].Instance.InstanceID)
44+
}

backend/redis/instance.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo
4848
pendingEventsKey(instance),
4949
payloadKey(instance),
5050
instancesActive(),
51+
instancesByCreation(),
5152
keyInfo.SetKey,
5253
keyInfo.StreamKey,
5354
},
@@ -57,6 +58,7 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo
5758
event.ID,
5859
eventData,
5960
payloadData,
61+
time.Now().UTC().Unix(),
6062
).Result()
6163

6264
if err != nil {

backend/redis/keys.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@ func instanceKeyFromSegment(segment string) string {
2424
}
2525

2626
// instancesByCreation returns the key for the ZSET that contains all instances sorted by creation date. The score is the
27-
// creation time. Used for listing all workflow instances in the diagnostics UI.
27+
// creation time as a unix timestamp. Used for listing all workflow instances in the diagnostics UI.
2828
func instancesByCreation() string {
2929
return "instances-by-creation"
3030
}
3131

32+
// instancesActive returns the key for the SET that contains all active instances. Used for reporting active workflow
33+
// instances in stats.
3234
func instancesActive() string {
3335
return "instances-active"
3436
}

backend/redis/scripts/complete_workflow_task.lua

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ local pendingEventsKey = getKey()
2121
local payloadHashKey = getKey()
2222
local futureEventZSetKey = getKey()
2323
local activeInstancesKey = getKey()
24+
local instancesByCreation = getKey()
2425

2526
local workflowSetKey = getKey()
2627
local workflowStreamKey = getKey()
@@ -59,6 +60,7 @@ redis.call("XDEL", pendingEventsKey, lastPendingEventMessageId)
5960

6061
-- Update instance state
6162
local now = getArgv()
63+
local nowUnix = tonumber(getArgv())
6264
local state = tonumber(getArgv())
6365

6466
-- State constants
@@ -164,6 +166,7 @@ for i = 1, otherWorkflowInstances do
164166

165167
-- Track active instance
166168
redis.call("SADD", activeInstancesKey, targetInstanceSegment)
169+
redis.call("ZADD", instancesByCreation, nowUnix, targetInstanceSegment)
167170
end
168171
end
169172

backend/redis/scripts/create_workflow_instance.lua

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ local pendingEventsKey = getKey()
1919
local payloadHashKey = getKey()
2020

2121
local instancesActiveKey = getKey()
22+
local instancesByCreation = getKey()
2223

2324
local workflowSetKey = getKey()
2425
local workflowStreamKey = getKey()
@@ -50,6 +51,9 @@ redis.call("XADD", pendingEventsKey, "*", "event", eventData)
5051
local payload = getArgv()
5152
redis.pcall("HSETNX", payloadHashKey, eventId, payload)
5253

54+
local creationTimestamp = tonumber(getArgv())
55+
redis.call("ZADD", instancesByCreation, creationTimestamp, instanceSegment)
56+
5357
-- queue workflow task
5458
local added = redis.call("SADD", workflowSetKey, instanceSegment)
5559
if added == 1 then

backend/redis/workflow.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
110110
payloadKey(instance),
111111
futureEventsKey(),
112112
instancesActive(),
113+
instancesByCreation(),
113114
queueKeys.SetKey,
114115
queueKeys.StreamKey,
115116
)
@@ -137,8 +138,17 @@ func (rb *redisBackend) CompleteWorkflowTask(
137138
args = append(args, lastPendingEventMessageID)
138139

139140
// Update instance state and update active execution
140-
nowStr := time.Now().Format(time.RFC3339)
141-
args = append(args, string(nowStr), int(state), int(core.WorkflowInstanceStateContinuedAsNew), int(core.WorkflowInstanceStateFinished))
141+
now := time.Now().UTC()
142+
nowStr := now.Format(time.RFC3339)
143+
nowUnix := now.Unix()
144+
args = append(
145+
args,
146+
string(nowStr),
147+
nowUnix,
148+
int(state),
149+
int(core.WorkflowInstanceStateContinuedAsNew),
150+
int(core.WorkflowInstanceStateFinished),
151+
)
142152
keys = append(keys, activeInstanceExecutionKey(instance.InstanceID))
143153

144154
// Remove canceled timers

0 commit comments

Comments
 (0)