Skip to content

Commit fe16cad

Browse files
committed
core/services/promotel: add Forwarder service
1 parent fb20ee8 commit fe16cad

File tree

2 files changed

+86
-0
lines changed

2 files changed

+86
-0
lines changed

core/services/chainlink/application.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/grafana/pyroscope-go"
1616
"github.com/jonboulle/clockwork"
1717
"github.com/pkg/errors"
18+
"github.com/prometheus/client_golang/prometheus"
1819
"go.opentelemetry.io/otel"
1920
"go.opentelemetry.io/otel/attribute"
2021
"go.opentelemetry.io/otel/trace"
@@ -66,6 +67,7 @@ import (
6667
externalp2p "github.com/smartcontractkit/chainlink/v2/core/services/p2p/wrapper"
6768
"github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup"
6869
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
70+
"github.com/smartcontractkit/chainlink/v2/core/services/promotel"
6971
"github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer"
7072
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury"
7173
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
@@ -277,6 +279,9 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
277279
restrictedHTTPClient := opts.RestrictedHTTPClient
278280
unrestrictedHTTPClient := opts.UnrestrictedHTTPClient
279281

282+
promForwarder := promotel.NewForwarder(globalLogger, prometheus.DefaultGatherer, otel.GetMeterProvider())
283+
srvcs = append(srvcs, promForwarder)
284+
280285
if opts.CapabilitiesRegistry == nil {
281286
// for tests only, in prod Registry should always be set at this point
282287
opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger)

core/services/promotel/promotel.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package promotel
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
"go.opentelemetry.io/otel/metric"
9+
10+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
11+
"github.com/smartcontractkit/chainlink-common/pkg/services"
12+
"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
13+
)
14+
15+
const period = 15 * time.Second
16+
17+
type Forwarder struct {
18+
services.StateMachine
19+
lggr logger.Logger
20+
gatherer prometheus.Gatherer
21+
meterProvider metric.MeterProvider
22+
stopCh services.StopChan
23+
done chan struct{}
24+
}
25+
26+
func NewForwarder(lggr logger.Logger, gatherer prometheus.Gatherer, meterProvider metric.MeterProvider) *Forwarder {
27+
return &Forwarder{
28+
lggr: logger.Named(lggr, "PromOTELForwarder"),
29+
gatherer: gatherer,
30+
meterProvider: meterProvider,
31+
stopCh: make(chan struct{}),
32+
done: make(chan struct{}),
33+
}
34+
}
35+
36+
func (f *Forwarder) HealthReport() map[string]error { return map[string]error{f.Name(): f.Healthy()} }
37+
38+
func (f *Forwarder) Name() string { return f.lggr.Name() }
39+
40+
func (f *Forwarder) Start(context.Context) error {
41+
go f.run()
42+
return nil
43+
}
44+
45+
func (f *Forwarder) run() {
46+
defer close(f.done)
47+
ctx, cancel := f.stopCh.NewCtx()
48+
defer cancel()
49+
ticker := timeutil.NewTicker(func() time.Duration { return period })
50+
defer ticker.Stop()
51+
for {
52+
select {
53+
case <-ctx.Done():
54+
return
55+
case <-ticker.C:
56+
f.forward(ctx)
57+
}
58+
}
59+
}
60+
61+
func (f *Forwarder) forward(ctx context.Context) {
62+
mfs, err := f.gatherer.Gather()
63+
if err != nil {
64+
f.lggr.Errorw("Failed to gather prometheus metrics", "err", err)
65+
}
66+
for _, mf := range mfs {
67+
for range mf.Metric {
68+
if ctx.Err() != nil {
69+
return
70+
}
71+
72+
//TODO f.meterProvider.Meter()
73+
}
74+
}
75+
}
76+
77+
func (f *Forwarder) Close() error {
78+
close(f.stopCh)
79+
<-f.done
80+
return nil
81+
}

0 commit comments

Comments
 (0)