Skip to content

Commit a0c7e5e

Browse files
committed
p2p/enode: migrate nodedb from leveldb to pebble
1 parent 3954259 commit a0c7e5e

File tree

1 file changed

+60
-51
lines changed

1 file changed

+60
-51
lines changed

p2p/enode/nodedb.go

Lines changed: 60 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,20 @@ import (
2020
"bytes"
2121
"crypto/rand"
2222
"encoding/binary"
23+
"errors"
2324
"fmt"
2425
"net/netip"
2526
"os"
2627
"sync"
2728
"time"
2829

30+
31+
"github.com/ethereum/go-ethereum/ethdb"
32+
"github.com/ethereum/go-ethereum/ethdb/memorydb"
33+
"github.com/ethereum/go-ethereum/ethdb/pebble"
2934
"github.com/ethereum/go-ethereum/p2p/enr"
3035
"github.com/ethereum/go-ethereum/rlp"
31-
"github.com/syndtr/goleveldb/leveldb"
32-
"github.com/syndtr/goleveldb/leveldb/errors"
33-
"github.com/syndtr/goleveldb/leveldb/iterator"
34-
"github.com/syndtr/goleveldb/leveldb/opt"
35-
"github.com/syndtr/goleveldb/leveldb/storage"
36-
"github.com/syndtr/goleveldb/leveldb/util"
36+
3737
)
3838

