Skip to content

Commit fc6c95f

Browse files
committed
Add metrics client
1 parent d589943 commit fc6c95f

File tree

9 files changed

+110
-7
lines changed

9 files changed

+110
-7
lines changed

backend/backend.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/cschleiden/go-workflows/internal/history"
99
"github.com/cschleiden/go-workflows/internal/task"
1010
"github.com/cschleiden/go-workflows/log"
11+
"github.com/cschleiden/go-workflows/metrics"
1112
"github.com/cschleiden/go-workflows/workflow"
1213
"go.opentelemetry.io/otel/trace"
1314
)
@@ -62,6 +63,9 @@ type Backend interface {
6263
// Logger returns the configured logger for the backend
6364
Logger() log.Logger
6465

65-
// Tracer returns th configured trace provider for the backend
66+
// Tracer returns the configured trace provider for the backend
6667
Tracer() trace.Tracer
68+
69+
// Metrics returns the configured metrics client for the backend
70+
Metrics() metrics.Client
6771
}

backend/mock_Backend.go

Lines changed: 19 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/mysql.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ import (
1313
"github.com/cschleiden/go-workflows/backend"
1414
"github.com/cschleiden/go-workflows/internal/core"
1515
"github.com/cschleiden/go-workflows/internal/history"
16+
mi "github.com/cschleiden/go-workflows/internal/metrics"
1617
"github.com/cschleiden/go-workflows/internal/task"
1718
"github.com/cschleiden/go-workflows/log"
19+
"github.com/cschleiden/go-workflows/metrics"
1820
"github.com/cschleiden/go-workflows/workflow"
1921
_ "github.com/go-sql-driver/mysql"
2022
"github.com/google/uuid"
@@ -94,6 +96,10 @@ func (b *mysqlBackend) Tracer() trace.Tracer {
9496
return b.options.TracerProvider.Tracer(backend.TracerName)
9597
}
9698

99+
func (b *mysqlBackend) Metrics() metrics.Client {
100+
return b.options.Metrics.WithTags(metrics.Tags{mi.Backend: "mysql"})
101+
}
102+
97103
func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
98104
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
99105
Isolation: sql.LevelReadCommitted,

backend/options.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,17 @@ import (
44
"time"
55

66
"github.com/cschleiden/go-workflows/internal/logger"
7+
mi "github.com/cschleiden/go-workflows/internal/metrics"
78
"github.com/cschleiden/go-workflows/log"
9+
"github.com/cschleiden/go-workflows/metrics"
810
"go.opentelemetry.io/otel/trace"
911
)
1012

1113
type Options struct {
1214
Logger log.Logger
1315

16+
Metrics metrics.Client
17+
1418
TracerProvider trace.TracerProvider
1519

1620
StickyTimeout time.Duration
@@ -26,6 +30,7 @@ var DefaultOptions Options = Options{
2630
ActivityLockTimeout: time.Minute * 2,
2731

2832
Logger: logger.NewDefaultLogger(),
33+
Metrics: mi.NewNoopMetricsClient(),
2934
TracerProvider: trace.NewNoopTracerProvider(),
3035
}
3136

@@ -43,6 +48,12 @@ func WithLogger(logger log.Logger) BackendOption {
4348
}
4449
}
4550

51+
func WithMetrics(client metrics.Client) BackendOption {
52+
return func(o *Options) {
53+
o.Metrics = client
54+
}
55+
}
56+
4657
func WithTracerProvider(tp trace.TracerProvider) BackendOption {
4758
return func(o *Options) {
4859
o.TracerProvider = tp

backend/redis/redis.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
"github.com/cschleiden/go-workflows/backend"
99
"github.com/cschleiden/go-workflows/internal/core"
1010
"github.com/cschleiden/go-workflows/internal/history"
11+
mi "github.com/cschleiden/go-workflows/internal/metrics"
1112
"github.com/cschleiden/go-workflows/log"
13+
"github.com/cschleiden/go-workflows/metrics"
1214
"github.com/go-redis/redis/v8"
1315
"go.opentelemetry.io/otel/trace"
1416
)
@@ -106,6 +108,10 @@ func (rb *redisBackend) Logger() log.Logger {
106108
return rb.options.Logger
107109
}
108110

111+
func (rb *redisBackend) Metrics() metrics.Client {
112+
return rb.options.Metrics.WithTags(metrics.Tags{mi.Backend: "mysql"})
113+
}
114+
109115
func (rb *redisBackend) Tracer() trace.Tracer {
110116
return rb.options.TracerProvider.Tracer(backend.TracerName)
111117
}

backend/sqlite/sqlite.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ import (
1313
"github.com/cschleiden/go-workflows/backend"
1414
"github.com/cschleiden/go-workflows/internal/core"
1515
"github.com/cschleiden/go-workflows/internal/history"
16+
mi "github.com/cschleiden/go-workflows/internal/metrics"
1617
"github.com/cschleiden/go-workflows/internal/task"
1718
"github.com/cschleiden/go-workflows/log"
19+
"github.com/cschleiden/go-workflows/metrics"
1820
"github.com/cschleiden/go-workflows/workflow"
1921
"github.com/google/uuid"
2022
"go.opentelemetry.io/otel/trace"
@@ -65,6 +67,10 @@ func (sb *sqliteBackend) Logger() log.Logger {
6567
return sb.options.Logger
6668
}
6769

70+
func (sb *sqliteBackend) Metrics() metrics.Client {
71+
return sb.options.Metrics.WithTags(metrics.Tags{mi.Backend: "mysql"})
72+
}
73+
6874
func (sb *sqliteBackend) Tracer() trace.Tracer {
6975
return sb.options.TracerProvider.Tracer(backend.TracerName)
7076
}

internal/metrics/noop.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package metrics
2+
3+
import (
4+
"time"
5+
6+
m "github.com/cschleiden/go-workflows/metrics"
7+
)
8+
9+
type noopMetricsClient struct {
10+
}
11+
12+
func NewNoopMetricsClient() *noopMetricsClient {
13+
return &noopMetricsClient{}
14+
}
15+
16+
var _ m.Client = (*noopMetricsClient)(nil)
17+
18+
func (*noopMetricsClient) Counter(name string, tags m.Tags, value float64) {
19+
}
20+
21+
// Distribution implements metrics.Client
22+
func (*noopMetricsClient) Distribution(name string, tags m.Tags, value float64) {
23+
}
24+
25+
// Timing implements metrics.Client
26+
func (*noopMetricsClient) Timing(name string, tags m.Tags, duration time.Duration) {
27+
}
28+
29+
// WithTags implements metrics.Client
30+
func (nmc *noopMetricsClient) WithTags(tags m.Tags) m.Client {
31+
return nmc
32+
}

metrics/metrics.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package metrics
2+
3+
import "time"
4+
5+
type Tags map[string]string
6+
7+
type Client interface {
8+
Counter(name string, tags Tags, value float64)
9+
10+
Distribution(name string, tags Tags, value float64)
11+
12+
Timing(name string, tags Tags, duration time.Duration)
13+
14+
WithTags(tags Tags) Client
15+
}

worker/worker.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package worker
22

33
import (
44
"context"
5+
"fmt"
56
"sync"
67

78
"github.com/benbjohnson/clock"
@@ -45,8 +46,8 @@ type worker struct {
4546

4647
registry *workflowinternal.Registry
4748

48-
workflowWorker internal.WorkflowWorker
49-
activityWorker internal.ActivityWorker
49+
workflowWorker *internal.WorkflowWorker
50+
activityWorker *internal.ActivityWorker
5051

5152
workflows map[string]interface{}
5253
activities map[string]interface{}
@@ -85,8 +86,13 @@ func New(backend backend.Backend, options *Options) Worker {
8586
}
8687

8788
func (w *worker) Start(ctx context.Context) error {
88-
w.workflowWorker.Start(ctx)
89-
w.activityWorker.Start(ctx)
89+
if err := w.workflowWorker.Start(ctx); err != nil {
90+
return fmt.Errorf("starting workflow worker: %w", err)
91+
}
92+
93+
if err := w.activityWorker.Start(ctx); err != nil {
94+
return fmt.Errorf("starting activity worker: %w", err)
95+
}
9096

9197
return nil
9298
}

0 commit comments

Comments
 (0)