Skip to content

Commit 7c0b083

Browse files
authored
feat(sensor): track if blocks are fetched as parents (#759)
* feat(sensor): use LRU with TTL for requests cache * docs: make gen-doc * fix: rename requests cache variables * fix: comment * fix: requests cache opts name * feat(sensor): use cache to track seen blocks * docs: make gen-doc * feat: global blocks cache * fix: rename method * fix: conditional sends? * fix: flags * chore: use conns opts * fix: revert request spelling * fix: remove wrong check * fix: logic issues * feat: store entire blocks * chore: runAsync refactor * feat: optimizations * feat(sensor): track if block headers are fetched as parents * docs: make gen-doc * chore: refactor to use CacheOptions struct
1 parent c3e0312 commit 7c0b083

File tree

9 files changed

+93
-59
lines changed

9 files changed

+93
-59
lines changed

cmd/p2p/sensor/sensor.go

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,9 @@ type (
6969
DiscoveryDNS string
7070
Database string
7171
NoDiscovery bool
72-
MaxRequests int
73-
RequestsCacheTTL time.Duration
74-
MaxBlocks int
75-
BlocksCacheTTL time.Duration
72+
RequestsCache p2p.CacheOptions
73+
ParentsCache p2p.CacheOptions
74+
BlocksCache p2p.CacheOptions
7675

7776
bootnodes []*enode.Node
7877
staticNodes []*enode.Node
@@ -196,24 +195,23 @@ var SensorCmd = &cobra.Command{
196195
// Create peer connection manager for broadcasting transactions
197196
// and managing the global blocks cache
198197
conns := p2p.NewConns(p2p.ConnsOptions{
199-
MaxBlocks: inputSensorParams.MaxBlocks,
200-
BlocksCacheTTL: inputSensorParams.BlocksCacheTTL,
198+
BlocksCache: inputSensorParams.BlocksCache,
201199
})
202200

203201
opts := p2p.EthProtocolOptions{
204-
Context: cmd.Context(),
205-
Database: db,
206-
GenesisHash: common.HexToHash(inputSensorParams.GenesisHash),
207-
RPC: inputSensorParams.RPC,
208-
SensorID: inputSensorParams.SensorID,
209-
NetworkID: inputSensorParams.NetworkID,
210-
Conns: conns,
211-
Head: &head,
212-
HeadMutex: &sync.RWMutex{},
213-
ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)},
214-
MsgCounter: msgCounter,
215-
MaxRequests: inputSensorParams.MaxRequests,
216-
RequestsCacheTTL: inputSensorParams.RequestsCacheTTL,
202+
Context: cmd.Context(),
203+
Database: db,
204+
GenesisHash: common.HexToHash(inputSensorParams.GenesisHash),
205+
RPC: inputSensorParams.RPC,
206+
SensorID: inputSensorParams.SensorID,
207+
NetworkID: inputSensorParams.NetworkID,
208+
Conns: conns,
209+
Head: &head,
210+
HeadMutex: &sync.RWMutex{},
211+
ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)},
212+
MsgCounter: msgCounter,
213+
RequestsCache: inputSensorParams.RequestsCache,
214+
ParentsCache: inputSensorParams.ParentsCache,
217215
}
218216

