From 163782fb55144da324d0870ee45cbc5cda2f9c72 Mon Sep 17 00:00:00 2001 From: Cherrypick14 Date: Sun, 16 Nov 2025 21:46:48 +0300 Subject: [PATCH] p2p/enode: migrate nodedb from leveldb to pebble --- p2p/enode/nodedb.go | 101 ++++++++++++++++++++++---------------------- 1 file changed, 50 insertions(+), 51 deletions(-) diff --git a/p2p/enode/nodedb.go b/p2p/enode/nodedb.go index 2cd211e2c20f..57af10b05946 100644 --- a/p2p/enode/nodedb.go +++ b/p2p/enode/nodedb.go @@ -20,20 +20,18 @@ import ( "bytes" "crypto/rand" "encoding/binary" + "errors" "fmt" "net/netip" "os" "sync" "time" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/ethdb/memorydb" + "github.com/ethereum/go-ethereum/ethdb/pebble" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/rlp" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/errors" - "github.com/syndtr/goleveldb/leveldb/iterator" - "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/storage" - "github.com/syndtr/goleveldb/leveldb/util" ) // Keys in the node database. @@ -71,9 +69,9 @@ var zeroIP = netip.IPv6Unspecified() // DB is the node database, storing previously seen nodes and any collected metadata about // them for QoS purposes. type DB struct { - lvl *leveldb.DB // Interface to the database itself - runner sync.Once // Ensures we can start at most one expirer - quit chan struct{} // Channel to signal the expiring thread to stop + db ethdb.KeyValueStore // Interface to the database itself + runner sync.Once // Ensures we can start at most one expirer + quit chan struct{} // Channel to signal the expiring thread to stop } // OpenDB opens a node database for storing and retrieving infos about known peers in the @@ -87,21 +85,14 @@ func OpenDB(path string) (*DB, error) { // newMemoryDB creates a new in-memory node database without a persistent backend. func newMemoryDB() (*DB, error) { - db, err := leveldb.Open(storage.NewMemStorage(), nil) - if err != nil { - return nil, err - } - return &DB{lvl: db, quit: make(chan struct{})}, nil + db := memorydb.New() + return &DB{db: db, quit: make(chan struct{})}, nil } // newPersistentDB creates/opens a leveldb backed persistent node database, // also flushing its contents in case of a version mismatch. func newPersistentDB(path string) (*DB, error) { - opts := &opt.Options{OpenFilesCacheCapacity: 5} - db, err := leveldb.OpenFile(path, opts) - if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted { - db, err = leveldb.RecoverFile(path, nil) - } + db, err := pebble.New(path, 16, 16, "", false) if err != nil { return nil, err } @@ -110,15 +101,8 @@ func newPersistentDB(path string) (*DB, error) { currentVer := make([]byte, binary.MaxVarintLen64) currentVer = currentVer[:binary.PutVarint(currentVer, int64(dbVersion))] - blob, err := db.Get([]byte(dbVersionKey), nil) + blob, err := db.Get([]byte(dbVersionKey)) switch err { - case leveldb.ErrNotFound: - // Version not found (i.e. empty cache), insert it - if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil { - db.Close() - return nil, err - } - case nil: // Version present, flush if different if !bytes.Equal(blob, currentVer) { @@ -128,8 +112,16 @@ func newPersistentDB(path string) (*DB, error) { } return newPersistentDB(path) } + + default: + // Version not found (i.e. empty cache), insert it + + if err := db.Put([]byte(dbVersionKey), currentVer); err != nil { + db.Close() + return nil, err + } } - return &DB{lvl: db, quit: make(chan struct{})}, nil + return &DB{db: db, quit: make(chan struct{})}, nil } // nodeKey returns the database key for a node record. @@ -196,7 +188,7 @@ func localItemKey(id ID, field string) []byte { // fetchInt64 retrieves an integer associated with a particular key. func (db *DB) fetchInt64(key []byte) int64 { - blob, err := db.lvl.Get(key, nil) + blob, err := db.db.Get(key) if err != nil { return 0 } @@ -211,12 +203,12 @@ func (db *DB) fetchInt64(key []byte) int64 { func (db *DB) storeInt64(key []byte, n int64) error { blob := make([]byte, binary.MaxVarintLen64) blob = blob[:binary.PutVarint(blob, n)] - return db.lvl.Put(key, blob, nil) + return db.db.Put(key, blob) } // fetchUint64 retrieves an integer associated with a particular key. func (db *DB) fetchUint64(key []byte) uint64 { - blob, err := db.lvl.Get(key, nil) + blob, err := db.db.Get(key) if err != nil { return 0 } @@ -228,12 +220,12 @@ func (db *DB) fetchUint64(key []byte) uint64 { func (db *DB) storeUint64(key []byte, n uint64) error { blob := make([]byte, binary.MaxVarintLen64) blob = blob[:binary.PutUvarint(blob, n)] - return db.lvl.Put(key, blob, nil) + return db.db.Put(key, blob) } // Node retrieves a node with a given id from the database. func (db *DB) Node(id ID) *Node { - blob, err := db.lvl.Get(nodeKey(id), nil) + blob, err := db.db.Get(nodeKey(id)) if err != nil { return nil } @@ -260,7 +252,7 @@ func (db *DB) UpdateNode(node *Node) error { if err != nil { return err } - if err := db.lvl.Put(nodeKey(node.ID()), blob, nil); err != nil { + if err := db.db.Put(nodeKey(node.ID()), blob); err != nil { return err } return db.storeUint64(nodeItemKey(node.ID(), zeroIP, dbNodeSeq), node.Seq()) @@ -282,14 +274,14 @@ func (db *DB) Resolve(n *Node) *Node { // DeleteNode deletes all information associated with a node. func (db *DB) DeleteNode(id ID) { - deleteRange(db.lvl, nodeKey(id)) + deleteRange(db.db, nodeKey(id)) } -func deleteRange(db *leveldb.DB, prefix []byte) { - it := db.NewIterator(util.BytesPrefix(prefix), nil) +func deleteRange(db ethdb.KeyValueStore, prefix []byte) { + it := db.NewIterator(prefix, nil) defer it.Release() for it.Next() { - db.Delete(it.Key(), nil) + db.Delete(it.Key()) } } @@ -324,7 +316,7 @@ func (db *DB) expirer() { // expireNodes iterates over the database and deletes all nodes that have not // been seen (i.e. received a pong from) for some time. func (db *DB) expireNodes() { - it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil) + it := db.db.NewIterator([]byte(dbNodePrefix), nil) defer it.Release() if !it.Next() { return @@ -344,7 +336,7 @@ func (db *DB) expireNodes() { } if time < threshold { // Last pong from this IP older than threshold, remove fields belonging to it. - deleteRange(db.lvl, nodeItemKey(id, ip, "")) + deleteRange(db.db, nodeItemKey(id, ip, "")) } } atEnd = !it.Next() @@ -353,7 +345,7 @@ func (db *DB) expireNodes() { // We've moved beyond the last entry of the current ID. // Remove everything if there was no recent enough pong. if youngestPong > 0 && youngestPong < threshold { - deleteRange(db.lvl, nodeKey(id)) + deleteRange(db.db, nodeKey(id)) } youngestPong = 0 } @@ -448,10 +440,8 @@ func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node { var ( now = time.Now() nodes = make([]*Node, 0, n) - it = db.lvl.NewIterator(nil, nil) id ID ) - defer it.Release() seek: for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ { @@ -461,29 +451,38 @@ seek: ctr := id[0] rand.Read(id[:]) id[0] = ctr + id[0]%16 - it.Seek(nodeKey(id)) - n := nextNode(it) - if n == nil { + // Create iterator starting from the random node key + // We use nil as the prefix to iterate over all keys, starting from + // the random position. nextNode() filters for actual node entries. + + startKey := nodeKey(id) + it := db.db.NewIterator(nil, startKey) + node := nextNode(it) + it.Release() + + if node == nil { id[0] = 0 continue seek // iterator exhausted } - if now.Sub(db.LastPongReceived(n.ID(), n.IPAddr())) > maxAge { + if now.Sub(db.LastPongReceived(node.ID(), node.IPAddr())) > maxAge { continue seek } for i := range nodes { - if nodes[i].ID() == n.ID() { + if nodes[i].ID() == node.ID() { continue seek // duplicate } } - nodes = append(nodes, n) + nodes = append(nodes, node) } return nodes } // reads the next node record from the iterator, skipping over other // database entries. -func nextNode(it iterator.Iterator) *Node { +// +//nolint:unused +func nextNode(it ethdb.Iterator) *Node { for end := false; !end; end = !it.Next() { id, rest := splitNodeKey(it.Key()) if string(rest) != dbDiscoverRoot { @@ -501,5 +500,5 @@ func (db *DB) Close() { default: close(db.quit) } - db.lvl.Close() + db.db.Close() }