Skip to content

Commit d9e771f

Browse files
committed
Move generic chain-agnostic balance monitor to cl-common
1 parent 9720931 commit d9e771f

File tree

3 files changed

+341
-0
lines changed

3 files changed

+341
-0
lines changed
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
package balance
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/smartcontractkit/chainlink-common/pkg/config"
9+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
10+
"github.com/smartcontractkit/chainlink-common/pkg/services"
11+
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
12+
"github.com/smartcontractkit/chainlink-common/pkg/utils"
13+
)
14+
15+
// Config defines the balance monitor configuration.
16+
type GenericBalanceConfig struct {
17+
BalancePollPeriod config.Duration
18+
}
19+
20+
// GenericBalanceClient defines the interface for getting account balances.
21+
type GenericBalanceClient interface {
22+
GetAccountBalance(addr string) (float64, error)
23+
}
24+
25+
// GenericBalanceMonitorOpts contains the options for creating a new balance monitor.
26+
type GenericBalanceMonitorOpts struct {
27+
ChainInfo ChainInfo
28+
ChainNativeCurrency string
29+
30+
Config GenericBalanceConfig
31+
Logger logger.Logger
32+
Keystore core.Keystore
33+
NewGenericBalanceClient func() (GenericBalanceClient, error)
34+
35+
// Maps a public key to an account address (optional, can return key as is)
36+
KeyToAccountMapper func(context.Context, string) (string, error)
37+
}
38+
39+
// ChainInfo contains information about the blockchain network.
40+
type ChainInfo struct {
41+
ChainFamilyName string
42+
ChainID string
43+
NetworkName string
44+
NetworkNameFull string
45+
}
46+
47+
// NewGenericBalanceMonitor returns a balance monitoring services.Service which reports the balance of all Keystore accounts.
48+
func NewGenericBalanceMonitor(opts GenericBalanceMonitorOpts) (services.Service, error) {
49+
// Try to create a new gauge for account balance
50+
gauge, err := NewGaugeAccBalance(opts.ChainNativeCurrency)
51+
if err != nil {
52+
return nil, fmt.Errorf("failed to create gauge: %w", err)
53+
}
54+
55+
lggr := logger.Named(opts.Logger, "BalanceMonitor")
56+
return &genericBalanceMonitor{
57+
cfg: opts.Config,
58+
lggr: lggr,
59+
ks: opts.Keystore,
60+
61+
newReader: opts.NewGenericBalanceClient,
62+
keyToAccountMapper: opts.KeyToAccountMapper,
63+
updateFn: func(ctx context.Context, acc string, balance float64) {
64+
lggr.Infow("Account balance updated", "unit", opts.ChainNativeCurrency, "account", acc, "balance", balance)
65+
gauge.Record(ctx, balance, acc, opts.ChainInfo)
66+
},
67+
68+
stop: make(chan struct{}),
69+
done: make(chan struct{}),
70+
}, nil
71+
}
72+
73+
type genericBalanceMonitor struct {
74+
services.StateMachine
75+
cfg GenericBalanceConfig
76+
lggr logger.Logger
77+
ks core.Keystore
78+
79+
// Returns a new GenericBalanceClient
80+
newReader func() (GenericBalanceClient, error)
81+
// Maps a public key to an account address (optional, can return key as is)
82+
keyToAccountMapper func(context.Context, string) (string, error)
83+
// Updates the balance metric
84+
updateFn func(ctx context.Context, acc string, balance float64) // overridable for testing
85+
86+
// Cached instance, intermitently reset to nil.
87+
reader GenericBalanceClient
88+
89+
stop services.StopChan
90+
done chan struct{}
91+
}
92+
93+
func (m *genericBalanceMonitor) Name() string {
94+
return m.lggr.Name()
95+
}
96+
97+
func (m *genericBalanceMonitor) Start(context.Context) error {
98+
return m.StartOnce(m.Name(), func() error {
99+
go m.start()
100+
return nil
101+
})
102+
}
103+
104+
func (m *genericBalanceMonitor) Close() error {
105+
return m.StopOnce(m.Name(), func() error {
106+
close(m.stop)
107+
<-m.done
108+
return nil
109+
})
110+
}
111+
112+
func (m *genericBalanceMonitor) HealthReport() map[string]error {
113+
return map[string]error{m.Name(): m.Healthy()}
114+
}
115+
116+
// monitor fn continously updates balances, until stop signal is received.
117+
func (m *genericBalanceMonitor) start() {
118+
defer close(m.done)
119+
ctx, cancel := m.stop.NewCtx()
120+
defer cancel()
121+
122+
period := m.cfg.BalancePollPeriod.Duration()
123+
tick := time.After(utils.WithJitter(period))
124+
for {
125+
select {
126+
case <-m.stop:
127+
return
128+
case <-tick:
129+
m.updateBalances(ctx)
130+
tick = time.After(utils.WithJitter(period))
131+
}
132+
}
133+
}
134+
135+
// getReader returns the stored GenericBalanceClient, creating a new one if necessary.
136+
func (m *genericBalanceMonitor) getReader() (GenericBalanceClient, error) {
137+
if m.reader == nil {
138+
var err error
139+
m.reader, err = m.newReader()
140+
if err != nil {
141+
return nil, err
142+
}
143+
}
144+
return m.reader, nil
145+
}
146+
147+
// updateBalances updates the balances of all accounts in the keystore, using the provided GenericBalanceClient and the updateFn.
148+
func (m *genericBalanceMonitor) updateBalances(ctx context.Context) {
149+
m.lggr.Debug("Updating account balances")
150+
keys, err := m.ks.Accounts(ctx)
151+
if err != nil {
152+
m.lggr.Errorw("Failed to get keys", "err", err)
153+
return
154+
}
155+
if len(keys) == 0 {
156+
return
157+
}
158+
reader, err := m.getReader()
159+
if err != nil {
160+
m.lggr.Errorw("Failed to get client", "err", err)
161+
return
162+
}
163+
164+
var gotSomeBals bool
165+
for _, pk := range keys {
166+
// Check for shutdown signal, since Balance blocks and may be slow.
167+
select {
168+
case <-m.stop:
169+
return
170+
default:
171+
}
172+
173+
// Account address can always be derived from the public key currently
174+
// TODO: if we need to support key rotation, the keystore should store the address explicitly
175+
// Notice: this is chain-specific key to account mapping injected (e.g., relevant for Aptos key management)
176+
accAddr, err := m.keyToAccountMapper(ctx, pk)
177+
if err != nil {
178+
m.lggr.Errorw("Failed to convert public key to account address", "err", err)
179+
continue
180+
}
181+
182+
balance, err := reader.GetAccountBalance(accAddr)
183+
if err != nil {
184+
m.lggr.Errorw("Failed to get balance", "account", accAddr, "err", err)
185+
continue
186+
}
187+
gotSomeBals = true
188+
m.updateFn(ctx, accAddr, balance)
189+
}
190+
191+
// Try a new client next time. // TODO: This is for multinode
192+
if !gotSomeBals {
193+
m.reader = nil
194+
}
195+
}