219217
config := ethp2p.Config{
@@ -486,8 +484,10 @@ will result in less chance of missing data but can significantly increase memory
486484
- json (output to stdout)
487485
- none (no persistence)`)
488486
f.BoolVar(&inputSensorParams.NoDiscovery, "no-discovery", false, "disable P2P peer discovery")
489-
f.IntVar(&inputSensorParams.MaxRequests, "max-requests", 2048, "maximum request IDs to track per peer (0 for no limit)")
490-
f.DurationVar(&inputSensorParams.RequestsCacheTTL, "requests-cache-ttl", 5*time.Minute, "time to live for requests cache entries (0 for no expiration)")
491-
f.IntVar(&inputSensorParams.MaxBlocks, "max-blocks", 1024, "maximum blocks to track across all peers (0 for no limit)")
492-
f.DurationVar(&inputSensorParams.BlocksCacheTTL, "blocks-cache-ttl", 10*time.Minute, "time to live for block cache entries (0 for no expiration)")
487+
f.IntVar(&inputSensorParams.RequestsCache.MaxSize, "max-requests", 2048, "maximum request IDs to track per peer (0 for no limit)")
488+
f.DurationVar(&inputSensorParams.RequestsCache.TTL, "requests-cache-ttl", 5*time.Minute, "time to live for requests cache entries (0 for no expiration)")
489+
f.IntVar(&inputSensorParams.ParentsCache.MaxSize, "max-parents", 1024, "maximum parent block hashes to track per peer (0 for no limit)")
490+
f.DurationVar(&inputSensorParams.ParentsCache.TTL, "parents-cache-ttl", 5*time.Minute, "time to live for parent hash cache entries (0 for no expiration)")
491+
f.IntVar(&inputSensorParams.BlocksCache.MaxSize, "max-blocks", 1024, "maximum blocks to track across all peers (0 for no limit)")
492+
f.DurationVar(&inputSensorParams.BlocksCache.TTL, "blocks-cache-ttl", 10*time.Minute, "time to live for block cache entries (0 for no expiration)")
493493
}

doc/polycli_p2p_sensor.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,13 @@ If no nodes.json file exists, it will be created.
4242
--max-blocks int maximum blocks to track across all peers (0 for no limit) (default 1024)
4343
-D, --max-db-concurrency int maximum number of concurrent database operations to perform (increasing this
4444
will result in less chance of missing data but can significantly increase memory usage) (default 10000)
45+
--max-parents int maximum parent block hashes to track per peer (0 for no limit) (default 1024)
4546
-m, --max-peers int maximum number of peers to connect to (default 2000)
4647
--max-requests int maximum request IDs to track per peer (0 for no limit) (default 2048)
4748
--nat string NAT port mapping mechanism (any|none|upnp|pmp|pmp:<IP>|extip:<IP>) (default "any")
4849
-n, --network-id uint filter discovered nodes by this network ID
4950
--no-discovery disable P2P peer discovery
51+
--parents-cache-ttl duration time to live for parent hash cache entries (0 for no expiration) (default 5m0s)
5052
--port int TCP network listening port (default 30303)
5153
--pprof run pprof server
5254
--pprof-port uint port pprof runs on (default 6060)

p2p/cache.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ import (
66
"time"
77
)
88

9+
// CacheOptions contains configuration for LRU caches with TTL.
10+
type CacheOptions struct {
11+
MaxSize int
12+
TTL time.Duration
13+
}
14+
915
// Cache is a thread-safe LRU cache with optional TTL-based expiration.
1016
type Cache[K comparable, V any] struct {
1117
mu sync.RWMutex
@@ -21,13 +27,13 @@ type entry[K comparable, V any] struct {
2127
expiresAt time.Time
2228
}
2329

24-
// NewCache creates a new cache with the given max size and optional TTL.
25-
// If maxSize <= 0, the cache has no size limit.
26-
// If ttl is 0, entries never expire based on time.
27-
func NewCache[K comparable, V any](maxSize int, ttl time.Duration) *Cache[K, V] {
30+
// NewCache creates a new cache with the given options.
31+
// If opts.MaxSize <= 0, the cache has no size limit.
32+
// If opts.TTL is 0, entries never expire based on time.
33+
func NewCache[K comparable, V any](opts CacheOptions) *Cache[K, V] {
2834
return &Cache[K, V]{
29-
maxSize: maxSize,
30-
ttl: ttl,
35+
maxSize: opts.MaxSize,
36+
ttl: opts.TTL,
3137
items: make(map[K]*list.Element),
3238
list: list.New(),
3339
}
@@ -187,15 +193,20 @@ func (c *Cache[K, V]) Contains(key K) bool {
187193
return true
188194
}
189195

190-
// Remove removes a key from the cache.
191-
func (c *Cache[K, V]) Remove(key K) {
196+
// Remove removes a key from the cache and returns the value if it existed.
197+
func (c *Cache[K, V]) Remove(key K) (V, bool) {
192198
c.mu.Lock()
193199
defer c.mu.Unlock()
194200

195201
if elem, ok := c.items[key]; ok {
202+
e := elem.Value.(*entry[K, V])
196203
c.list.Remove(elem)
197204
delete(c.items, key)
205+
return e.value, true
198206
}
207+
208+
var zero V
209+
return zero, false
199210
}
200211

201212
// Len returns the number of items in the cache.

p2p/conns.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,14 @@ type Conns struct {
2424

2525
// ConnsOptions contains configuration options for creating a new Conns manager.
2626
type ConnsOptions struct {
27-
// MaxBlocks is the maximum number of blocks to track in the cache.
28-
MaxBlocks int
29-
// BlocksCacheTTL is the time to live for block cache entries.
30-
BlocksCacheTTL time.Duration
27+
BlocksCache CacheOptions
3128
}
3229

3330
// NewConns creates a new connection manager with a blocks cache.
3431
func NewConns(opts ConnsOptions) *Conns {
3532
return &Conns{
3633
conns: make(map[string]*conn),
37-
blocks: NewCache[common.Hash, BlockCache](opts.MaxBlocks, opts.BlocksCacheTTL),
34+
blocks: NewCache[common.Hash, BlockCache](opts.BlocksCache),
3835
}
3936
}
4037

p2p/database/database.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ type Database interface {
2121
WriteBlock(context.Context, *enode.Node, *types.Block, *big.Int, time.Time)
2222

2323
// WriteBlockHeaders will write the block headers if ShouldWriteBlocks
24-
// returns true.
25-
WriteBlockHeaders(context.Context, []*types.Header, time.Time)
24+
// returns true. The isParent parameter indicates if these headers were
25+
// fetched as parent blocks.
26+
WriteBlockHeaders(context.Context, []*types.Header, time.Time, bool)
2627

2728
// WriteBlockHashes will write the block hashes if ShouldWriteBlockEvents
2829
// returns true.

p2p/database/datastore.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ type DatastoreHeader struct {
7575
BaseFee string
7676
TimeFirstSeen time.Time
7777
TTL time.Time
78+
IsParent bool
7879
}
7980

8081
// DatastoreBlock represents a block stored in datastore.
@@ -184,15 +185,16 @@ func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *typ
184185
// WriteBlockHeaders will write the block headers to datastore. It will not
185186
// write block events because headers will only be sent to the sensor when
186187
// requested. The block events will be written when the hash is received
187-
// instead.
188-
func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time) {
188+
// instead. The isParent parameter indicates if these headers were fetched
189+
// as parent blocks.
190+
func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time, isParent bool) {
189191
if d.client == nil || !d.ShouldWriteBlocks() {
190192
return
191193
}
192194

193195
for _, h := range headers {
194196
d.runAsync(func() {
195-
d.writeBlockHeader(ctx, h, tfs)
197+
d.writeBlockHeader(ctx, h, tfs, isParent)
196198
})
197199
}
198200
}
@@ -314,7 +316,7 @@ func (d *Datastore) HasBlock(ctx context.Context, hash common.Hash) bool {
314316

315317
// newDatastoreHeader creates a DatastoreHeader from a types.Header. Some
316318
// values are converted into strings to prevent a loss of precision.
317-
func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time) *DatastoreHeader {
319+
func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time, isParent bool) *DatastoreHeader {
318320
return &DatastoreHeader{
319321
ParentHash: datastore.NameKey(BlocksKind, header.ParentHash.Hex(), nil),
320322
UncleHash: header.UncleHash.Hex(),
@@ -334,6 +336,7 @@ func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time) *Dat
334336
BaseFee: header.BaseFee.String(),
335337
TimeFirstSeen: tfs,
336338
TTL: tfs.Add(d.ttl),
339+
IsParent: isParent,
337340
}
338341
}
339342

@@ -390,7 +393,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.
390393

391394
if dsBlock.DatastoreHeader == nil {
392395
shouldWrite = true
393-
dsBlock.DatastoreHeader = d.newDatastoreHeader(block.Header(), tfs)
396+
dsBlock.DatastoreHeader = d.newDatastoreHeader(block.Header(), tfs, false)
394397
}
395398

396399
if len(dsBlock.TotalDifficulty) == 0 {
@@ -414,7 +417,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.
414417
shouldWrite = true
415418
dsBlock.Uncles = make([]*datastore.Key, 0, len(block.Uncles()))
416419
for _, uncle := range block.Uncles() {
417-
d.writeBlockHeader(ctx, uncle, tfs)
420+
d.writeBlockHeader(ctx, uncle, tfs, false)
418421
dsBlock.Uncles = append(dsBlock.Uncles, datastore.NameKey(BlocksKind, uncle.Hash().Hex(), nil))
419422
}
420423
}
@@ -475,8 +478,8 @@ func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind
475478
}
476479

477480
// writeBlockHeader will write the block header to datastore if it doesn't
478-
// exist.
479-
func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, tfs time.Time) {
481+
// exist. The isParent parameter indicates if this block was fetched as a parent block.
482+
func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, tfs time.Time, isParent bool) {
480483
key := datastore.NameKey(BlocksKind, header.Hash().Hex(), nil)
481484

482485
_, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
@@ -485,7 +488,7 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header,
485488
return nil
486489
}
487490

488-
block.DatastoreHeader = d.newDatastoreHeader(header, tfs)
491+
block.DatastoreHeader = d.newDatastoreHeader(header, tfs, isParent)
489492
_, err := tx.Put(key, &block)
490493
return err
491494
}, datastore.MaxAttempts(MaxAttempts))
@@ -522,7 +525,7 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has
522525
shouldWrite = true
523526
block.Uncles = make([]*datastore.Key, 0, len(body.Uncles))
524527
for _, uncle := range body.Uncles {
525-
d.writeBlockHeader(ctx, uncle, tfs)
528+
d.writeBlockHeader(ctx, uncle, tfs, false)
526529
block.Uncles = append(block.Uncles, datastore.NameKey(BlocksKind, uncle.Hash().Hex(), nil))
527530
}
528531
}

p2p/database/json.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ type JSONBlock struct {
6969
TxCount int `json:"tx_count"`
7070
UncleCount int `json:"uncle_count"`
7171
TimeFirstSeen time.Time `json:"time_first_seen"`
72+
IsParent bool `json:"is_parent"`
7273
}
7374

7475
// JSONBlockEvent represents a block event in JSON format.
@@ -179,7 +180,8 @@ func (j *JSONDatabase) writeBlock(block *types.Block, td *big.Int, tfs time.Time
179180
}
180181

181182
// WriteBlockHeaders writes the block headers as JSON.
182-
func (j *JSONDatabase) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time) {
183+
// The isParent parameter indicates if these headers were fetched as parent blocks.
184+
func (j *JSONDatabase) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time, isParent bool) {
183185
if !j.ShouldWriteBlocks() {
184186
return
185187
}
@@ -196,6 +198,7 @@ func (j *JSONDatabase) WriteBlockHeaders(ctx context.Context, headers []*types.H
196198
GasUsed: header.GasUsed,
197199
Difficulty: header.Difficulty.String(),
198200
TimeFirstSeen: tfs,
201+
IsParent: isParent,
199202
}
200203

201204
if header.BaseFee != nil {

p2p/database/nodb.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func (n *nodb) WriteBlock(ctx context.Context, peer *enode.Node, block *types.Bl
2626
}
2727

2828
// WriteBlockHeaders does nothing.
29-
func (n *nodb) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time) {
29+
func (n *nodb) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time, isParent bool) {
3030
}
3131

3232
// WriteBlockHashes does nothing.

p2p/protocol.go

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ type conn struct {
4747
requests *Cache[uint64, common.Hash]
4848
requestNum uint64
4949

50+
// parents tracks hashes of blocks requested as parents to mark them
51+
// with IsParent=true when writing to the database.
52+
parents *Cache[common.Hash, struct{}]
53+
5054
// conns provides access to the global connection manager, which includes
5155
// the blocks cache shared across all peers.
5256
conns *Conns
@@ -80,9 +84,9 @@ type EthProtocolOptions struct {
8084
Head *HeadBlock
8185
HeadMutex *sync.RWMutex
8286

83-
// Requests cache configuration
84-
MaxRequests int
85-
RequestsCacheTTL time.Duration
87+
// Cache configurations
88+
RequestsCache CacheOptions
89+
ParentsCache CacheOptions
8690
}
8791

8892
// HeadBlock contains the necessary head block data for the status message.
@@ -108,8 +112,9 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
108112
logger: log.With().Str("peer", peerURL).Logger(),
109113
rw: rw,
110114
db: opts.Database,
111-
requests: NewCache[uint64, common.Hash](opts.MaxRequests, opts.RequestsCacheTTL),
115+
requests: NewCache[uint64, common.Hash](opts.RequestsCache),
112116
requestNum: 0,
117+
parents: NewCache[common.Hash, struct{}](opts.ParentsCache),
113118
head: opts.Head,
114119
headMutex: opts.HeadMutex,
115120
counter: opts.MsgCounter,
@@ -277,8 +282,9 @@ func (c *conn) readStatus(packet *eth.StatusPacket) error {
277282

278283
// getBlockData will send GetBlockHeaders and/or GetBlockBodies requests to the
279284
// peer based on what parts of the block we already have. It will return an error
280-
// if sending either of the requests failed.
281-
func (c *conn) getBlockData(hash common.Hash, cache BlockCache) error {
285+
// if sending either of the requests failed. The isParent parameter indicates if
286+
// this block is being fetched as a parent block.
287+
func (c *conn) getBlockData(hash common.Hash, cache BlockCache, isParent bool) error {
282288
// Only request header if we don't have it
283289
if cache.Header == nil {
284290
headersRequest := &GetBlockHeaders{
@@ -290,6 +296,10 @@ func (c *conn) getBlockData(hash common.Hash, cache BlockCache) error {
290296
},
291297
}
292298

299+
if isParent {
300+
c.parents.Add(hash, struct{}{})
301+
}
302+
293303
c.countMsgSent(headersRequest.Name(), 1)
294304
if err := ethp2p.Send(c.rw, eth.GetBlockHeadersMsg, headersRequest); err != nil {
295305
return err
@@ -343,7 +353,7 @@ func (c *conn) getParentBlock(ctx context.Context, header *types.Header) error {
343353
Str("number", new(big.Int).Sub(header.Number, big.NewInt(1)).String()).
344354
Msg("Fetching missing parent block")
345355

346-
return c.getBlockData(header.ParentHash, cache)
356+
return c.getBlockData(header.ParentHash, cache, true)
347357
}
348358

349359
func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error {
@@ -369,7 +379,7 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error {
369379
}
370380

371381
// Request only the parts we don't have
372-
if err := c.getBlockData(hash, cache); err != nil {
382+
if err := c.getBlockData(hash, cache, false); err != nil {
373383
return err
374384
}
375385

@@ -422,6 +432,10 @@ func (c *conn) handleBlockHeaders(ctx context.Context, msg ethp2p.Msg) error {
422432
tfs := time.Now()
423433

424434
headers := packet.BlockHeadersRequest
435+
if len(headers) == 0 {
436+
return nil
437+
}
438+
425439
c.countMsgReceived(packet.Name(), float64(len(headers)))
426440

427441
for _, header := range headers {
@@ -430,7 +444,10 @@ func (c *conn) handleBlockHeaders(ctx context.Context, msg ethp2p.Msg) error {
430444
}
431445
}
432446

433-
c.db.WriteBlockHeaders(ctx, headers, tfs)
447+
// Check if any of these headers were requested as parent blocks
448+
_, isParent := c.parents.Remove(headers[0].Hash())
449+
450+
c.db.WriteBlockHeaders(ctx, headers, tfs, isParent)
434451

435452
// Update cache to store headers
436453
for _, header := range headers {

0 commit comments

Comments
 (0)