Skip to content

Commit 626b33c

Browse files
committed
Support emitting metrics
1 parent b339b8f commit 626b33c

File tree

10 files changed

+130
-17
lines changed

10 files changed

+130
-17
lines changed

client/client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import (
1414
"github.com/cschleiden/go-workflows/internal/core"
1515
"github.com/cschleiden/go-workflows/internal/fn"
1616
"github.com/cschleiden/go-workflows/internal/history"
17+
"github.com/cschleiden/go-workflows/internal/metrickeys"
1718
"github.com/cschleiden/go-workflows/internal/tracing"
19+
"github.com/cschleiden/go-workflows/metrics"
1820
"github.com/cschleiden/go-workflows/workflow"
1921
"github.com/google/uuid"
2022
"go.opentelemetry.io/otel/attribute"
@@ -88,6 +90,8 @@ func (c *client) CreateWorkflowInstance(ctx context.Context, options WorkflowIns
8890

8991
c.backend.Logger().Debug("Created workflow instance", "instance_id", wfi.InstanceID, "execution_id", wfi.ExecutionID)
9092

93+
c.backend.Metrics().Counter(metrickeys.WorkflowInstanceCreated, metrics.Tags{}, 1)
94+
9195
return wfi, nil
9296
}
9397

internal/metrickeys/metrickeys.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package metrickeys
2+
3+
const (
4+
Prefix = "workflows."
5+
6+
// Workflows
7+
WorkflowInstanceCreated = Prefix + "workflow.created"
8+
WorkflowInstanceFinished = Prefix + "workflow.finished"
9+
10+
WorkflowTaskScheduled = Prefix + "workflow.task.scheduled"
11+
WorkflowTaskProcessed = Prefix + "workflow.task.processed"
12+
WorkflowTaskDelay = Prefix + "workflow.task.time_in_queue"
13+
14+
WorkflowInstanceCacheSize = Prefix + "workflow.cache.size"
15+
WorkflowInstanceCacheEviction = Prefix + "workflow.cache.eviction"
16+
17+
// Activities
18+
ActivityTaskScheduled = Prefix + "activity.task.scheduled"
19+
ActivityTaskProcessed = Prefix + "activity.task.processed"
20+
ActivityTaskDelay = Prefix + "activity.task.time_in_queue"
21+
)
22+
23+
// Tag names
24+
const (
25+
// Backend being used
26+
Backend = "backend"
27+
28+
// Reason for evicting an entry from the workflow instance cache
29+
EvictionReason = "reason"
30+
31+
ActivityName = "activity"
32+
)

internal/metrics/noop.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package metrics
33
import (
44
"time"
55

6-
m "github.com/cschleiden/go-workflows/metrics"
6+
"github.com/cschleiden/go-workflows/metrics"
77
)
88

99
type noopMetricsClient struct {
@@ -13,20 +13,21 @@ func NewNoopMetricsClient() *noopMetricsClient {
1313
return &noopMetricsClient{}
1414
}
1515

16-
var _ m.Client = (*noopMetricsClient)(nil)
16+
var _ metrics.Client = (*noopMetricsClient)(nil)
1717

18-
func (*noopMetricsClient) Counter(name string, tags m.Tags, value float64) {
18+
func (*noopMetricsClient) Counter(name string, tags metrics.Tags, value int64) {
1919
}
2020

21-
// Distribution implements metrics.Client
22-
func (*noopMetricsClient) Distribution(name string, tags m.Tags, value float64) {
21+
func (*noopMetricsClient) Distribution(name string, tags metrics.Tags, value float64) {
2322
}
2423

25-
// Timing implements metrics.Client
26-
func (*noopMetricsClient) Timing(name string, tags m.Tags, duration time.Duration) {
24+
func (*noopMetricsClient) Gauge(name string, tags metrics.Tags, value int64) {
25+
26+
}
27+
28+
func (*noopMetricsClient) Timing(name string, tags metrics.Tags, duration time.Duration) {
2729
}
2830

29-
// WithTags implements metrics.Client
30-
func (nmc *noopMetricsClient) WithTags(tags m.Tags) m.Client {
31+
func (nmc *noopMetricsClient) WithTags(tags metrics.Tags) metrics.Client {
3132
return nmc
3233
}

internal/worker/activity.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@ import (
1010
"github.com/cschleiden/go-workflows/backend"
1111
"github.com/cschleiden/go-workflows/internal/activity"
1212
"github.com/cschleiden/go-workflows/internal/history"
13+
"github.com/cschleiden/go-workflows/internal/metrickeys"
1314
"github.com/cschleiden/go-workflows/internal/task"
1415
"github.com/cschleiden/go-workflows/internal/workflow"
16+
"github.com/cschleiden/go-workflows/metrics"
1517
)
1618

1719
type ActivityWorker struct {
@@ -108,6 +110,14 @@ func (aw *ActivityWorker) runDispatcher(ctx context.Context) {
108110
}
109111

110112
func (aw *ActivityWorker) handleTask(ctx context.Context, task *task.Activity) {
113+
a := task.Event.Attributes.(*history.ActivityScheduledAttributes)
114+
ametrics := aw.backend.Metrics().WithTags(metrics.Tags{metrickeys.ActivityName: a.Name})
115+
116+
// Record how long this task was in the queue
117+
scheduledAt := task.Event.Timestamp
118+
timeInQueue := time.Since(scheduledAt)
119+
ametrics.Distribution(metrickeys.ActivityTaskDelay, metrics.Tags{}, float64(timeInQueue/time.Millisecond))
120+
111121
// Start heartbeat while activity is running
112122
heartbeatCtx, cancelHeartbeat := context.WithCancel(ctx)
113123
go func(ctx context.Context) {
@@ -126,6 +136,9 @@ func (aw *ActivityWorker) handleTask(ctx context.Context, task *task.Activity) {
126136
}
127137
}(heartbeatCtx)
128138

139+
timer := metrics.Timer(ametrics, metrickeys.ActivityTaskProcessed, metrics.Tags{})
140+
defer timer.Stop()
141+
129142
result, err := aw.activityTaskExecutor.ExecuteActivity(ctx, task)
130143

131144
cancelHeartbeat()

internal/worker/workflow.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ import (
99
"github.com/benbjohnson/clock"
1010
"github.com/cschleiden/go-workflows/backend"
1111
"github.com/cschleiden/go-workflows/internal/core"
12+
"github.com/cschleiden/go-workflows/internal/metrickeys"
1213
"github.com/cschleiden/go-workflows/internal/task"
1314
"github.com/cschleiden/go-workflows/internal/workflow"
1415
"github.com/cschleiden/go-workflows/internal/workflow/cache"
1516
"github.com/cschleiden/go-workflows/log"
17+
"github.com/cschleiden/go-workflows/metrics"
1618
)
1719

1820
type WorkflowWorker struct {
@@ -36,7 +38,7 @@ func NewWorkflowWorker(backend backend.Backend, registry *workflow.Registry, opt
3638
if options.WorkflowExecutorCache != nil {
3739
c = options.WorkflowExecutorCache
3840
} else {
39-
c = cache.NewWorkflowExecutorLRUCache(options.WorkflowExecutorCacheSize, options.WorkflowExecutorCacheTTL)
41+
c = cache.NewWorkflowExecutorLRUCache(backend.Metrics(), options.WorkflowExecutorCacheSize, options.WorkflowExecutorCacheTTL)
4042
}
4143

4244
return &WorkflowWorker{
@@ -123,6 +125,14 @@ func (ww *WorkflowWorker) runDispatcher() {
123125
}
124126

125127
func (ww *WorkflowWorker) handle(ctx context.Context, t *task.Workflow) {
128+
// Record how long this task was in the queue
129+
scheduledAt := t.NewEvents[0].Timestamp // Use the timestamp of the first event as the schedule time
130+
timeInQueue := time.Since(scheduledAt)
131+
ww.backend.Metrics().Distribution(metrickeys.WorkflowTaskDelay, metrics.Tags{}, float64(timeInQueue/time.Millisecond))
132+
133+
timer := metrics.Timer(ww.backend.Metrics(), metrickeys.WorkflowTaskProcessed, metrics.Tags{})
134+
defer timer.Stop()
135+
126136
result, err := ww.handleTask(ctx, t)
127137
if err != nil {
128138
ww.logger.Panic("could not handle workflow task", "error", err)
@@ -131,8 +141,12 @@ func (ww *WorkflowWorker) handle(ctx context.Context, t *task.Workflow) {
131141
state := core.WorkflowInstanceStateActive
132142
if result.Completed {
133143
state = core.WorkflowInstanceStateFinished
144+
145+
ww.backend.Metrics().Counter(metrickeys.WorkflowInstanceFinished, metrics.Tags{}, 1)
134146
}
135147

148+
ww.backend.Metrics().Counter(metrickeys.ActivityTaskScheduled, metrics.Tags{}, int64(len(result.ActivityEvents)))
149+
136150
if err := ww.backend.CompleteWorkflowTask(
137151
ctx, t, t.WorkflowInstance, state, result.Executed, result.ActivityEvents, result.TimerEvents, result.WorkflowEvents); err != nil {
138152
ww.logger.Panic("could not complete workflow task", "error", err)

internal/workflow/cache/cache.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,41 @@ import (
55
"time"
66

77
"github.com/cschleiden/go-workflows/internal/core"
8+
"github.com/cschleiden/go-workflows/internal/metrickeys"
89
"github.com/cschleiden/go-workflows/internal/workflow"
10+
"github.com/cschleiden/go-workflows/metrics"
911
"github.com/jellydator/ttlcache/v3"
1012
)
1113

1214
type LruCache struct {
13-
c *ttlcache.Cache[string, workflow.WorkflowExecutor]
15+
mc metrics.Client
16+
c *ttlcache.Cache[string, workflow.WorkflowExecutor]
1417
}
1518

16-
func NewWorkflowExecutorLRUCache(size int, expiration time.Duration) workflow.ExecutorCache {
19+
func NewWorkflowExecutorLRUCache(mc metrics.Client, size int, expiration time.Duration) workflow.ExecutorCache {
1720
c := ttlcache.New(
1821
ttlcache.WithCapacity[string, workflow.WorkflowExecutor](uint64(size)),
1922
ttlcache.WithTTL[string, workflow.WorkflowExecutor](expiration),
2023
)
2124

2225
c.OnEviction(func(ctx context.Context, er ttlcache.EvictionReason, i *ttlcache.Item[string, workflow.WorkflowExecutor]) {
26+
// Close the executor to allow it to clean up resources.
2327
i.Value().Close()
28+
29+
reason := ""
30+
switch er {
31+
case ttlcache.EvictionReasonExpired:
32+
reason = "expired"
33+
case ttlcache.EvictionReasonCapacityReached:
34+
reason = "capacity"
35+
}
36+
37+
mc.Counter(metrickeys.WorkflowInstanceCacheEviction, metrics.Tags{metrickeys.EvictionReason: reason}, 1)
2438
})
2539

2640
return &LruCache{
27-
c: c,
41+
mc: mc,
42+
c: c,
2843
}
2944
}
3045

@@ -40,6 +55,8 @@ func (lc *LruCache) Get(ctx context.Context, instance *core.WorkflowInstance) (w
4055
func (lc *LruCache) Store(ctx context.Context, instance *core.WorkflowInstance, executor workflow.WorkflowExecutor) error {
4156
lc.c.Set(getKey(instance), executor, ttlcache.DefaultTTL)
4257

58+
lc.mc.Gauge(metrickeys.WorkflowInstanceCacheSize, metrics.Tags{}, int64(lc.c.Len()))
59+
4360
return nil
4461
}
4562

internal/workflow/cache/cache_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,15 @@ import (
1111
"github.com/cschleiden/go-workflows/internal/core"
1212
"github.com/cschleiden/go-workflows/internal/history"
1313
"github.com/cschleiden/go-workflows/internal/logger"
14+
"github.com/cschleiden/go-workflows/internal/metrics"
1415
wf "github.com/cschleiden/go-workflows/internal/workflow"
1516
"github.com/cschleiden/go-workflows/workflow"
1617
"github.com/stretchr/testify/require"
1718
"go.opentelemetry.io/otel/trace"
1819
)
1920

2021
func Test_Cache_StoreAndGet(t *testing.T) {
21-
c := NewWorkflowExecutorLRUCache(1, time.Second*10)
22+
c := NewWorkflowExecutorLRUCache(metrics.NewNoopMetricsClient(), 1, time.Second*10)
2223

2324
r := wf.NewRegistry()
2425
r.RegisterWorkflow(workflowWithActivity)
@@ -51,7 +52,9 @@ func Test_Cache_StoreAndGet(t *testing.T) {
5152
}
5253

5354
func Test_Cache_Evict(t *testing.T) {
54-
c := NewWorkflowExecutorLRUCache(128,
55+
c := NewWorkflowExecutorLRUCache(
56+
metrics.NewNoopMetricsClient(),
57+
128,
5558
1, // Should evict immediately
5659
)
5760

internal/workflow/executor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
101101

102102
if t.WorkflowInstanceState == core.WorkflowInstanceStateFinished {
103103
// This should never happen. For now, log information and then panic.
104-
logger.Debug("Received workflow task for finished workflow instance, discarding events")
104+
logger.Error("Received workflow task for finished workflow instance, discarding events")
105105

106106
// Log events that caused this task to be scheduled
107107
for _, event := range t.NewEvents {

metrics/metrics.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ import "time"
55
type Tags map[string]string
66

77
type Client interface {
8-
Counter(name string, tags Tags, value float64)
8+
Counter(name string, tags Tags, value int64)
99

1010
Distribution(name string, tags Tags, value float64)
1111

12+
Gauge(name string, tags Tags, value int64)
13+
1214
Timing(name string, tags Tags, duration time.Duration)
1315

1416
WithTags(tags Tags) Client

metrics/timer.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package metrics
2+
3+
import (
4+
"time"
5+
)
6+
7+
type timer struct {
8+
client Client
9+
start time.Time
10+
name string
11+
tags Tags
12+
}
13+
14+
func Timer(client Client, name string, tags Tags) *timer {
15+
return &timer{
16+
client: client,
17+
start: time.Now(),
18+
name: name,
19+
tags: tags,
20+
}
21+
}
22+
23+
// Stop the timer and send the elapsed time as milliseconds as a distribution metric
24+
func (t *timer) Stop() {
25+
elapsed := time.Since(t.start)
26+
t.client.Distribution(t.name, t.tags, float64(elapsed/time.Millisecond))
27+
}

0 commit comments

Comments
 (0)