Skip to content

Commit 42a776a

Browse files
committed
feat(sensor): track if block headers are fetched as parents
1 parent 6e2de05 commit 42a776a

File tree

7 files changed

+60
-21
lines changed

7 files changed

+60
-21
lines changed

cmd/p2p/sensor/sensor.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ type (
7171
NoDiscovery bool
7272
MaxRequests int
7373
RequestsCacheTTL time.Duration
74+
MaxParents int
75+
ParentsCacheTTL time.Duration
7476
MaxBlocks int
7577
BlocksCacheTTL time.Duration
7678

@@ -214,6 +216,8 @@ var SensorCmd = &cobra.Command{
214216
MsgCounter: msgCounter,
215217
MaxRequests: inputSensorParams.MaxRequests,
216218
RequestsCacheTTL: inputSensorParams.RequestsCacheTTL,
219+
MaxParents: inputSensorParams.MaxParents,
220+
ParentsCacheTTL: inputSensorParams.ParentsCacheTTL,
217221
}
218222

219223
config := ethp2p.Config{
@@ -488,6 +492,8 @@ will result in less chance of missing data but can significantly increase memory
488492
f.BoolVar(&inputSensorParams.NoDiscovery, "no-discovery", false, "disable P2P peer discovery")
489493
f.IntVar(&inputSensorParams.MaxRequests, "max-requests", 2048, "maximum request IDs to track per peer (0 for no limit)")
490494
f.DurationVar(&inputSensorParams.RequestsCacheTTL, "requests-cache-ttl", 5*time.Minute, "time to live for requests cache entries (0 for no expiration)")
495+
f.IntVar(&inputSensorParams.MaxParents, "max-parents", 1024, "maximum parent block hashes to track per peer (0 for no limit)")
496+
f.DurationVar(&inputSensorParams.ParentsCacheTTL, "parents-cache-ttl", 5*time.Minute, "time to live for parent hash cache entries (0 for no expiration)")
491497
f.IntVar(&inputSensorParams.MaxBlocks, "max-blocks", 1024, "maximum blocks to track across all peers (0 for no limit)")
492498
f.DurationVar(&inputSensorParams.BlocksCacheTTL, "blocks-cache-ttl", 10*time.Minute, "time to live for block cache entries (0 for no expiration)")
493499
}

p2p/cache.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,15 +187,20 @@ func (c *Cache[K, V]) Contains(key K) bool {
187187
return true
188188
}
189189

190-
// Remove removes a key from the cache.
191-
func (c *Cache[K, V]) Remove(key K) {
190+
// Remove removes a key from the cache and returns the value if it existed.
191+
func (c *Cache[K, V]) Remove(key K) (V, bool) {
192192
c.mu.Lock()
193193
defer c.mu.Unlock()
194194

195195
if elem, ok := c.items[key]; ok {
196+
e := elem.Value.(*entry[K, V])
196197
c.list.Remove(elem)
197198
delete(c.items, key)
199+
return e.value, true
198200
}
201+
202+
var zero V
203+
return zero, false
199204
}
200205

201206
// Len returns the number of items in the cache.

p2p/database/database.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ type Database interface {
2121
WriteBlock(context.Context, *enode.Node, *types.Block, *big.Int, time.Time)
2222

2323
// WriteBlockHeaders will write the block headers if ShouldWriteBlocks
24-
// returns true.
25-
WriteBlockHeaders(context.Context, []*types.Header, time.Time)
24+
// returns true. The isParent parameter indicates if these headers were
25+
// fetched as parent blocks.
26+
WriteBlockHeaders(context.Context, []*types.Header, time.Time, bool)
2627

2728
// WriteBlockHashes will write the block hashes if ShouldWriteBlockEvents
2829
// returns true.

p2p/database/datastore.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ type DatastoreHeader struct {
7575
BaseFee string
7676
TimeFirstSeen time.Time
7777
TTL time.Time
78+
IsParent bool
7879
}
7980

8081
// DatastoreBlock represents a block stored in datastore.
@@ -184,15 +185,16 @@ func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *typ
184185
// WriteBlockHeaders will write the block headers to datastore. It will not
185186
// write block events because headers will only be sent to the sensor when
186187
// requested. The block events will be written when the hash is received
187-
// instead.
188-
func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time) {
188+
// instead. The isParent parameter indicates if these headers were fetched
189+
// as parent blocks.
190+
func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time, isParent bool) {
189191
if d.client == nil || !d.ShouldWriteBlocks() {
190192
return
191193
}
192194

193195
for _, h := range headers {
194196
d.runAsync(func() {
195-
d.writeBlockHeader(ctx, h, tfs)
197+
d.writeBlockHeader(ctx, h, tfs, isParent)
196198
})
197199
}
198200
}
@@ -314,7 +316,7 @@ func (d *Datastore) HasBlock(ctx context.Context, hash common.Hash) bool {
314316

315317
// newDatastoreHeader creates a DatastoreHeader from a types.Header. Some
316318
// values are converted into strings to prevent a loss of precision.
317-
func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time) *DatastoreHeader {
319+
func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time, isParent bool) *DatastoreHeader {
318320
return &DatastoreHeader{
319321
ParentHash: datastore.NameKey(BlocksKind, header.ParentHash.Hex(), nil),
320322
UncleHash: header.UncleHash.Hex(),
@@ -334,6 +336,7 @@ func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time) *Dat
334336
BaseFee: header.BaseFee.String(),
335337
TimeFirstSeen: tfs,
336338
TTL: tfs.Add(d.ttl),
339+
IsParent: isParent,
337340
}
338341
}
339342

@@ -390,7 +393,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.
390393

391394
if dsBlock.DatastoreHeader == nil {
392395
shouldWrite = true
393-
dsBlock.DatastoreHeader = d.newDatastoreHeader(block.Header(), tfs)
396+
dsBlock.DatastoreHeader = d.newDatastoreHeader(block.Header(), tfs, false)
394397
}
395398

396399
if len(dsBlock.TotalDifficulty) == 0 {
@@ -414,7 +417,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.
414417
shouldWrite = true
415418
dsBlock.Uncles = make([]*datastore.Key, 0, len(block.Uncles()))
416419
for _, uncle := range block.Uncles() {
417-
d.writeBlockHeader(ctx, uncle, tfs)
420+
d.writeBlockHeader(ctx, uncle, tfs, false)
418421
dsBlock.Uncles = append(dsBlock.Uncles, datastore.NameKey(BlocksKind, uncle.Hash().Hex(), nil))
419422
}
420423
}
@@ -475,8 +478,8 @@ func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind
475478
}
476479