pkg/monitoring/balance/metadata.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package balance
2+
3+
import (
4+
"encoding/hex"
5+
6+
"go.opentelemetry.io/otel/attribute"
7+
)
8+
9+
const (
10+
// WorkflowExecutionIDShortLen is the length of the short version of the WorkflowExecutionId (label)
11+
WorkflowExecutionIDShortLen = 3 // first 3 characters, 16^3 = 4.096 possibilities (mid-high cardinality)
12+
)
13+
14+
// TODO: Refactor as a proto referenced from the other proto files (telemetry messages)
15+
type ExecutionMetadata struct {
16+
// Execution Context - Source
17+
SourceId string
18+
// Execution Context - Chain
19+
ChainFamilyName string
20+
ChainId string
21+
NetworkName string
22+
NetworkNameFull string
23+
// Execution Context - Workflow (capabilities.RequestMetadata)
24+
WorkflowId string
25+
WorkflowOwner string
26+
WorkflowExecutionId string
27+
WorkflowName string
28+
WorkflowDonId uint32
29+
WorkflowDonConfigVersion uint32
30+
ReferenceId string
31+
// Execution Context - Capability
32+
CapabilityType string
33+
CapabilityId string
34+
CapabilityTimestampStart uint32
35+
CapabilityTimestampEmit uint32
36+
}
37+
38+
// Attributes returns common attributes used for metrics
39+
func (m ExecutionMetadata) Attributes() []attribute.KeyValue {
40+
// Decode workflow name attribute for output
41+
workflowName := m.decodeWorkflowName()
42+
43+
return []attribute.KeyValue{
44+
// Execution Context - Source
45+
attribute.String("source_id", ValOrUnknown(m.SourceId)),
46+
// Execution Context - Chain
47+
attribute.String("chain_family_name", ValOrUnknown(m.ChainFamilyName)),
48+
attribute.String("chain_id", ValOrUnknown(m.ChainId)),
49+
attribute.String("network_name", ValOrUnknown(m.NetworkName)),
50+
attribute.String("network_name_full", ValOrUnknown(m.NetworkNameFull)),
51+
// Execution Context - Workflow (capabilities.RequestMetadata)
52+
attribute.String("workflow_id", ValOrUnknown(m.WorkflowId)),
53+
attribute.String("workflow_owner", ValOrUnknown(m.WorkflowOwner)),
54+
// Notice: We lower the cardinality on the WorkflowExecutionId so it can be used by metrics
55+
// This label has good chances to be unique per workflow, in a reasonable bounded time window
56+
// TODO: enable this when sufficiently tested (PromQL queries like alerts might need to change if this is used)
57+
// attribute.String("workflow_execution_id_short", ValShortOrUnknown(m.WorkflowExecutionId, WorkflowExecutionIDShortLen)),
58+
attribute.String("workflow_name", ValOrUnknown(workflowName)),
59+
attribute.Int64("workflow_don_id", int64(m.WorkflowDonId)),
60+
attribute.Int64("workflow_don_config_version", int64(m.WorkflowDonConfigVersion)),
61+
attribute.String("reference_id", ValOrUnknown(m.ReferenceId)),
62+
// Execution Context - Capability
63+
attribute.String("capability_type", ValOrUnknown(m.CapabilityType)),
64+
attribute.String("capability_id", ValOrUnknown(m.CapabilityId)),
65+
// Notice: we don't include the timestamps here (high cardinality)
66+
}
67+
}
68+
69+
// decodeWorkflowName decodes the workflow name from hex string to raw string (underlying, output)
70+
func (m ExecutionMetadata) decodeWorkflowName() string {
71+
bytes, err := hex.DecodeString(m.WorkflowName)
72+
if err != nil {
73+
// This should never happen
74+
bytes = []byte("unknown-decode-error")
75+
}
76+
return string(bytes)
77+
}
78+
79+
// This is needed to avoid issues during exporting OTel metrics to Prometheus
80+
// For more details see https://smartcontract-it.atlassian.net/browse/INFOPLAT-1349
81+
// ValOrUnknown returns the value if it is not empty, otherwise it returns "unknown"
82+
func ValOrUnknown(val string) string {
83+
if val == "" {
84+
return "unknown"
85+
}
86+
return val
87+
}
88+
89+
// ValShortOrUnknown returns the short len value if not empty or available, otherwise it returns "unknown"
90+
func ValShortOrUnknown(val string, _len int) string {
91+
if val == "" || _len <= 0 {
92+
return "unknown"
93+
}
94+
if _len > len(val) {
95+
return val
96+
}
97+
return val[:_len]
98+
}

