Skip to content

Commit f4d73db

Browse files
committed
fix: cache
1 parent c816221 commit f4d73db

File tree

4 files changed

+60
-44
lines changed

4 files changed

+60
-44
lines changed

cmd/p2p/sensor/sensor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type (
5858
MaxCachedBlocks int
5959
MaxKnownTxs int
6060
MaxKnownBlocks int
61+
MaxRequests int
6162
CacheTTL time.Duration
6263
PeerCacheTTL time.Duration
6364
ShouldRunPprof bool
@@ -235,6 +236,7 @@ var SensorCmd = &cobra.Command{
235236
ShouldBroadcastBlockHashes: inputSensorParams.ShouldBroadcastBlockHashes,
236237
MaxKnownTxs: inputSensorParams.MaxKnownTxs,
237238
MaxKnownBlocks: inputSensorParams.MaxKnownBlocks,
239+
MaxRequests: inputSensorParams.MaxRequests,
238240
PeerCacheTTL: inputSensorParams.PeerCacheTTL,
239241
}
240242

@@ -483,6 +485,7 @@ will result in less chance of missing data but can significantly increase memory
483485
f.DurationVar(&inputSensorParams.CacheTTL, "cache-ttl", 10*time.Minute, "time to live for cached transactions and blocks")
484486
f.IntVar(&inputSensorParams.MaxKnownTxs, "max-known-txs", 8192, "maximum transaction hashes to track per peer")
485487
f.IntVar(&inputSensorParams.MaxKnownBlocks, "max-known-blocks", 1024, "maximum block hashes to track per peer")
488+
f.IntVar(&inputSensorParams.MaxRequests, "max-requests", 2048, "maximum request IDs to track per peer")
486489
f.DurationVar(&inputSensorParams.PeerCacheTTL, "peer-cache-ttl", 5*time.Minute, "time to live for per-peer caches (known tx/block hashes and requests)")
487490
f.BoolVar(&inputSensorParams.ShouldRunPprof, "pprof", false, "run pprof server")
488491
f.UintVar(&inputSensorParams.PprofPort, "pprof-port", 6060, "port pprof runs on")

doc/polycli_p2p_sensor.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ If no nodes.json file exists, it will be created.
5050
--max-known-blocks int maximum block hashes to track per peer (default 1024)
5151
--max-known-txs int maximum transaction hashes to track per peer (default 8192)
5252
-m, --max-peers int maximum number of peers to connect to (default 2000)
53+
--max-requests int maximum request IDs to track per peer (default 2048)
5354
--nat string NAT port mapping mechanism (any|none|upnp|pmp|pmp:<IP>|extip:<IP>) (default "any")
5455
-n, --network-id uint filter discovered nodes by this network ID
5556
--no-discovery disable P2P peer discovery

p2p/cache.go

Lines changed: 54 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,29 @@ import (
66
"time"
77
)
88

9+
// Cache is a thread-safe LRU cache with optional TTL-based expiration.
910
type Cache[K comparable, V any] struct {
1011
mu sync.RWMutex
1112
maxSize int
1213
ttl time.Duration
14+
items map[K]*list.Element
1315
list *list.List
1416
}
1517

16-
// entry represents an entry in the cache.
1718
type entry[K comparable, V any] struct {
1819
key K
1920
value V
2021
expiresAt time.Time
2122
}
2223

2324
// NewCache creates a new cache with the given max size and optional TTL.
24-
// If maxSize is 0 or negative, the cache has no size limit (only TTL eviction).
25+
// If maxSize <= 0, the cache has no size limit.
2526
// If ttl is 0, entries never expire based on time.
2627
func NewCache[K comparable, V any](maxSize int, ttl time.Duration) *Cache[K, V] {
2728
return &Cache[K, V]{
2829
maxSize: maxSize,
2930
ttl: ttl,
31+
items: make(map[K]*list.Element),
3032
list: list.New(),
3133
}
3234
}
@@ -42,74 +44,84 @@ func (c *Cache[K, V]) Add(key K, value V) {
4244
expiresAt = now.Add(c.ttl)
4345
}
4446

45-
// Check if key exists, update it and move to front
46-
for elem := c.list.Front(); elem != nil; elem = elem.Next() {
47+
if elem, ok := c.items[key]; ok {
48+
c.list.MoveToFront(elem)
4749
e := elem.Value.(*entry[K, V])
48-
if e.key == key {
49-
c.list.MoveToFront(elem)
50-
e.value = value
51-
e.expiresAt = expiresAt
52-
return
53-
}
50+
e.value = value
51+
e.expiresAt = expiresAt
52+
return
5453
}
5554

56-
// Add new entry at front
5755
e := &entry[K, V]{
5856
key: key,
5957
value: value,
6058
expiresAt: expiresAt,
6159
}
62-
c.list.PushFront(e)
60+
elem := c.list.PushFront(e)
61+
c.items[key] = elem
6362

64-
// Evict oldest if over max size (only if maxSize is set)
6563
if c.maxSize > 0 && c.list.Len() > c.maxSize {
66-
c.list.Remove(c.list.Back())
64+
back := c.list.Back()
65+
if back != nil {
66+
c.list.Remove(back)
67+
e := back.Value.(*entry[K, V])
68+
delete(c.items, e.key)
69+
}
6770
}
6871
}
6972

70-
// Get retrieves a value from the cache.
71-
// Returns the value and true if found and not expired, otherwise zero value and false.
73+
// Get retrieves a value from the cache and updates LRU ordering.
7274
func (c *Cache[K, V]) Get(key K) (V, bool) {
7375
c.mu.Lock()
7476
defer c.mu.Unlock()
7577

76-
now := time.Now()
77-
for elem := c.list.Front(); elem != nil; elem = elem.Next() {
78-
e := elem.Value.(*entry[K, V])
79-
if e.key == key {
80-
// Check if expired
81-
if c.ttl > 0 && now.After(e.expiresAt) {
82-
c.list.Remove(elem)
83-
var zero V
84-
return zero, false
85-
}
86-
// Move to front (LRU)
87-
c.list.MoveToFront(elem)
88-
return e.value, true
89-
}
78+
elem, ok := c.items[key]
79+
if !ok {
80+
var zero V
81+
return zero, false
9082
}
9183

92-
var zero V
93-
return zero, false
84+
e := elem.Value.(*entry[K, V])
85+
86+
if c.ttl > 0 && time.Now().After(e.expiresAt) {
87+
c.list.Remove(elem)
88+
delete(c.items, key)
89+
var zero V
90+
return zero, false
91+
}
92+
93+
c.list.MoveToFront(elem)
94+
return e.value, true
9495
}
9596

9697
// Contains checks if a key exists in the cache and is not expired.
98+
// Uses a read lock and doesn't update LRU ordering.
9799
func (c *Cache[K, V]) Contains(key K) bool {
98-
_, ok := c.Get(key)
99-
return ok
100+
c.mu.RLock()
101+
defer c.mu.RUnlock()
102+
103+
elem, ok := c.items[key]
104+
if !ok {
105+
return false
106+
}
107+
108+
e := elem.Value.(*entry[K, V])
109+
110+
if c.ttl > 0 && time.Now().After(e.expiresAt) {
111+
return false
112+
}
113+
114+
return true
100115
}
101116

102117
// Remove removes a key from the cache.
103118
func (c *Cache[K, V]) Remove(key K) {
104119
c.mu.Lock()
105120
defer c.mu.Unlock()
106121

107-
for elem := c.list.Front(); elem != nil; elem = elem.Next() {
108-
e := elem.Value.(*entry[K, V])
109-
if e.key == key {
110-
c.list.Remove(elem)
111-
return
112-
}
122+
if elem, ok := c.items[key]; ok {
123+
c.list.Remove(elem)
124+
delete(c.items, key)
113125
}
114126
}
115127

@@ -125,10 +137,11 @@ func (c *Cache[K, V]) Purge() {
125137
c.mu.Lock()
126138
defer c.mu.Unlock()
127139

140+
c.items = make(map[K]*list.Element)
128141
c.list.Init()
129142
}
130143

131-
// Keys returns all keys in the cache (including potentially expired ones).
144+
// Keys returns all keys in the cache.
132145
func (c *Cache[K, V]) Keys() []K {
133146
c.mu.RLock()
134147
defer c.mu.RUnlock()

p2p/protocol.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ type EthProtocolOptions struct {
8787
// Cache sizes for known tx/block tracking per peer
8888
MaxKnownTxs int
8989
MaxKnownBlocks int
90+
MaxRequests int
9091
PeerCacheTTL time.Duration
9192
}
9293

@@ -126,11 +127,9 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
126127
connectedAt: time.Now(),
127128
}
128129

129-
// Initialize per-peer caches with configured TTL
130130
c.knownTxs = NewCache[common.Hash, struct{}](opts.MaxKnownTxs, opts.PeerCacheTTL)
131131
c.knownBlocks = NewCache[common.Hash, struct{}](opts.MaxKnownBlocks, opts.PeerCacheTTL)
132-
// Initialize requests cache with no size limit, uses peer cache TTL
133-
c.requests = NewCache[uint64, common.Hash](0, opts.PeerCacheTTL)
132+
c.requests = NewCache[uint64, common.Hash](opts.MaxRequests, opts.PeerCacheTTL)
134133

135134
c.headMutex.RLock()
136135
status := eth.StatusPacket{

0 commit comments

Comments
 (0)