Skip to content

Commit aadc07e

Browse files
authored
app/sse: add block processing time metric (#4212)
Add new `sseBlockProcessingTimeHistogram` metric to track difference between `block gossip` and `head` events. <img width="2865" height="892" alt="image" src="https://github.com/user-attachments/assets/b43d4247-c2e9-4c3d-a3cf-5614d7cf7cfb" /> category: feature ticket: none
1 parent 8b04352 commit aadc07e

File tree

4 files changed

+149
-4
lines changed

4 files changed

+149
-4
lines changed

app/sse/listener.go

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ type listener struct {
3232
chainReorgSubs []ChainReorgEventHandlerFunc
3333
lastReorgEpoch eth2p0.Epoch
3434

35+
// blockGossipTimes stores timestamps of block gossip events per slot and beacon node address
36+
// first key: slot, second key: beacon node address
37+
blockGossipTimes map[uint64]map[string]time.Time
38+
3539
// immutable fields
3640
genesisTime time.Time
3741
slotDuration time.Duration
@@ -54,10 +58,11 @@ func StartListener(ctx context.Context, eth2Cl eth2wrap.Client, addresses, heade
5458
}
5559

5660
l := &listener{
57-
chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0),
58-
genesisTime: genesisTime,
59-
slotDuration: slotDuration,
60-
slotsPerEpoch: slotsPerEpoch,
61+
chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0),
62+
blockGossipTimes: make(map[uint64]map[string]time.Time),
63+
genesisTime: genesisTime,
64+
slotDuration: slotDuration,
65+
slotsPerEpoch: slotsPerEpoch,
6166
}
6267

6368
parsedHeaders, err := eth2util.ParseHTTPHeaders(headers)
@@ -139,6 +144,9 @@ func (p *listener) handleHeadEvent(ctx context.Context, event *event, addr strin
139144

140145
sseHeadSlotGauge.WithLabelValues(addr).Set(float64(slot))
141146

147+
// Calculate and record block processing time
148+
p.recordBlockProcessingTime(slot, addr, event.Timestamp)
149+
142150
log.Debug(ctx, "SSE head event",
143151
z.U64("slot", slot),
144152
z.Str("delay", delay.String()),
@@ -216,6 +224,8 @@ func (p *listener) handleBlockGossipEvent(ctx context.Context, event *event, add
216224

217225
sseBlockGossipHistogram.WithLabelValues(addr).Observe(delay.Seconds())
218226

227+
p.storeBlockGossipTime(slot, addr, event.Timestamp)
228+
219229
return nil
220230
}
221231

@@ -272,3 +282,54 @@ func (p *listener) computeDelay(slot uint64, eventTS time.Time, delayOKFunc func
272282
// calculate time of receiving the event - the time of start of the slot
273283
return delay, delayOKFunc(delay)
274284
}
285+
286+
// storeBlockGossipTime stores the timestamp of a block gossip event for later comparison with head event.
287+
func (p *listener) storeBlockGossipTime(slot uint64, addr string, timestamp time.Time) {
288+
p.Lock()
289+
defer p.Unlock()
290+
291+
if p.blockGossipTimes[slot] == nil {
292+
p.blockGossipTimes[slot] = make(map[string]time.Time)
293+
}
294+
p.blockGossipTimes[slot][addr] = timestamp
295+
}
296+
297+
// recordBlockProcessingTime calculates and records the time between block gossip and head events.
298+
func (p *listener) recordBlockProcessingTime(slot uint64, addr string, headTimestamp time.Time) {
299+
p.Lock()
300+
defer p.Unlock()
301+
302+
addrMap, slotFound := p.blockGossipTimes[slot]
303+
if !slotFound {
304+
// Block gossip event not received yet or already cleaned up
305+
return
306+
}
307+
308+
gossipTime, addrFound := addrMap[addr]
309+
if !addrFound {
310+
// Block gossip event for this address not received yet or already cleaned up
311+
return
312+
}
313+
314+
processingTime := headTimestamp.Sub(gossipTime)
315+
if processingTime > 0 {
316+
sseBlockProcessingTimeHistogram.WithLabelValues(addr).Observe(processingTime.Seconds())
317+
}
318+
319+
// Clean up this entry as it's no longer needed
320+
delete(addrMap, addr)
321+
if len(addrMap) == 0 {
322+
delete(p.blockGossipTimes, slot)
323+
}
324+
325+
// Cleanup old entries (older than one epoch)
326+
// This handles cases where gossip events don't get matching head events
327+
if slot > p.slotsPerEpoch {
328+
cutoff := slot - p.slotsPerEpoch
329+
for s := range p.blockGossipTimes {
330+
if s < cutoff {
331+
delete(p.blockGossipTimes, s)
332+
}
333+
}
334+
}
335+
}

app/sse/listener_internal_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,3 +179,78 @@ func TestComputeDelay(t *testing.T) {
179179
})
180180
}
181181
}
182+
183+
func TestBlockProcessingTime(t *testing.T) {
184+
l := &listener{
185+
blockGossipTimes: make(map[uint64]map[string]time.Time),
186+
genesisTime: time.Now().Add(-10 * 12 * time.Second),
187+
slotDuration: 12 * time.Second,
188+
slotsPerEpoch: 32,
189+
}
190+
191+
addr := "test-beacon-node"
192+
slot := uint64(10)
193+
194+
// Simulate block gossip event
195+
gossipTime := time.Now()
196+
l.storeBlockGossipTime(slot, addr, gossipTime)
197+
198+
// Verify the timestamp was stored
199+
addrMap, found := l.blockGossipTimes[slot]
200+
require.True(t, found)
201+
storedTime, found := addrMap[addr]
202+
require.True(t, found)
203+
require.Equal(t, gossipTime, storedTime)
204+
205+
// Simulate head event 400ms later
206+
headTime := gossipTime.Add(400 * time.Millisecond)
207+
l.recordBlockProcessingTime(slot, addr, headTime)
208+
209+
// Verify the entry was cleaned up after recording
210+
_, found = l.blockGossipTimes[slot]
211+
require.False(t, found)
212+
}
213+
214+
func TestBlockProcessingTimeCleanup(t *testing.T) {
215+
l := &listener{
216+
blockGossipTimes: make(map[uint64]map[string]time.Time),
217+
genesisTime: time.Now().Add(-200 * 12 * time.Second),
218+
slotDuration: 12 * time.Second,
219+
slotsPerEpoch: 32,
220+
}
221+
222+
addr := "test-addr"
223+
baseTime := time.Now()
224+
225+
// Add entries for 150 slots and process them to trigger cleanup
226+
for i := uint64(1); i <= 150; i++ {
227+
gossipTime := baseTime.Add(time.Duration(i) * time.Millisecond)
228+
l.storeBlockGossipTime(i, addr, gossipTime)
229+
230+
// Process every other slot to trigger cleanup
231+
if i%2 == 0 {
232+
headTime := gossipTime.Add(100 * time.Millisecond)
233+
l.recordBlockProcessingTime(i, addr, headTime)
234+
}
235+
}
236+
237+
// Cleanup happens on every record call and removes entries older than 1 epoch
238+
// After processing slot 150, entries older than (150 - 32) = 118 are removed
239+
// Remaining entries: odd slots from 119-149 (never processed) = 16 entries
240+
// Even slots are immediately deleted after processing
241+
require.Equal(t, 16, len(l.blockGossipTimes))
242+
243+
// Verify recent unprocessed entries are still there (odd slots from end)
244+
addrMap, found := l.blockGossipTimes[149]
245+
require.True(t, found)
246+
_, found = addrMap[addr]
247+
require.True(t, found)
248+
249+
// Verify old entries were cleaned up
250+
_, found = l.blockGossipTimes[1]
251+
require.False(t, found)
252+
253+
// Verify processed entries were immediately cleaned up
254+
_, found = l.blockGossipTimes[150]
255+
require.False(t, found)
256+
}

