Skip to content

Commit 8faa59b

Browse files
authored
Implement diagnostics interface for Redis backend
1 parent de2eff0 commit 8faa59b

File tree

4 files changed

+101
-2
lines changed

4 files changed

+101
-2
lines changed

backend/redis/diagnostics.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
8+
"github.com/cschleiden/go-workflows/web"
9+
"github.com/go-redis/redis/v8"
10+
)
11+
12+
var _ web.WebBackend = (*redisBackend)(nil)
13+
14+
func (rb *redisBackend) GetWorkflowInstances(ctx context.Context, afterInstanceID string, count int) ([]*web.WorkflowInstanceRef, error) {
15+
max := "+inf"
16+
17+
if afterInstanceID != "" {
18+
scores, err := rb.rdb.ZMScore(ctx, instancesByCreation(), afterInstanceID).Result()
19+
if err != nil {
20+
return nil, fmt.Errorf("getting instance score for %v: %w", afterInstanceID, err)
21+
}
22+
23+
if len(scores) == 0 {
24+
rb.Logger().Error("could not find instance %v", "afterInstanceID", afterInstanceID)
25+
return nil, nil
26+
}
27+
28+
max = fmt.Sprintf("(%v", int64(scores[0]))
29+
}
30+
31+
result, err := rb.rdb.ZRangeArgs(ctx, redis.ZRangeArgs{
32+
Key: instancesByCreation(),
33+
Stop: max,
34+
Start: "-inf",
35+
ByScore: true,
36+
Rev: true,
37+
Count: int64(count),
38+
}).Result()
39+
if err != nil {
40+
return nil, fmt.Errorf("getting instances after %v: %w", max, err)
41+
}
42+
43+
instanceIDs := make([]string, 0)
44+
for _, r := range result {
45+
instanceID := r
46+
instanceIDs = append(instanceIDs, instanceKey(instanceID))
47+
}
48+
49+
instances, err := rb.rdb.MGet(ctx, instanceIDs...).Result()
50+
if err != nil {
51+
return nil, fmt.Errorf("getting instances: %w", err)
52+
}
53+
54+
var instanceRefs []*web.WorkflowInstanceRef
55+
for _, instance := range instances {
56+
var state instanceState
57+
if err := json.Unmarshal([]byte(instance.(string)), &state); err != nil {
58+
return nil, fmt.Errorf("unmarshaling instance state: %w", err)
59+
}
60+
61+
instanceRefs = append(instanceRefs, &web.WorkflowInstanceRef{
62+
Instance: state.Instance,
63+
CreatedAt: state.CreatedAt,
64+
CompletedAt: state.CompletedAt,
65+
State: state.State,
66+
})
67+
}
68+
69+
return instanceRefs, nil
70+
}
71+
72+
func (rb *redisBackend) GetWorkflowInstance(ctx context.Context, instanceID string) (*web.WorkflowInstanceRef, error) {
73+
instance, err := readInstance(ctx, rb.rdb, instanceID)
74+
if err != nil {
75+
return nil, err
76+
}
77+
78+
return &web.WorkflowInstanceRef{
79+
Instance: instance.Instance,
80+
CreatedAt: instance.CreatedAt,
81+
CompletedAt: instance.CompletedAt,
82+
State: instance.State,
83+
}, nil
84+
}

backend/redis/instance.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,12 @@ type instanceState struct {
119119
func createInstance(ctx context.Context, rdb redis.UniversalClient, instance *core.WorkflowInstance, ignoreDuplicate bool) error {
120120
key := instanceKey(instance.InstanceID)
121121

122+
createdAt := time.Now()
123+
122124
b, err := json.Marshal(&instanceState{
123125
Instance: instance,
124126
State: backend.WorkflowStateActive,
125-
CreatedAt: time.Now(),
127+
CreatedAt: createdAt,
126128
})
127129
if err != nil {
128130
return fmt.Errorf("marshaling instance state: %w", err)
@@ -148,6 +150,13 @@ func createInstance(ctx context.Context, rdb redis.UniversalClient, instance *co
148150
}
149151
}
150152

153+
if err := rdb.ZAdd(ctx, instancesByCreation(), &redis.Z{
154+
Member: instance.InstanceID,
155+
Score: float64(createdAt.UnixMilli()),
156+
}).Err(); err != nil {
157+
return fmt.Errorf("storing instance reference: %w", err)
158+
}
159+
151160
return nil
152161
}
153162

@@ -164,6 +173,8 @@ func updateInstance(ctx context.Context, rdb redis.UniversalClient, instanceID s
164173
return fmt.Errorf("updating instance: %w", err)
165174
}
166175

176+
// CreatedAt does not change, so skip updating the instancesByCreation() ZSET
177+
167178
return nil
168179
}
169180

backend/redis/keys.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ func instanceKey(instanceID string) string {
88
return fmt.Sprintf("instance:%v", instanceID)
99
}
1010

11+
func instancesByCreation() string {
12+
return "instances-by-creation"
13+
}
14+
1115
func subInstanceKey(instanceID string) string {
1216
return fmt.Sprintf("sub-instance:%v", instanceID)
1317
}

backend/redis/redis.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func WithBackendOptions(opts ...backend.BackendOption) RedisBackendOption {
3434
}
3535
}
3636

37-
func NewRedisBackend(address, username, password string, db int, opts ...RedisBackendOption) (backend.Backend, error) {
37+
func NewRedisBackend(address, username, password string, db int, opts ...RedisBackendOption) (*redisBackend, error) {
3838
client := redis.NewUniversalClient(&redis.UniversalOptions{
3939
Addrs: []string{address},
4040
Username: username,

0 commit comments

Comments
 (0)