Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ coverage.out
*.swo

wallets.json

*.key
90 changes: 74 additions & 16 deletions p2p/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type conn struct {
requests *list.List
requestNum uint64

// Linked list of seen block hashes with timestamps.
blockHashes *list.List

// oldestBlock stores the first block the sensor has seen so when fetching
// parent blocks, it does not request blocks older than this.
oldestBlock *types.Header
Expand Down Expand Up @@ -72,6 +75,14 @@ type HeadBlock struct {
Time uint64
}

type BlockHashEntry struct {
hash common.Hash
time time.Time
}

// blockHashTTL defines the time-to-live for block hash entries in blockHashes list.
var blockHashTTL = 10 * time.Minute

// NewEthProctocol creates the new eth protocol. This will handle writing the
// status exchange, message handling, and writing blocks/txs to the database.
func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
Expand All @@ -81,17 +92,18 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
Length: 17,
Run: func(p *ethp2p.Peer, rw ethp2p.MsgReadWriter) error {
c := conn{
sensorID: opts.SensorID,
node: p.Node(),
logger: log.With().Str("peer", p.Node().URLv4()).Logger(),
rw: rw,
db: opts.Database,
requests: list.New(),
requestNum: 0,
head: opts.Head,
headMutex: opts.HeadMutex,
counter: opts.MsgCounter,
name: p.Fullname(),
sensorID: opts.SensorID,
node: p.Node(),
logger: log.With().Str("peer", p.Node().URLv4()).Logger(),
rw: rw,
db: opts.Database,
requests: list.New(),
requestNum: 0,
head: opts.Head,
headMutex: opts.HeadMutex,
counter: opts.MsgCounter,
name: p.Fullname(),
blockHashes: list.New(),
}

c.headMutex.RLock()
Expand Down Expand Up @@ -304,19 +316,65 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error {

c.counter.WithLabelValues(packet.Name(), c.node.URLv4(), c.name).Add(float64(len(packet)))

hashes := make([]common.Hash, 0, len(packet))
for _, hash := range packet {
hashes = append(hashes, hash.Hash)
if err := c.getBlockData(hash.Hash); err != nil {
// Collect unique hashes for database write.
uniqueHashes := make([]common.Hash, 0, len(packet))

for _, entry := range packet {
hash := entry.Hash

// Check if we've seen the hash and remove old entries
if c.hasSeenBlockHash(hash) {
continue
}

// Attempt to fetch block data first
if err := c.getBlockData(hash); err != nil {
return err
}

// Now that we've successfully fetched, record the new block hash
c.addBlockHash(hash)
uniqueHashes = append(uniqueHashes, hash)
}

c.db.WriteBlockHashes(ctx, c.node, hashes, tfs)
// Write only unique hashes to the database.
if len(uniqueHashes) > 0 {
c.db.WriteBlockHashes(ctx, c.node, uniqueHashes, tfs)
}

return nil
}

// addBlockHash adds a new block hash with a timestamp to the blockHashes list.
func (c *conn) addBlockHash(hash common.Hash) {
now := time.Now()

// Add the new block hash entry to the list.
c.blockHashes.PushBack(BlockHashEntry{
hash: hash,
time: now,
})
}

// Helper method to check if a block hash is already in blockHashes.
func (c *conn) hasSeenBlockHash(hash common.Hash) bool {
now := time.Now()
for e := c.blockHashes.Front(); e != nil; e = e.Next() {
entry := e.Value.(BlockHashEntry)
// Check if the hash matches. We can short circuit here because there will
// be block hashes that we haven't seen before, which will make a full
// iteration of the blockHashes linked list.
if entry.hash.Cmp(hash) == 0 {
return true
}
// Remove entries older than blockHashTTL.
if now.Sub(entry.time) > blockHashTTL {
c.blockHashes.Remove(e)
}
}
return false
}

func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error {
var txs eth.TransactionsPacket
if err := msg.Decode(&txs); err != nil {
Expand Down
Loading