Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 50 additions & 51 deletions p2p/enode/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,18 @@ import (
"bytes"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"net/netip"
"os"
"sync"
"time"

"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/ethdb/pebble"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/rlp"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
)

// Keys in the node database.
Expand Down Expand Up @@ -71,9 +69,9 @@ var zeroIP = netip.IPv6Unspecified()
// DB is the node database, storing previously seen nodes and any collected metadata about
// them for QoS purposes.
type DB struct {
lvl *leveldb.DB // Interface to the database itself
runner sync.Once // Ensures we can start at most one expirer
quit chan struct{} // Channel to signal the expiring thread to stop
db ethdb.KeyValueStore // Interface to the database itself
runner sync.Once // Ensures we can start at most one expirer
quit chan struct{} // Channel to signal the expiring thread to stop
}

// OpenDB opens a node database for storing and retrieving infos about known peers in the
Expand All @@ -87,21 +85,14 @@ func OpenDB(path string) (*DB, error) {

// newMemoryDB creates a new in-memory node database without a persistent backend.
func newMemoryDB() (*DB, error) {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
if err != nil {
return nil, err
}
return &DB{lvl: db, quit: make(chan struct{})}, nil
db := memorydb.New()
return &DB{db: db, quit: make(chan struct{})}, nil
}

// newPersistentDB creates/opens a leveldb backed persistent node database,
// also flushing its contents in case of a version mismatch.
func newPersistentDB(path string) (*DB, error) {
opts := &opt.Options{OpenFilesCacheCapacity: 5}
db, err := leveldb.OpenFile(path, opts)
if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted {
db, err = leveldb.RecoverFile(path, nil)
}
db, err := pebble.New(path, 16, 16, "", false)
if err != nil {
return nil, err
}
Expand All @@ -110,15 +101,8 @@ func newPersistentDB(path string) (*DB, error) {
currentVer := make([]byte, binary.MaxVarintLen64)
currentVer = currentVer[:binary.PutVarint(currentVer, int64(dbVersion))]

blob, err := db.Get([]byte(dbVersionKey), nil)
blob, err := db.Get([]byte(dbVersionKey))
switch err {
case leveldb.ErrNotFound:
// Version not found (i.e. empty cache), insert it
if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil {
db.Close()
return nil, err
}

case nil:
// Version present, flush if different
if !bytes.Equal(blob, currentVer) {
Expand All @@ -128,8 +112,16 @@ func newPersistentDB(path string) (*DB, error) {
}
return newPersistentDB(path)
}

default:
// Version not found (i.e. empty cache), insert it

if err := db.Put([]byte(dbVersionKey), currentVer); err != nil {
db.Close()
return nil, err
}
}
return &DB{lvl: db, quit: make(chan struct{})}, nil
return &DB{db: db, quit: make(chan struct{})}, nil
}

// nodeKey returns the database key for a node record.
Expand Down Expand Up @@ -196,7 +188,7 @@ func localItemKey(id ID, field string) []byte {

// fetchInt64 retrieves an integer associated with a particular key.
func (db *DB) fetchInt64(key []byte) int64 {
blob, err := db.lvl.Get(key, nil)
blob, err := db.db.Get(key)
if err != nil {
return 0
}
Expand All @@ -211,12 +203,12 @@ func (db *DB) fetchInt64(key []byte) int64 {
func (db *DB) storeInt64(key []byte, n int64) error {
blob := make([]byte, binary.MaxVarintLen64)
blob = blob[:binary.PutVarint(blob, n)]
return db.lvl.Put(key, blob, nil)
return db.db.Put(key, blob)
}

// fetchUint64 retrieves an integer associated with a particular key.
func (db *DB) fetchUint64(key []byte) uint64 {
blob, err := db.lvl.Get(key, nil)
blob, err := db.db.Get(key)
if err != nil {
return 0
}
Expand All @@ -228,12 +220,12 @@ func (db *DB) fetchUint64(key []byte) uint64 {
func (db *DB) storeUint64(key []byte, n uint64) error {
blob := make([]byte, binary.MaxVarintLen64)
blob = blob[:binary.PutUvarint(blob, n)]
return db.lvl.Put(key, blob, nil)
return db.db.Put(key, blob)
}

// Node retrieves a node with a given id from the database.
func (db *DB) Node(id ID) *Node {
blob, err := db.lvl.Get(nodeKey(id), nil)
blob, err := db.db.Get(nodeKey(id))
if err != nil {
return nil
}
Expand All @@ -260,7 +252,7 @@ func (db *DB) UpdateNode(node *Node) error {
if err != nil {
return err
}
if err := db.lvl.Put(nodeKey(node.ID()), blob, nil); err != nil {
if err := db.db.Put(nodeKey(node.ID()), blob); err != nil {
return err
}
return db.storeUint64(nodeItemKey(node.ID(), zeroIP, dbNodeSeq), node.Seq())
Expand All @@ -282,14 +274,14 @@ func (db *DB) Resolve(n *Node) *Node {

// DeleteNode deletes all information associated with a node.
func (db *DB) DeleteNode(id ID) {
deleteRange(db.lvl, nodeKey(id))
deleteRange(db.db, nodeKey(id))
}

func deleteRange(db *leveldb.DB, prefix []byte) {
it := db.NewIterator(util.BytesPrefix(prefix), nil)
func deleteRange(db ethdb.KeyValueStore, prefix []byte) {
it := db.NewIterator(prefix, nil)
defer it.Release()
for it.Next() {
db.Delete(it.Key(), nil)
db.Delete(it.Key())
}
}

Expand Down Expand Up @@ -324,7 +316,7 @@ func (db *DB) expirer() {
// expireNodes iterates over the database and deletes all nodes that have not
// been seen (i.e. received a pong from) for some time.
func (db *DB) expireNodes() {
it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil)
it := db.db.NewIterator([]byte(dbNodePrefix), nil)
defer it.Release()
if !it.Next() {
return
Expand All @@ -344,7 +336,7 @@ func (db *DB) expireNodes() {
}
if time < threshold {
// Last pong from this IP older than threshold, remove fields belonging to it.
deleteRange(db.lvl, nodeItemKey(id, ip, ""))
deleteRange(db.db, nodeItemKey(id, ip, ""))
}
}
atEnd = !it.Next()
Expand All @@ -353,7 +345,7 @@ func (db *DB) expireNodes() {
// We've moved beyond the last entry of the current ID.
// Remove everything if there was no recent enough pong.
if youngestPong > 0 && youngestPong < threshold {
deleteRange(db.lvl, nodeKey(id))
deleteRange(db.db, nodeKey(id))
}
youngestPong = 0
}
Expand Down Expand Up @@ -448,10 +440,8 @@ func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
var (
now = time.Now()
nodes = make([]*Node, 0, n)
it = db.lvl.NewIterator(nil, nil)
id ID
)
defer it.Release()

seek:
for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
Expand All @@ -461,29 +451,38 @@ seek:
ctr := id[0]
rand.Read(id[:])
id[0] = ctr + id[0]%16
it.Seek(nodeKey(id))

n := nextNode(it)
if n == nil {
// Create iterator starting from the random node key
// We use nil as the prefix to iterate over all keys, starting from
// the random position. nextNode() filters for actual node entries.

startKey := nodeKey(id)
it := db.db.NewIterator(nil, startKey)
node := nextNode(it)
it.Release()

if node == nil {
id[0] = 0
continue seek // iterator exhausted
}
if now.Sub(db.LastPongReceived(n.ID(), n.IPAddr())) > maxAge {
if now.Sub(db.LastPongReceived(node.ID(), node.IPAddr())) > maxAge {
continue seek
}
for i := range nodes {
if nodes[i].ID() == n.ID() {
if nodes[i].ID() == node.ID() {
continue seek // duplicate
}
}
nodes = append(nodes, n)
nodes = append(nodes, node)
}
return nodes
}

// reads the next node record from the iterator, skipping over other
// database entries.
func nextNode(it iterator.Iterator) *Node {
//
//nolint:unused
func nextNode(it ethdb.Iterator) *Node {
for end := false; !end; end = !it.Next() {
id, rest := splitNodeKey(it.Key())
if string(rest) != dbDiscoverRoot {
Expand All @@ -501,5 +500,5 @@ func (db *DB) Close() {
default:
close(db.quit)
}
db.lvl.Close()
db.db.Close()
}