app/sse/metrics.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,12 @@ var (
4747
Help: "Block imported into fork choice delay, supplied by beacon node's SSE endpoint. Values between 0s and 4s for Ethereum mainnet are considered safe",
4848
Buckets: []float64{0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 6, 8, 10, 12},
4949
}, []string{"addr"})
50+
51+
sseBlockProcessingTimeHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{
52+
Namespace: "app",
53+
Subsystem: "beacon_node",
54+
Name: "sse_block_processing_time",
55+
Help: "Time in seconds between block gossip and head events, indicating block processing time. Lower values indicate better CPU/disk/RAM performance.",
56+
Buckets: []float64{0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1, 2, 4},
57+
}, []string{"addr"})
5058
)

docs/metrics.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ when storing metrics from multiple nodes or clusters in one Prometheus instance.
1616
| `app_beacon_node_peers` | Gauge | Gauge set to the peer count of the upstream beacon node | |
1717
| `app_beacon_node_sse_block` | Histogram | Block imported into fork choice delay, supplied by beacon node`s SSE endpoint. Values between 0s and 4s for Ethereum mainnet are considered safe | `addr` |
1818
| `app_beacon_node_sse_block_gossip` | Histogram | Block reception via gossip delay, supplied by beacon node`s SSE endpoint. Values between 0s and 4s for Ethereum mainnet are considered safe | `addr` |
19+
| `app_beacon_node_sse_block_processing_time` | Histogram | Time in seconds between block gossip and head events, indicating block processing time. Lower values indicate better CPU/disk/RAM performance. | `addr` |
1920
| `app_beacon_node_sse_chain_reorg_depth` | Histogram | Chain reorg depth, supplied by beacon node`s SSE endpoint | `addr` |
2021
| `app_beacon_node_sse_head_delay` | Histogram | Delay in seconds between slot start and head update, supplied by beacon node`s SSE endpoint. Values between 8s and 12s for Ethereum mainnet are considered safe. | `addr` |
2122
| `app_beacon_node_sse_head_slot` | Gauge | Current beacon node head slot, supplied by beacon node`s SSE endpoint | `addr` |

0 commit comments

Comments
 (0)