Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 65 additions & 4 deletions app/sse/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ type listener struct {
chainReorgSubs []ChainReorgEventHandlerFunc
lastReorgEpoch eth2p0.Epoch

// blockGossipTimes stores timestamps of block gossip events per slot and beacon node address
// first key: slot, second key: beacon node address
blockGossipTimes map[uint64]map[string]time.Time

// immutable fields
genesisTime time.Time
slotDuration time.Duration
Expand All @@ -54,10 +58,11 @@ func StartListener(ctx context.Context, eth2Cl eth2wrap.Client, addresses, heade
}

l := &listener{
chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0),
genesisTime: genesisTime,
slotDuration: slotDuration,
slotsPerEpoch: slotsPerEpoch,
chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0),
blockGossipTimes: make(map[uint64]map[string]time.Time),
genesisTime: genesisTime,
slotDuration: slotDuration,
slotsPerEpoch: slotsPerEpoch,
}

parsedHeaders, err := eth2util.ParseHTTPHeaders(headers)
Expand Down Expand Up @@ -139,6 +144,9 @@ func (p *listener) handleHeadEvent(ctx context.Context, event *event, addr strin

sseHeadSlotGauge.WithLabelValues(addr).Set(float64(slot))

// Calculate and record block processing time
p.recordBlockProcessingTime(slot, addr, event.Timestamp)

log.Debug(ctx, "SSE head event",
z.U64("slot", slot),
z.Str("delay", delay.String()),
Expand Down Expand Up @@ -216,6 +224,8 @@ func (p *listener) handleBlockGossipEvent(ctx context.Context, event *event, add

sseBlockGossipHistogram.WithLabelValues(addr).Observe(delay.Seconds())

p.storeBlockGossipTime(slot, addr, event.Timestamp)

return nil
}

Expand Down Expand Up @@ -272,3 +282,54 @@ func (p *listener) computeDelay(slot uint64, eventTS time.Time, delayOKFunc func
// calculate time of receiving the event - the time of start of the slot
return delay, delayOKFunc(delay)
}

// storeBlockGossipTime stores the timestamp of a block gossip event for later comparison with head event.
func (p *listener) storeBlockGossipTime(slot uint64, addr string, timestamp time.Time) {
p.Lock()
defer p.Unlock()

if p.blockGossipTimes[slot] == nil {
p.blockGossipTimes[slot] = make(map[string]time.Time)
}
p.blockGossipTimes[slot][addr] = timestamp
}

// recordBlockProcessingTime calculates and records the time between block gossip and head events.
func (p *listener) recordBlockProcessingTime(slot uint64, addr string, headTimestamp time.Time) {
p.Lock()
defer p.Unlock()

addrMap, slotFound := p.blockGossipTimes[slot]
if !slotFound {
// Block gossip event not received yet or already cleaned up
return
}

gossipTime, addrFound := addrMap[addr]
if !addrFound {
// Block gossip event for this address not received yet or already cleaned up
return
}

processingTime := headTimestamp.Sub(gossipTime)
if processingTime > 0 {
sseBlockProcessingTimeHistogram.WithLabelValues(addr).Observe(processingTime.Seconds())
}

// Clean up this entry as it's no longer needed
delete(addrMap, addr)
if len(addrMap) == 0 {
delete(p.blockGossipTimes, slot)
}

// Cleanup old entries (older than one epoch)
// This handles cases where gossip events don't get matching head events
if slot > p.slotsPerEpoch {
cutoff := slot - p.slotsPerEpoch
for s := range p.blockGossipTimes {
if s < cutoff {
delete(p.blockGossipTimes, s)
}
}
}
}
75 changes: 75 additions & 0 deletions app/sse/listener_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,78 @@ func TestComputeDelay(t *testing.T) {
})
}
}

