Skip to content

Commit 163782f

Browse files
committed
p2p/enode: migrate nodedb from leveldb to pebble
1 parent 3954259 commit 163782f

File tree

1 file changed

+50
-51
lines changed

1 file changed

+50
-51
lines changed

p2p/enode/nodedb.go

Lines changed: 50 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,18 @@ import (
2020
"bytes"
2121
"crypto/rand"
2222
"encoding/binary"
23+
"errors"
2324
"fmt"
2425
"net/netip"
2526
"os"
2627
"sync"
2728
"time"
2829

30+
"github.com/ethereum/go-ethereum/ethdb"
31+
"github.com/ethereum/go-ethereum/ethdb/memorydb"
32+
"github.com/ethereum/go-ethereum/ethdb/pebble"
2933
"github.com/ethereum/go-ethereum/p2p/enr"
3034
"github.com/ethereum/go-ethereum/rlp"
31-
"github.com/syndtr/goleveldb/leveldb"
32-
"github.com/syndtr/goleveldb/leveldb/errors"
33-
"github.com/syndtr/goleveldb/leveldb/iterator"
34-
"github.com/syndtr/goleveldb/leveldb/opt"
35-
"github.com/syndtr/goleveldb/leveldb/storage"
36-
"github.com/syndtr/goleveldb/leveldb/util"
3735
)
3836

3937
// Keys in the node database.
@@ -71,9 +69,9 @@ var zeroIP = netip.IPv6Unspecified()
7169
// DB is the node database, storing previously seen nodes and any collected metadata about
7270
// them for QoS purposes.
7371
type DB struct {
74-
lvl *leveldb.DB // Interface to the database itself
75-
runner sync.Once // Ensures we can start at most one expirer
76-
quit chan struct{} // Channel to signal the expiring thread to stop
72+
db ethdb.KeyValueStore // Interface to the database itself
73+
runner sync.Once // Ensures we can start at most one expirer
74+
quit chan struct{} // Channel to signal the expiring thread to stop
7775
}
7876