477480
// writeBlockHeader will write the block header to datastore if it doesn't
478-
// exist.
479-
func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, tfs time.Time) {
481+
// exist. The isParent parameter indicates if this block was fetched as a parent block.
482+
func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, tfs time.Time, isParent bool) {
480483
key := datastore.NameKey(BlocksKind, header.Hash().Hex(), nil)
481484

482485
_, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
@@ -485,7 +488,7 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header,
485488
return nil
486489
}
487490

488-
block.DatastoreHeader = d.newDatastoreHeader(header, tfs)
491+
block.DatastoreHeader = d.newDatastoreHeader(header, tfs, isParent)
489492
_, err := tx.Put(key, &block)
490493
return err
491494
}, datastore.MaxAttempts(MaxAttempts))
@@ -522,7 +525,7 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has
522525
shouldWrite = true
523526
block.Uncles = make([]*datastore.Key, 0, len(body.Uncles))
524527
for _, uncle := range body.Uncles {
525-
d.writeBlockHeader(ctx, uncle, tfs)
528+
d.writeBlockHeader(ctx, uncle, tfs, false)
526529
block.Uncles = append(block.Uncles, datastore.NameKey(BlocksKind, uncle.Hash().Hex(), nil))
527530
}
528531
}

p2p/database/json.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ type JSONBlock struct {
6969
TxCount int `json:"tx_count"`
7070
UncleCount int `json:"uncle_count"`
7171
TimeFirstSeen time.Time `json:"time_first_seen"`
72+
IsParent bool `json:"is_parent"`
7273
}
7374

7475
// JSONBlockEvent represents a block event in JSON format.
@@ -179,7 +180,8 @@ func (j *JSONDatabase) writeBlock(block *types.Block, td *big.Int, tfs time.Time
179180
}
180181

