Skip to content

Commit debc92f

Browse files
authored
Port generic chain-agnostic balance monitor to cl-common (#1728)
* Move generic chain-agnostic balance monitor to cl-common * gomods tidy * lint * lint * fix typo
1 parent df30cf5 commit debc92f

File tree

5 files changed

+374
-2
lines changed

5 files changed

+374
-2
lines changed
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
// Package balance provides a generic chain-agnostic balance monitoring service
2+
// that tracks account balances across different blockchain networks.
3+
package balance
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"time"
9+
10+
"github.com/smartcontractkit/chainlink-common/pkg/config"
11+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
12+
"github.com/smartcontractkit/chainlink-common/pkg/services"
13+
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
14+
"github.com/smartcontractkit/chainlink-common/pkg/utils"
15+
)
16+
17+
// Config defines the balance monitor configuration.
18+
type GenericBalanceConfig struct {
19+
BalancePollPeriod config.Duration
20+
}
21+
22+
// GenericBalanceClient defines the interface for getting account balances.
23+
type GenericBalanceClient interface {
24+
GetAccountBalance(addr string) (float64, error)
25+
}
26+
27+
// GenericBalanceMonitorOpts contains the options for creating a new balance monitor.
28+
type GenericBalanceMonitorOpts struct {
29+
ChainInfo ChainInfo
30+
ChainNativeCurrency string
31+
32+
Config GenericBalanceConfig
33+
Logger logger.Logger
34+
Keystore core.Keystore
35+
NewGenericBalanceClient func() (GenericBalanceClient, error)
36+
37+
// Maps a public key to an account address (optional, can return key as is)
38+
KeyToAccountMapper func(context.Context, string) (string, error)
39+
}
40+
41+
// ChainInfo contains information about the blockchain network.
42+
type ChainInfo struct {
43+
ChainFamilyName string
44+
ChainID string
45+
NetworkName string
46+
NetworkNameFull string
47+
}
48+
49+
// NewGenericBalanceMonitor returns a balance monitoring services.Service which reports the balance of all Keystore accounts.
50+
func NewGenericBalanceMonitor(opts GenericBalanceMonitorOpts) (services.Service, error) {
51+
// Try to create a new gauge for account balance
52+
gauge, err := NewGaugeAccBalance(opts.ChainNativeCurrency)
53+
if err != nil {
54+
return nil, fmt.Errorf("failed to create gauge: %w", err)
55+
}
56+
57+
lggr := logger.Named(opts.Logger, "BalanceMonitor")
58+
return &genericBalanceMonitor{
59+
cfg: opts.Config,
60+
lggr: lggr,
61+
ks: opts.Keystore,
62+
63+
newReader: opts.NewGenericBalanceClient,
64+
keyToAccountMapper: opts.KeyToAccountMapper,
65+
updateFn: func(ctx context.Context, acc string, balance float64) {
66+
lggr.Infow("Account balance updated", "unit", opts.ChainNativeCurrency, "account", acc, "balance", balance)
67+
gauge.Record(ctx, balance, acc, opts.ChainInfo)
68+
},
69+
70+
stop: make(chan struct{}),
71+
done: make(chan struct{}),
72+
}, nil
73+
}
74+
75+
type genericBalanceMonitor struct {
76+
services.StateMachine
77+
cfg GenericBalanceConfig
78+
lggr logger.Logger
79+
ks core.Keystore
80+
81+
// Returns a new GenericBalanceClient
82+
newReader func() (GenericBalanceClient, error)
83+
// Maps a public key to an account address (optional, can return key as is)
84+
keyToAccountMapper func(context.Context, string) (string, error)
85+
// Updates the balance metric
86+
updateFn func(ctx context.Context, acc string, balance float64) // overridable for testing
87+
88+
// Cached instance, intermittently reset to nil.
89+
reader GenericBalanceClient
90+
91+
stop services.StopChan
92+
done chan struct{}
93+
}
94+
95+
func (m *genericBalanceMonitor) Name() string {
96+
return m.lggr.Name()
97+
}
98+
99+
func (m *genericBalanceMonitor) Start(context.Context) error {
100+
return m.StartOnce(m.Name(), func() error {
101+
go m.start()
102+
return nil
103+
})
104+
}
105+
106+
func (m *genericBalanceMonitor) Close() error {
107+
return m.StopOnce(m.Name(), func() error {
108+
close(m.stop)
109+
<-m.done
110+
return nil
111+
})
112+
}
113+
114+
func (m *genericBalanceMonitor) HealthReport() map[string]error {
115+
return map[string]error{m.Name(): m.Healthy()}
116+
}
117+
118+
// monitor fn continuously updates balances, until stop signal is received.
119+
func (m *genericBalanceMonitor) start() {
120+
defer close(m.done)
121+
ctx, cancel := m.stop.NewCtx()
122+
defer cancel()
123+
124+
period := m.cfg.BalancePollPeriod.Duration()
125+
tick := time.After(utils.WithJitter(period))
126+
for {
127+
select {
128+
case <-m.stop:
129+
return
130+
case <-tick:
131+
m.updateBalances(ctx)
132+
tick = time.After(utils.WithJitter(period))
133+
}
134+
}
135+
}
136+
137+
// getReader returns the stored GenericBalanceClient, creating a new one if necessary.
138+
func (m *genericBalanceMonitor) getReader() (GenericBalanceClient, error) {
139+
if m.reader == nil {
140+
var err error
141+
m.reader, err = m.newReader()
142+
if err != nil {
143+
return nil, err
144+
}
145+
}
146+
return m.reader, nil
147+
}
148+
149+
// updateBalances updates the balances of all accounts in the keystore, using the provided GenericBalanceClient and the updateFn.
150+
func (m *genericBalanceMonitor) updateBalances(ctx context.Context) {
151+
m.lggr.Debug("Updating account balances")
152+
keys, err := m.ks.Accounts(ctx)
153+
if err != nil {
154+
m.lggr.Errorw("Failed to get keys", "err", err)
155+
return
156+
}
157+
if len(keys) == 0 {
158+
return
159+
}
160+
reader, err := m.getReader()
161+
if err != nil {
162+
m.lggr.Errorw("Failed to get client", "err", err)
163+
return
164+
}
165+
166+
var gotSomeBals bool
167+
for _, pk := range keys {
168+
// Check for shutdown signal, since Balance blocks and may be slow.
169+
select {
170+
case <-m.stop:
171+
return
172+
default:
173+
}
174+
175+
// Account address can always be derived from the public key currently
176+
// TODO: if we need to support key rotation, the keystore should store the address explicitly
177+
// Notice: this is chain-specific key to account mapping injected (e.g., relevant for Aptos key management)
178+
accAddr, err := m.keyToAccountMapper(ctx, pk)
179+
if err != nil {
180+
m.lggr.Errorw("Failed to convert public key to account address", "err", err)
181+
continue
182+
}
183+
184+
balance, err := reader.GetAccountBalance(accAddr)
185+
if err != nil {
186+
m.lggr.Errorw("Failed to get balance", "account", accAddr, "err", err)
187+
continue
188+
}
189+
gotSomeBals = true
190+
m.updateFn(ctx, accAddr, balance)
191+
}
192+
193+
// Try a new client next time. // TODO: This is for multinode
194+
if !gotSomeBals {
195+
m.reader = nil
196+
}
197+
}

