Skip to content

Commit 97b9531

Browse files
committed
feat: global blocks cache
1 parent afb6777 commit 97b9531

File tree

3 files changed

+69
-18
lines changed

3 files changed

+69
-18
lines changed

cmd/p2p/sensor/sensor.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,8 @@ var SensorCmd = &cobra.Command{
194194
}, []string{"message", "url", "name"})
195195

196196
// Create peer connection manager for broadcasting transactions
197-
conns := p2p.NewConns()
197+
// and managing the global blocks cache
198+
conns := p2p.NewConns(inputSensorParams.MaxBlocks, inputSensorParams.BlocksCacheTTL)
198199

199200
opts := p2p.EthProtocolOptions{
200201
Context: cmd.Context(),
@@ -210,8 +211,6 @@ var SensorCmd = &cobra.Command{
210211
MsgCounter: msgCounter,
211212
MaxRequests: inputSensorParams.MaxRequests,
212213
RequestsCacheTTL: inputSensorParams.RequestsCacheTTL,
213-
MaxBlocks: inputSensorParams.MaxBlocks,
214-
BlocksCacheTTL: inputSensorParams.BlocksCacheTTL,
215214
}
216215

217216
config := ethp2p.Config{

p2p/conns.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,31 @@ package p2p
22

33
import (
44
"sync"
5+
"time"
56

7+
"github.com/ethereum/go-ethereum/common"
68
"github.com/ethereum/go-ethereum/core/types"
79
"github.com/ethereum/go-ethereum/eth/protocols/eth"
810
ethp2p "github.com/ethereum/go-ethereum/p2p"
911
"github.com/ethereum/go-ethereum/p2p/enode"
1012
)
1113

1214
// Conns manages a collection of active peer connections for transaction broadcasting.
15+
// It also maintains a global cache of blocks written to the database.
1316
type Conns struct {
1417
conns map[string]*conn
1518
mu sync.RWMutex
19+
20+
// BlocksCache tracks blocks written to the database across all peers
21+
// to avoid duplicate writes and requests.
22+
BlocksCache *Cache[common.Hash, BlockWriteState]
1623
}
1724

18-
// NewConns creates a new connection manager.
19-
func NewConns() *Conns {
25+
// NewConns creates a new connection manager with a blocks cache.
26+
func NewConns(maxBlocks int, blocksCacheTTL time.Duration) *Conns {
2027
return &Conns{
21-
conns: make(map[string]*conn),
28+
conns: make(map[string]*conn),
29+
BlocksCache: NewCache[common.Hash, BlockWriteState](maxBlocks, blocksCacheTTL),
2230
}
2331
}
2432

p2p/protocol.go

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ import (
2222
"github.com/0xPolygon/polygon-cli/p2p/database"
2323
)
2424

25+
// BlockWriteState tracks what parts of a block have been written to the database.
26+
type BlockWriteState struct {
27+
HasHeader bool
28+
HasBody bool
29+
}
30+
2531
// conn represents an individual connection with a peer.
2632
type conn struct {
2733
sensorID string
@@ -40,8 +46,9 @@ type conn struct {
4046
requests *Cache[uint64, common.Hash]
4147
requestNum uint64
4248

43-
// blocks caches seen block hashes to avoid duplicate processing.
44-
blocks *Cache[common.Hash, struct{}]
49+
// conns provides access to the global connection manager, which includes
50+
// the blocks cache shared across all peers.
51+
conns *Conns
4552

4653
// oldestBlock stores the first block the sensor has seen so when fetching
4754
// parent blocks, it does not request blocks older than this.
@@ -68,10 +75,6 @@ type EthProtocolOptions struct {
6875
// Requests cache configuration
6976
MaxRequests int
7077
RequestsCacheTTL time.Duration
71-
72-
// Blocks cache configuration
73-
MaxBlocks int
74-
BlocksCacheTTL time.Duration
7578
}
7679

7780
// HeadBlock contains the necessary head block data for the status message.
@@ -102,7 +105,7 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
102105
headMutex: opts.HeadMutex,
103106
counter: opts.MsgCounter,
104107
peer: p,
105-
blocks: NewCache[common.Hash, struct{}](opts.MaxBlocks, opts.BlocksCacheTTL),
108+
conns: opts.Conns,
106109
}
107110

108111
c.headMutex.RLock()
@@ -287,6 +290,11 @@ func (c *conn) getParentBlock(ctx context.Context, header *types.Header) error {
287290
return nil
288291
}
289292

293+
// Check cache first before querying the database
294+
if state, ok := c.conns.BlocksCache.Get(header.ParentHash); ok && state.HasHeader {
295+
return nil
296+
}
297+
290298
if c.db.HasBlock(ctx, header.ParentHash) || header.Number.Cmp(c.oldestBlock.Number) != 1 {
291299
return nil
292300
}
@@ -315,8 +323,8 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error {
315323
for _, entry := range packet {
316324
hash := entry.Hash
317325

318-
// Check if we've seen the hash
319-
if c.blocks.Contains(hash) {
326+
// Check if we've already seen this block in the cache
327+
if c.conns.BlocksCache.Contains(hash) {
320328
continue
321329
}
322330

@@ -325,8 +333,9 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error {
325333
return err
326334
}
327335

328-
// Now that we've successfully fetched, record the new block hash
329-
c.blocks.Add(hash, struct{}{})
336+
// Mark that we've requested this block (header and body will be marked
337+
// when they're actually written to the database)
338+
c.conns.BlocksCache.Add(hash, BlockWriteState{HasHeader: false, HasBody: false})
330339
uniqueHashes = append(uniqueHashes, hash)
331340
}
332341

@@ -380,12 +389,28 @@ func (c *conn) handleBlockHeaders(ctx context.Context, msg ethp2p.Msg) error {
380389
c.AddCount(packet.Name(), float64(len(headers)))
381390

382391
for _, header := range headers {
392+
hash := header.Hash()
393+
394+
// Check if we already have the header in the cache
395+
if state, ok := c.conns.BlocksCache.Get(hash); ok && state.HasHeader {
396+
continue
397+
}
398+
383399
if err := c.getParentBlock(ctx, header); err != nil {
384400
return err
385401
}
386402
}
387403

388404
c.db.WriteBlockHeaders(ctx, headers, tfs)
405+
406+
// Update cache to mark headers as written
407+
for _, header := range headers {
408+
hash := header.Hash()
409+
state, _ := c.conns.BlocksCache.Get(hash)
410+
state.HasHeader = true
411+
c.conns.BlocksCache.Add(hash, state)
412+
}
413+
389414
return nil
390415
}
391416

@@ -425,8 +450,18 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error {
425450
}
426451
c.requests.Remove(packet.RequestId)
427452

453+
// Check if we already have the body in the cache
454+
if state, ok := c.conns.BlocksCache.Get(hash); ok && state.HasBody {
455+
return nil
456+
}
457+
428458
c.db.WriteBlockBody(ctx, packet.BlockBodiesResponse[0], hash, tfs)
429459

460+
// Update cache to mark body as written
461+
state, _ := c.conns.BlocksCache.Get(hash)
462+
state.HasBody = true
463+
c.conns.BlocksCache.Add(hash, state)
464+
430465
return nil
431466
}
432467

@@ -437,14 +472,15 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error {
437472
}
438473

439474
tfs := time.Now()
475+
hash := block.Block.Hash()
440476

441477
c.AddCount(block.Name(), 1)
442478

443479
// Set the head block if newer.
444480
c.headMutex.Lock()
445481
if block.Block.Number().Uint64() > c.head.Number && block.TD.Cmp(c.head.TotalDifficulty) == 1 {
446482
*c.head = HeadBlock{
447-
Hash: block.Block.Hash(),
483+
Hash: hash,
448484
TotalDifficulty: block.TD,
449485
Number: block.Block.Number().Uint64(),
450486
Time: block.Block.Time(),
@@ -453,12 +489,20 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error {
453489
}
454490
c.headMutex.Unlock()
455491

492+
// Check if we already have the full block in the cache
493+
if state, ok := c.conns.BlocksCache.Get(hash); ok && state.HasHeader && state.HasBody {
494+
return nil
495+
}
496+
456497
if err := c.getParentBlock(ctx, block.Block.Header()); err != nil {
457498
return err
458499
}
459500

460501
c.db.WriteBlock(ctx, c.node, block.Block, block.TD, tfs)
461502

503+
// Update cache to mark both header and body as written
504+
c.conns.BlocksCache.Add(hash, BlockWriteState{HasHeader: true, HasBody: true})
505+
462506
return nil
463507
}
464508

0 commit comments

Comments
 (0)