Skip to content

Commit 8d7e0ac

Browse files
authored
feat(sensor): use cache to track seen blocks (#747)
* feat(sensor): use LRU with TTL for requests cache * docs: make gen-doc * fix: rename requests cache variables * fix: comment * fix: requests cache opts name * feat(sensor): use cache to track seen blocks * docs: make gen-doc * feat: global blocks cache * fix: rename method * fix: conditional sends? * fix: flags * chore: use conns opts * fix: revert request spelling * fix: remove wrong check * fix: logic issues * feat: store entire blocks * chore: runAsync refactor * feat: optimizations
1 parent 4bfd49f commit 8d7e0ac

File tree

6 files changed

+251
-124
lines changed

6 files changed

+251
-124
lines changed

cmd/p2p/sensor/sensor.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ type (
7171
NoDiscovery bool
7272
MaxRequests int
7373
RequestsCacheTTL time.Duration
74+
MaxBlocks int
75+
BlocksCacheTTL time.Duration
7476

7577
bootnodes []*enode.Node
7678
staticNodes []*enode.Node
@@ -192,7 +194,11 @@ var SensorCmd = &cobra.Command{
192194
}, []string{"message", "url", "name", "direction"})
193195

194196
// Create peer connection manager for broadcasting transactions
195-
conns := p2p.NewConns()
197+
// and managing the global blocks cache
198+
conns := p2p.NewConns(p2p.ConnsOptions{
199+
MaxBlocks: inputSensorParams.MaxBlocks,
200+
BlocksCacheTTL: inputSensorParams.BlocksCacheTTL,
201+
})
196202

197203
opts := p2p.EthProtocolOptions{
198204
Context: cmd.Context(),
@@ -482,4 +488,6 @@ will result in less chance of missing data but can significantly increase memory
482488
f.BoolVar(&inputSensorParams.NoDiscovery, "no-discovery", false, "disable P2P peer discovery")
483489
f.IntVar(&inputSensorParams.MaxRequests, "max-requests", 2048, "maximum request IDs to track per peer (0 for no limit)")
484490
f.DurationVar(&inputSensorParams.RequestsCacheTTL, "requests-cache-ttl", 5*time.Minute, "time to live for requests cache entries (0 for no expiration)")
491+
f.IntVar(&inputSensorParams.MaxBlocks, "max-blocks", 1024, "maximum blocks to track across all peers (0 for no limit)")
492+
f.DurationVar(&inputSensorParams.BlocksCacheTTL, "blocks-cache-ttl", 10*time.Minute, "time to live for block cache entries (0 for no expiration)")
485493
}

doc/polycli_p2p_sensor.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ If no nodes.json file exists, it will be created.
2424

2525
```bash
2626
--api-port uint port API server will listen on (default 8080)
27+
--blocks-cache-ttl duration time to live for block cache entries (0 for no expiration) (default 10m0s)
2728
-b, --bootnodes string comma separated nodes used for bootstrapping
2829
--database string which database to persist data to, options are:
2930
- datastore (GCP Datastore)
@@ -38,6 +39,7 @@ If no nodes.json file exists, it will be created.
3839
-h, --help help for sensor
3940
--key string hex-encoded private key (cannot be set with --key-file)
4041
-k, --key-file string private key file (cannot be set with --key)
42+
--max-blocks int maximum blocks to track across all peers (0 for no limit) (default 1024)
4143
-D, --max-db-concurrency int maximum number of concurrent database operations to perform (increasing this
4244
will result in less chance of missing data but can significantly increase memory usage) (default 10000)
4345
-m, --max-peers int maximum number of peers to connect to (default 2000)

p2p/cache.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,79 @@ func (c *Cache[K, V]) Get(key K) (V, bool) {
9494
return e.value, true
9595
}
9696

97+
// Peek retrieves a value from the cache without updating LRU ordering.
98+
// Uses a read lock for better concurrency.
99+
func (c *Cache[K, V]) Peek(key K) (V, bool) {
100+
c.mu.RLock()
101+
defer c.mu.RUnlock()
102+
103+
elem, ok := c.items[key]
104+
if !ok {
105+
var zero V
106+
return zero, false
107+
}
108+
109+
e := elem.Value.(*entry[K, V])
110+
111+
if c.ttl > 0 && time.Now().After(e.expiresAt) {
112+
var zero V
113+
return zero, false
114+
}
115+
116+
return e.value, true
117+
}
118+
119+
// Update atomically updates a value in the cache using the provided update function.
120+
// The update function receives the current value (or zero value if not found) and
121+
// returns the new value to store. This is thread-safe and prevents race conditions
122+
// in get-modify-add patterns.
123+
func (c *Cache[K, V]) Update(key K, updateFn func(V) V) {
124+
c.mu.Lock()
125+
defer c.mu.Unlock()
126+
127+
now := time.Now()
128+
expiresAt := time.Time{}
129+
if c.ttl > 0 {
130+
expiresAt = now.Add(c.ttl)
131+
}
132+
133+
var currentVal V
134+
if elem, ok := c.items[key]; ok {
135+
e := elem.Value.(*entry[K, V])
136+
if c.ttl == 0 || !now.After(e.expiresAt) {
137+
currentVal = e.value
138+
// Update existing entry
139+
c.list.MoveToFront(elem)
140+
e.value = updateFn(currentVal)
141+
e.expiresAt = expiresAt
142+
return
143+
}
144+
// Entry expired, remove it
145+
c.list.Remove(elem)
146+
delete(c.items, key)
147+
}
148+
149+
// Create new entry
150+
newVal := updateFn(currentVal)
151+
e := &entry[K, V]{
152+
key: key,
153+
value: newVal,
154+
expiresAt: expiresAt,
155+
}
156+
elem := c.list.PushFront(e)
157+
c.items[key] = elem
158+
159+
// Enforce size limit
160+
if c.maxSize > 0 && c.list.Len() > c.maxSize {
161+
back := c.list.Back()
162+
if back != nil {
163+
c.list.Remove(back)
164+
e := back.Value.(*entry[K, V])
165+
delete(c.items, e.key)
166+
}
167+
}
168+
}
169+
97170
// Contains checks if a key exists in the cache and is not expired.
98171
// Uses a read lock and doesn't update LRU ordering.
99172
func (c *Cache[K, V]) Contains(key K) bool {

p2p/conns.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,37 @@ import (
44
"sync"
55
"time"
66

7+
"github.com/ethereum/go-ethereum/common"
78
"github.com/ethereum/go-ethereum/core/types"
89
"github.com/ethereum/go-ethereum/eth/protocols/eth"
910
ethp2p "github.com/ethereum/go-ethereum/p2p"
1011
"github.com/ethereum/go-ethereum/p2p/enode"
1112
)
1213

1314
// Conns manages a collection of active peer connections for transaction broadcasting.
15+
// It also maintains a global cache of blocks written to the database.
1416
type Conns struct {
1517
conns map[string]*conn
1618
mu sync.RWMutex
19+
20+
// blocks tracks blocks written to the database across all peers
21+
// to avoid duplicate writes and requests.
22+
blocks *Cache[common.Hash, BlockCache]
23+
}
24+
25+
// ConnsOptions contains configuration options for creating a new Conns manager.
26+
type ConnsOptions struct {
27+
// MaxBlocks is the maximum number of blocks to track in the cache.
28+
MaxBlocks int
29+
// BlocksCacheTTL is the time to live for block cache entries.
30+
BlocksCacheTTL time.Duration
1731
}
1832

19-
// NewConns creates a new connection manager.
20-
func NewConns() *Conns {
33+
// NewConns creates a new connection manager with a blocks cache.
34+
func NewConns(opts ConnsOptions) *Conns {
2135
return &Conns{
22-
conns: make(map[string]*conn),
36+
conns: make(map[string]*conn),
37+
blocks: NewCache[common.Hash, BlockCache](opts.MaxBlocks, opts.BlocksCacheTTL),
2338
}
2439
}
2540

@@ -91,3 +106,8 @@ func (c *Conns) GetPeerConnectedAt(peerID string) time.Time {
91106

92107
return time.Time{}
93108
}
109+
110+
// Blocks returns the global blocks cache.
111+
func (c *Conns) Blocks() *Cache[common.Hash, BlockCache] {
112+
return c.blocks
113+
}

p2p/database/datastore.go

Lines changed: 27 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -152,26 +152,32 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database {
152152
}
153153
}
154154

155+
// runAsync executes the provided function asynchronously with concurrency control.
156+
// It uses the jobs channel as a semaphore to limit concurrent operations.
157+
func (d *Datastore) runAsync(fn func()) {
158+
d.jobs <- struct{}{}
159+
go func() {
160+
fn()
161+
<-d.jobs
162+
}()
163+
}
164+
155165
// WriteBlock writes the block and the block event to datastore.
156166
func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *types.Block, td *big.Int, tfs time.Time) {
157167
if d.client == nil {
158168
return
159169
}
160170

161171
if d.ShouldWriteBlockEvents() {
162-
d.jobs <- struct{}{}
163-
go func() {
172+
d.runAsync(func() {
164173
d.writeEvent(peer, BlockEventsKind, block.Hash(), BlocksKind, tfs)
165-
<-d.jobs
166-
}()
174+
})
167175
}
168176

169177
if d.ShouldWriteBlocks() {
170-
d.jobs <- struct{}{}
171-
go func() {
178+
d.runAsync(func() {
172179
d.writeBlock(ctx, block, td, tfs)
173-
<-d.jobs
174-
}()
180+
})
175181
}
176182
}
177183

@@ -185,11 +191,9 @@ func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Head
185191
}
186192

187193
for _, h := range headers {
188-
d.jobs <- struct{}{}
189-
go func(header *types.Header) {
190-
d.writeBlockHeader(ctx, header, tfs)
191-
<-d.jobs
192-
}(h)
194+
d.runAsync(func() {
195+
d.writeBlockHeader(ctx, h, tfs)
196+
})
193197
}
194198
}
195199

@@ -203,11 +207,9 @@ func (d *Datastore) WriteBlockBody(ctx context.Context, body *eth.BlockBody, has
203207
return
204208
}
205209

206-
d.jobs <- struct{}{}
207-
go func() {
210+
d.runAsync(func() {
208211
d.writeBlockBody(ctx, body, hash, tfs)
209-
<-d.jobs
210-
}()
212+
})
211213
}
212214

213215
// WriteBlockHashes will write the block events to datastore.
@@ -216,11 +218,9 @@ func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hash
216218
return
217219
}
218220

219-
d.jobs <- struct{}{}
220-
go func() {
221+
d.runAsync(func() {
221222
d.writeEvents(ctx, peer, BlockEventsKind, hashes, BlocksKind, tfs)
222-
<-d.jobs
223-
}()
223+
})
224224
}
225225

226226
// WriteTransactions will write the transactions and transaction events to datastore.
@@ -230,11 +230,9 @@ func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs
230230
}
231231

232232
if d.ShouldWriteTransactions() {
233-
d.jobs <- struct{}{}
234-
go func() {
233+
d.runAsync(func() {
235234
d.writeTransactions(ctx, txs, tfs)
236-
<-d.jobs
237-
}()
235+
})
238236
}
239237

240238
if d.ShouldWriteTransactionEvents() {
@@ -243,11 +241,9 @@ func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs
243241
hashes = append(hashes, tx.Hash())
244242
}
245243

246-
d.jobs <- struct{}{}
247-
go func() {
244+
d.runAsync(func() {
248245
d.writeEvents(ctx, peer, TransactionEventsKind, hashes, TransactionsKind, tfs)
249-
<-d.jobs
250-
}()
246+
})
251247
}
252248
}
253249

@@ -257,9 +253,7 @@ func (d *Datastore) WritePeers(ctx context.Context, peers []*p2p.Peer, tls time.
257253
return
258254
}
259255

260-
d.jobs <- struct{}{}
261-
go func() {
262-
256+
d.runAsync(func() {
263257
keys := make([]*datastore.Key, 0, len(peers))
264258
dsPeers := make([]*DatastorePeer, 0, len(peers))
265259

@@ -279,9 +273,7 @@ func (d *Datastore) WritePeers(ctx context.Context, peers []*p2p.Peer, tls time.
279273
if err != nil {
280274
log.Error().Err(err).Msg("Failed to write peers")
281275
}
282-
283-
<-d.jobs
284-
}()
276+
})
285277
}
286278

287279
func (d *Datastore) MaxConcurrentWrites() int {

0 commit comments

Comments
 (0)