pkg/monitoring/balance/metadata.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Package balance provides a generic chain-agnostic balance monitoring service
2+
// that tracks account balances across different blockchain networks.
3+
package balance
4+
5+
import (
6+
"encoding/hex"
7+
8+
"go.opentelemetry.io/otel/attribute"
9+
)
10+
11+
const (
12+
// WorkflowExecutionIDShortLen is the length of the short version of the WorkflowExecutionId (label)
13+
WorkflowExecutionIDShortLen = 3 // first 3 characters, 16^3 = 4.096 possibilities (mid-high cardinality)
14+
)
15+
16+
// TODO: Refactor as a proto referenced from the other proto files (telemetry messages)
17+
type ExecutionMetadata struct {
18+
// Execution Context - Source
19+
SourceID string
20+
// Execution Context - Chain
21+
ChainFamilyName string
22+
ChainID string
23+
NetworkName string
24+
NetworkNameFull string
25+
// Execution Context - Workflow (capabilities.RequestMetadata)
26+
WorkflowID string
27+
WorkflowOwner string
28+
WorkflowExecutionID string
29+
WorkflowName string
30+
WorkflowDonID uint32
31+
WorkflowDonConfigVersion uint32
32+
ReferenceID string
33+
// Execution Context - Capability
34+
CapabilityType string
35+
CapabilityID string
36+
CapabilityTimestampStart uint32
37+
CapabilityTimestampEmit uint32
38+
}
39+
40+
// Attributes returns common attributes used for metrics
41+
func (m ExecutionMetadata) Attributes() []attribute.KeyValue {
42+
// Decode workflow name attribute for output
43+
workflowName := m.decodeWorkflowName()
44+
45+
return []attribute.KeyValue{
46+
// Execution Context - Source
47+
attribute.String("source_id", ValOrUnknown(m.SourceID)),
48+
// Execution Context - Chain
49+
attribute.String("chain_family_name", ValOrUnknown(m.ChainFamilyName)),
50+
attribute.String("chain_id", ValOrUnknown(m.ChainID)),
51+
attribute.String("network_name", ValOrUnknown(m.NetworkName)),
52+
attribute.String("network_name_full", ValOrUnknown(m.NetworkNameFull)),
53+
// Execution Context - Workflow (capabilities.RequestMetadata)
54+
attribute.String("workflow_id", ValOrUnknown(m.WorkflowID)),
55+
attribute.String("workflow_owner", ValOrUnknown(m.WorkflowOwner)),
56+
// Notice: We lower the cardinality on the WorkflowExecutionID so it can be used by metrics
57+
// This label has good chances to be unique per workflow, in a reasonable bounded time window
58+
// TODO: enable this when sufficiently tested (PromQL queries like alerts might need to change if this is used)
59+
// attribute.String("workflow_execution_id_short", ValShortOrUnknown(m.WorkflowExecutionID, WorkflowExecutionIDShortLen)),
60+
attribute.String("workflow_name", ValOrUnknown(workflowName)),
61+
attribute.Int64("workflow_don_id", int64(m.WorkflowDonID)),
62+
attribute.Int64("workflow_don_config_version", int64(m.WorkflowDonConfigVersion)),
63+
attribute.String("reference_id", ValOrUnknown(m.ReferenceID)),
64+
// Execution Context - Capability
65+
attribute.String("capability_type", ValOrUnknown(m.CapabilityType)),
66+
attribute.String("capability_id", ValOrUnknown(m.CapabilityID)),
67+
// Notice: we don't include the timestamps here (high cardinality)
68+
}
69+
}
70+
71+
// decodeWorkflowName decodes the workflow name from hex string to raw string (underlying, output)
72+
func (m ExecutionMetadata) decodeWorkflowName() string {
73+
bytes, err := hex.DecodeString(m.WorkflowName)
74+
if err != nil {
75+
// This should never happen
76+
bytes = []byte("unknown-decode-error")
77+
}
78+
return string(bytes)
79+
}
80+
81+
// ValOrUnknown returns the value if it is not empty, otherwise it returns "unknown"
82+
// This is needed to avoid issues during exporting OTel metrics to Prometheus
83+
// For more details see https://smartcontract-it.atlassian.net/browse/INFOPLAT-1349
84+
func ValOrUnknown(val string) string {
85+
if val == "" {
86+
return "unknown"
87+
}
88+
return val
89+
}
90+
91+
// ValShortOrUnknown returns the short len value if not empty or available, otherwise it returns "unknown"
92+
func ValShortOrUnknown(val string, maxLen int) string {
93+
if val == "" || maxLen <= 0 {
94+
return "unknown"
95+
}
96+
if maxLen > len(val) {
97+
return val
98+
}
99+
return val[:maxLen]
100+
}