func TestBlockProcessingTime(t *testing.T) {
l := &listener{
blockGossipTimes: make(map[uint64]map[string]time.Time),
genesisTime: time.Now().Add(-10 * 12 * time.Second),
slotDuration: 12 * time.Second,
slotsPerEpoch: 32,
}

addr := "test-beacon-node"
slot := uint64(10)

// Simulate block gossip event
gossipTime := time.Now()
l.storeBlockGossipTime(slot, addr, gossipTime)

// Verify the timestamp was stored
addrMap, found := l.blockGossipTimes[slot]
require.True(t, found)
storedTime, found := addrMap[addr]
require.True(t, found)
require.Equal(t, gossipTime, storedTime)

// Simulate head event 400ms later
headTime := gossipTime.Add(400 * time.Millisecond)
l.recordBlockProcessingTime(slot, addr, headTime)

// Verify the entry was cleaned up after recording
_, found = l.blockGossipTimes[slot]
require.False(t, found)
}

func TestBlockProcessingTimeCleanup(t *testing.T) {
l := &listener{
blockGossipTimes: make(map[uint64]map[string]time.Time),
genesisTime: time.Now().Add(-200 * 12 * time.Second),
slotDuration: 12 * time.Second,
slotsPerEpoch: 32,
}

addr := "test-addr"
baseTime := time.Now()

// Add entries for 150 slots and process them to trigger cleanup
for i := uint64(1); i <= 150; i++ {
gossipTime := baseTime.Add(time.Duration(i) * time.Millisecond)
l.storeBlockGossipTime(i, addr, gossipTime)

// Process every other slot to trigger cleanup
if i%2 == 0 {
headTime := gossipTime.Add(100 * time.Millisecond)
l.recordBlockProcessingTime(i, addr, headTime)
}
}

// Cleanup happens on every record call and removes entries older than 1 epoch
// After processing slot 150, entries older than (150 - 32) = 118 are removed
// Remaining entries: odd slots from 119-149 (never processed) = 16 entries
// Even slots are immediately deleted after processing
require.Equal(t, 16, len(l.blockGossipTimes))

// Verify recent unprocessed entries are still there (odd slots from end)
addrMap, found := l.blockGossipTimes[149]
require.True(t, found)
_, found = addrMap[addr]
require.True(t, found)

// Verify old entries were cleaned up
_, found = l.blockGossipTimes[1]
require.False(t, found)

// Verify processed entries were immediately cleaned up
_, found = l.blockGossipTimes[150]
require.False(t, found)
}
8 changes: 8 additions & 0 deletions app/sse/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,12 @@ var (
Help: "Block imported into fork choice delay, supplied by beacon node's SSE endpoint. Values between 0s and 4s for Ethereum mainnet are considered safe",
Buckets: []float64{0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 6, 8, 10, 12},
}, []string{"addr"})

sseBlockProcessingTimeHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "app",
Subsystem: "beacon_node",
Name: "sse_block_processing_time",
Help: "Time in seconds between block gossip and head events, indicating block processing time. Lower values indicate better CPU/disk/RAM performance.",
Buckets: []float64{0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1, 2, 4},
}, []string{"addr"})
)
1 change: 1 addition & 0 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ when storing metrics from multiple nodes or clusters in one Prometheus instance.
| `app_beacon_node_peers` | Gauge | Gauge set to the peer count of the upstream beacon node | |
| `app_beacon_node_sse_block` | Histogram | Block imported into fork choice delay, supplied by beacon node`s SSE endpoint. Values between 0s and 4s for Ethereum mainnet are considered safe | `addr` |
| `app_beacon_node_sse_block_gossip` | Histogram | Block reception via gossip delay, supplied by beacon node`s SSE endpoint. Values between 0s and 4s for Ethereum mainnet are considered safe | `addr` |
| `app_beacon_node_sse_block_processing_time` | Histogram | Time in seconds between block gossip and head events, indicating block processing time. Lower values indicate better CPU/disk/RAM performance. | `addr` |
| `app_beacon_node_sse_chain_reorg_depth` | Histogram | Chain reorg depth, supplied by beacon node`s SSE endpoint | `addr` |
| `app_beacon_node_sse_head_delay` | Histogram | Delay in seconds between slot start and head update, supplied by beacon node`s SSE endpoint. Values between 8s and 12s for Ethereum mainnet are considered safe. | `addr` |
| `app_beacon_node_sse_head_slot` | Gauge | Current beacon node head slot, supplied by beacon node`s SSE endpoint | `addr` |
Expand Down
Loading