pkg/monitoring/balance/metrics.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package balance
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"go.opentelemetry.io/otel/attribute"
8+
"go.opentelemetry.io/otel/metric"
9+
10+
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
11+
)
12+
13+
// GaugeAccBalance defines a new gauge metric for account balance
14+
type GaugeAccBalance struct {
15+
// account_balance
16+
gauge metric.Float64Gauge
17+
}
18+
19+
func NewGaugeAccBalance(unitStr string) (*GaugeAccBalance, error) {
20+
name := "account_balance"
21+
description := "Balance for configured WT account"
22+
gauge, err := beholder.GetMeter().Float64Gauge(name, metric.WithUnit(unitStr), metric.WithDescription(description))
23+
if err != nil {
24+
return nil, fmt.Errorf("failed to create new gauge %s: %+w", name, err)
25+
}
26+
return &GaugeAccBalance{gauge}, nil
27+
}
28+
29+
func (g *GaugeAccBalance) Record(ctx context.Context, balance float64, account string, chainInfo ChainInfo) {
30+
oAttrs := metric.WithAttributeSet(g.GetAttributes(account, chainInfo))
31+
g.gauge.Record(ctx, balance, oAttrs)
32+
33+
// TODO: consider also recording record in Prom for availability to NOPs
34+
}
35+
36+
func (g *GaugeAccBalance) GetAttributes(account string, chainInfo ChainInfo) attribute.Set {
37+
return attribute.NewSet(
38+
attribute.String("account", account),
39+
40+
// Execution Context - Source
41+
attribute.String("source_id", ValOrUnknown(account)), // reusing account as source_id
42+
// Execution Context - Chain
43+
attribute.String("chain_family_name", ValOrUnknown(chainInfo.ChainFamilyName)),
44+
attribute.String("chain_id", ValOrUnknown(chainInfo.ChainID)),
45+
attribute.String("network_name", ValOrUnknown(chainInfo.NetworkName)),
46+
attribute.String("network_name_full", ValOrUnknown(chainInfo.NetworkNameFull)),
47+
)
48+
}

0 commit comments

Comments
 (0)