Skip to content

Commit dbb31b3

Browse files
authored
Merge pull request #279 from cschleiden/redis-metrics
Add pending workflow tasks metric to stats interface
2 parents ed3ea0e + 92d33fb commit dbb31b3

File tree

5 files changed

+99
-7
lines changed

5 files changed

+99
-7
lines changed

backend/mysql/stats.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import (
44
"context"
55
"database/sql"
66
"fmt"
7+
"time"
78

89
"github.com/cschleiden/go-workflows/backend"
10+
"github.com/cschleiden/go-workflows/core"
911
)
1012

1113
func (b *mysqlBackend) GetStats(ctx context.Context) (*backend.Stats, error) {
@@ -19,6 +21,7 @@ func (b *mysqlBackend) GetStats(ctx context.Context) (*backend.Stats, error) {
1921
}
2022
defer tx.Rollback()
2123

24+
// Get active instances
2225
row := tx.QueryRowContext(
2326
ctx,
2427
"SELECT COUNT(*) FROM instances i WHERE i.completed_at IS NULL",
@@ -34,6 +37,34 @@ func (b *mysqlBackend) GetStats(ctx context.Context) (*backend.Stats, error) {
3437

3538
s.ActiveWorkflowInstances = activeInstances
3639

40+
// Get workflow instances ready to be picked up
41+
now := time.Now()
42+
row = tx.QueryRowContext(
43+
ctx,
44+
`SELECT COUNT(*)
45+
FROM instances i
46+
INNER JOIN pending_events pe ON i.instance_id = pe.instance_id
47+
WHERE
48+
state = ? AND i.completed_at IS NULL
49+
AND (pe.visible_at IS NULL OR pe.visible_at <= ?)
50+
AND (i.locked_until IS NULL OR i.locked_until < ?)
51+
LIMIT 1
52+
FOR UPDATE OF i SKIP LOCKED`,
53+
core.WorkflowInstanceStateActive,
54+
now, // event.visible_at
55+
now, // locked_until
56+
)
57+
if err := row.Err(); err != nil {
58+
return nil, fmt.Errorf("failed to query active instances: %w", err)
59+
}
60+
61+
var pendingInstances int64
62+
if err := row.Scan(&pendingInstances); err != nil {
63+
return nil, fmt.Errorf("failed to scan active instances: %w", err)
64+
}
65+
66+
s.PendingWorkflowTasks = pendingInstances
67+
3768
// Get pending activities
3869
row = tx.QueryRowContext(
3970
ctx,

backend/redis/stats.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@ func (rb *redisBackend) GetStats(ctx context.Context) (*backend.Stats, error) {
2020

2121
s.ActiveWorkflowInstances = activeInstances
2222

23+
// get pending workflow tasks
24+
pendingWorkflows, err := rb.workflowQueue.Size(ctx, rb.rdb)
25+
if err != nil {
26+
return nil, fmt.Errorf("getting active workflows: %w", err)
27+
}
28+
29+
s.PendingWorkflowTasks = pendingWorkflows
30+
2331
// get pending activities
2432
pendingActivities, err := rb.activityQueue.Size(ctx, rb.rdb)
2533
if err != nil {

backend/sqlite/stats.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import (
44
"context"
55
"database/sql"
66
"fmt"
7+
"time"
78

89
"github.com/cschleiden/go-workflows/backend"
10+
"github.com/cschleiden/go-workflows/core"
911
)
1012

1113
func (b *sqliteBackend) GetStats(ctx context.Context) (*backend.Stats, error) {
@@ -34,6 +36,35 @@ func (b *sqliteBackend) GetStats(ctx context.Context) (*backend.Stats, error) {
3436

3537
s.ActiveWorkflowInstances = activeInstances
3638

39+
// Get workflow instances ready to be picked up
40+
now := time.Now()
41+
row = tx.QueryRowContext(
42+
ctx,
43+
`SELECT COUNT(*) FROM instances i
44+
WHERE
45+
(locked_until IS NULL OR locked_until < ?)
46+
AND state = ? AND i.completed_at IS NULL
47+
AND EXISTS (
48+
SELECT 1
49+
FROM pending_events
50+
WHERE instance_id = i.id AND execution_id = i.execution_id AND (visible_at IS NULL OR visible_at <= ?)
51+
)
52+
LIMIT 1`,
53+
now, // locked_until
54+
core.WorkflowInstanceStateActive, // state
55+
now, // pending_event.visible_at
56+
)
57+
if err := row.Err(); err != nil {
58+
return nil, fmt.Errorf("failed to query active instances: %w", err)
59+
}
60+
61+
var pendingInstances int64
62+
if err := row.Scan(&pendingInstances); err != nil {
63+
return nil, fmt.Errorf("failed to scan active instances: %w", err)
64+
}
65+
66+
s.PendingWorkflowTasks = pendingInstances
67+
3768
// Get pending activities
3869
row = tx.QueryRowContext(
3970
ctx,

backend/stats.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,11 @@ package backend
33
type Stats struct {
44
ActiveWorkflowInstances int64
55

6+
// PendingWorkflowTasks are the number of workflow tasks that are currently in the queue,
7+
// waiting to be processed by a worker
8+
PendingWorkflowTasks int64
9+
10+
// PendingActivities are the number of activities that are currently in the queue,
11+
// waiting to be processed by a worker
612
PendingActivities int64
713
}

backend/test/e2e_stats.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ var e2eStatsTests = []backendTest{
1515
{
1616
name: "Stats_ActiveInstance",
1717
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
18-
as := make(chan bool, 1)
19-
af := make(chan bool, 1)
18+
activityRunning := make(chan bool, 1)
19+
activityBlocked := make(chan bool, 1)
2020

2121
a := func(ctx context.Context) error {
22-
as <- true
23-
<-af
22+
activityRunning <- true
23+
<-activityBlocked
2424

2525
return nil
2626
}
@@ -31,28 +31,44 @@ var e2eStatsTests = []backendTest{
3131

3232
return true, nil
3333
}
34-
register(t, ctx, w, []interface{}{wf}, []interface{}{a})
34+
35+
require.NoError(t, w.RegisterWorkflow(wf))
36+
require.NoError(t, w.RegisterActivity(a))
3537

3638
s, err := b.GetStats(ctx)
3739
require.NoError(t, err)
3840
require.Equal(t, int64(0), s.ActiveWorkflowInstances)
41+
require.Equal(t, int64(0), s.PendingWorkflowTasks)
3942
require.Equal(t, int64(0), s.PendingActivities)
4043

4144
wfi := runWorkflow(t, ctx, c, wf)
4245

4346
s, err = b.GetStats(ctx)
4447
require.NoError(t, err)
4548
require.Equal(t, int64(1), s.ActiveWorkflowInstances)
49+
require.Equal(t, int64(1), s.PendingWorkflowTasks)
50+
require.Equal(t, int64(0), s.PendingActivities)
51+
52+
// Start worker
53+
require.NoError(t, w.Start(ctx))
54+
55+
// Wait until the activity is running
56+
<-activityRunning
4657

47-
<-as
58+
s, err = b.GetStats(ctx)
59+
require.NoError(t, err)
60+
require.Equal(t, int64(0), s.PendingWorkflowTasks)
61+
require.Equal(t, int64(1), s.ActiveWorkflowInstances)
4862

4963
s, err = b.GetStats(ctx)
5064
require.NoError(t, err)
5165
require.Equal(t, int64(1), s.ActiveWorkflowInstances)
5266
require.Equal(t, int64(1), s.PendingActivities)
5367

54-
af <- true
68+
// Let the activity finish
69+
activityBlocked <- true
5570

71+
// Let the workflow finish
5672
err = c.SignalWorkflow(ctx, wfi.InstanceID, "test-signal", nil)
5773
require.NoError(t, err)
5874

0 commit comments

Comments
 (0)