pkg/monitoring/balance/metrics.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Package balance provides a generic chain-agnostic balance monitoring service
2+
// that tracks account balances across different blockchain networks.
3+
package balance
4+
5+
import (
6+
"context"
7+
"fmt"
8+
9+
"go.opentelemetry.io/otel/attribute"
10+
"go.opentelemetry.io/otel/metric"
11+
12+
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
13+
)
14+
15+
// GaugeAccBalance defines a new gauge metric for account balance
16+
type GaugeAccBalance struct {
17+
// account_balance
18+
gauge metric.Float64Gauge
19+
}
20+
21+
func NewGaugeAccBalance(unitStr string) (*GaugeAccBalance, error) {
22+
name := "account_balance"
23+
description := "Balance for configured WT account"
24+
gauge, err := beholder.GetMeter().Float64Gauge(name, metric.WithUnit(unitStr), metric.WithDescription(description))
25+
if err != nil {
26+
return nil, fmt.Errorf("failed to create new gauge %s: %+w", name, err)
27+
}
28+
return &GaugeAccBalance{gauge}, nil
29+
}
30+
31+
func (g *GaugeAccBalance) Record(ctx context.Context, balance float64, account string, chainInfo ChainInfo) {
32+
oAttrs := metric.WithAttributeSet(g.GetAttributes(account, chainInfo))
33+
g.gauge.Record(ctx, balance, oAttrs)
34+
35+
// TODO: consider also recording record in Prom for availability to NOPs
36+
}
37+
38+
func (g *GaugeAccBalance) GetAttributes(account string, chainInfo ChainInfo) attribute.Set {
39+
return attribute.NewSet(
40+
attribute.String("account", account),
41+
42+
// Execution Context - Source
43+
attribute.String("source_id", ValOrUnknown(account)), // reusing account as source_id
44+
// Execution Context - Chain
45+
attribute.String("chain_family_name", ValOrUnknown(chainInfo.ChainFamilyName)),
46+
attribute.String("chain_id", ValOrUnknown(chainInfo.ChainID)),
47+
attribute.String("network_name", ValOrUnknown(chainInfo.NetworkName)),
48+
attribute.String("network_name_full", ValOrUnknown(chainInfo.NetworkNameFull)),
49+
)
50+
}

0 commit comments

Comments
 (0)