181182
// WriteBlockHeaders writes the block headers as JSON.
182-
func (j *JSONDatabase) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time) {
183+
// The isParent parameter indicates if these headers were fetched as parent blocks.
184+
func (j *JSONDatabase) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time, isParent bool) {
183185
if !j.ShouldWriteBlocks() {
184186
return
185187
}
@@ -196,6 +198,7 @@ func (j *JSONDatabase) WriteBlockHeaders(ctx context.Context, headers []*types.H
196198
GasUsed: header.GasUsed,
197199
Difficulty: header.Difficulty.String(),
198200
TimeFirstSeen: tfs,
201+
IsParent: isParent,
199202
}
200203

201204
if header.BaseFee != nil {

p2p/database/nodb.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func (n *nodb) WriteBlock(ctx context.Context, peer *enode.Node, block *types.Bl
2626
}
2727

2828
// WriteBlockHeaders does nothing.
29-
func (n *nodb) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time) {
29+
func (n *nodb) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time, isParent bool) {
3030
}
3131

3232
// WriteBlockHashes does nothing.

p2p/protocol.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ type conn struct {
4747
requests *Cache[uint64, common.Hash]
4848
requestNum uint64
4949

50+
// parents tracks hashes of blocks requested as parents to mark them
51+
// with IsParent=true when writing to the database.
52+
parents *Cache[common.Hash, struct{}]
53+
5054
// conns provides access to the global connection manager, which includes
5155
// the blocks cache shared across all peers.
5256
conns *Conns
@@ -83,6 +87,10 @@ type EthProtocolOptions struct {
8387
// Requests cache configuration
8488
MaxRequests int
8589
RequestsCacheTTL time.Duration
90+
91+
// Parent hash tracking cache configuration
92+
MaxParents int
93+
ParentsCacheTTL time.Duration
8694
}
8795

8896
// HeadBlock contains the necessary head block data for the status message.
@@ -110,6 +118,7 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
110118
db: opts.Database,
111119
requests: NewCache[uint64, common.Hash](opts.MaxRequests, opts.RequestsCacheTTL),
112120
requestNum: 0,
121+
parents: NewCache[common.Hash, struct{}](opts.MaxParents, opts.ParentsCacheTTL),
113122
head: opts.Head,
114123
headMutex: opts.HeadMutex,
115124
counter: opts.MsgCounter,
@@ -277,8 +286,9 @@ func (c *conn) readStatus(packet *eth.StatusPacket) error {
277286

278287
// getBlockData will send GetBlockHeaders and/or GetBlockBodies requests to the
279288
// peer based on what parts of the block we already have. It will return an error
280-
// if sending either of the requests failed.
281-
func (c *conn) getBlockData(hash common.Hash, cache BlockCache) error {
289+
// if sending either of the requests failed. The isParent parameter indicates if
290+
// this block is being fetched as a parent block.
291+
func (c *conn) getBlockData(hash common.Hash, cache BlockCache, isParent bool) error {
282292
// Only request header if we don't have it
283293
if cache.Header == nil {
284294
headersRequest := &GetBlockHeaders{
@@ -290,6 +300,10 @@ func (c *conn) getBlockData(hash common.Hash, cache BlockCache) error {
290300
},
291301
}
292302

303+
if isParent {
304+
c.parents.Add(hash, struct{}{})
305+
}
306+
293307
c.countMsgSent(headersRequest.Name(), 1)
294308
if err := ethp2p.Send(c.rw, eth.GetBlockHeadersMsg, headersRequest); err != nil {
295309
return err
@@ -343,7 +357,7 @@ func (c *conn) getParentBlock(ctx context.Context, header *types.Header) error {
343357
Str("number", new(big.Int).Sub(header.Number, big.NewInt(1)).String()).
344358
Msg("Fetching missing parent block")
345359

346-
return c.getBlockData(header.ParentHash, cache)
360+
return c.getBlockData(header.ParentHash, cache, true)
347361
}
348362

349363
func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error {
@@ -369,7 +383,7 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error {
369383
}
370384

371385
// Request only the parts we don't have
372-
if err := c.getBlockData(hash, cache); err != nil {
386+
if err := c.getBlockData(hash, cache, false); err != nil {
373387
return err
374388
}
375389

@@ -422,6 +436,10 @@ func (c *conn) handleBlockHeaders(ctx context.Context, msg ethp2p.Msg) error {
422436
tfs := time.Now()
423437

424438
headers := packet.BlockHeadersRequest
439+
if len(headers) == 0 {
440+
return nil
441+
}
442+
425443
c.countMsgReceived(packet.Name(), float64(len(headers)))
426444

427445
for _, header := range headers {
@@ -430,7 +448,10 @@ func (c *conn) handleBlockHeaders(ctx context.Context, msg ethp2p.Msg) error {
430448
}
431449
}
432450

433-
c.db.WriteBlockHeaders(ctx, headers, tfs)
451+
// Check if any of these headers were requested as parent blocks
452+
_, isParent := c.parents.Remove(headers[0].Hash())
453+
454+
c.db.WriteBlockHeaders(ctx, headers, tfs, isParent)
434455

435456
// Update cache to store headers
436457
for _, header := range headers {

0 commit comments

Comments
 (0)