3939
// Keys in the node database.
@@ -71,7 +71,7 @@ var zeroIP = netip.IPv6Unspecified()
7171
// DB is the node database, storing previously seen nodes and any collected metadata about
7272
// them for QoS purposes.
7373
type DB struct {
74-
lvl *leveldb.DB // Interface to the database itself
74+
db ethdb.KeyValueStore // Interface to the database itself
7575
runner sync.Once // Ensures we can start at most one expirer
7676
quit chan struct{} // Channel to signal the expiring thread to stop
7777
}
@@ -87,21 +87,14 @@ func OpenDB(path string) (*DB, error) {
8787

8888
// newMemoryDB creates a new in-memory node database without a persistent backend.
8989
func newMemoryDB() (*DB, error) {
90-
db, err := leveldb.Open(storage.NewMemStorage(), nil)
91-
if err != nil {
92-
return nil, err
93-
}
94-
return &DB{lvl: db, quit: make(chan struct{})}, nil
90+
db := memorydb.New()
91+
return &DB{db: db, quit: make(chan struct{})}, nil
9592
}
9693

9794
// newPersistentDB creates/opens a leveldb backed persistent node database,
9895
// also flushing its contents in case of a version mismatch.
9996
func newPersistentDB(path string) (*DB, error) {
100-
opts := &opt.Options{OpenFilesCacheCapacity: 5}
101-
db, err := leveldb.OpenFile(path, opts)
102-
if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted {
103-
db, err = leveldb.RecoverFile(path, nil)
104-
}
97+
db, err := pebble.New(path, 16, 16, "", false)
10598
if err != nil {
10699
return nil, err
107100
}
@@ -110,16 +103,9 @@ func newPersistentDB(path string) (*DB, error) {
110103
currentVer := make([]byte, binary.MaxVarintLen64)
111104
currentVer = currentVer[:binary.PutVarint(currentVer, int64(dbVersion))]
112105

113-
blob, err := db.Get([]byte(dbVersionKey), nil)
106+
blob, err := db.Get([]byte(dbVersionKey))
114107
switch err {
115-
case leveldb.ErrNotFound:
116-
// Version not found (i.e. empty cache), insert it
117-
if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil {
118-
db.Close()
119-
return nil, err
120-
}
121-
122-
case nil:
108+
case nil:
123109
// Version present, flush if different
124110
if !bytes.Equal(blob, currentVer) {
125111
db.Close()
@@ -128,8 +114,16 @@ func newPersistentDB(path string) (*DB, error) {
128114
}
129115
return newPersistentDB(path)
130116
}
117+
118+
default:
119+
// Version not found (i.e. empty cache), insert it
120+
121+
if err := db.Put([]byte(dbVersionKey), currentVer); err != nil {
122+
db.Close()
123+
return nil, err
124+
}
131125
}
132-
return &DB{lvl: db, quit: make(chan struct{})}, nil
126+
return &DB{db: db, quit: make(chan struct{})}, nil
133127
}
134128

135129
// nodeKey returns the database key for a node record.
@@ -196,7 +190,7 @@ func localItemKey(id ID, field string) []byte {
196190

197191
// fetchInt64 retrieves an integer associated with a particular key.
198192
func (db *DB) fetchInt64(key []byte) int64 {
199-
blob, err := db.lvl.Get(key, nil)
193+
blob, err := db.db.Get(key)
200194
if err != nil {
201195
return 0
202196
}
@@ -211,12 +205,12 @@ func (db *DB) fetchInt64(key []byte) int64 {
211205
func (db *DB) storeInt64(key []byte, n int64) error {
212206
blob := make([]byte, binary.MaxVarintLen64)
213207
blob = blob[:binary.PutVarint(blob, n)]
214-
return db.lvl.Put(key, blob, nil)
208+
return db.db.Put(key, blob)
215209
}
216210

217211
// fetchUint64 retrieves an integer associated with a particular key.
218212
func (db *DB) fetchUint64(key []byte) uint64 {
219-
blob, err := db.lvl.Get(key, nil)
213+
blob, err := db.db.Get(key)
220214
if err != nil {
221215
return 0
222216
}
@@ -228,12 +222,12 @@ func (db *DB) fetchUint64(key []byte) uint64 {
228222
func (db *DB) storeUint64(key []byte, n uint64) error {
229223
blob := make([]byte, binary.MaxVarintLen64)
230224
blob = blob[:binary.PutUvarint(blob, n)]
231-
return db.lvl.Put(key, blob, nil)
225+
return db.db.Put(key, blob)
232226
}
233227

234228
// Node retrieves a node with a given id from the database.
235229
func (db *DB) Node(id ID) *Node {
236-
blob, err := db.lvl.Get(nodeKey(id), nil)
230+
blob, err := db.db.Get(nodeKey(id))
237231
if err != nil {
238232
return nil
239233
}
@@ -260,7 +254,7 @@ func (db *DB) UpdateNode(node *Node) error {
260254
if err != nil {
261255
return err
262256
}
263-
if err := db.lvl.Put(nodeKey(node.ID()), blob, nil); err != nil {
257+
if err := db.db.Put(nodeKey(node.ID()), blob); err != nil {
264258
return err
265259
}
266260
return db.storeUint64(nodeItemKey(node.ID(), zeroIP, dbNodeSeq), node.Seq())
@@ -282,14 +276,14 @@ func (db *DB) Resolve(n *Node) *Node {
282276

283277
// DeleteNode deletes all information associated with a node.
284278
func (db *DB) DeleteNode(id ID) {
285-
deleteRange(db.lvl, nodeKey(id))
279+
deleteRange(db.db, nodeKey(id))
286280
}
287281

288-
func deleteRange(db *leveldb.DB, prefix []byte) {
289-
it := db.NewIterator(util.BytesPrefix(prefix), nil)
282+
func deleteRange(db ethdb.KeyValueStore, prefix []byte) {
283+
it := db.NewIterator(prefix, nil)
290284
defer it.Release()
291285
for it.Next() {
292-
db.Delete(it.Key(), nil)
286+
db.Delete(it.Key())
293287
}
294288
}
295289

@@ -324,7 +318,7 @@ func (db *DB) expirer() {
324318
// expireNodes iterates over the database and deletes all nodes that have not
325319
// been seen (i.e. received a pong from) for some time.
326320
func (db *DB) expireNodes() {
327-
it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil)
321+
it := db.db.NewIterator([]byte(dbNodePrefix), nil)
328322
defer it.Release()
329323
if !it.Next() {
330324
return
@@ -344,7 +338,7 @@ func (db *DB) expireNodes() {
344338
}
345339
if time < threshold {
346340
// Last pong from this IP older than threshold, remove fields belonging to it.
347-
deleteRange(db.lvl, nodeItemKey(id, ip, ""))
341+
deleteRange(db.db, nodeItemKey(id, ip, ""))
348342
}
349343
}
350344
atEnd = !it.Next()
@@ -353,7 +347,7 @@ func (db *DB) expireNodes() {
353347
// We've moved beyond the last entry of the current ID.
354348
// Remove everything if there was no recent enough pong.
355349
if youngestPong > 0 && youngestPong < threshold {
356-
deleteRange(db.lvl, nodeKey(id))
350+
deleteRange(db.db, nodeKey(id))
357351
}
358352
youngestPong = 0
359353
}
@@ -448,11 +442,9 @@ func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
448442
var (
449443
now = time.Now()
450444
nodes = make([]*Node, 0, n)
451-
it = db.lvl.NewIterator(nil, nil)
452445
id ID
453446
)
454-
defer it.Release()
455-
447+
456448
seek:
457449
for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
458450
// Seek to a random entry. The first byte is incremented by a
@@ -461,29 +453,46 @@ seek:
461453
ctr := id[0]
462454
rand.Read(id[:])
463455
id[0] = ctr + id[0]%16
464-
it.Seek(nodeKey(id))
465456

466-
n := nextNode(it)
467-
if n == nil {
457+
// Create iterator starting from the random node key
458+
targetKey := nodeKey(id)
459+
it := db.db.NewIterator([]byte(dbNodePrefix),nil)
460+
461+
// Manually advance to find nodes at or after the target key
462+
var node *Node
463+
for it.Next() {
464+
// Check if we've reached or passed our target
465+
if bytes.Compare(it.Key(), targetKey) >= 0 {
466+
// Try to parse this as a node
467+
nodeID, rest := splitNodeKey(it.Key())
468+
if string(rest) == dbDiscoverRoot {
469+
node = mustDecodeNode(nodeID[:], it.Value())
470+
break
471+
}
472+
}
473+
}
474+
it.Release()
475+
476+
if node == nil {
468477
id[0] = 0
469478
continue seek // iterator exhausted
470479
}
471-
if now.Sub(db.LastPongReceived(n.ID(), n.IPAddr())) > maxAge {
480+
if now.Sub(db.LastPongReceived(node.ID(), node.IPAddr())) > maxAge {
472481
continue seek
473482
}
474483
for i := range nodes {
475-
if nodes[i].ID() == n.ID() {
484+
if nodes[i].ID() == node.ID() {
476485
continue seek // duplicate
477486
}
478487
}
479-
nodes = append(nodes, n)
488+
nodes = append(nodes, node)
480489
}
481490
return nodes
482491
}
483492

484493
// reads the next node record from the iterator, skipping over other
485494
// database entries.
486-
func nextNode(it iterator.Iterator) *Node {
495+
func nextNode(it ethdb.Iterator) *Node {
487496
for end := false; !end; end = !it.Next() {
488497
id, rest := splitNodeKey(it.Key())
489498
if string(rest) != dbDiscoverRoot {
@@ -501,5 +510,5 @@ func (db *DB) Close() {
501510
default:
502511
close(db.quit)
503512
}
504-
db.lvl.Close()
513+
db.db.Close()
505514
}

0 commit comments

Comments
 (0)