7977
// OpenDB opens a node database for storing and retrieving infos about known peers in the
@@ -87,21 +85,14 @@ func OpenDB(path string) (*DB, error) {
8785

8886
// newMemoryDB creates a new in-memory node database without a persistent backend.
8987
func newMemoryDB() (*DB, error) {
90-
db, err := leveldb.Open(storage.NewMemStorage(), nil)
91-
if err != nil {
92-
return nil, err
93-
}
94-
return &DB{lvl: db, quit: make(chan struct{})}, nil
88+
db := memorydb.New()
89+
return &DB{db: db, quit: make(chan struct{})}, nil
9590
}
9691

9792
// newPersistentDB creates/opens a leveldb backed persistent node database,
9893
// also flushing its contents in case of a version mismatch.
9994
func newPersistentDB(path string) (*DB, error) {
100-
opts := &opt.Options{OpenFilesCacheCapacity: 5}
101-
db, err := leveldb.OpenFile(path, opts)
102-
if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted {
103-
db, err = leveldb.RecoverFile(path, nil)
104-
}
95+
db, err := pebble.New(path, 16, 16, "", false)
10596
if err != nil {
10697
return nil, err
10798
}
@@ -110,15 +101,8 @@ func newPersistentDB(path string) (*DB, error) {
110101
currentVer := make([]byte, binary.MaxVarintLen64)
111102
currentVer = currentVer[:binary.PutVarint(currentVer, int64(dbVersion))]
112103

113-
blob, err := db.Get([]byte(dbVersionKey), nil)
104+
blob, err := db.Get([]byte(dbVersionKey))
114105
switch err {
115-
case leveldb.ErrNotFound:
116-
// Version not found (i.e. empty cache), insert it
117-
if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil {
118-
db.Close()
119-
return nil, err
120-
}
121-
122106
case nil:
123107
// Version present, flush if different
124108
if !bytes.Equal(blob, currentVer) {
@@ -128,8 +112,16 @@ func newPersistentDB(path string) (*DB, error) {
128112
}
129113
return newPersistentDB(path)
130114
}
115+
116+
default:
117+
// Version not found (i.e. empty cache), insert it
118+
119+
if err := db.Put([]byte(dbVersionKey), currentVer); err != nil {
120+
db.Close()
121+
return nil, err
122+
}
131123
}
132-
return &DB{lvl: db, quit: make(chan struct{})}, nil
124+
return &DB{db: db, quit: make(chan struct{})}, nil
133125
}
134126

135127
// nodeKey returns the database key for a node record.
@@ -196,7 +188,7 @@ func localItemKey(id ID, field string) []byte {
196188

197189
// fetchInt64 retrieves an integer associated with a particular key.
198190
func (db *DB) fetchInt64(key []byte) int64 {
199-
blob, err := db.lvl.Get(key, nil)
191+
blob, err := db.db.Get(key)
200192
if err != nil {
201193
return 0
202194
}
@@ -211,12 +203,12 @@ func (db *DB) fetchInt64(key []byte) int64 {
211203
func (db *DB) storeInt64(key []byte, n int64) error {
212204
blob := make([]byte, binary.MaxVarintLen64)
213205
blob = blob[:binary.PutVarint(blob, n)]
214-
return db.lvl.Put(key, blob, nil)
206+
return db.db.Put(key, blob)
215207
}
216208

217209
// fetchUint64 retrieves an integer associated with a particular key.
218210
func (db *DB) fetchUint64(key []byte) uint64 {
219-
blob, err := db.lvl.Get(key, nil)
211+
blob, err := db.db.Get(key)
220212
if err != nil {
221213
return 0
222214
}
@@ -228,12 +220,12 @@ func (db *DB) fetchUint64(key []byte) uint64 {
228220
func (db *DB) storeUint64(key []byte, n uint64) error {
229221
blob := make([]byte, binary.MaxVarintLen64)
230222
blob = blob[:binary.PutUvarint(blob, n)]
231-
return db.lvl.Put(key, blob, nil)
223+
return db.db.Put(key, blob)
232224
}
233225

234226
// Node retrieves a node with a given id from the database.
235227
func (db *DB) Node(id ID) *Node {
236-
blob, err := db.lvl.Get(nodeKey(id), nil)
228+
blob, err := db.db.Get(nodeKey(id))
237229
if err != nil {
238230
return nil
239231
}
@@ -260,7 +252,7 @@ func (db *DB) UpdateNode(node *Node) error {
260252
if err != nil {
261253
return err
262254
}
263-
if err := db.lvl.Put(nodeKey(node.ID()), blob, nil); err != nil {
255+
if err := db.db.Put(nodeKey(node.ID()), blob); err != nil {
264256
return err
265257
}
266258
return db.storeUint64(nodeItemKey(node.ID(), zeroIP, dbNodeSeq), node.Seq())
@@ -282,14 +274,14 @@ func (db *DB) Resolve(n *Node) *Node {
282274

283275
// DeleteNode deletes all information associated with a node.
284276
func (db *DB) DeleteNode(id ID) {
285-
deleteRange(db.lvl, nodeKey(id))
277+
deleteRange(db.db, nodeKey(id))
286278
}
287279

288-
func deleteRange(db *leveldb.DB, prefix []byte) {
289-
it := db.NewIterator(util.BytesPrefix(prefix), nil)
280+
func deleteRange(db ethdb.KeyValueStore, prefix []byte) {
281+
it := db.NewIterator(prefix, nil)
290282
defer it.Release()
291283
for it.Next() {
292-
db.Delete(it.Key(), nil)
284+
db.Delete(it.Key())
293285
}
294286
}
295287

@@ -324,7 +316,7 @@ func (db *DB) expirer() {
324316
// expireNodes iterates over the database and deletes all nodes that have not
325317
// been seen (i.e. received a pong from) for some time.
326318
func (db *DB) expireNodes() {
327-
it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil)
319+
it := db.db.NewIterator([]byte(dbNodePrefix), nil)
328320
defer it.Release()
329321
if !it.Next() {
330322
return
@@ -344,7 +336,7 @@ func (db *DB) expireNodes() {
344336
}
345337
if time < threshold {
346338
// Last pong from this IP older than threshold, remove fields belonging to it.
347-
deleteRange(db.lvl, nodeItemKey(id, ip, ""))
339+
deleteRange(db.db, nodeItemKey(id, ip, ""))
348340
}
349341
}
350342
atEnd = !it.Next()
@@ -353,7 +345,7 @@ func (db *DB) expireNodes() {
353345
// We've moved beyond the last entry of the current ID.
354346
// Remove everything if there was no recent enough pong.
355347
if youngestPong > 0 && youngestPong < threshold {
356-
deleteRange(db.lvl, nodeKey(id))
348+
deleteRange(db.db, nodeKey(id))
357349
}
358350
youngestPong = 0
359351
}
@@ -448,10 +440,8 @@ func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
448440
var (
449441
now = time.Now()
450442
nodes = make([]*Node, 0, n)
451-
it = db.lvl.NewIterator(nil, nil)
452443
id ID
453444
)
454-
defer it.Release()
455445

456446
seek:
457447
for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
@@ -461,29 +451,38 @@ seek:
461451
ctr := id[0]
462452
rand.Read(id[:])
463453
id[0] = ctr + id[0]%16
464-
it.Seek(nodeKey(id))
465454

466-
n := nextNode(it)
467-
if n == nil {
455+
// Create iterator starting from the random node key
456+
// We use nil as the prefix to iterate over all keys, starting from
457+
// the random position. nextNode() filters for actual node entries.
458+
459+
startKey := nodeKey(id)
460+
it := db.db.NewIterator(nil, startKey)
461+
node := nextNode(it)
462+
it.Release()
463+
464+
if node == nil {
468465
id[0] = 0
469466
continue seek // iterator exhausted
470467
}
471-
if now.Sub(db.LastPongReceived(n.ID(), n.IPAddr())) > maxAge {
468+
if now.Sub(db.LastPongReceived(node.ID(), node.IPAddr())) > maxAge {
472469
continue seek
473470
}
474471
for i := range nodes {
475-
if nodes[i].ID() == n.ID() {
472+
if nodes[i].ID() == node.ID() {
476473
continue seek // duplicate
477474
}
478475
}
479-
nodes = append(nodes, n)
476+
nodes = append(nodes, node)
480477
}
481478
return nodes
482479
}
483480

484481
// reads the next node record from the iterator, skipping over other
485482
// database entries.
486-
func nextNode(it iterator.Iterator) *Node {
483+
//
484+
//nolint:unused
485+
func nextNode(it ethdb.Iterator) *Node {
487486
for end := false; !end; end = !it.Next() {
488487
id, rest := splitNodeKey(it.Key())
489488
if string(rest) != dbDiscoverRoot {
@@ -501,5 +500,5 @@ func (db *DB) Close() {
501500
default:
502501
close(db.quit)
503502
}
504-
db.lvl.Close()
503+
db.db.Close()
505504
}

0 commit comments

Comments
 (0)