Skip to content

Commit 70bf492

Browse files
committed
core/services/promotel: add Forwarder service
1 parent f9fe452 commit 70bf492

File tree

3 files changed

+89
-3
lines changed

3 files changed

+89
-3
lines changed

core/cmd/shell.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ import (
6161

6262
var (
6363
initGlobalsOnce sync.Once
64-
prometheus *ginprom.Prometheus
64+
ginPrometheus *ginprom.Prometheus
6565
grpcOpts loop.GRPCOpts
6666
)
6767

@@ -70,7 +70,7 @@ func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTeleme
7070
var err error
7171
initGlobalsOnce.Do(func() {
7272
err = func() error {
73-
prometheus = ginprom.New(ginprom.Namespace("service"), ginprom.Token(cfgProm.AuthToken()))
73+
ginPrometheus = ginprom.New(ginprom.Namespace("service"), ginprom.Token(cfgProm.AuthToken()))
7474
grpcOpts = loop.NewGRPCOpts(nil) // default prometheus.Registerer
7575

7676
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
@@ -400,7 +400,7 @@ func (n ChainlinkRunner) Run(ctx context.Context, app chainlink.Application) err
400400
return errors.New("You must specify at least one port to listen on")
401401
}
402402

403-
handler, err := web.NewRouter(app, prometheus)
403+
handler, err := web.NewRouter(app, ginPrometheus)
404404
if err != nil {
405405
return errors.Wrap(err, "failed to create web router")
406406
}

core/services/chainlink/application.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/grafana/pyroscope-go"
1515
"github.com/jonboulle/clockwork"
1616
"github.com/pkg/errors"
17+
"github.com/prometheus/client_golang/prometheus"
1718
"go.opentelemetry.io/otel"
1819
"go.opentelemetry.io/otel/attribute"
1920
"go.opentelemetry.io/otel/trace"
@@ -26,6 +27,7 @@ import (
2627
"github.com/smartcontractkit/chainlink-common/pkg/utils"
2728
"github.com/smartcontractkit/chainlink-common/pkg/utils/jsonserializable"
2829
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"
30+
"github.com/smartcontractkit/chainlink/v2/core/services/promotel"
2931

3032
"github.com/smartcontractkit/chainlink/v2/core/bridges"
3133
"github.com/smartcontractkit/chainlink/v2/core/build"
@@ -205,6 +207,9 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
205207
restrictedHTTPClient := opts.RestrictedHTTPClient
206208
unrestrictedHTTPClient := opts.UnrestrictedHTTPClient
207209

210+
promForwarder := promotel.NewForwarder(globalLogger, prometheus.DefaultGatherer, otel.GetMeterProvider())
211+
srvcs = append(srvcs, promForwarder)
212+
208213
if opts.CapabilitiesRegistry == nil {
209214
// for tests only, in prod Registry should always be set at this point
210215
opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger)

core/services/promotel/promotel.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package promotel
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
"go.opentelemetry.io/otel/metric"
9+
10+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
11+
"github.com/smartcontractkit/chainlink-common/pkg/services"
12+
"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
13+
)
14+
15+
const period = 15 * time.Second
16+
17+
type Forwarder struct {
18+
services.StateMachine
19+
lggr logger.Logger
20+
gatherer prometheus.Gatherer
21+
meterProvider metric.MeterProvider
22+
stopCh services.StopChan
23+
done chan struct{}
24+
}
25+
26+
func NewForwarder(lggr logger.Logger, gatherer prometheus.Gatherer, meterProvider metric.MeterProvider) *Forwarder {
27+
return &Forwarder{
28+
lggr: logger.Named(lggr, "PromOTELForwarder"),
29+
gatherer: gatherer,
30+
meterProvider: meterProvider,
31+
stopCh: make(chan struct{}),
32+
done: make(chan struct{}),
33+
}
34+
}
35+
36+
func (f *Forwarder) HealthReport() map[string]error { return map[string]error{f.Name(): f.Healthy()} }
37+
38+
func (f *Forwarder) Name() string { return f.lggr.Name() }
39+
40+
func (f *Forwarder) Start(context.Context) error {
41+
go f.run()
42+
return nil
43+
}
44+
45+
func (f *Forwarder) run() {
46+
defer close(f.done)
47+
ctx, cancel := f.stopCh.NewCtx()
48+
defer cancel()
49+
ticker := timeutil.NewTicker(func() time.Duration { return period })
50+
defer ticker.Stop()
51+
for {
52+
select {
53+
case <-ctx.Done():
54+
return
55+
case <-ticker.C:
56+
f.forward(ctx)
57+
}
58+
}
59+
}
60+
61+
func (f *Forwarder) forward(ctx context.Context) {
62+
mfs, err := f.gatherer.Gather()
63+
if err != nil {
64+
f.lggr.Errorw("Failed to gather prometheus metrics", "err", err)
65+
}
66+
for _, mf := range mfs {
67+
for range mf.Metric {
68+
if ctx.Err() != nil {
69+
return
70+
}
71+
72+
//TODO f.meterProvider.Meter()
73+
}
74+
}
75+
}
76+
77+
func (f *Forwarder) Close() error {
78+
close(f.stopCh)
79+
<-f.done
80+
return nil
81+
}

0 commit comments

Comments
 (0)