Skip to content

Commit 2e58eb5

Browse files
committed
feat(sensor): use LRU with TTL for requests cache
1 parent e9389ae commit 2e58eb5

File tree

4 files changed

+183
-46
lines changed

4 files changed

+183
-46
lines changed

cmd/p2p/sensor/sensor.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ type (
6969
DiscoveryDNS string
7070
Database string
7171
NoDiscovery bool
72+
MaxRequests int
73+
RequestCacheTTL time.Duration
7274

7375
bootnodes []*enode.Node
7476
staticNodes []*enode.Node
@@ -193,17 +195,19 @@ var SensorCmd = &cobra.Command{
193195
conns := p2p.NewConns()
194196

195197
opts := p2p.EthProtocolOptions{
196-
Context: cmd.Context(),
197-
Database: db,
198-
GenesisHash: common.HexToHash(inputSensorParams.GenesisHash),
199-
RPC: inputSensorParams.RPC,
200-
SensorID: inputSensorParams.SensorID,
201-
NetworkID: inputSensorParams.NetworkID,
202-
Conns: conns,
203-
Head: &head,
204-
HeadMutex: &sync.RWMutex{},
205-
ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)},
206-
MsgCounter: msgCounter,
198+
Context: cmd.Context(),
199+
Database: db,
200+
GenesisHash: common.HexToHash(inputSensorParams.GenesisHash),
201+
RPC: inputSensorParams.RPC,
202+
SensorID: inputSensorParams.SensorID,
203+
NetworkID: inputSensorParams.NetworkID,
204+
Conns: conns,
205+
Head: &head,
206+
HeadMutex: &sync.RWMutex{},
207+
ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)},
208+
MsgCounter: msgCounter,
209+
MaxRequests: inputSensorParams.MaxRequests,
210+
RequestCacheTTL: inputSensorParams.RequestCacheTTL,
207211
}
208212

