Skip to content

Commit 2953761

Browse files
authored
fix: block_event dedupe using linked list (#423)
* dedupe writeEvents * go fmt * proper mem cleanup * add shard size logging * adjust TTL config * linked list dedupe strategy * make TTL 10 min * pr refactors * consolidate remove stale entries loop * final cleanup fmt * remove noisy log
1 parent f77444a commit 2953761

File tree

2 files changed

+76
-16
lines changed

2 files changed

+76
-16
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ coverage.out
99
*.swo
1010

1111
wallets.json
12+
13+
*.key

p2p/protocol.go

Lines changed: 74 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ type conn struct {
4141
requests *list.List
4242
requestNum uint64
4343

44+
// Linked list of seen block hashes with timestamps.
45+
blockHashes *list.List
46+
4447
// oldestBlock stores the first block the sensor has seen so when fetching
4548
// parent blocks, it does not request blocks older than this.
4649
oldestBlock *types.Header
@@ -72,6 +75,14 @@ type HeadBlock struct {
7275
Time uint64
7376
}
7477

78+
type BlockHashEntry struct {
79+
hash common.Hash
80+
time time.Time
81+
}
82+
83+
// blockHashTTL defines the time-to-live for block hash entries in blockHashes list.
84+
var blockHashTTL = 10 * time.Minute
85+
7586
// NewEthProctocol creates the new eth protocol. This will handle writing the
7687
// status exchange, message handling, and writing blocks/txs to the database.
7788
func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
@@ -81,17 +92,18 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
8192
Length: 17,
8293
Run: func(p *ethp2p.Peer, rw ethp2p.MsgReadWriter) error {
8394
c := conn{
84-
sensorID: opts.SensorID,
85-
node: p.Node(),
86-
logger: log.With().Str("peer", p.Node().URLv4()).Logger(),
87-
rw: rw,
88-
db: opts.Database,
89-
requests: list.New(),
90-
requestNum: 0,
91-
head: opts.Head,
92-
headMutex: opts.HeadMutex,
93-
counter: opts.MsgCounter,
94-
name: p.Fullname(),
95+
sensorID: opts.SensorID,
96+
node: p.Node(),
97+
logger: log.With().Str("peer", p.Node().URLv4()).Logger(),
98+
rw: rw,
99+
db: opts.Database,
100+
requests: list.New(),
101+
requestNum: 0,
102+
head: opts.Head,
103+
headMutex: opts.HeadMutex,
104+
counter: opts.MsgCounter,
105+
name: p.Fullname(),
106+
blockHashes: list.New(),
95107
}
96108

97109
c.headMutex.RLock()
@@ -304,19 +316,65 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error {
304316

305317
c.counter.WithLabelValues(packet.Name(), c.node.URLv4(), c.name).Add(float64(len(packet)))
306318

307-
hashes := make([]common.Hash, 0, len(packet))
308-
for _, hash := range packet {
309-
hashes = append(hashes, hash.Hash)
310-
if err := c.getBlockData(hash.Hash); err != nil {
319+
// Collect unique hashes for database write.
320+
uniqueHashes := make([]common.Hash, 0, len(packet))
321+
322+
for _, entry := range packet {
323+
hash := entry.Hash
324+
325+
// Check if we've seen the hash and remove old entries
326+
if c.hasSeenBlockHash(hash) {
327+
continue
328+
}
329+
330+
// Attempt to fetch block data first
331+
if err := c.getBlockData(hash); err != nil {
311332
return err
312333
}
334+
335+
// Now that we've successfully fetched, record the new block hash
336+
c.addBlockHash(hash)
337+
uniqueHashes = append(uniqueHashes, hash)
313338
}
314339

315-
c.db.WriteBlockHashes(ctx, c.node, hashes, tfs)
340+
// Write only unique hashes to the database.
341+
if len(uniqueHashes) > 0 {
342+
c.db.WriteBlockHashes(ctx, c.node, uniqueHashes, tfs)
343+
}
316344

317345
return nil
318346
}
319347

348+
// addBlockHash adds a new block hash with a timestamp to the blockHashes list.
349+
func (c *conn) addBlockHash(hash common.Hash) {
350+
now := time.Now()
351+
352+
// Add the new block hash entry to the list.
353+
c.blockHashes.PushBack(BlockHashEntry{
354+
hash: hash,
355+
time: now,
356+
})
357+
}
358+
359+
// Helper method to check if a block hash is already in blockHashes.
360+
func (c *conn) hasSeenBlockHash(hash common.Hash) bool {
361+
now := time.Now()
362+
for e := c.blockHashes.Front(); e != nil; e = e.Next() {
363+
entry := e.Value.(BlockHashEntry)
364+
// Check if the hash matches. We can short circuit here because there will
365+
// be block hashes that we haven't seen before, which will make a full
366+
// iteration of the blockHashes linked list.
367+
if entry.hash.Cmp(hash) == 0 {
368+
return true
369+
}
370+
// Remove entries older than blockHashTTL.
371+
if now.Sub(entry.time) > blockHashTTL {
372+
c.blockHashes.Remove(e)
373+
}
374+
}
375+
return false
376+
}
377+
320378
func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error {
321379
var txs eth.TransactionsPacket
322380
if err := msg.Decode(&txs); err != nil {

0 commit comments

Comments
 (0)