Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
RetirementReportCache: retirementReportCache,
LLOTransmissionReaper: lloReaper,
CapabilitiesRegistry: capabilitiesRegistry,
Registerer: appRegisterer,
})
}

Expand Down
201 changes: 145 additions & 56 deletions core/scripts/go.mod

Large diffs are not rendered by default.

584 changes: 461 additions & 123 deletions core/scripts/go.sum

Large diffs are not rendered by default.

14 changes: 12 additions & 2 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ type ApplicationOpts struct {
NewOracleFactoryFn standardcapabilities.NewOracleFactoryFn
FetcherFunc syncer.FetcherFunc
FetcherFactoryFn compute.FetcherFactory
Registerer prometheus.Registerer
}

type Heartbeat struct {
Expand Down Expand Up @@ -279,8 +280,17 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
restrictedHTTPClient := opts.RestrictedHTTPClient
unrestrictedHTTPClient := opts.UnrestrictedHTTPClient

promForwarder := promotel.NewForwarder(globalLogger, prometheus.DefaultGatherer, otel.GetMeterProvider())
srvcs = append(srvcs, promForwarder)
if beholderClient := beholder.GetClient(); beholderClient != nil {
forwarderOpts := promotel.DefaultOptions()
forwarderOpts.Endpoint = beholderClient.Config.OtelExporterGRPCEndpoint
forwarderOpts.TLSInsecure = beholderClient.Config.InsecureConnection
forwarderOpts.AuthHeaders = beholderClient.Config.AuthHeaders
promForwarder, err := promotel.NewForwarderService(prometheus.DefaultGatherer, opts.Registerer, globalLogger, forwarderOpts)
if err != nil {
return nil, fmt.Errorf("could not create prometheus forwarder: %w", err)
}
srvcs = append(srvcs, promForwarder)
}

if opts.CapabilitiesRegistry == nil {
// for tests only, in prod Registry should always be set at this point
Expand Down
86 changes: 30 additions & 56 deletions core/services/promotel/promotel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,80 +2,54 @@ package promotel

import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/metric"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/timeutil"

promotelcommon "github.com/smartcontractkit/chainlink-common/pkg/promotel"
)

const period = 15 * time.Second
const (
name = "PromOTELForwarder"
)

type Forwarder struct {
type Options = promotelcommon.ForwarderOptions
type ForwarderService struct {
services.StateMachine
lggr logger.Logger
gatherer prometheus.Gatherer
meterProvider metric.MeterProvider
stopCh services.StopChan
done chan struct{}
lggr logger.Logger
forwarder *promotelcommon.Forwarder
}

func NewForwarder(lggr logger.Logger, gatherer prometheus.Gatherer, meterProvider metric.MeterProvider) *Forwarder {
return &Forwarder{
lggr: logger.Named(lggr, "PromOTELForwarder"),
gatherer: gatherer,
meterProvider: meterProvider,
stopCh: make(chan struct{}),
done: make(chan struct{}),
func NewForwarderService(g prometheus.Gatherer, r prometheus.Registerer, lggr logger.Logger, opts Options) (*ForwarderService, error) {
l := logger.Named(lggr, name)
forwarder, err := promotelcommon.NewForwarder(g, r, l, opts)
if err != nil {
return nil, err
}
return &ForwarderService{
lggr: l,
forwarder: forwarder,
}, nil
}

func (f *Forwarder) HealthReport() map[string]error { return map[string]error{f.Name(): f.Healthy()} }

func (f *Forwarder) Name() string { return f.lggr.Name() }

func (f *Forwarder) Start(context.Context) error {
go f.run()
return nil
func (f *ForwarderService) HealthReport() map[string]error {
return map[string]error{f.Name(): f.Healthy()}
}

func (f *Forwarder) run() {
defer close(f.done)
ctx, cancel := f.stopCh.NewCtx()
defer cancel()
ticker := timeutil.NewTicker(func() time.Duration { return period })
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
f.forward(ctx)
}
}
}
func (f *ForwarderService) Name() string { return f.lggr.Name() }

func (f *Forwarder) forward(ctx context.Context) {
mfs, err := f.gatherer.Gather()
if err != nil {
f.lggr.Errorw("Failed to gather prometheus metrics", "err", err)
}
for _, mf := range mfs {
for range mf.Metric {
if ctx.Err() != nil {
return
}
func (f *ForwarderService) Start(ctx context.Context) error {
return f.StartOnce(name, func() error {
return f.forwarder.Start(ctx)
})
}

//TODO f.meterProvider.Meter()
}
}
func (f *ForwarderService) Close() error {
return f.StopOnce(name, f.forwarder.Close)
}

func (f *Forwarder) Close() error {
close(f.stopCh)
<-f.done
return nil
func DefaultOptions() Options {
return promotelcommon.DefaultForwarderOptions()
}
76 changes: 76 additions & 0 deletions core/services/promotel/promotel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package promotel

import (
"context"
"strings"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/smartcontractkit/wsrpc/logger"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestForwarder(t *testing.T) {
var (
g = prometheus.DefaultGatherer
r = prometheus.DefaultRegisterer
lggr, observed = logger.TestObserved(t, zap.DebugLevel)
testMetricName = t.Name() + "_test_counter_metric"
interval = 10 * time.Millisecond
)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

go reportTestMetrics(ctx, r, testMetricName)

doneCh := make(chan struct{})
go func() {
for {
select {
case <-ctx.Done():
return
default:
for _, l := range observed.All() {
metricName, ok := l.ContextMap()["name"].(string)
if ok && strings.Contains(metricName, testMetricName) {
doneCh <- struct{}{}
}
}
time.Sleep(1 * time.Second)
}
}
}()

forwarder, err := NewForwarderService(g, r, lggr, Options{
Endpoint: "localhost:4317",
TLSInsecure: true,
Interval: interval,
Verbose: true,
})
require.NoError(t, err)
require.NoError(t, forwarder.Start(ctx))
defer forwarder.Close()

select {
case <-ctx.Done():
t.Fatal("Test timed out. Expected metric not found")
case <-doneCh:
t.Log("Found metric.")
}
}

func reportTestMetrics(ctx context.Context, reg prometheus.Registerer, metricName string) {
m := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: metricName})
for {
select {
case <-ctx.Done():
return
default:
m.Inc()
time.Sleep(1 * time.Second)
}
}
}
Loading
Loading