209213
config := ethp2p.Config{
@@ -476,4 +480,6 @@ will result in less chance of missing data but can significantly increase memory
476480
- json (output to stdout)
477481
- none (no persistence)`)
478482
f.BoolVar(&inputSensorParams.NoDiscovery, "no-discovery", false, "disable P2P peer discovery")
483+
f.IntVar(&inputSensorParams.MaxRequests, "max-requests", 2048, "maximum request IDs to track per peer (0 for no limit)")
484+
f.DurationVar(&inputSensorParams.RequestCacheTTL, "requests-cache-ttl", 5*time.Minute, "time to live for requests cache entries (0 for no expiration)")
479485
}

p2p/cache.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package p2p
2+
3+
import (
4+
"container/list"
5+
"sync"
6+
"time"
7+
)
8+
9+
// Cache is a thread-safe LRU cache with optional TTL-based expiration.
10+
type Cache[K comparable, V any] struct {
11+
mu sync.RWMutex
12+
maxSize int
13+
ttl time.Duration
14+
items map[K]*list.Element
15+
list *list.List
16+
}
17+
18+
type entry[K comparable, V any] struct {
19+
key K
20+
value V
21+
expiresAt time.Time
22+
}
23+
24+
// NewCache creates a new cache with the given max size and optional TTL.
25+
// If maxSize <= 0, the cache has no size limit.
26+
// If ttl is 0, entries never expire based on time.
27+
func NewCache[K comparable, V any](maxSize int, ttl time.Duration) *Cache[K, V] {
28+
return &Cache[K, V]{
29+
maxSize: maxSize,
30+
ttl: ttl,
31+
items: make(map[K]*list.Element),
32+
list: list.New(),
33+
}
34+
}
35+
36+
// Add adds or updates a value in the cache.
37+
func (c *Cache[K, V]) Add(key K, value V) {
38+
c.mu.Lock()
39+
defer c.mu.Unlock()
40+
41+
now := time.Now()
42+
expiresAt := time.Time{}
43+
if c.ttl > 0 {
44+
expiresAt = now.Add(c.ttl)
45+
}
46+
47+
if elem, ok := c.items[key]; ok {
48+
c.list.MoveToFront(elem)
49+
e := elem.Value.(*entry[K, V])
50+
e.value = value
51+
e.expiresAt = expiresAt
52+
return
53+
}
54+
55+
e := &entry[K, V]{
56+
key: key,
57+
value: value,
58+
expiresAt: expiresAt,
59+
}
60+
elem := c.list.PushFront(e)
61+
c.items[key] = elem
62+
63+
if c.maxSize > 0 && c.list.Len() > c.maxSize {
64+
back := c.list.Back()
65+
if back != nil {
66+
c.list.Remove(back)
67+
e := back.Value.(*entry[K, V])
68+
delete(c.items, e.key)
69+
}
70+
}
71+
}
72+
73+
// Get retrieves a value from the cache and updates LRU ordering.
74+
func (c *Cache[K, V]) Get(key K) (V, bool) {
75+
c.mu.Lock()
76+
defer c.mu.Unlock()
77+
78+
elem, ok := c.items[key]
79+
if !ok {
80+
var zero V
81+
return zero, false
82+
}
83+
84+
e := elem.Value.(*entry[K, V])
85+
86+
if c.ttl > 0 && time.Now().After(e.expiresAt) {
87+
c.list.Remove(elem)
88+
delete(c.items, key)
89+
var zero V
90+
return zero, false
91+
}
92+
93+
c.list.MoveToFront(elem)
94+
return e.value, true
95+
}
96+
97+
// Contains checks if a key exists in the cache and is not expired.
98+
// Uses a read lock and doesn't update LRU ordering.
99+
func (c *Cache[K, V]) Contains(key K) bool {
100+
c.mu.RLock()
101+
defer c.mu.RUnlock()
102+
103+
elem, ok := c.items[key]
104+
if !ok {
105+
return false
106+
}
107+
108+
e := elem.Value.(*entry[K, V])
109+
110+
if c.ttl > 0 && time.Now().After(e.expiresAt) {
111+
return false
112+
}
113+
114+
return true
115+
}
116+
117+
// Remove removes a key from the cache.
118+
func (c *Cache[K, V]) Remove(key K) {
119+
c.mu.Lock()
120+
defer c.mu.Unlock()
121+
122+
if elem, ok := c.items[key]; ok {
123+
c.list.Remove(elem)
124+
delete(c.items, key)
125+
}
126+
}
127+
128+
// Len returns the number of items in the cache.
129+
func (c *Cache[K, V]) Len() int {
130+
c.mu.RLock()
131+
defer c.mu.RUnlock()
132+
return c.list.Len()
133+
}
134+
135+
// Purge clears all items from the cache.
136+
func (c *Cache[K, V]) Purge() {
137+
c.mu.Lock()
138+
defer c.mu.Unlock()
139+
140+
c.items = make(map[K]*list.Element)
141+
c.list.Init()
142+
}
143+
144+
// Keys returns all keys in the cache.
145+
func (c *Cache[K, V]) Keys() []K {
146+
c.mu.RLock()
147+
defer c.mu.RUnlock()
148+
149+
keys := make([]K, 0, c.list.Len())
150+
for elem := c.list.Front(); elem != nil; elem = elem.Next() {
151+
e := elem.Value.(*entry[K, V])
152+
keys = append(keys, e.key)
153+
}
154+
return keys
155+
}

p2p/protocol.go

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type conn struct {
3838
// requests is used to store the request ID and the block hash. This is used
3939
// when fetching block bodies because the eth protocol block bodies do not
4040
// contain information about the block hash.
41-
requests *list.List
41+
requests *Cache[uint64, common.Hash]
4242
requestNum uint64
4343

4444
// Linked list of seen block hashes with timestamps.
@@ -65,6 +65,10 @@ type EthProtocolOptions struct {
6565
// when doing the status exchange.
6666
Head *HeadBlock
6767
HeadMutex *sync.RWMutex
68+
69+
// Request cache configuration
70+
MaxRequests int
71+
RequestCacheTTL time.Duration
6872
}
6973

7074
// HeadBlock contains the necessary head block data for the status message.
@@ -97,7 +101,7 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
97101
logger: log.With().Str("peer", p.Node().URLv4()).Logger(),
98102
rw: rw,
99103
db: opts.Database,
100-
requests: list.New(),
104+
requests: NewCache[uint64, common.Hash](opts.MaxRequests, opts.RequestCacheTTL),
101105
requestNum: 0,
102106
head: opts.Head,
103107
headMutex: opts.HeadMutex,
@@ -264,20 +268,8 @@ func (c *conn) getBlockData(hash common.Hash) error {
264268
return err
265269
}
266270

267-
for e := c.requests.Front(); e != nil; e = e.Next() {
268-
r := e.Value.(request)
269-
270-
if time.Since(r.time).Minutes() > 10 {
271-
c.requests.Remove(e)
272-
}
273-
}
274-
275271
c.requestNum++
276-
c.requests.PushBack(request{
277-
requestID: c.requestNum,
278-
hash: hash,
279-
time: time.Now(),
280-
})
272+
c.requests.Add(c.requestNum, hash)
281273

282274
bodiesRequest := &GetBlockBodies{
283275
RequestId: c.requestNum,
@@ -461,23 +453,14 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error {
461453

462454
c.AddCount(packet.Name(), float64(len(packet.BlockBodiesResponse)))
463455

464-
var hash *common.Hash
465-
for e := c.requests.Front(); e != nil; e = e.Next() {
466-
r := e.Value.(request)
467-
468-
if r.requestID == packet.RequestId {
469-
hash = &r.hash
470-
c.requests.Remove(e)
471-
break
472-
}
473-
}
474-
475-
if hash == nil {
456+
hash, ok := c.requests.Get(packet.RequestId)
457+
if !ok {
476458
c.logger.Warn().Msg("No block hash found for block body")
477459
return nil
478460
}
461+
c.requests.Remove(packet.RequestId)
479462

480-
c.db.WriteBlockBody(ctx, packet.BlockBodiesResponse[0], *hash, tfs)
463+
c.db.WriteBlockBody(ctx, packet.BlockBodiesResponse[0], hash, tfs)
481464

482465
return nil
483466
}

p2p/rlpx.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,13 +167,6 @@ loop:
167167
return status, nil
168168
}
169169

170-
// request stores the request ID and the block's hash.
171-
type request struct {
172-
requestID uint64
173-
hash common.Hash
174-
time time.Time
175-
}
176-
177170
// ReadAndServe reads messages from peers and writes it to a database.
178171
func (c *rlpxConn) ReadAndServe(count *MessageCount) error {
179172
for {

0 commit comments

Comments
 (0)