diff --git a/app/sse/listener.go b/app/sse/listener.go index ad89f5b2e..89f7e65b8 100644 --- a/app/sse/listener.go +++ b/app/sse/listener.go @@ -32,6 +32,10 @@ type listener struct { chainReorgSubs []ChainReorgEventHandlerFunc lastReorgEpoch eth2p0.Epoch + // blockGossipTimes stores timestamps of block gossip events per slot and beacon node address + // first key: slot, second key: beacon node address + blockGossipTimes map[uint64]map[string]time.Time + // immutable fields genesisTime time.Time slotDuration time.Duration @@ -54,10 +58,11 @@ func StartListener(ctx context.Context, eth2Cl eth2wrap.Client, addresses, heade } l := &listener{ - chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0), - genesisTime: genesisTime, - slotDuration: slotDuration, - slotsPerEpoch: slotsPerEpoch, + chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0), + blockGossipTimes: make(map[uint64]map[string]time.Time), + genesisTime: genesisTime, + slotDuration: slotDuration, + slotsPerEpoch: slotsPerEpoch, } parsedHeaders, err := eth2util.ParseHTTPHeaders(headers) @@ -139,6 +144,9 @@ func (p *listener) handleHeadEvent(ctx context.Context, event *event, addr strin sseHeadSlotGauge.WithLabelValues(addr).Set(float64(slot)) + // Calculate and record block processing time + p.recordBlockProcessingTime(slot, addr, event.Timestamp) + log.Debug(ctx, "SSE head event", z.U64("slot", slot), z.Str("delay", delay.String()), @@ -216,6 +224,8 @@ func (p *listener) handleBlockGossipEvent(ctx context.Context, event *event, add sseBlockGossipHistogram.WithLabelValues(addr).Observe(delay.Seconds()) + p.storeBlockGossipTime(slot, addr, event.Timestamp) + return nil } @@ -272,3 +282,54 @@ func (p *listener) computeDelay(slot uint64, eventTS time.Time, delayOKFunc func // calculate time of receiving the event - the time of start of the slot return delay, delayOKFunc(delay) } + +// storeBlockGossipTime stores the timestamp of a block gossip event for later comparison with head event. +func (p *listener) storeBlockGossipTime(slot uint64, addr string, timestamp time.Time) { + p.Lock() + defer p.Unlock() + + if p.blockGossipTimes[slot] == nil { + p.blockGossipTimes[slot] = make(map[string]time.Time) + } + p.blockGossipTimes[slot][addr] = timestamp +} + +// recordBlockProcessingTime calculates and records the time between block gossip and head events. +func (p *listener) recordBlockProcessingTime(slot uint64, addr string, headTimestamp time.Time) { + p.Lock() + defer p.Unlock() + + addrMap, slotFound := p.blockGossipTimes[slot] + if !slotFound { + // Block gossip event not received yet or already cleaned up + return + } + + gossipTime, addrFound := addrMap[addr] + if !addrFound { + // Block gossip event for this address not received yet or already cleaned up + return + } + + processingTime := headTimestamp.Sub(gossipTime) + if processingTime > 0 { + sseBlockProcessingTimeHistogram.WithLabelValues(addr).Observe(processingTime.Seconds()) + } + + // Clean up this entry as it's no longer needed + delete(addrMap, addr) + if len(addrMap) == 0 { + delete(p.blockGossipTimes, slot) + } + + // Cleanup old entries (older than one epoch) + // This handles cases where gossip events don't get matching head events + if slot > p.slotsPerEpoch { + cutoff := slot - p.slotsPerEpoch + for s := range p.blockGossipTimes { + if s < cutoff { + delete(p.blockGossipTimes, s) + } + } + } +} diff --git a/app/sse/listener_internal_test.go b/app/sse/listener_internal_test.go index 117e6f8d7..8ca0bd590 100644 --- a/app/sse/listener_internal_test.go +++ b/app/sse/listener_internal_test.go @@ -179,3 +179,78 @@ func TestComputeDelay(t *testing.T) { }) } } + +func TestBlockProcessingTime(t *testing.T) { + l := &listener{ + blockGossipTimes: make(map[uint64]map[string]time.Time), + genesisTime: time.Now().Add(-10 * 12 * time.Second), + slotDuration: 12 * time.Second, + slotsPerEpoch: 32, + } + + addr := "test-beacon-node" + slot := uint64(10) + + // Simulate block gossip event + gossipTime := time.Now() + l.storeBlockGossipTime(slot, addr, gossipTime) + + // Verify the timestamp was stored + addrMap, found := l.blockGossipTimes[slot] + require.True(t, found) + storedTime, found := addrMap[addr] + require.True(t, found) + require.Equal(t, gossipTime, storedTime) + + // Simulate head event 400ms later + headTime := gossipTime.Add(400 * time.Millisecond) + l.recordBlockProcessingTime(slot, addr, headTime) + + // Verify the entry was cleaned up after recording + _, found = l.blockGossipTimes[slot] + require.False(t, found) +} + +func TestBlockProcessingTimeCleanup(t *testing.T) { + l := &listener{ + blockGossipTimes: make(map[uint64]map[string]time.Time), + genesisTime: time.Now().Add(-200 * 12 * time.Second), + slotDuration: 12 * time.Second, + slotsPerEpoch: 32, + } + + addr := "test-addr" + baseTime := time.Now() + + // Add entries for 150 slots and process them to trigger cleanup + for i := uint64(1); i <= 150; i++ { + gossipTime := baseTime.Add(time.Duration(i) * time.Millisecond) + l.storeBlockGossipTime(i, addr, gossipTime) + + // Process every other slot to trigger cleanup + if i%2 == 0 { + headTime := gossipTime.Add(100 * time.Millisecond) + l.recordBlockProcessingTime(i, addr, headTime) + } + } + + // Cleanup happens on every record call and removes entries older than 1 epoch + // After processing slot 150, entries older than (150 - 32) = 118 are removed + // Remaining entries: odd slots from 119-149 (never processed) = 16 entries + // Even slots are immediately deleted after processing + require.Equal(t, 16, len(l.blockGossipTimes)) + + // Verify recent unprocessed entries are still there (odd slots from end) + addrMap, found := l.blockGossipTimes[149] + require.True(t, found) + _, found = addrMap[addr] + require.True(t, found) + + // Verify old entries were cleaned up + _, found = l.blockGossipTimes[1] + require.False(t, found) + + // Verify processed entries were immediately cleaned up + _, found = l.blockGossipTimes[150] + require.False(t, found) +} diff --git a/app/sse/metrics.go b/app/sse/metrics.go index 73bc8cc7a..a869f137e 100644 --- a/app/sse/metrics.go +++ b/app/sse/metrics.go @@ -47,4 +47,12 @@ var ( Help: "Block imported into fork choice delay, supplied by beacon node's SSE endpoint. Values between 0s and 4s for Ethereum mainnet are considered safe", Buckets: []float64{0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 6, 8, 10, 12}, }, []string{"addr"}) + + sseBlockProcessingTimeHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "app", + Subsystem: "beacon_node", + Name: "sse_block_processing_time", + Help: "Time in seconds between block gossip and head events, indicating block processing time. Lower values indicate better CPU/disk/RAM performance.", + Buckets: []float64{0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1, 2, 4}, + }, []string{"addr"}) ) diff --git a/docs/metrics.md b/docs/metrics.md index 4dcd87fda..feaea2eac 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -16,6 +16,7 @@ when storing metrics from multiple nodes or clusters in one Prometheus instance. | `app_beacon_node_peers` | Gauge | Gauge set to the peer count of the upstream beacon node | | | `app_beacon_node_sse_block` | Histogram | Block imported into fork choice delay, supplied by beacon node`s SSE endpoint. Values between 0s and 4s for Ethereum mainnet are considered safe | `addr` | | `app_beacon_node_sse_block_gossip` | Histogram | Block reception via gossip delay, supplied by beacon node`s SSE endpoint. Values between 0s and 4s for Ethereum mainnet are considered safe | `addr` | +| `app_beacon_node_sse_block_processing_time` | Histogram | Time in seconds between block gossip and head events, indicating block processing time. Lower values indicate better CPU/disk/RAM performance. | `addr` | | `app_beacon_node_sse_chain_reorg_depth` | Histogram | Chain reorg depth, supplied by beacon node`s SSE endpoint | `addr` | | `app_beacon_node_sse_head_delay` | Histogram | Delay in seconds between slot start and head update, supplied by beacon node`s SSE endpoint. Values between 8s and 12s for Ethereum mainnet are considered safe. | `addr` | | `app_beacon_node_sse_head_slot` | Gauge | Current beacon node head slot, supplied by beacon node`s SSE endpoint | `addr` |