Skip to content

Commit b3b9797

Browse files
authored
Add heartbeat endpoint for aggregator (#550)
* [CCIP-8200] Add aggregator heartbeat endpoint * Add lag score calculation logic * Add metrics for aggregator heartbeats * Adjust heartbeat metrics names * Add config to set aggregator ID * Cap heartbeat chain details to max configured in committee * Add basic participation metric * Update go.mod * Fixes * Update go.mod and linter fixex * Fix go.mod
1 parent 5ce3860 commit b3b9797

File tree

21 files changed

+1391
-43
lines changed

21 files changed

+1391
-43
lines changed

aggregator/pkg/common/metrics.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,14 @@ type AggregatorMetricLabeler interface {
4848
IncrementOrphanRecoveryErrors(ctx context.Context)
4949
// IncrementPanics increments the counter for panics recovered by background workers.
5050
IncrementPanics(ctx context.Context)
51+
// SetVerifierHeartbeatScore sets the adaptive heartbeat score gauge for a verifier on a specific chain.
52+
SetVerifierHeartbeatScore(ctx context.Context, score float64)
53+
// SetVerifierLastHeartbeatTimestamp sets the timestamp gauge of the last heartbeat from a verifier.
54+
SetVerifierLastHeartbeatTimestamp(ctx context.Context, timestamp int64)
55+
// IncrementVerifierHeartbeatsTotal increments the total number of heartbeats received.
56+
IncrementVerifierHeartbeatsTotal(ctx context.Context)
57+
// SetVerifierHeartbeatChainHeads sets the block height gauge for a verifier on a specific chain.
58+
SetVerifierHeartbeatChainHeads(ctx context.Context, blockHeight uint64)
59+
// IncrementVerificationsTotal increments the total number of commit verifications received.
60+
IncrementVerificationsTotal(ctx context.Context)
5161
}

aggregator/pkg/handlers/batch_write_commit_verifier_node_result_test.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,13 @@ func TestBatchWriteCommitCCVNodeDataHandler_BatchSizeValidation(t *testing.T) {
7979
store.EXPECT().SaveCommitVerification(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
8080
}
8181

82-
writeHandler := NewWriteCommitCCVNodeDataHandler(store, agg, lggr, sig, time.Millisecond)
82+
mon := mocks.NewMockAggregatorMonitoring(t)
83+
labeler := mocks.NewMockAggregatorMetricLabeler(t)
84+
mon.EXPECT().Metrics().Return(labeler).Maybe()
85+
labeler.EXPECT().With(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(labeler).Maybe()
86+
labeler.EXPECT().IncrementVerificationsTotal(mock.Anything).Maybe()
87+
88+
writeHandler := NewWriteCommitCCVNodeDataHandler(store, agg, mon, lggr, sig, time.Millisecond)
8389
batchHandler := NewBatchWriteCommitVerifierNodeResultHandler(writeHandler, tc.maxBatchSize)
8490

8591
requests := make([]*committeepb.WriteCommitteeVerifierNodeResultRequest, tc.numRequests)
@@ -129,7 +135,13 @@ func TestBatchWriteCommitCCVNodeDataHandler_MixedSuccessAndInvalidArgument(t *te
129135

130136
store.EXPECT().SaveCommitVerification(mock.Anything, mock.Anything, mock.Anything).Return(nil)
131137

132-
writeHandler := NewWriteCommitCCVNodeDataHandler(store, agg, lggr, sig, time.Millisecond)
138+
mon := mocks.NewMockAggregatorMonitoring(t)
139+
labeler := mocks.NewMockAggregatorMetricLabeler(t)
140+
mon.EXPECT().Metrics().Return(labeler).Maybe()
141+
labeler.EXPECT().With(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(labeler).Maybe()
142+
labeler.EXPECT().IncrementVerificationsTotal(mock.Anything).Maybe()
143+
144+
writeHandler := NewWriteCommitCCVNodeDataHandler(store, agg, mon, lggr, sig, time.Millisecond)
133145
batchHandler := NewBatchWriteCommitVerifierNodeResultHandler(writeHandler, 10)
134146

135147
validReq := makeValidProtoRequest()
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package handlers
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math"
7+
"slices"
8+
"sort"
9+
"strconv"
10+
11+
"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/auth"
12+
"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/common"
13+
"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/heartbeat"
14+
"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/model"
15+
"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/scope"
16+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
17+
heartbeatpb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat/v1"
18+
)
19+
20+
type HeartbeatHandler struct {
21+
storage heartbeat.Storage
22+
aggregatorID string
23+
committee *model.Committee
24+
l logger.SugaredLogger
25+
m common.AggregatorMonitoring
26+
}
27+
28+
func (h *HeartbeatHandler) logger(ctx context.Context) logger.SugaredLogger {
29+
return scope.AugmentLogger(ctx, h.l)
30+
}
31+
32+
// Handle processes the POST request with heartbeat data and returns current benchmark score per chain.
33+
func (h *HeartbeatHandler) Handle(ctx context.Context, req *heartbeatpb.HeartbeatRequest) (*heartbeatpb.HeartbeatResponse, error) {
34+
identity, ok := auth.IdentityFromContext(ctx)
35+
if !ok {
36+
return nil, fmt.Errorf("no caller identity in context")
37+
}
38+
39+
callerID := identity.CallerID
40+
h.logger(ctx).Infof("Received HeartbeatRequest from caller: %s", callerID)
41+
42+
// get Allowed chains from committee config
43+
allowedChains := make([]uint64, 0, len(h.committee.QuorumConfigs))
44+
45+
for chainSelector := range h.committee.QuorumConfigs {
46+
chainSelectorUint, err := strconv.ParseUint(chainSelector, 10, 64)
47+
if err != nil {
48+
h.logger(ctx).Warnf("Error parsing chain selector '%s': %v", chainSelector, err)
49+
continue
50+
}
51+
allowedChains = append(allowedChains, chainSelectorUint)
52+
}
53+
54+
// filter for allowed chains only
55+
chainDetails := filterHeartbeatChainDetails(req.ChainDetails, allowedChains)
56+
if chainDetails == nil || len(chainDetails.BlockHeightsByChain) == 0 {
57+
h.logger(ctx).Info("No valid chain details provided in heartbeat request")
58+
return &heartbeatpb.HeartbeatResponse{
59+
AggregatorId: h.aggregatorID,
60+
Timestamp: req.SendTimestamp,
61+
ChainBenchmarks: make(map[uint64]*heartbeatpb.ChainBenchmark),
62+
}, nil
63+
}
64+
65+
// Store the block heights from the incoming request
66+
for chainSelector, blockHeight := range chainDetails.BlockHeightsByChain {
67+
if err := h.storage.StoreBlockHeight(ctx, callerID, chainSelector, blockHeight); err != nil {
68+
h.logger(ctx).Warnf("Failed to store block height for chain %d: %v", chainSelector, err)
69+
}
70+
}
71+
72+
// Get the list of chain selectors to query
73+
chainSelectors := make([]uint64, 0, len(chainDetails.BlockHeightsByChain))
74+
for chainSelector := range chainDetails.BlockHeightsByChain {
75+
chainSelectors = append(chainSelectors, chainSelector)
76+
}
77+
78+
maxBlockHeights, err := h.storage.GetMaxBlockHeights(ctx, chainSelectors)
79+
h.logger(ctx).Infof("Max block heights across all callers: %+v", maxBlockHeights)
80+
if err != nil {
81+
h.logger(ctx).Errorf("Failed to get max block heights: %v", err)
82+
maxBlockHeights = make(map[uint64]uint64)
83+
}
84+
85+
// Create chain benchmarks based on max block heights
86+
chainBenchmarks := make(map[uint64]*heartbeatpb.ChainBenchmark)
87+
88+
for chainSelector, maxBlockHeight := range maxBlockHeights {
89+
// Collect all block heights for this chain across all callers
90+
headsAcrossCallers, err := h.storage.GetBlockHeights(ctx, chainSelector)
91+
if err != nil {
92+
h.logger(ctx).Warnf("Failed to get block heights for chain %d: %v", chainSelector, err)
93+
continue
94+
}
95+
96+
headsFlat := make([]int64, 0, len(headsAcrossCallers))
97+
for _, height := range headsAcrossCallers {
98+
headsFlat = append(headsFlat, int64(height)) // #nosec G115 -- block heights are within int64 range
99+
}
100+
101+
// Calculate adaptive score
102+
currentHeight := req.ChainDetails.BlockHeightsByChain[chainSelector]
103+
score := CalculateAdaptiveScore(int64(currentHeight), headsFlat) // #nosec G115 -- block heights are within int64 range
104+
chainBenchmarks[chainSelector] = &heartbeatpb.ChainBenchmark{
105+
BlockHeight: maxBlockHeight,
106+
Score: float32(score),
107+
}
108+
}
109+
110+
metrics := h.m.Metrics().With("caller_id", callerID)
111+
metrics.SetVerifierLastHeartbeatTimestamp(ctx, req.SendTimestamp)
112+
metrics.IncrementVerifierHeartbeatsTotal(ctx)
113+
114+
// Record per-chain metrics
115+
for chainSelector, benchmark := range chainBenchmarks {
116+
chainMetrics := metrics.With("chain_selector", fmt.Sprintf("%d", chainSelector))
117+
chainMetrics.SetVerifierHeartbeatScore(ctx, float64(benchmark.Score))
118+
chainMetrics.SetVerifierHeartbeatChainHeads(ctx, benchmark.BlockHeight)
119+
}
120+
121+
return &heartbeatpb.HeartbeatResponse{
122+
AggregatorId: h.aggregatorID,
123+
Timestamp: req.SendTimestamp,
124+
ChainBenchmarks: chainBenchmarks,
125+
}, nil
126+
}
127+
128+
// NewHeartbeatHandler creates a new instance of HeartbeatHandler.
129+
func NewHeartbeatHandler(storage heartbeat.Storage, aggregatorID string, committee *model.Committee, l logger.SugaredLogger, m common.AggregatorMonitoring) *HeartbeatHandler {
130+
return &HeartbeatHandler{
131+
storage: storage,
132+
aggregatorID: aggregatorID,
133+
committee: committee,
134+
l: l,
135+
m: m,
136+
}
137+
}
138+
139+
// CalculateAdaptiveScore computes the adaptive score based on the provided block height and all block heights.
140+
// The score reflects how far behind the provided block height is compared to others using Median Absolute Deviation (MAD).
141+
// Using MAD is much more robust to outliers compared to standard deviation as it uses median instead of mean.
142+
// This helps prevent a few nodes with very low block heights from skewing the score for everyone else.
143+
// Example scores
144+
// 1.0 -> Leading.
145+
// 2.0 -> 1 MAD behind.
146+
// 4.0 -> 3 MADs behind.
147+
func CalculateAdaptiveScore(scoreBlock int64, allBlocks []int64) float64 {
148+
n := len(allBlocks)
149+
if n == 0 {
150+
return 1.0 // Default to baseline if no data
151+
}
152+
153+
// 1. Find Median
154+
sorted := make([]int64, n)
155+
copy(sorted, allBlocks)
156+
slices.Sort(sorted)
157+
median := sorted[n/2]
158+
159+
// 2. Find MAD (Median Absolute Deviation).
160+
// This calculates the median of gaps from the median.
161+
deviations := make([]float64, 0, n)
162+
for _, b := range sorted {
163+
dev := math.Abs(float64(b - median))
164+
deviations = append(deviations, dev)
165+
}
166+
sort.Float64s(deviations)
167+
mad := deviations[n/2]
168+
169+
// Safety: Assume a minimum deviation of 1 block to avoid divide-by-zero
170+
if mad < 1.0 {
171+
mad = 1.0
172+
}
173+
174+
// 3. Calculate Lag for the scoreBlock
175+
lag := float64(median - scoreBlock)
176+
if lag < 0 {
177+
lag = 0 // Being ahead is treated as leading (Score 1.0)
178+
}
179+
180+
// 4. Calculate Divergence Index
181+
// Formula: 1 + (Lag / MAD)
182+
return 1.0 + (lag / mad)
183+
}
184+
185+
func filterHeartbeatChainDetails(details *heartbeatpb.ChainHealthDetails, allowedChains []uint64) *heartbeatpb.ChainHealthDetails {
186+
if details == nil {
187+
return nil
188+
}
189+
190+
filtered := &heartbeatpb.ChainHealthDetails{
191+
BlockHeightsByChain: make(map[uint64]uint64),
192+
}
193+
194+
for chainSelector, blockHeight := range details.BlockHeightsByChain {
195+
if slices.Contains(allowedChains, chainSelector) {
196+
filtered.BlockHeightsByChain[chainSelector] = blockHeight
197+
}
198+
}
199+
200+
return filtered
201+
}

0 commit comments

Comments
 (0)