Skip to content

Commit a5f7b54

Browse files
authored
Merge pull request #133 from cschleiden/metrics
Metrics
2 parents 42c303d + 626b33c commit a5f7b54

File tree

17 files changed

+251
-49
lines changed

17 files changed

+251
-49
lines changed

backend/backend.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/cschleiden/go-workflows/internal/history"
99
"github.com/cschleiden/go-workflows/internal/task"
1010
"github.com/cschleiden/go-workflows/log"
11+
"github.com/cschleiden/go-workflows/metrics"
1112
"github.com/cschleiden/go-workflows/workflow"
1213
"go.opentelemetry.io/otel/trace"
1314
)
@@ -62,6 +63,9 @@ type Backend interface {
6263
// Logger returns the configured logger for the backend
6364
Logger() log.Logger
6465

65-
// Tracer returns th configured trace provider for the backend
66+
// Tracer returns the configured trace provider for the backend
6667
Tracer() trace.Tracer
68+
69+
// Metrics returns the configured metrics client for the backend
70+
Metrics() metrics.Client
6771
}

backend/mock_Backend.go

Lines changed: 19 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/mysql.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ import (
1313
"github.com/cschleiden/go-workflows/backend"
1414
"github.com/cschleiden/go-workflows/internal/core"
1515
"github.com/cschleiden/go-workflows/internal/history"
16+
"github.com/cschleiden/go-workflows/internal/metrickeys"
1617
"github.com/cschleiden/go-workflows/internal/task"
1718
"github.com/cschleiden/go-workflows/log"
19+
"github.com/cschleiden/go-workflows/metrics"
1820
"github.com/cschleiden/go-workflows/workflow"
1921
_ "github.com/go-sql-driver/mysql"
2022
"github.com/google/uuid"
@@ -94,6 +96,10 @@ func (b *mysqlBackend) Tracer() trace.Tracer {
9496
return b.options.TracerProvider.Tracer(backend.TracerName)
9597
}
9698

99+
func (b *mysqlBackend) Metrics() metrics.Client {
100+
return b.options.Metrics.WithTags(metrics.Tags{metrickeys.Backend: "mysql"})
101+
}
102+
97103
func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
98104
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
99105
Isolation: sql.LevelReadCommitted,

backend/options.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,17 @@ import (
44
"time"
55

66
"github.com/cschleiden/go-workflows/internal/logger"
7+
mi "github.com/cschleiden/go-workflows/internal/metrics"
78
"github.com/cschleiden/go-workflows/log"
9+
"github.com/cschleiden/go-workflows/metrics"
810
"go.opentelemetry.io/otel/trace"
911
)
1012

1113
type Options struct {
1214
Logger log.Logger
1315

16+
Metrics metrics.Client
17+
1418
TracerProvider trace.TracerProvider
1519

1620
StickyTimeout time.Duration
@@ -26,6 +30,7 @@ var DefaultOptions Options = Options{
2630
ActivityLockTimeout: time.Minute * 2,
2731

2832
Logger: logger.NewDefaultLogger(),
33+
Metrics: mi.NewNoopMetricsClient(),
2934
TracerProvider: trace.NewNoopTracerProvider(),
3035
}
3136

@@ -43,6 +48,12 @@ func WithLogger(logger log.Logger) BackendOption {
4348
}
4449
}
4550

51+
func WithMetrics(client metrics.Client) BackendOption {
52+
return func(o *Options) {
53+
o.Metrics = client
54+
}
55+
}
56+
4657
func WithTracerProvider(tp trace.TracerProvider) BackendOption {
4758
return func(o *Options) {
4859
o.TracerProvider = tp

backend/redis/redis.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
"github.com/cschleiden/go-workflows/backend"
99
"github.com/cschleiden/go-workflows/internal/core"
1010
"github.com/cschleiden/go-workflows/internal/history"
11+
"github.com/cschleiden/go-workflows/internal/metrickeys"
1112
"github.com/cschleiden/go-workflows/log"
13+
"github.com/cschleiden/go-workflows/metrics"
1214
"github.com/go-redis/redis/v8"
1315
"go.opentelemetry.io/otel/trace"
1416
)
@@ -106,6 +108,10 @@ func (rb *redisBackend) Logger() log.Logger {
106108
return rb.options.Logger
107109
}
108110

111+
func (rb *redisBackend) Metrics() metrics.Client {
112+
return rb.options.Metrics.WithTags(metrics.Tags{metrickeys.Backend: "redis"})
113+
}
114+
109115
func (rb *redisBackend) Tracer() trace.Tracer {
110116
return rb.options.TracerProvider.Tracer(backend.TracerName)
111117
}

backend/sqlite/sqlite.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ import (
1313
"github.com/cschleiden/go-workflows/backend"
1414
"github.com/cschleiden/go-workflows/internal/core"
1515
"github.com/cschleiden/go-workflows/internal/history"
16+
"github.com/cschleiden/go-workflows/internal/metrickeys"
1617
"github.com/cschleiden/go-workflows/internal/task"
1718
"github.com/cschleiden/go-workflows/log"
19+
"github.com/cschleiden/go-workflows/metrics"
1820
"github.com/cschleiden/go-workflows/workflow"
1921
"github.com/google/uuid"
2022
"go.opentelemetry.io/otel/trace"
@@ -65,6 +67,10 @@ func (sb *sqliteBackend) Logger() log.Logger {
6567
return sb.options.Logger
6668
}
6769

70+
func (sb *sqliteBackend) Metrics() metrics.Client {
71+
return sb.options.Metrics.WithTags(metrics.Tags{metrickeys.Backend: "sqlite"})
72+
}
73+
6874
func (sb *sqliteBackend) Tracer() trace.Tracer {
6975
return sb.options.TracerProvider.Tracer(backend.TracerName)
7076
}

client/client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import (
1414
"github.com/cschleiden/go-workflows/internal/core"
1515
"github.com/cschleiden/go-workflows/internal/fn"
1616
"github.com/cschleiden/go-workflows/internal/history"
17+
"github.com/cschleiden/go-workflows/internal/metrickeys"
1718
"github.com/cschleiden/go-workflows/internal/tracing"
19+
"github.com/cschleiden/go-workflows/metrics"
1820
"github.com/cschleiden/go-workflows/workflow"
1921
"github.com/google/uuid"
2022
"go.opentelemetry.io/otel/attribute"
@@ -88,6 +90,8 @@ func (c *client) CreateWorkflowInstance(ctx context.Context, options WorkflowIns
8890

8991
c.backend.Logger().Debug("Created workflow instance", "instance_id", wfi.InstanceID, "execution_id", wfi.ExecutionID)
9092

93+
c.backend.Metrics().Counter(metrickeys.WorkflowInstanceCreated, metrics.Tags{}, 1)
94+
9195
return wfi, nil
9296
}
9397

internal/metrickeys/metrickeys.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package metrickeys
2+
3+
const (
4+
Prefix = "workflows."
5+
6+
// Workflows
7+
WorkflowInstanceCreated = Prefix + "workflow.created"
8+
WorkflowInstanceFinished = Prefix + "workflow.finished"
9+
10+
WorkflowTaskScheduled = Prefix + "workflow.task.scheduled"
11+
WorkflowTaskProcessed = Prefix + "workflow.task.processed"
12+
WorkflowTaskDelay = Prefix + "workflow.task.time_in_queue"
13+
14+
WorkflowInstanceCacheSize = Prefix + "workflow.cache.size"
15+
WorkflowInstanceCacheEviction = Prefix + "workflow.cache.eviction"
16+
17+
// Activities
18+
ActivityTaskScheduled = Prefix + "activity.task.scheduled"
19+
ActivityTaskProcessed = Prefix + "activity.task.processed"
20+
ActivityTaskDelay = Prefix + "activity.task.time_in_queue"
21+
)
22+
23+
// Tag names
24+
const (
25+
// Backend being used
26+
Backend = "backend"
27+
28+
// Reason for evicting an entry from the workflow instance cache
29+
EvictionReason = "reason"
30+
31+
ActivityName = "activity"
32+
)

internal/metrics/noop.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package metrics
2+
3+
import (
4+
"time"
5+
6+
"github.com/cschleiden/go-workflows/metrics"
7+
)
8+
9+
type noopMetricsClient struct {
10+
}
11+
12+
func NewNoopMetricsClient() *noopMetricsClient {
13+
return &noopMetricsClient{}
14+
}
15+
16+
var _ metrics.Client = (*noopMetricsClient)(nil)
17+
18+
func (*noopMetricsClient) Counter(name string, tags metrics.Tags, value int64) {
19+
}
20+
21+
func (*noopMetricsClient) Distribution(name string, tags metrics.Tags, value float64) {
22+
}
23+
24+
func (*noopMetricsClient) Gauge(name string, tags metrics.Tags, value int64) {
25+
26+
}
27+
28+
func (*noopMetricsClient) Timing(name string, tags metrics.Tags, duration time.Duration) {
29+
}
30+
31+
func (nmc *noopMetricsClient) WithTags(tags metrics.Tags) metrics.Client {
32+
return nmc
33+
}

internal/worker/activity.go

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,13 @@ import (
1010
"github.com/cschleiden/go-workflows/backend"
1111
"github.com/cschleiden/go-workflows/internal/activity"
1212
"github.com/cschleiden/go-workflows/internal/history"
13+
"github.com/cschleiden/go-workflows/internal/metrickeys"
1314
"github.com/cschleiden/go-workflows/internal/task"
1415
"github.com/cschleiden/go-workflows/internal/workflow"
16+
"github.com/cschleiden/go-workflows/metrics"
1517
)
1618

17-
type ActivityWorker interface {
18-
Start(context.Context) error
19-
WaitForCompletion() error
20-
}
21-
22-
type activityWorker struct {
19+
type ActivityWorker struct {
2320
backend backend.Backend
2421

2522
options *Options
@@ -32,8 +29,8 @@ type activityWorker struct {
3229
clock clock.Clock
3330
}
3431

35-
func NewActivityWorker(backend backend.Backend, registry *workflow.Registry, clock clock.Clock, options *Options) ActivityWorker {
36-
return &activityWorker{
32+
func NewActivityWorker(backend backend.Backend, registry *workflow.Registry, clock clock.Clock, options *Options) *ActivityWorker {
33+
return &ActivityWorker{
3734
backend: backend,
3835

3936
options: options,
@@ -47,7 +44,7 @@ func NewActivityWorker(backend backend.Backend, registry *workflow.Registry, clo
4744
}
4845
}
4946

50-
func (aw *activityWorker) Start(ctx context.Context) error {
47+
func (aw *ActivityWorker) Start(ctx context.Context) error {
5148
for i := 0; i <= aw.options.ActivityPollers; i++ {
5249
go aw.runPoll(ctx)
5350
}
@@ -57,15 +54,15 @@ func (aw *activityWorker) Start(ctx context.Context) error {
5754
return nil
5855
}
5956

60-
func (aw *activityWorker) WaitForCompletion() error {
57+
func (aw *ActivityWorker) WaitForCompletion() error {
6158
close(aw.activityTaskQueue)
6259

6360
aw.wg.Wait()
6461

6562
return nil
6663
}
6764

68-
func (aw *activityWorker) runPoll(ctx context.Context) {
65+
func (aw *ActivityWorker) runPoll(ctx context.Context) {
6966
for {
7067
select {
7168
case <-ctx.Done():
@@ -84,7 +81,7 @@ func (aw *activityWorker) runPoll(ctx context.Context) {
8481
}
8582
}
8683

87-
func (aw *activityWorker) runDispatcher(ctx context.Context) {
84+
func (aw *ActivityWorker) runDispatcher(ctx context.Context) {
8885
var sem chan struct{}
8986
if aw.options.MaxParallelActivityTasks > 0 {
9087
sem = make(chan struct{}, aw.options.MaxParallelActivityTasks)
@@ -112,7 +109,15 @@ func (aw *activityWorker) runDispatcher(ctx context.Context) {
112109
}
113110
}
114111

115-
func (aw *activityWorker) handleTask(ctx context.Context, task *task.Activity) {
112+
func (aw *ActivityWorker) handleTask(ctx context.Context, task *task.Activity) {
113+
a := task.Event.Attributes.(*history.ActivityScheduledAttributes)
114+
ametrics := aw.backend.Metrics().WithTags(metrics.Tags{metrickeys.ActivityName: a.Name})
115+
116+
// Record how long this task was in the queue
117+
scheduledAt := task.Event.Timestamp
118+
timeInQueue := time.Since(scheduledAt)
119+
ametrics.Distribution(metrickeys.ActivityTaskDelay, metrics.Tags{}, float64(timeInQueue/time.Millisecond))
120+
116121
// Start heartbeat while activity is running
117122
heartbeatCtx, cancelHeartbeat := context.WithCancel(ctx)
118123
go func(ctx context.Context) {
@@ -131,6 +136,9 @@ func (aw *activityWorker) handleTask(ctx context.Context, task *task.Activity) {
131136
}
132137
}(heartbeatCtx)
133138

139+
timer := metrics.Timer(ametrics, metrickeys.ActivityTaskProcessed, metrics.Tags{})
140+
defer timer.Stop()
141+
134142
result, err := aw.activityTaskExecutor.ExecuteActivity(ctx, task)
135143

136144
cancelHeartbeat()
@@ -161,7 +169,7 @@ func (aw *activityWorker) handleTask(ctx context.Context, task *task.Activity) {
161169
}
162170
}
163171

164-
func (aw *activityWorker) poll(ctx context.Context, timeout time.Duration) (*task.Activity, error) {
172+
func (aw *ActivityWorker) poll(ctx context.Context, timeout time.Duration) (*task.Activity, error) {
165173
if timeout == 0 {
166174
timeout = 30 * time.Second
167175
}

0 commit comments

Comments
 (0)