Skip to content

Commit cf95b44

Browse files
authored
Merge pull request #218 from cschleiden/backend-stats
Expose simple backend stats for active instances/activities
2 parents 6f51384 + 0eb0314 commit cf95b44

File tree

14 files changed

+277
-8
lines changed

14 files changed

+277
-8
lines changed

backend/backend.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ type Backend interface {
6868
// ExtendActivityTask extends the lock of an activity task
6969
ExtendActivityTask(ctx context.Context, activityID string) error
7070

71+
// GetStats returns stats about the backend
72+
GetStats(ctx context.Context) (*Stats, error)
73+
7174
// Logger returns the configured logger for the backend
7275
Logger() log.Logger
7376

backend/mock_Backend.go

Lines changed: 31 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/stats.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package mysql
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
8+
"github.com/cschleiden/go-workflows/backend"
9+
)
10+
11+
func (b *mysqlBackend) GetStats(ctx context.Context) (*backend.Stats, error) {
12+
s := &backend.Stats{}
13+
14+
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
15+
Isolation: sql.LevelReadCommitted,
16+
})
17+
if err != nil {
18+
return nil, fmt.Errorf("failed to start transaction: %w", err)
19+
}
20+
defer tx.Rollback()
21+
22+
row := tx.QueryRowContext(
23+
ctx,
24+
"SELECT COUNT(*) FROM instances i WHERE i.completed_at IS NULL",
25+
)
26+
if err := row.Err(); err != nil {
27+
return nil, fmt.Errorf("failed to query active instances: %w", err)
28+
}
29+
30+
var activeInstances int64
31+
if err := row.Scan(&activeInstances); err != nil {
32+
return nil, fmt.Errorf("failed to scan active instances: %w", err)
33+
}
34+
35+
s.ActiveWorkflowInstances = activeInstances
36+
37+
// Get pending activities
38+
row = tx.QueryRowContext(
39+
ctx,
40+
"SELECT COUNT(*) FROM activities")
41+
if err := row.Err(); err != nil {
42+
return nil, fmt.Errorf("failed to query active activities: %w", err)
43+
}
44+
45+
var pendingActivities int64
46+
if err := row.Scan(&pendingActivities); err != nil {
47+
return nil, fmt.Errorf("failed to scan active activities: %w", err)
48+
}
49+
50+
s.PendingActivities = pendingActivities
51+
52+
return s, nil
53+
}

backend/redis/instance.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.Work
160160
Score: float64(createdAt.UnixMilli()),
161161
})
162162

163+
p.SAdd(ctx, instancesActive(), instanceSegment(instance))
164+
163165
return nil
164166
}
165167

@@ -173,6 +175,10 @@ func updateInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.Work
173175

174176
p.Set(ctx, key, string(b), 0)
175177

178+
if state.State != core.WorkflowInstanceStateActive {
179+
p.SRem(ctx, instancesActive(), instanceSegment(instance))
180+
}
181+
176182
// CreatedAt does not change, so skip updating the instancesByCreation() ZSET
177183

178184
return nil

backend/redis/keys.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ func instancesByCreation() string {
2929
return "instances-by-creation"
3030
}
3131

32+
func instancesActive() string {
33+
return "instances-active"
34+
}
35+
3236
func instancesExpiring() string {
3337
return "instances-expiring"
3438
}

backend/redis/queue.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ func (q *taskQueue[T]) Keys() KeyInfo {
7474
}
7575
}
7676

77+
func (q *taskQueue[T]) Size(ctx context.Context, rdb redis.UniversalClient) (int64, error) {
78+
return rdb.XLen(ctx, q.streamKey).Result()
79+
}
80+
7781
// KEYS[1] = set
7882
// KEYS[2] = stream
7983
// ARGV[1] = caller provided id of the task

backend/redis/stats.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/cschleiden/go-workflows/backend"
8+
)
9+
10+
func (rb *redisBackend) GetStats(ctx context.Context) (*backend.Stats, error) {
11+
var err error
12+
13+
s := &backend.Stats{}
14+
15+
// get workflow instances
16+
activeInstances, err := rb.rdb.SCard(ctx, instancesActive()).Result()
17+
if err != nil {
18+
return nil, fmt.Errorf("getting active instances: %w", err)
19+
}
20+
21+
s.ActiveWorkflowInstances = activeInstances
22+
23+
// get pending activities
24+
pendingActivities, err := rb.activityQueue.Size(ctx, rb.rdb)
25+
if err != nil {
26+
return nil, fmt.Errorf("getting active activities: %w", err)
27+
}
28+
29+
s.PendingActivities = pendingActivities
30+
31+
return s, nil
32+
}

backend/sqlite/sqlite.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ func (sb *sqliteBackend) Converter() converter.Converter {
8383
return sb.options.Converter
8484
}
8585

86-
func (b *sqliteBackend) ContextPropagators() []contextpropagation.ContextPropagator {
87-
return b.options.ContextPropagators
86+
func (sb *sqliteBackend) ContextPropagators() []contextpropagation.ContextPropagator {
87+
return sb.options.ContextPropagators
8888
}
8989

9090
func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {

backend/sqlite/stats.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package sqlite
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
8+
"github.com/cschleiden/go-workflows/backend"
9+
)
10+
11+
func (b *sqliteBackend) GetStats(ctx context.Context) (*backend.Stats, error) {
12+
s := &backend.Stats{}
13+
14+
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
15+
Isolation: sql.LevelReadCommitted,
16+
})
17+
if err != nil {
18+
return nil, fmt.Errorf("failed to start transaction: %w", err)
19+
}
20+
defer tx.Rollback()
21+
22+
row := tx.QueryRowContext(
23+
ctx,
24+
"SELECT COUNT(*) FROM instances i WHERE i.completed_at IS NULL",
25+
)
26+
if err := row.Err(); err != nil {
27+
return nil, fmt.Errorf("failed to query active instances: %w", err)
28+
}
29+
30+
var activeInstances int64
31+
if err := row.Scan(&activeInstances); err != nil {
32+
return nil, fmt.Errorf("failed to scan active instances: %w", err)
33+
}
34+
35+
s.ActiveWorkflowInstances = activeInstances
36+
37+
// Get pending activities
38+
row = tx.QueryRowContext(
39+
ctx,
40+
"SELECT COUNT(*) FROM activities")
41+
if err := row.Err(); err != nil {
42+
return nil, fmt.Errorf("failed to query active activities: %w", err)
43+
}
44+
45+
var pendingActivities int64
46+
if err := row.Scan(&pendingActivities); err != nil {
47+
return nil, fmt.Errorf("failed to scan active activities: %w", err)
48+
}
49+
50+
s.PendingActivities = pendingActivities
51+
52+
return s, nil
53+
}

backend/stats.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package backend
2+
3+
type Stats struct {
4+
ActiveWorkflowInstances int64
5+
6+
PendingActivities int64
7+
}

0 commit comments

Comments
 (0)