diff --git a/aggregator/pkg/common/metrics.go b/aggregator/pkg/common/metrics.go index affe0e30e..e2c14f1a6 100644 --- a/aggregator/pkg/common/metrics.go +++ b/aggregator/pkg/common/metrics.go @@ -48,4 +48,14 @@ type AggregatorMetricLabeler interface { IncrementOrphanRecoveryErrors(ctx context.Context) // IncrementPanics increments the counter for panics recovered by background workers. IncrementPanics(ctx context.Context) + // SetVerifierHeartbeatScore sets the adaptive heartbeat score gauge for a verifier on a specific chain. + SetVerifierHeartbeatScore(ctx context.Context, score float64) + // SetVerifierLastHeartbeatTimestamp sets the timestamp gauge of the last heartbeat from a verifier. + SetVerifierLastHeartbeatTimestamp(ctx context.Context, timestamp int64) + // IncrementVerifierHeartbeatsTotal increments the total number of heartbeats received. + IncrementVerifierHeartbeatsTotal(ctx context.Context) + // SetVerifierHeartbeatChainHeads sets the block height gauge for a verifier on a specific chain. + SetVerifierHeartbeatChainHeads(ctx context.Context, blockHeight uint64) + // IncrementVerificationsTotal increments the total number of commit verifications received. + IncrementVerificationsTotal(ctx context.Context) } diff --git a/aggregator/pkg/handlers/batch_write_commit_verifier_node_result_test.go b/aggregator/pkg/handlers/batch_write_commit_verifier_node_result_test.go index 2796c0ea8..a52e995ed 100644 --- a/aggregator/pkg/handlers/batch_write_commit_verifier_node_result_test.go +++ b/aggregator/pkg/handlers/batch_write_commit_verifier_node_result_test.go @@ -79,7 +79,13 @@ func TestBatchWriteCommitCCVNodeDataHandler_BatchSizeValidation(t *testing.T) { store.EXPECT().SaveCommitVerification(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() } - writeHandler := NewWriteCommitCCVNodeDataHandler(store, agg, lggr, sig, time.Millisecond) + mon := mocks.NewMockAggregatorMonitoring(t) + labeler := mocks.NewMockAggregatorMetricLabeler(t) + mon.EXPECT().Metrics().Return(labeler).Maybe() + labeler.EXPECT().With(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(labeler).Maybe() + labeler.EXPECT().IncrementVerificationsTotal(mock.Anything).Maybe() + + writeHandler := NewWriteCommitCCVNodeDataHandler(store, agg, mon, lggr, sig, time.Millisecond) batchHandler := NewBatchWriteCommitVerifierNodeResultHandler(writeHandler, tc.maxBatchSize) requests := make([]*committeepb.WriteCommitteeVerifierNodeResultRequest, tc.numRequests) @@ -129,7 +135,13 @@ func TestBatchWriteCommitCCVNodeDataHandler_MixedSuccessAndInvalidArgument(t *te store.EXPECT().SaveCommitVerification(mock.Anything, mock.Anything, mock.Anything).Return(nil) - writeHandler := NewWriteCommitCCVNodeDataHandler(store, agg, lggr, sig, time.Millisecond) + mon := mocks.NewMockAggregatorMonitoring(t) + labeler := mocks.NewMockAggregatorMetricLabeler(t) + mon.EXPECT().Metrics().Return(labeler).Maybe() + labeler.EXPECT().With(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(labeler).Maybe() + labeler.EXPECT().IncrementVerificationsTotal(mock.Anything).Maybe() + + writeHandler := NewWriteCommitCCVNodeDataHandler(store, agg, mon, lggr, sig, time.Millisecond) batchHandler := NewBatchWriteCommitVerifierNodeResultHandler(writeHandler, 10) validReq := makeValidProtoRequest() diff --git a/aggregator/pkg/handlers/heartbeat.go b/aggregator/pkg/handlers/heartbeat.go new file mode 100644 index 000000000..45fb9fb8f --- /dev/null +++ b/aggregator/pkg/handlers/heartbeat.go @@ -0,0 +1,201 @@ +package handlers + +import ( + "context" + "fmt" + "math" + "slices" + "sort" + "strconv" + + "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/auth" + "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/common" + "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/heartbeat" + "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/model" + "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/scope" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + heartbeatpb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat/v1" +) + +type HeartbeatHandler struct { + storage heartbeat.Storage + aggregatorID string + committee *model.Committee + l logger.SugaredLogger + m common.AggregatorMonitoring +} + +func (h *HeartbeatHandler) logger(ctx context.Context) logger.SugaredLogger { + return scope.AugmentLogger(ctx, h.l) +} + +// Handle processes the POST request with heartbeat data and returns current benchmark score per chain. +func (h *HeartbeatHandler) Handle(ctx context.Context, req *heartbeatpb.HeartbeatRequest) (*heartbeatpb.HeartbeatResponse, error) { + identity, ok := auth.IdentityFromContext(ctx) + if !ok { + return nil, fmt.Errorf("no caller identity in context") + } + + callerID := identity.CallerID + h.logger(ctx).Infof("Received HeartbeatRequest from caller: %s", callerID) + + // get Allowed chains from committee config + allowedChains := make([]uint64, 0, len(h.committee.QuorumConfigs)) + + for chainSelector := range h.committee.QuorumConfigs { + chainSelectorUint, err := strconv.ParseUint(chainSelector, 10, 64) + if err != nil { + h.logger(ctx).Warnf("Error parsing chain selector '%s': %v", chainSelector, err) + continue + } + allowedChains = append(allowedChains, chainSelectorUint) + } + + // filter for allowed chains only + chainDetails := filterHeartbeatChainDetails(req.ChainDetails, allowedChains) + if chainDetails == nil || len(chainDetails.BlockHeightsByChain) == 0 { + h.logger(ctx).Info("No valid chain details provided in heartbeat request") + return &heartbeatpb.HeartbeatResponse{ + AggregatorId: h.aggregatorID, + Timestamp: req.SendTimestamp, + ChainBenchmarks: make(map[uint64]*heartbeatpb.ChainBenchmark), + }, nil + } + + // Store the block heights from the incoming request + for chainSelector, blockHeight := range chainDetails.BlockHeightsByChain { + if err := h.storage.StoreBlockHeight(ctx, callerID, chainSelector, blockHeight); err != nil { + h.logger(ctx).Warnf("Failed to store block height for chain %d: %v", chainSelector, err) + } + } + + // Get the list of chain selectors to query + chainSelectors := make([]uint64, 0, len(chainDetails.BlockHeightsByChain)) + for chainSelector := range chainDetails.BlockHeightsByChain { + chainSelectors = append(chainSelectors, chainSelector) + } + + maxBlockHeights, err := h.storage.GetMaxBlockHeights(ctx, chainSelectors) + h.logger(ctx).Infof("Max block heights across all callers: %+v", maxBlockHeights) + if err != nil { + h.logger(ctx).Errorf("Failed to get max block heights: %v", err) + maxBlockHeights = make(map[uint64]uint64) + } + + // Create chain benchmarks based on max block heights + chainBenchmarks := make(map[uint64]*heartbeatpb.ChainBenchmark) + + for chainSelector, maxBlockHeight := range maxBlockHeights { + // Collect all block heights for this chain across all callers + headsAcrossCallers, err := h.storage.GetBlockHeights(ctx, chainSelector) + if err != nil { + h.logger(ctx).Warnf("Failed to get block heights for chain %d: %v", chainSelector, err) + continue + } + + headsFlat := make([]int64, 0, len(headsAcrossCallers)) + for _, height := range headsAcrossCallers { + headsFlat = append(headsFlat, int64(height)) // #nosec G115 -- block heights are within int64 range + } + + // Calculate adaptive score + currentHeight := req.ChainDetails.BlockHeightsByChain[chainSelector] + score := CalculateAdaptiveScore(int64(currentHeight), headsFlat) // #nosec G115 -- block heights are within int64 range + chainBenchmarks[chainSelector] = &heartbeatpb.ChainBenchmark{ + BlockHeight: maxBlockHeight, + Score: float32(score), + } + } + + metrics := h.m.Metrics().With("caller_id", callerID) + metrics.SetVerifierLastHeartbeatTimestamp(ctx, req.SendTimestamp) + metrics.IncrementVerifierHeartbeatsTotal(ctx) + + // Record per-chain metrics + for chainSelector, benchmark := range chainBenchmarks { + chainMetrics := metrics.With("chain_selector", fmt.Sprintf("%d", chainSelector)) + chainMetrics.SetVerifierHeartbeatScore(ctx, float64(benchmark.Score)) + chainMetrics.SetVerifierHeartbeatChainHeads(ctx, benchmark.BlockHeight) + } + + return &heartbeatpb.HeartbeatResponse{ + AggregatorId: h.aggregatorID, + Timestamp: req.SendTimestamp, + ChainBenchmarks: chainBenchmarks, + }, nil +} + +// NewHeartbeatHandler creates a new instance of HeartbeatHandler. +func NewHeartbeatHandler(storage heartbeat.Storage, aggregatorID string, committee *model.Committee, l logger.SugaredLogger, m common.AggregatorMonitoring) *HeartbeatHandler { + return &HeartbeatHandler{ + storage: storage, + aggregatorID: aggregatorID, + committee: committee, + l: l, + m: m, + } +} + +// CalculateAdaptiveScore computes the adaptive score based on the provided block height and all block heights. +// The score reflects how far behind the provided block height is compared to others using Median Absolute Deviation (MAD). +// Using MAD is much more robust to outliers compared to standard deviation as it uses median instead of mean. +// This helps prevent a few nodes with very low block heights from skewing the score for everyone else. +// Example scores +// 1.0 -> Leading. +// 2.0 -> 1 MAD behind. +// 4.0 -> 3 MADs behind. +func CalculateAdaptiveScore(scoreBlock int64, allBlocks []int64) float64 { + n := len(allBlocks) + if n == 0 { + return 1.0 // Default to baseline if no data + } + + // 1. Find Median + sorted := make([]int64, n) + copy(sorted, allBlocks) + slices.Sort(sorted) + median := sorted[n/2] + + // 2. Find MAD (Median Absolute Deviation). + // This calculates the median of gaps from the median. + deviations := make([]float64, 0, n) + for _, b := range sorted { + dev := math.Abs(float64(b - median)) + deviations = append(deviations, dev) + } + sort.Float64s(deviations) + mad := deviations[n/2] + + // Safety: Assume a minimum deviation of 1 block to avoid divide-by-zero + if mad < 1.0 { + mad = 1.0 + } + + // 3. Calculate Lag for the scoreBlock + lag := float64(median - scoreBlock) + if lag < 0 { + lag = 0 // Being ahead is treated as leading (Score 1.0) + } + + // 4. Calculate Divergence Index + // Formula: 1 + (Lag / MAD) + return 1.0 + (lag / mad) +} + +func filterHeartbeatChainDetails(details *heartbeatpb.ChainHealthDetails, allowedChains []uint64) *heartbeatpb.ChainHealthDetails { + if details == nil { + return nil + } + + filtered := &heartbeatpb.ChainHealthDetails{ + BlockHeightsByChain: make(map[uint64]uint64), + } + + for chainSelector, blockHeight := range details.BlockHeightsByChain { + if slices.Contains(allowedChains, chainSelector) { + filtered.BlockHeightsByChain[chainSelector] = blockHeight + } + } + + return filtered +} diff --git a/aggregator/pkg/handlers/heartbeat_test.go b/aggregator/pkg/handlers/heartbeat_test.go new file mode 100644 index 000000000..1adb14735 --- /dev/null +++ b/aggregator/pkg/handlers/heartbeat_test.go @@ -0,0 +1,491 @@ +package handlers + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/auth" + "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/heartbeat" + "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/model" + "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/monitoring" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + heartbeatpb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat/v1" +) + +func createTestCommittee(chainSelectors ...uint64) *model.Committee { + quorumConfigs := make(map[string]*model.QuorumConfig) + for _, chainSelector := range chainSelectors { + quorumConfigs[fmt.Sprintf("%d", chainSelector)] = &model.QuorumConfig{ + Threshold: 1, + Signers: []model.Signer{}, + } + } + return &model.Committee{ + QuorumConfigs: quorumConfigs, + } +} + +func TestHeartbeatHandler_Handle_NoIdentityInContext(t *testing.T) { + t.Parallel() + + storage := heartbeat.NewInMemoryStorage() + lggr := logger.TestSugared(t) + monitoring := &monitoring.NoopAggregatorMonitoring{} + committee := createTestCommittee(1) + handler := NewHeartbeatHandler(storage, "mock-aggregator-001", committee, lggr, monitoring) + + req := &heartbeatpb.HeartbeatRequest{ + SendTimestamp: 1768392197, + ChainDetails: &heartbeatpb.ChainHealthDetails{ + BlockHeightsByChain: map[uint64]uint64{ + 1: 1000000, + }, + }, + } + + resp, err := handler.Handle(context.Background(), req) + + require.Error(t, err) + require.Nil(t, resp) + assert.Contains(t, err.Error(), "no caller identity in context") +} + +func TestHeartbeatHandler_Handle_SingleCaller(t *testing.T) { + t.Parallel() + + storage := heartbeat.NewInMemoryStorage() + lggr := logger.TestSugared(t) + monitoring := &monitoring.NoopAggregatorMonitoring{} + committee := createTestCommittee(1, 137) + handler := NewHeartbeatHandler(storage, "mock-aggregator-001", committee, lggr, monitoring) + + identity := auth.CreateCallerIdentity("caller-1", false) + ctx := auth.ToContext(context.Background(), identity) + + req := &heartbeatpb.HeartbeatRequest{ + SendTimestamp: 1768392197, + ChainDetails: &heartbeatpb.ChainHealthDetails{ + BlockHeightsByChain: map[uint64]uint64{ + 1: 1000000, + 137: 2000000, + }, + }, + } + + resp, err := handler.Handle(ctx, req) + + require.NoError(t, err) + require.NotNil(t, resp) + assert.Equal(t, "mock-aggregator-001", resp.AggregatorId) + assert.Equal(t, int64(1768392197), resp.Timestamp) + assert.Len(t, resp.ChainBenchmarks, 2) + + // Caller is at max, so score should be 1.0 + assert.Equal(t, uint64(1000000), resp.ChainBenchmarks[1].BlockHeight) + assert.Equal(t, float32(1.0), resp.ChainBenchmarks[1].Score) + + assert.Equal(t, uint64(2000000), resp.ChainBenchmarks[137].BlockHeight) + assert.Equal(t, float32(1.0), resp.ChainBenchmarks[137].Score) +} + +func TestHeartbeatHandler_Handle_MultipleCallers_ReturnsMaxBlockHeights(t *testing.T) { + t.Parallel() + + storage := heartbeat.NewInMemoryStorage() + lggr := logger.TestSugared(t) + monitoring := &monitoring.NoopAggregatorMonitoring{} + committee := createTestCommittee(1, 137) + handler := NewHeartbeatHandler(storage, "mock-aggregator-001", committee, lggr, monitoring) + + // First caller with higher block heights + caller1Identity := auth.CreateCallerIdentity("caller-1", false) + ctx1 := auth.ToContext(context.Background(), caller1Identity) + + req1 := &heartbeatpb.HeartbeatRequest{ + SendTimestamp: 1768392197, + ChainDetails: &heartbeatpb.ChainHealthDetails{ + BlockHeightsByChain: map[uint64]uint64{ + 1: 1000000, + 137: 2000000, + }, + }, + } + + resp1, err := handler.Handle(ctx1, req1) + require.NoError(t, err) + require.NotNil(t, resp1) + + // Second caller with mixed block heights (higher on chain 1, lower on chain 137) + caller2Identity := auth.CreateCallerIdentity("caller-2", false) + ctx2 := auth.ToContext(context.Background(), caller2Identity) + + req2 := &heartbeatpb.HeartbeatRequest{ + SendTimestamp: 1768392198, + ChainDetails: &heartbeatpb.ChainHealthDetails{ + BlockHeightsByChain: map[uint64]uint64{ + 1: 1000100, // Higher than caller-1 + 137: 1999950, // Lower than caller-1 + }, + }, + } + + resp2, err := handler.Handle(ctx2, req2) + require.NoError(t, err) + require.NotNil(t, resp2) + + // Verify that max block heights are returned + assert.Equal(t, uint64(1000100), resp2.ChainBenchmarks[1].BlockHeight) // caller-2's height + assert.Equal(t, uint64(2000000), resp2.ChainBenchmarks[137].BlockHeight) // caller-1's height + + assert.Equal(t, float32(1.0), resp2.ChainBenchmarks[1].Score) // At max + + assert.Equal(t, float32(2), resp2.ChainBenchmarks[137].Score) +} + +func TestHeartbeatHandler_Handle_NilChainDetails(t *testing.T) { + t.Parallel() + + storage := heartbeat.NewInMemoryStorage() + lggr := logger.TestSugared(t) + monitoring := &monitoring.NoopAggregatorMonitoring{} + committee := createTestCommittee(1) + handler := NewHeartbeatHandler(storage, "mock-aggregator-001", committee, lggr, monitoring) + + identity := auth.CreateCallerIdentity("caller-1", false) + ctx := auth.ToContext(context.Background(), identity) + + req := &heartbeatpb.HeartbeatRequest{ + SendTimestamp: 1768392197, + ChainDetails: nil, + } + + resp, err := handler.Handle(ctx, req) + + require.NoError(t, err) + require.NotNil(t, resp) + assert.Equal(t, "mock-aggregator-001", resp.AggregatorId) + assert.Len(t, resp.ChainBenchmarks, 0) +} + +func TestHeartbeatHandler_Handle_EmptyChainDetails(t *testing.T) { + t.Parallel() + + storage := heartbeat.NewInMemoryStorage() + lggr := logger.TestSugared(t) + monitoring := &monitoring.NoopAggregatorMonitoring{} + committee := createTestCommittee(1) + handler := NewHeartbeatHandler(storage, "mock-aggregator-001", committee, lggr, monitoring) + + identity := auth.CreateCallerIdentity("caller-1", false) + ctx := auth.ToContext(context.Background(), identity) + + req := &heartbeatpb.HeartbeatRequest{ + SendTimestamp: 1768392197, + ChainDetails: &heartbeatpb.ChainHealthDetails{ + BlockHeightsByChain: map[uint64]uint64{}, + }, + } + + resp, err := handler.Handle(ctx, req) + + require.NoError(t, err) + require.NotNil(t, resp) + assert.Len(t, resp.ChainBenchmarks, 0) +} + +func TestHeartbeatHandler_Handle_StorageFailureDoesNotFailRequest(t *testing.T) { + t.Parallel() + + // Using NoopStorage which returns empty results + storage := heartbeat.NewNoopStorage() + lggr := logger.TestSugared(t) + monitoring := &monitoring.NoopAggregatorMonitoring{} + committee := createTestCommittee(1) + handler := NewHeartbeatHandler(storage, "mock-aggregator-001", committee, lggr, monitoring) + + identity := auth.CreateCallerIdentity("caller-1", false) + ctx := auth.ToContext(context.Background(), identity) + + req := &heartbeatpb.HeartbeatRequest{ + SendTimestamp: 1768392197, + ChainDetails: &heartbeatpb.ChainHealthDetails{ + BlockHeightsByChain: map[uint64]uint64{ + 1: 1000000, + }, + }, + } + + resp, err := handler.Handle(ctx, req) + + // Should not fail even with noop storage + require.NoError(t, err) + require.NotNil(t, resp) +} + +func TestCalculateAdaptiveScore(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + scoreBlock int64 + allBlocks []int64 + expected float64 + }{ + { + name: "no blocks returns baseline", + scoreBlock: 100, + allBlocks: []int64{}, + expected: 1.0, + }, + { + name: "at median gets score of 1.0", + scoreBlock: 100, + allBlocks: []int64{90, 95, 100, 105, 110}, + expected: 1.0, + }, + { + name: "ahead of median gets score of 1.0", + scoreBlock: 110, + allBlocks: []int64{90, 95, 100, 105, 110}, + expected: 1.0, + }, + { + name: "one MAD behind gets score of 2.0", + scoreBlock: 95, + allBlocks: []int64{90, 95, 100, 105, 110}, + expected: 2.0, // median=100, MAD=5, lag=5, score=1+(5/5)=2.0 + }, + { + name: "two MADs behind gets score of 3.0", + scoreBlock: 90, + allBlocks: []int64{90, 95, 100, 105, 110}, + expected: 3.0, // median=100, MAD=5, lag=10, score=1+(10/5)=3.0 + }, + { + name: "uniform blocks with small MAD", + scoreBlock: 100, + allBlocks: []int64{100, 100, 100, 100, 100}, + expected: 1.0, // All same, at median + }, + { + name: "behind with uniform blocks uses minimum MAD", + scoreBlock: 90, + allBlocks: []int64{100, 100, 100, 100, 100}, + expected: 11.0, // median=100, MAD=1 (min), lag=10, score=1+(10/1)=11.0 + }, + { + name: "half similar half lagging - average best node", + scoreBlock: 1010, + allBlocks: []int64{900, 910, 920, 930, 940, 1000, 1005, 1010, 1015, 1020}, + expected: 1.0, // At the median of the similar group, should score well + }, + { + name: "half similar half lagging - lagging node", + scoreBlock: 920, + allBlocks: []int64{900, 910, 920, 930, 940, 1000, 1005, 1010, 1015, 1020}, + expected: 2.25, // median=970 (avg of 940 and 1000), MAD≈40, lag=50, score≈1+(50/40)=2.25 + }, + { + name: "half similar half lagging - worst lagging node", + scoreBlock: 900, + allBlocks: []int64{900, 910, 920, 930, 940, 1000, 1005, 1010, 1015, 1020}, + expected: 2.75, // median=970, MAD≈40, lag=70, score≈1+(70/40)=2.75 + }, + { + name: "half similar half lagging - best node from leading group", + scoreBlock: 1020, + allBlocks: []int64{900, 910, 920, 930, 940, 1000, 1005, 1010, 1015, 1020}, + expected: 1.0, // Ahead of median, gets baseline score + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + score := CalculateAdaptiveScore(tt.scoreBlock, tt.allBlocks) + assert.InDelta(t, tt.expected, score, 0.1, "score mismatch for scoreBlock=%d, allBlocks=%v", tt.scoreBlock, tt.allBlocks) + }) + } +} + +func TestHeartbeatHandler_Handle_FiltersByAllowedChains(t *testing.T) { + t.Parallel() + + storage := heartbeat.NewInMemoryStorage() + lggr := logger.TestSugared(t) + monitoring := &monitoring.NoopAggregatorMonitoring{} + // Only allow chains 1 and 137 + committee := createTestCommittee(1, 137) + handler := NewHeartbeatHandler(storage, "mock-aggregator-001", committee, lggr, monitoring) + + identity := auth.CreateCallerIdentity("caller-1", false) + ctx := auth.ToContext(context.Background(), identity) + + req := &heartbeatpb.HeartbeatRequest{ + SendTimestamp: 1768392197, + ChainDetails: &heartbeatpb.ChainHealthDetails{ + BlockHeightsByChain: map[uint64]uint64{ + 1: 1000000, // Allowed + 137: 2000000, // Allowed + 42161: 3000000, // Not allowed (Arbitrum) + 10: 4000000, // Not allowed (Optimism) + }, + }, + } + + resp, err := handler.Handle(ctx, req) + + require.NoError(t, err) + require.NotNil(t, resp) + assert.Equal(t, "mock-aggregator-001", resp.AggregatorId) + // Should only return benchmarks for allowed chains (1 and 137) + assert.Len(t, resp.ChainBenchmarks, 2) + assert.Contains(t, resp.ChainBenchmarks, uint64(1)) + assert.Contains(t, resp.ChainBenchmarks, uint64(137)) + assert.NotContains(t, resp.ChainBenchmarks, uint64(42161)) + assert.NotContains(t, resp.ChainBenchmarks, uint64(10)) +} + +func TestHeartbeatHandler_Handle_AllChainsDisallowed(t *testing.T) { + t.Parallel() + + storage := heartbeat.NewInMemoryStorage() + lggr := logger.TestSugared(t) + monitoring := &monitoring.NoopAggregatorMonitoring{} + // Only allow chain 1 + committee := createTestCommittee(1) + handler := NewHeartbeatHandler(storage, "mock-aggregator-001", committee, lggr, monitoring) + + identity := auth.CreateCallerIdentity("caller-1", false) + ctx := auth.ToContext(context.Background(), identity) + + req := &heartbeatpb.HeartbeatRequest{ + SendTimestamp: 1768392197, + ChainDetails: &heartbeatpb.ChainHealthDetails{ + BlockHeightsByChain: map[uint64]uint64{ + 42161: 3000000, // Not allowed (Arbitrum) + 10: 4000000, // Not allowed (Optimism) + }, + }, + } + + resp, err := handler.Handle(ctx, req) + + require.NoError(t, err) + require.NotNil(t, resp) + // No valid chain details after filtering, should return empty benchmarks + assert.Len(t, resp.ChainBenchmarks, 0) +} + +func TestHeartbeatHandler_Handle_NoCommitteeChains(t *testing.T) { + t.Parallel() + + storage := heartbeat.NewInMemoryStorage() + lggr := logger.TestSugared(t) + monitoring := &monitoring.NoopAggregatorMonitoring{} + // Empty committee with no chains + committee := createTestCommittee() + handler := NewHeartbeatHandler(storage, "mock-aggregator-001", committee, lggr, monitoring) + + identity := auth.CreateCallerIdentity("caller-1", false) + ctx := auth.ToContext(context.Background(), identity) + + req := &heartbeatpb.HeartbeatRequest{ + SendTimestamp: 1768392197, + ChainDetails: &heartbeatpb.ChainHealthDetails{ + BlockHeightsByChain: map[uint64]uint64{ + 1: 1000000, + 137: 2000000, + }, + }, + } + + resp, err := handler.Handle(ctx, req) + + require.NoError(t, err) + require.NotNil(t, resp) + // No allowed chains, should return empty benchmarks + assert.Len(t, resp.ChainBenchmarks, 0) +} + +func TestHeartbeatHandler_Handle_SingleChainAllowed(t *testing.T) { + t.Parallel() + + storage := heartbeat.NewInMemoryStorage() + lggr := logger.TestSugared(t) + monitoring := &monitoring.NoopAggregatorMonitoring{} + // Only allow chain 137 + committee := createTestCommittee(137) + handler := NewHeartbeatHandler(storage, "mock-aggregator-001", committee, lggr, monitoring) + + identity := auth.CreateCallerIdentity("caller-1", false) + ctx := auth.ToContext(context.Background(), identity) + + req := &heartbeatpb.HeartbeatRequest{ + SendTimestamp: 1768392197, + ChainDetails: &heartbeatpb.ChainHealthDetails{ + BlockHeightsByChain: map[uint64]uint64{ + 1: 1000000, // Not allowed + 137: 2000000, // Allowed + }, + }, + } + + resp, err := handler.Handle(ctx, req) + + require.NoError(t, err) + require.NotNil(t, resp) + // Should only return benchmarks for chain 137 + assert.Len(t, resp.ChainBenchmarks, 1) + assert.Contains(t, resp.ChainBenchmarks, uint64(137)) + assert.NotContains(t, resp.ChainBenchmarks, uint64(1)) + assert.Equal(t, uint64(2000000), resp.ChainBenchmarks[137].BlockHeight) +} + +func TestHeartbeatHandler_Handle_ManyChains(t *testing.T) { + t.Parallel() + + storage := heartbeat.NewInMemoryStorage() + lggr := logger.TestSugared(t) + monitoring := &monitoring.NoopAggregatorMonitoring{} + // Allow multiple chains + committee := createTestCommittee(1, 137, 42161, 10, 43114, 8453) + handler := NewHeartbeatHandler(storage, "mock-aggregator-001", committee, lggr, monitoring) + + identity := auth.CreateCallerIdentity("caller-1", false) + ctx := auth.ToContext(context.Background(), identity) + + req := &heartbeatpb.HeartbeatRequest{ + SendTimestamp: 1768392197, + ChainDetails: &heartbeatpb.ChainHealthDetails{ + BlockHeightsByChain: map[uint64]uint64{ + 1: 1000000, // Ethereum + 137: 2000000, // Polygon + 42161: 3000000, // Arbitrum + 10: 4000000, // Optimism + 43114: 5000000, // Avalanche + 8453: 6000000, // Base + 999: 7000000, // Not allowed + }, + }, + } + + resp, err := handler.Handle(ctx, req) + + require.NoError(t, err) + require.NotNil(t, resp) + // Should return benchmarks for all allowed chains (6 out of 7) + assert.Len(t, resp.ChainBenchmarks, 6) + assert.Contains(t, resp.ChainBenchmarks, uint64(1)) + assert.Contains(t, resp.ChainBenchmarks, uint64(137)) + assert.Contains(t, resp.ChainBenchmarks, uint64(42161)) + assert.Contains(t, resp.ChainBenchmarks, uint64(10)) + assert.Contains(t, resp.ChainBenchmarks, uint64(43114)) + assert.Contains(t, resp.ChainBenchmarks, uint64(8453)) + assert.NotContains(t, resp.ChainBenchmarks, uint64(999)) +} diff --git a/aggregator/pkg/handlers/write_commit_verifier_node_result.go b/aggregator/pkg/handlers/write_commit_verifier_node_result.go index 3a5de65e8..54ab50cd8 100644 --- a/aggregator/pkg/handlers/write_commit_verifier_node_result.go +++ b/aggregator/pkg/handlers/write_commit_verifier_node_result.go @@ -2,6 +2,7 @@ package handlers import ( "context" + "strconv" "time" "google.golang.org/grpc/codes" @@ -33,6 +34,7 @@ type AggregationTriggerer interface { type WriteCommitVerifierNodeResultHandler struct { storage common.CommitVerificationStore aggregator AggregationTriggerer + m common.AggregatorMonitoring l logger.SugaredLogger signatureValidator SignatureValidator checkAggregationTimeout time.Duration @@ -100,6 +102,12 @@ func (h *WriteCommitVerifierNodeResultHandler) Handle(ctx context.Context, req * } h.logger(signerCtx).Infof("Successfully saved commit verification record") + metrics := h.m.Metrics().With( + "caller_id", identity.CallerID, + "chainselector", strconv.FormatUint(uint64(record.Message.SourceChainSelector), 10), + ) + metrics.IncrementVerificationsTotal(ctx) + if err := h.aggregator.CheckAggregation(record.MessageID, aggregationKey, model.ChannelKey(identity.CallerID), h.checkAggregationTimeout); err != nil { if err == common.ErrAggregationChannelFull { reqLogger.Errorf("Aggregation channel is full") @@ -121,10 +129,11 @@ func (h *WriteCommitVerifierNodeResultHandler) Handle(ctx context.Context, req * } // NewWriteCommitCCVNodeDataHandler creates a new instance of WriteCommitCCVNodeDataHandler. -func NewWriteCommitCCVNodeDataHandler(store common.CommitVerificationStore, aggregator AggregationTriggerer, l logger.SugaredLogger, signatureValidator SignatureValidator, checkAggregationTimeout time.Duration) *WriteCommitVerifierNodeResultHandler { +func NewWriteCommitCCVNodeDataHandler(store common.CommitVerificationStore, aggregator AggregationTriggerer, m common.AggregatorMonitoring, l logger.SugaredLogger, signatureValidator SignatureValidator, checkAggregationTimeout time.Duration) *WriteCommitVerifierNodeResultHandler { return &WriteCommitVerifierNodeResultHandler{ storage: store, aggregator: aggregator, + m: m, l: l, signatureValidator: signatureValidator, checkAggregationTimeout: checkAggregationTimeout, diff --git a/aggregator/pkg/handlers/write_commit_verifier_node_result_test.go b/aggregator/pkg/handlers/write_commit_verifier_node_result_test.go index 2a42db3a8..747629d28 100644 --- a/aggregator/pkg/handlers/write_commit_verifier_node_result_test.go +++ b/aggregator/pkg/handlers/write_commit_verifier_node_result_test.go @@ -183,7 +183,13 @@ func TestWriteCommitCCVNodeDataHandler_Handle_Table(t *testing.T) { agg.EXPECT().CheckAggregation(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe() } - handler := NewWriteCommitCCVNodeDataHandler(store, agg, lggr, sig, time.Millisecond) + mon := mocks.NewMockAggregatorMonitoring(t) + labeler := mocks.NewMockAggregatorMetricLabeler(t) + mon.EXPECT().Metrics().Return(labeler).Maybe() + labeler.EXPECT().With(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(labeler).Maybe() + labeler.EXPECT().IncrementVerificationsTotal(mock.Anything).Maybe() + + handler := NewWriteCommitCCVNodeDataHandler(store, agg, mon, lggr, sig, time.Millisecond) ctx := auth.ToContext(context.Background(), auth.CreateCallerIdentity(testCallerID, false)) resp, err := handler.Handle(ctx, tc.req) diff --git a/aggregator/pkg/heartbeat/storage.go b/aggregator/pkg/heartbeat/storage.go new file mode 100644 index 000000000..6e84720f3 --- /dev/null +++ b/aggregator/pkg/heartbeat/storage.go @@ -0,0 +1,323 @@ +package heartbeat + +import ( + "context" + "fmt" + "strconv" + "sync" + "time" + + "github.com/redis/go-redis/v9" +) + +const ( + // DefaultKeyPrefix is the default Redis key prefix for heartbeat data. + DefaultKeyPrefix = "heartbeat" + // DefaultTTL is the default TTL for heartbeat data (7 days). + DefaultTTL = 7 * 24 * time.Hour +) + +// Storage defines the interface for storing and retrieving heartbeat data. +type Storage interface { + // StoreBlockHeight stores the block height for a caller on a specific chain. + StoreBlockHeight(ctx context.Context, callerID string, chainSelector, blockHeight uint64) error + // GetBlockHeights returns the block heights for all callers on a specific chain. + GetBlockHeights(ctx context.Context, chainSelector uint64) (map[string]uint64, error) + // GetMaxBlockHeight returns the maximum block height across all callers for a specific chain. + GetMaxBlockHeight(ctx context.Context, chainSelector uint64) (uint64, error) + // GetMaxBlockHeights returns the maximum block heights across all callers for multiple chains. + GetMaxBlockHeights(ctx context.Context, chainSelectors []uint64) (map[uint64]uint64, error) +} + +// RedisStorage implements Storage using Redis. +type RedisStorage struct { + client *redis.Client + keyPrefix string + ttl time.Duration +} + +// NewRedisStorage creates a new Redis-backed heartbeat storage. +func NewRedisStorage(client *redis.Client, keyPrefix string, ttl time.Duration) *RedisStorage { + if keyPrefix == "" { + keyPrefix = DefaultKeyPrefix + } + if ttl == 0 { + ttl = DefaultTTL + } + return &RedisStorage{ + client: client, + keyPrefix: keyPrefix, + ttl: ttl, + } +} + +// StoreBlockHeight stores the block height for a caller on a specific chain. +func (s *RedisStorage) StoreBlockHeight(ctx context.Context, callerID string, chainSelector, blockHeight uint64) error { + key := s.buildKey(callerID, chainSelector) + err := s.client.Set(ctx, key, blockHeight, s.ttl).Err() + if err != nil { + return fmt.Errorf("failed to store block height for caller %s chain %d: %w", callerID, chainSelector, err) + } + return nil +} + +// GetBlockHeights returns the block heights for all callers on a specific chain. +func (s *RedisStorage) GetBlockHeights(ctx context.Context, chainSelector uint64) (map[string]uint64, error) { + pattern := s.buildPattern(chainSelector) + result := make(map[string]uint64) + var cursor uint64 + + for { + // Scan for keys matching the pattern + keys, nextCursor, err := s.client.Scan(ctx, cursor, pattern, 100).Result() + if err != nil { + return nil, fmt.Errorf("failed to scan keys for chain %d: %w", chainSelector, err) + } + + // Get all values for the found keys + if len(keys) > 0 { //nolint:nestif // Reasonable complexity for Redis scan pattern + values, err := s.client.MGet(ctx, keys...).Result() + if err != nil { + return nil, fmt.Errorf("failed to get values for chain %d: %w", chainSelector, err) + } + + // Parse keys to extract caller IDs and map to block heights + for i, key := range keys { + if values[i] == nil { + continue + } + heightStr, ok := values[i].(string) + if !ok { + continue + } + height, err := strconv.ParseUint(heightStr, 10, 64) + if err != nil { + continue + } + + // Extract caller ID from key (format: prefix:callerID:chainSelector) + callerID := s.extractCallerID(key) + if callerID != "" { + result[callerID] = height + } + } + } + + cursor = nextCursor + if cursor == 0 { + break + } + } + + return result, nil +} + +// GetMaxBlockHeight returns the maximum block height across all callers for a specific chain. +func (s *RedisStorage) GetMaxBlockHeight(ctx context.Context, chainSelector uint64) (uint64, error) { + pattern := s.buildPattern(chainSelector) + + var maxHeight uint64 + var cursor uint64 + + for { + // Scan for keys matching the pattern + keys, nextCursor, err := s.client.Scan(ctx, cursor, pattern, 100).Result() + if err != nil { + return 0, fmt.Errorf("failed to scan keys for chain %d: %w", chainSelector, err) + } + + // Get all values for the found keys + if len(keys) > 0 { //nolint:nestif // Reasonable complexity + values, err := s.client.MGet(ctx, keys...).Result() + if err != nil { + return 0, fmt.Errorf("failed to get values for chain %d: %w", chainSelector, err) + } + + // Find the maximum value + for _, val := range values { + if val == nil { + continue + } + heightStr, ok := val.(string) + if !ok { + continue + } + height, err := strconv.ParseUint(heightStr, 10, 64) + if err != nil { + continue + } + if height > maxHeight { + maxHeight = height + } + } + } + + cursor = nextCursor + if cursor == 0 { + break + } + } + + return maxHeight, nil +} + +// GetMaxBlockHeights returns the maximum block heights across all callers for multiple chains. +func (s *RedisStorage) GetMaxBlockHeights(ctx context.Context, chainSelectors []uint64) (map[uint64]uint64, error) { + result := make(map[uint64]uint64, len(chainSelectors)) + + for _, chainSelector := range chainSelectors { + maxHeight, err := s.GetMaxBlockHeight(ctx, chainSelector) + if err != nil { + return nil, fmt.Errorf("failed to get max block height for chain %d: %w", chainSelector, err) + } + if maxHeight > 0 { + result[chainSelector] = maxHeight + } + } + + return result, nil +} + +// buildKey creates a Redis key for a specific caller and chain. +// Format: ::. +func (s *RedisStorage) buildKey(callerID string, chainSelector uint64) string { + return fmt.Sprintf("%s:%s:%d", s.keyPrefix, callerID, chainSelector) +} + +// buildPattern creates a Redis key pattern for scanning all callers on a specific chain. +// Format: :*:. +func (s *RedisStorage) buildPattern(chainSelector uint64) string { + return fmt.Sprintf("%s:*:%d", s.keyPrefix, chainSelector) +} + +// extractCallerID extracts the caller ID from a Redis key. +// Key format: ::. +func (s *RedisStorage) extractCallerID(key string) string { + // Remove prefix + prefixLen := len(s.keyPrefix) + 1 // +1 for the colon + if len(key) <= prefixLen { + return "" + } + remainder := key[prefixLen:] + + // Find the last colon to separate caller ID from chain selector + lastColon := -1 + for i := len(remainder) - 1; i >= 0; i-- { + if remainder[i] == ':' { + lastColon = i + break + } + } + + if lastColon == -1 { + return "" + } + + return remainder[:lastColon] +} + +// InMemoryStorage implements Storage using in-memory maps with thread-safety. +type InMemoryStorage struct { + mu sync.RWMutex + // data maps "callerID:chainSelector" -> blockHeight + data map[string]uint64 +} + +// NewInMemoryStorage creates a new in-memory heartbeat storage. +func NewInMemoryStorage() *InMemoryStorage { + return &InMemoryStorage{ + data: make(map[string]uint64), + } +} + +// StoreBlockHeight stores the block height for a caller on a specific chain. +func (s *InMemoryStorage) StoreBlockHeight(ctx context.Context, callerID string, chainSelector, blockHeight uint64) error { + key := fmt.Sprintf("%s:%d", callerID, chainSelector) + + s.mu.Lock() + defer s.mu.Unlock() + + s.data[key] = blockHeight + return nil +} + +// GetBlockHeights returns the block heights for all callers on a specific chain. +func (s *InMemoryStorage) GetBlockHeights(ctx context.Context, chainSelector uint64) (map[string]uint64, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + result := make(map[string]uint64) + suffix := fmt.Sprintf(":%d", chainSelector) + + for key, height := range s.data { + // Check if this key belongs to the requested chain + if len(key) >= len(suffix) && key[len(key)-len(suffix):] == suffix { + // Extract caller ID (everything before the suffix) + callerID := key[:len(key)-len(suffix)] + result[callerID] = height + } + } + + return result, nil +} + +// GetMaxBlockHeight returns the maximum block height across all callers for a specific chain. +func (s *InMemoryStorage) GetMaxBlockHeight(ctx context.Context, chainSelector uint64) (uint64, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + var maxHeight uint64 + suffix := fmt.Sprintf(":%d", chainSelector) + + for key, height := range s.data { + // Check if this key belongs to the requested chain + if len(key) >= len(suffix) && key[len(key)-len(suffix):] == suffix { + if height > maxHeight { + maxHeight = height + } + } + } + + return maxHeight, nil +} + +// GetMaxBlockHeights returns the maximum block heights across all callers for multiple chains. +func (s *InMemoryStorage) GetMaxBlockHeights(ctx context.Context, chainSelectors []uint64) (map[uint64]uint64, error) { + result := make(map[uint64]uint64, len(chainSelectors)) + + for _, chainSelector := range chainSelectors { + maxHeight, err := s.GetMaxBlockHeight(ctx, chainSelector) + if err != nil { + return nil, fmt.Errorf("failed to get max block height for chain %d: %w", chainSelector, err) + } + if maxHeight > 0 { + result[chainSelector] = maxHeight + } + } + + return result, nil +} + +// NoopStorage is a no-op implementation of Storage. +type NoopStorage struct{} + +// NewNoopStorage creates a new no-op storage. +func NewNoopStorage() *NoopStorage { + return &NoopStorage{} +} + +func (n *NoopStorage) StoreBlockHeight(ctx context.Context, callerID string, chainSelector, blockHeight uint64) error { + return nil +} + +func (n *NoopStorage) GetBlockHeights(ctx context.Context, chainSelector uint64) (map[string]uint64, error) { + return make(map[string]uint64), nil +} + +func (n *NoopStorage) GetMaxBlockHeight(ctx context.Context, chainSelector uint64) (uint64, error) { + return 0, nil +} + +func (n *NoopStorage) GetMaxBlockHeights(ctx context.Context, chainSelectors []uint64) (map[uint64]uint64, error) { + return make(map[uint64]uint64), nil +} diff --git a/aggregator/pkg/model/config.go b/aggregator/pkg/model/config.go index ef4f8373a..e5afcf7cc 100644 --- a/aggregator/pkg/model/config.go +++ b/aggregator/pkg/model/config.go @@ -302,6 +302,7 @@ type BeholderConfig struct { // AggregatorConfig is the root configuration for the pb. type AggregatorConfig struct { + AggregatorID string `toml:"aggregatorID"` GeneratedConfigPath string `toml:"generatedConfigPath"` Committee *Committee `toml:"committee"` Server ServerConfig `toml:"server"` @@ -407,6 +408,14 @@ func (c *AggregatorConfig) GetClientByClientID(clientID string) (auth.ClientConf // SetDefaults sets default values for the configuration. func (c *AggregatorConfig) SetDefaults() { + // AggregatorID defaults to hostname if not set + if c.AggregatorID == "" { + hostname, err := os.Hostname() + if err != nil { + hostname = "unknown" + } + c.AggregatorID = hostname + } // Batch verifier result defaults if c.MaxMessageIDsPerBatch == 0 { c.MaxMessageIDsPerBatch = 100 diff --git a/aggregator/pkg/monitoring/metrics.go b/aggregator/pkg/monitoring/metrics.go index bbeecc8f5..df8432148 100644 --- a/aggregator/pkg/monitoring/metrics.go +++ b/aggregator/pkg/monitoring/metrics.go @@ -36,6 +36,15 @@ type AggregatorMetrics struct { // Worker health metrics panics metric.Int64Counter + + // Verifiers health metrics + verifierHeartbeatTimestamp metric.Float64Gauge + verifierHeartbeatsTotal metric.Int64Counter + verifierHeartbeatChainHeads metric.Int64Gauge + verifierHeartbeatScore metric.Float64Gauge + + // Participation metrics + verificationsTotal metric.Int64Counter } func MetricViews() []sdkmetric.View { @@ -188,6 +197,46 @@ func InitMetrics() (am *AggregatorMetrics, err error) { return nil, fmt.Errorf("failed to register panics counter: %w", err) } + am.verifierHeartbeatTimestamp, err = beholder.GetMeter().Float64Gauge( + "aggregator_heartbeat_verifier_heartbeat_timestamp", + metric.WithDescription("Timestamp of the last heartbeat received from verifiers"), + ) + if err != nil { + return nil, fmt.Errorf("failed to register verifier heartbeat timestamp gauge: %w", err) + } + + am.verifierHeartbeatsTotal, err = beholder.GetMeter().Int64Counter( + "aggregator_heartbeat_verifier_heartbeats_total", + metric.WithDescription("Total number of verifier heartbeats received"), + ) + if err != nil { + return nil, fmt.Errorf("failed to register verifier heartbeats total counter: %w", err) + } + + am.verifierHeartbeatChainHeads, err = beholder.GetMeter().Int64Gauge( + "aggregator_heartbeat_verifier_chain_heads", + metric.WithDescription("Latest chain head reported by verifiers"), + ) + if err != nil { + return nil, fmt.Errorf("failed to register verifier heartbeat chain heads gauge: %w", err) + } + + am.verifierHeartbeatScore, err = beholder.GetMeter().Float64Gauge( + "aggregator_heartbeat_verifier_score", + metric.WithDescription("Health score of verifiers based on heartbeats"), + ) + if err != nil { + return nil, fmt.Errorf("failed to register verifier heartbeat score gauge: %w", err) + } + + am.verificationsTotal, err = beholder.GetMeter().Int64Counter( + "aggregator_verifications_total", + metric.WithDescription("Total number of verification records processed"), + ) + if err != nil { + return nil, fmt.Errorf("failed to register verifications total counter: %w", err) + } + return am, nil } @@ -286,3 +335,28 @@ func (c *AggregatorMetricLabeler) IncrementPanics(ctx context.Context) { otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes() c.am.panics.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } + +func (c *AggregatorMetricLabeler) IncrementVerifierHeartbeatsTotal(ctx context.Context) { + otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes() + c.am.verifierHeartbeatsTotal.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (c *AggregatorMetricLabeler) SetVerifierHeartbeatChainHeads(ctx context.Context, blockHeight uint64) { + otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes() + c.am.verifierHeartbeatChainHeads.Record(ctx, int64(blockHeight), metric.WithAttributes(otelLabels...)) // #nosec G115 -- block heights are within int64 range +} + +func (c *AggregatorMetricLabeler) SetVerifierLastHeartbeatTimestamp(ctx context.Context, timestamp int64) { + otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes() + c.am.verifierHeartbeatTimestamp.Record(ctx, float64(timestamp), metric.WithAttributes(otelLabels...)) +} + +func (c *AggregatorMetricLabeler) SetVerifierHeartbeatScore(ctx context.Context, score float64) { + otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes() + c.am.verifierHeartbeatScore.Record(ctx, score, metric.WithAttributes(otelLabels...)) +} + +func (c *AggregatorMetricLabeler) IncrementVerificationsTotal(ctx context.Context) { + otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes() + c.am.verificationsTotal.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} diff --git a/aggregator/pkg/monitoring/noop.go b/aggregator/pkg/monitoring/noop.go index c5f874033..1e0b188dd 100644 --- a/aggregator/pkg/monitoring/noop.go +++ b/aggregator/pkg/monitoring/noop.go @@ -90,3 +90,23 @@ func (c *NoopAggregatorMetricLabeler) IncrementOrphanRecoveryErrors(ctx context. func (c *NoopAggregatorMetricLabeler) IncrementPanics(ctx context.Context) { // No-op } + +func (c *NoopAggregatorMetricLabeler) SetVerifierHeartbeatScore(ctx context.Context, score float64) { + // No-op +} + +func (c *NoopAggregatorMetricLabeler) SetVerifierLastHeartbeatTimestamp(ctx context.Context, timestamp int64) { + // No-op +} + +func (c *NoopAggregatorMetricLabeler) IncrementVerifierHeartbeatsTotal(ctx context.Context) { + // No-op +} + +func (c *NoopAggregatorMetricLabeler) SetVerifierHeartbeatChainHeads(ctx context.Context, blockHeight uint64) { + // No-op +} + +func (c *NoopAggregatorMetricLabeler) IncrementVerificationsTotal(ctx context.Context) { + // No-op +} diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index 6223f94a5..6e3d0e2ef 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -28,6 +28,7 @@ import ( "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/common" "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/handlers" "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/health" + "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/heartbeat" "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/middlewares" "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/model" "github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/monitoring" @@ -37,6 +38,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" committeepb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/committee-verifier/v1" + heartbeatpb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat/v1" msgdiscoverypb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/message-discovery/v1" verifierpb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier/v1" ) @@ -46,6 +48,7 @@ type Server struct { committeepb.UnimplementedCommitteeVerifierServer verifierpb.UnimplementedVerifierServer msgdiscoverypb.UnimplementedMessageDiscoveryServer + heartbeatpb.UnimplementedHeartbeatServiceServer l logger.SugaredLogger config *model.AggregatorConfig @@ -56,6 +59,7 @@ type Server struct { writeCommitVerifierNodeResultHandler *handlers.WriteCommitVerifierNodeResultHandler getMessagesSinceHandler *handlers.GetMessagesSinceHandler getVerifierResultsForMessageHandler *handlers.GetVerifierResultsForMessageHandler + heartbeatHandler *handlers.HeartbeatHandler grpcServer *grpc.Server batchWriteCommitVerifierNodeResultHandler *handlers.BatchWriteCommitVerifierNodeResultHandler httpHealthServer *health.HTTPHealthServer @@ -88,6 +92,10 @@ func (s *Server) GetMessagesSince(ctx context.Context, req *msgdiscoverypb.GetMe return s.getMessagesSinceHandler.Handle(ctx, req) } +func (s *Server) SendHeartbeat(ctx context.Context, req *heartbeatpb.HeartbeatRequest) (*heartbeatpb.HeartbeatResponse, error) { + return s.heartbeatHandler.Handle(ctx, req) +} + func (s *Server) Start(lis net.Listener) error { s.mu.Lock() defer s.mu.Unlock() @@ -304,12 +312,19 @@ func NewServer(l logger.SugaredLogger, config *model.AggregatorConfig) *Server { agg := createAggregator(store, store, store, validator, config, l, aggMonitoring) - writeCommitVerifierNodeResultHandler := handlers.NewWriteCommitCCVNodeDataHandler(store, agg, l, validator, config.Aggregation.CheckAggregationTimeout) + writeCommitVerifierNodeResultHandler := handlers.NewWriteCommitCCVNodeDataHandler(store, agg, aggMonitoring, l, validator, config.Aggregation.CheckAggregationTimeout) readCommitVerifierNodeResultHandler := handlers.NewReadCommitVerifierNodeResultHandler(store, l) getMessagesSinceHandler := handlers.NewGetMessagesSinceHandler(store, config.Committee, l, aggMonitoring) getVerifierResultsForMessageHandler := handlers.NewGetVerifierResultsForMessageHandler(store, config.Committee, config.MaxMessageIDsPerBatch, l) batchWriteCommitVerifierNodeResultHandler := handlers.NewBatchWriteCommitVerifierNodeResultHandler(writeCommitVerifierNodeResultHandler, config.MaxCommitVerifierNodeResultRequestsPerBatch) + // Create in-memory heartbeat storage and handler + // TODO: switch to Redis-based storage when available + heartbeatStorage := heartbeat.NewInMemoryStorage() + heartbeatHandler := handlers.NewHeartbeatHandler(heartbeatStorage, config.AggregatorID, config.Committee, l, aggMonitoring) + + l.Info("Using in-memory heartbeat storage") + // Initialize middlewares loggingMiddleware := middlewares.NewLoggingMiddleware(l) metricsMiddleware := middlewares.NewMetricMiddleware(aggMonitoring) @@ -401,6 +416,7 @@ func NewServer(l logger.SugaredLogger, config *model.AggregatorConfig) *Server { getMessagesSinceHandler: getMessagesSinceHandler, getVerifierResultsForMessageHandler: getVerifierResultsForMessageHandler, batchWriteCommitVerifierNodeResultHandler: batchWriteCommitVerifierNodeResultHandler, + heartbeatHandler: heartbeatHandler, httpHealthServer: httpHealthServer, healthManager: healthManager, grpcServer: grpcServer, @@ -411,6 +427,7 @@ func NewServer(l logger.SugaredLogger, config *model.AggregatorConfig) *Server { verifierpb.RegisterVerifierServer(grpcServer, server) msgdiscoverypb.RegisterMessageDiscoveryServer(grpcServer, server) committeepb.RegisterCommitteeVerifierServer(grpcServer, server) + heartbeatpb.RegisterHeartbeatServiceServer(grpcServer, server) if os.Getenv("AGGREGATOR_GRPC_REFLECTION_ENABLED") == "true" { reflection.Register(grpcServer) diff --git a/build/devenv/fakes/go.sum b/build/devenv/fakes/go.sum index e17020dd4..861016187 100644 --- a/build/devenv/fakes/go.sum +++ b/build/devenv/fakes/go.sum @@ -266,12 +266,12 @@ golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= -google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 h1:BIRfGDEjiHRrk0QKZe3Xv2ieMhtgRGeLcZQ0mIVn4EY= -google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5/go.mod h1:j3QtIyytwqGr1JUDtYXwtMXWPKsEa5LtzIFN1Wn5WvE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 h1:eaY8u2EuxbRv7c3NiGK0/NedzVsCcV6hDuU5qPX5EGE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5/go.mod h1:M4/wBTSeyLxupu3W3tJtOgB14jILAS/XWPSSa3TAlJc= -google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A= -google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c= +google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda h1:+2XxjfsAu6vqFxwGBRcHiMaDCuZiqXGDUDVWVtrFAnE= +google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda/go.mod h1:fDMmzKV90WSg1NbozdqrE64fkuTv6mlq2zxo9ad+3yo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda h1:i/Q+bfisr7gq6feoJnS/DlpdwEL4ihp41fvRiM3Ork0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= +google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= +google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/build/devenv/go.mod b/build/devenv/go.mod index 035f667c7..73c08ae3a 100644 --- a/build/devenv/go.mod +++ b/build/devenv/go.mod @@ -44,7 +44,7 @@ require ( github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d github.com/smartcontractkit/chainlink-testing-framework/wasp v1.51.1 github.com/spf13/cobra v1.8.1 - google.golang.org/grpc v1.77.0 + google.golang.org/grpc v1.78.0 ) require ( @@ -53,6 +53,7 @@ require ( github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/oapi-codegen/runtime v1.1.2 // indirect + github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat v0.0.0-20260115142640-f6b99095c12e // indirect github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b // indirect github.com/spf13/pflag v1.0.10 // indirect ) @@ -431,8 +432,8 @@ require ( golang.org/x/time v0.12.0 // indirect golang.org/x/tools v0.39.0 // indirect google.golang.org/api v0.221.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/guregu/null.v4 v4.0.0 // indirect diff --git a/build/devenv/go.sum b/build/devenv/go.sum index d79efdbf3..c302d1c68 100644 --- a/build/devenv/go.sum +++ b/build/devenv/go.sum @@ -1091,6 +1091,8 @@ github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20251222115927-36a github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20251222115927-36a18321243c/go.mod h1:oyfOm4k0uqmgZIfxk1elI/59B02shbbJQiiUdPdbMgI= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/committee-verifier v0.0.0-20251211142334-5c3421fe2c8d h1:VYoBBNnQpZ5p+enPTl8SkKBRaubqyGpO0ul3B1np++I= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/committee-verifier v0.0.0-20251211142334-5c3421fe2c8d/go.mod h1:oNFoKHRIerxuaANa8ASNejtHrdsG26LqGtQ2XhSac2g= +github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat v0.0.0-20260115142640-f6b99095c12e h1:c7vgdeidC0LMtV1a01B/rPL4fEC/cnPanRDflRijXCM= +github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat v0.0.0-20260115142640-f6b99095c12e/go.mod h1:rZV/gLc1wlSp2r5oXN09iOrlyZPFX4iK+cqoSW2k5dc= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/message-discovery v0.0.0-20251211142334-5c3421fe2c8d h1:pKCyW7BYzO5GThFNlXZY0Azx/yOnI4b5GeuLeU23ie0= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/message-discovery v0.0.0-20251211142334-5c3421fe2c8d/go.mod h1:ATjAPIVJibHRcIfiG47rEQkUIOoYa6KDvWj3zwCAw6g= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d h1:AJy55QJ/pBhXkZjc7N+ATnWfxrcjq9BI9DmdtdjwDUQ= @@ -1606,10 +1608,10 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20200324203455-a04cca1dde73/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210401141331-865547bb08e2/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A= -google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8 h1:mepRgnBZa07I4TRuomDE4sTIYieg/osKmzIf4USdWS4= -google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8/go.mod h1:fDMmzKV90WSg1NbozdqrE64fkuTv6mlq2zxo9ad+3yo= -google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 h1:M1rk8KBnUsBDg1oPGHNCxG4vc1f49epmTO7xscSajMk= -google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= +google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda h1:+2XxjfsAu6vqFxwGBRcHiMaDCuZiqXGDUDVWVtrFAnE= +google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda/go.mod h1:fDMmzKV90WSg1NbozdqrE64fkuTv6mlq2zxo9ad+3yo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda h1:i/Q+bfisr7gq6feoJnS/DlpdwEL4ihp41fvRiM3Ork0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= @@ -1617,8 +1619,8 @@ google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= -google.golang.org/grpc v1.77.0 h1:wVVY6/8cGA6vvffn+wWK5ToddbgdU3d8MNENr4evgXM= -google.golang.org/grpc v1.77.0/go.mod h1:z0BY1iVj0q8E1uSQCjL9cppRj+gnZjzDnzV0dHhrNig= +google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= +google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/build/devenv/services/aggregator.template.toml b/build/devenv/services/aggregator.template.toml index 882943e04..3a03469e2 100644 --- a/build/devenv/services/aggregator.template.toml +++ b/build/devenv/services/aggregator.template.toml @@ -42,11 +42,13 @@ key_prefix = "ratelimit" [rateLimiting.defaultLimits] "/chainlink_ccv.message_discovery.v1.MessageDiscovery/GetMessagesSince" = { limit_per_minute = 1200 } "/chainlink_ccv.verifier.v1.Verifier/GetVerifierResultsForMessage" = { limit_per_minute = 6000 } +"/chainlink_ccv.heartbeat.v1.HeartbeatService/SendHeartbeat" = { limit_per_minute = 6000 } # Group-based rate limits (more restrictive than defaults for specific groups) [rateLimiting.groupLimits.verifiers] "/chainlink_ccv.committee_verifier.v1.CommitteeVerifier/WriteCommitteeVerifierNodeResult" = { limit_per_minute = 6000 } "/chainlink_ccv.committee_verifier.v1.CommitteeVerifier/BatchWriteCommitteeVerifierNodeResult" = { limit_per_minute = 150 } +"/chainlink_ccv.heartbeat.v1.HeartbeatService/SendHeartbeat" = { limit_per_minute = 6000 } # Low rate limit group for service tests [rateLimiting.groupLimits.service-tests] diff --git a/go.mod b/go.mod index d3ec68d02..87206a491 100644 --- a/go.mod +++ b/go.mod @@ -38,8 +38,8 @@ require ( go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20250711185948-6ae5c78190dc // indirect golang.org/x/sync v0.18.0 - google.golang.org/grpc v1.76.0 - google.golang.org/protobuf v1.36.9 + google.golang.org/grpc v1.78.0 + google.golang.org/protobuf v1.36.11 modernc.org/sqlite v1.39.0 // indirect ) @@ -53,6 +53,7 @@ require ( github.com/smartcontractkit/chainlink-common/keystore v0.1.1-0.20260108190337-7f47b3ae8537 github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20251210101658-1c5c8e4c4f15 github.com/smartcontractkit/chainlink-protos/chainlink-ccv/committee-verifier v0.0.0-20251211142334-5c3421fe2c8d + github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat v0.0.0-20260115142640-f6b99095c12e github.com/smartcontractkit/chainlink-protos/chainlink-ccv/message-discovery v0.0.0-20251211142334-5c3421fe2c8d github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d github.com/smartcontractkit/chainlink-testing-framework/framework v0.12.5 @@ -61,7 +62,7 @@ require ( github.com/testcontainers/testcontainers-go/modules/postgres v0.39.0 golang.org/x/crypto v0.45.0 golang.org/x/time v0.12.0 - google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 + google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda ) require ( @@ -274,7 +275,8 @@ require ( golang.org/x/net v0.47.0 // indirect golang.org/x/sys v0.38.0 // indirect golang.org/x/text v0.31.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 // indirect + golang.org/x/tools v0.38.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda // indirect gopkg.in/guregu/null.v4 v4.0.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index c0a99ab41..c50e2dae0 100644 --- a/go.sum +++ b/go.sum @@ -624,6 +624,8 @@ github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20251021173435- github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20251021173435-e86785845942/go.mod h1:2JTBNp3FlRdO/nHc4dsc9bfxxMClMO1Qt8sLJgtreBY= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/committee-verifier v0.0.0-20251211142334-5c3421fe2c8d h1:VYoBBNnQpZ5p+enPTl8SkKBRaubqyGpO0ul3B1np++I= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/committee-verifier v0.0.0-20251211142334-5c3421fe2c8d/go.mod h1:oNFoKHRIerxuaANa8ASNejtHrdsG26LqGtQ2XhSac2g= +github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat v0.0.0-20260115142640-f6b99095c12e h1:c7vgdeidC0LMtV1a01B/rPL4fEC/cnPanRDflRijXCM= +github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat v0.0.0-20260115142640-f6b99095c12e/go.mod h1:rZV/gLc1wlSp2r5oXN09iOrlyZPFX4iK+cqoSW2k5dc= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/message-discovery v0.0.0-20251211142334-5c3421fe2c8d h1:pKCyW7BYzO5GThFNlXZY0Azx/yOnI4b5GeuLeU23ie0= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/message-discovery v0.0.0-20251211142334-5c3421fe2c8d/go.mod h1:ATjAPIVJibHRcIfiG47rEQkUIOoYa6KDvWj3zwCAw6g= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d h1:AJy55QJ/pBhXkZjc7N+ATnWfxrcjq9BI9DmdtdjwDUQ= @@ -976,18 +978,18 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20200324203455-a04cca1dde73/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210401141331-865547bb08e2/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A= -google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 h1:BIRfGDEjiHRrk0QKZe3Xv2ieMhtgRGeLcZQ0mIVn4EY= -google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5/go.mod h1:j3QtIyytwqGr1JUDtYXwtMXWPKsEa5LtzIFN1Wn5WvE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 h1:eaY8u2EuxbRv7c3NiGK0/NedzVsCcV6hDuU5qPX5EGE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5/go.mod h1:M4/wBTSeyLxupu3W3tJtOgB14jILAS/XWPSSa3TAlJc= +google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda h1:+2XxjfsAu6vqFxwGBRcHiMaDCuZiqXGDUDVWVtrFAnE= +google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda/go.mod h1:fDMmzKV90WSg1NbozdqrE64fkuTv6mlq2zxo9ad+3yo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda h1:i/Q+bfisr7gq6feoJnS/DlpdwEL4ihp41fvRiM3Ork0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= -google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A= -google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c= +google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= +google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -999,8 +1001,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw= -google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/indexer/cmd/oapigen/go.mod b/indexer/cmd/oapigen/go.mod index 6ba3ae89a..26f0af220 100644 --- a/indexer/cmd/oapigen/go.mod +++ b/indexer/cmd/oapigen/go.mod @@ -68,7 +68,7 @@ require ( golang.org/x/sys v0.39.0 // indirect golang.org/x/telemetry v0.0.0-20251222180846-3f2a21fb04ff // indirect golang.org/x/text v0.31.0 // indirect - google.golang.org/protobuf v1.36.9 // indirect + google.golang.org/protobuf v1.36.11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/indexer/cmd/oapigen/go.sum b/indexer/cmd/oapigen/go.sum index fd458e2b6..50255c8ea 100644 --- a/indexer/cmd/oapigen/go.sum +++ b/indexer/cmd/oapigen/go.sum @@ -428,14 +428,14 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= -google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 h1:BIRfGDEjiHRrk0QKZe3Xv2ieMhtgRGeLcZQ0mIVn4EY= -google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5/go.mod h1:j3QtIyytwqGr1JUDtYXwtMXWPKsEa5LtzIFN1Wn5WvE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 h1:eaY8u2EuxbRv7c3NiGK0/NedzVsCcV6hDuU5qPX5EGE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5/go.mod h1:M4/wBTSeyLxupu3W3tJtOgB14jILAS/XWPSSa3TAlJc= -google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A= -google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c= -google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw= -google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda h1:+2XxjfsAu6vqFxwGBRcHiMaDCuZiqXGDUDVWVtrFAnE= +google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda/go.mod h1:fDMmzKV90WSg1NbozdqrE64fkuTv6mlq2zxo9ad+3yo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda h1:i/Q+bfisr7gq6feoJnS/DlpdwEL4ihp41fvRiM3Ork0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= +google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= +google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/internal/mocks/mock_AggregatorMetricLabeler.go b/internal/mocks/mock_AggregatorMetricLabeler.go index 9cb6385fb..ecbf46f10 100644 --- a/internal/mocks/mock_AggregatorMetricLabeler.go +++ b/internal/mocks/mock_AggregatorMetricLabeler.go @@ -324,6 +324,72 @@ func (_c *MockAggregatorMetricLabeler_IncrementStorageError_Call) RunAndReturn(r return _c } +// IncrementVerificationsTotal provides a mock function with given fields: ctx +func (_m *MockAggregatorMetricLabeler) IncrementVerificationsTotal(ctx context.Context) { + _m.Called(ctx) +} + +// MockAggregatorMetricLabeler_IncrementVerificationsTotal_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IncrementVerificationsTotal' +type MockAggregatorMetricLabeler_IncrementVerificationsTotal_Call struct { + *mock.Call +} + +// IncrementVerificationsTotal is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockAggregatorMetricLabeler_Expecter) IncrementVerificationsTotal(ctx interface{}) *MockAggregatorMetricLabeler_IncrementVerificationsTotal_Call { + return &MockAggregatorMetricLabeler_IncrementVerificationsTotal_Call{Call: _e.mock.On("IncrementVerificationsTotal", ctx)} +} + +func (_c *MockAggregatorMetricLabeler_IncrementVerificationsTotal_Call) Run(run func(ctx context.Context)) *MockAggregatorMetricLabeler_IncrementVerificationsTotal_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockAggregatorMetricLabeler_IncrementVerificationsTotal_Call) Return() *MockAggregatorMetricLabeler_IncrementVerificationsTotal_Call { + _c.Call.Return() + return _c +} + +func (_c *MockAggregatorMetricLabeler_IncrementVerificationsTotal_Call) RunAndReturn(run func(context.Context)) *MockAggregatorMetricLabeler_IncrementVerificationsTotal_Call { + _c.Run(run) + return _c +} + +// IncrementVerifierHeartbeatsTotal provides a mock function with given fields: ctx +func (_m *MockAggregatorMetricLabeler) IncrementVerifierHeartbeatsTotal(ctx context.Context) { + _m.Called(ctx) +} + +// MockAggregatorMetricLabeler_IncrementVerifierHeartbeatsTotal_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IncrementVerifierHeartbeatsTotal' +type MockAggregatorMetricLabeler_IncrementVerifierHeartbeatsTotal_Call struct { + *mock.Call +} + +// IncrementVerifierHeartbeatsTotal is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockAggregatorMetricLabeler_Expecter) IncrementVerifierHeartbeatsTotal(ctx interface{}) *MockAggregatorMetricLabeler_IncrementVerifierHeartbeatsTotal_Call { + return &MockAggregatorMetricLabeler_IncrementVerifierHeartbeatsTotal_Call{Call: _e.mock.On("IncrementVerifierHeartbeatsTotal", ctx)} +} + +func (_c *MockAggregatorMetricLabeler_IncrementVerifierHeartbeatsTotal_Call) Run(run func(ctx context.Context)) *MockAggregatorMetricLabeler_IncrementVerifierHeartbeatsTotal_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockAggregatorMetricLabeler_IncrementVerifierHeartbeatsTotal_Call) Return() *MockAggregatorMetricLabeler_IncrementVerifierHeartbeatsTotal_Call { + _c.Call.Return() + return _c +} + +func (_c *MockAggregatorMetricLabeler_IncrementVerifierHeartbeatsTotal_Call) RunAndReturn(run func(context.Context)) *MockAggregatorMetricLabeler_IncrementVerifierHeartbeatsTotal_Call { + _c.Run(run) + return _c +} + // RecordAPIRequestDuration provides a mock function with given fields: ctx, duration func (_m *MockAggregatorMetricLabeler) RecordAPIRequestDuration(ctx context.Context, duration time.Duration) { _m.Called(ctx, duration) @@ -562,6 +628,108 @@ func (_c *MockAggregatorMetricLabeler_SetOrphanExpiredBacklog_Call) RunAndReturn return _c } +// SetVerifierHeartbeatChainHeads provides a mock function with given fields: ctx, blockHeight +func (_m *MockAggregatorMetricLabeler) SetVerifierHeartbeatChainHeads(ctx context.Context, blockHeight uint64) { + _m.Called(ctx, blockHeight) +} + +// MockAggregatorMetricLabeler_SetVerifierHeartbeatChainHeads_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetVerifierHeartbeatChainHeads' +type MockAggregatorMetricLabeler_SetVerifierHeartbeatChainHeads_Call struct { + *mock.Call +} + +// SetVerifierHeartbeatChainHeads is a helper method to define mock.On call +// - ctx context.Context +// - blockHeight uint64 +func (_e *MockAggregatorMetricLabeler_Expecter) SetVerifierHeartbeatChainHeads(ctx interface{}, blockHeight interface{}) *MockAggregatorMetricLabeler_SetVerifierHeartbeatChainHeads_Call { + return &MockAggregatorMetricLabeler_SetVerifierHeartbeatChainHeads_Call{Call: _e.mock.On("SetVerifierHeartbeatChainHeads", ctx, blockHeight)} +} + +func (_c *MockAggregatorMetricLabeler_SetVerifierHeartbeatChainHeads_Call) Run(run func(ctx context.Context, blockHeight uint64)) *MockAggregatorMetricLabeler_SetVerifierHeartbeatChainHeads_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(uint64)) + }) + return _c +} + +func (_c *MockAggregatorMetricLabeler_SetVerifierHeartbeatChainHeads_Call) Return() *MockAggregatorMetricLabeler_SetVerifierHeartbeatChainHeads_Call { + _c.Call.Return() + return _c +} + +func (_c *MockAggregatorMetricLabeler_SetVerifierHeartbeatChainHeads_Call) RunAndReturn(run func(context.Context, uint64)) *MockAggregatorMetricLabeler_SetVerifierHeartbeatChainHeads_Call { + _c.Run(run) + return _c +} + +// SetVerifierHeartbeatScore provides a mock function with given fields: ctx, score +func (_m *MockAggregatorMetricLabeler) SetVerifierHeartbeatScore(ctx context.Context, score float64) { + _m.Called(ctx, score) +} + +// MockAggregatorMetricLabeler_SetVerifierHeartbeatScore_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetVerifierHeartbeatScore' +type MockAggregatorMetricLabeler_SetVerifierHeartbeatScore_Call struct { + *mock.Call +} + +// SetVerifierHeartbeatScore is a helper method to define mock.On call +// - ctx context.Context +// - score float64 +func (_e *MockAggregatorMetricLabeler_Expecter) SetVerifierHeartbeatScore(ctx interface{}, score interface{}) *MockAggregatorMetricLabeler_SetVerifierHeartbeatScore_Call { + return &MockAggregatorMetricLabeler_SetVerifierHeartbeatScore_Call{Call: _e.mock.On("SetVerifierHeartbeatScore", ctx, score)} +} + +func (_c *MockAggregatorMetricLabeler_SetVerifierHeartbeatScore_Call) Run(run func(ctx context.Context, score float64)) *MockAggregatorMetricLabeler_SetVerifierHeartbeatScore_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(float64)) + }) + return _c +} + +func (_c *MockAggregatorMetricLabeler_SetVerifierHeartbeatScore_Call) Return() *MockAggregatorMetricLabeler_SetVerifierHeartbeatScore_Call { + _c.Call.Return() + return _c +} + +func (_c *MockAggregatorMetricLabeler_SetVerifierHeartbeatScore_Call) RunAndReturn(run func(context.Context, float64)) *MockAggregatorMetricLabeler_SetVerifierHeartbeatScore_Call { + _c.Run(run) + return _c +} + +// SetVerifierLastHeartbeatTimestamp provides a mock function with given fields: ctx, timestamp +func (_m *MockAggregatorMetricLabeler) SetVerifierLastHeartbeatTimestamp(ctx context.Context, timestamp int64) { + _m.Called(ctx, timestamp) +} + +// MockAggregatorMetricLabeler_SetVerifierLastHeartbeatTimestamp_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetVerifierLastHeartbeatTimestamp' +type MockAggregatorMetricLabeler_SetVerifierLastHeartbeatTimestamp_Call struct { + *mock.Call +} + +// SetVerifierLastHeartbeatTimestamp is a helper method to define mock.On call +// - ctx context.Context +// - timestamp int64 +func (_e *MockAggregatorMetricLabeler_Expecter) SetVerifierLastHeartbeatTimestamp(ctx interface{}, timestamp interface{}) *MockAggregatorMetricLabeler_SetVerifierLastHeartbeatTimestamp_Call { + return &MockAggregatorMetricLabeler_SetVerifierLastHeartbeatTimestamp_Call{Call: _e.mock.On("SetVerifierLastHeartbeatTimestamp", ctx, timestamp)} +} + +func (_c *MockAggregatorMetricLabeler_SetVerifierLastHeartbeatTimestamp_Call) Run(run func(ctx context.Context, timestamp int64)) *MockAggregatorMetricLabeler_SetVerifierLastHeartbeatTimestamp_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *MockAggregatorMetricLabeler_SetVerifierLastHeartbeatTimestamp_Call) Return() *MockAggregatorMetricLabeler_SetVerifierLastHeartbeatTimestamp_Call { + _c.Call.Return() + return _c +} + +func (_c *MockAggregatorMetricLabeler_SetVerifierLastHeartbeatTimestamp_Call) RunAndReturn(run func(context.Context, int64)) *MockAggregatorMetricLabeler_SetVerifierLastHeartbeatTimestamp_Call { + _c.Run(run) + return _c +} + // With provides a mock function with given fields: keyValues func (_m *MockAggregatorMetricLabeler) With(keyValues ...string) common.AggregatorMetricLabeler { _va := make([]interface{}, len(keyValues))