Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,39 +746,39 @@ func (dht *IpfsDHT) FindLocal(ctx context.Context, id peer.ID) peer.AddrInfo {
return peer.AddrInfo{}
}

// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
return closer
}

// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
closer := dht.nearestPeersToQuery(pmes, count)

// no node? nil
if closer == nil {
// closestPeersToQuery returns the closest peers to the target key from the
// local routing table, filtering out self and the requester's peer ID.
//
// Per the IPFS Kademlia DHT spec, servers SHOULD NOT include themselves or the
// requester in FIND_NODE responses (except for FIND_PEER when the target is
// self or the requester, which is handled separately in handleFindPeer).
func (dht *IpfsDHT) closestPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
// Get count+1 closest peers to target key, so that we can filter out 'from'
// (and potentially 'self' if it somehow appears) and still return 'count' peers.
closestPeers := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count+1)

if len(closestPeers) == 0 {
logger.Infow("no closer peers to send", from)
return nil
}

filtered := make([]peer.ID, 0, len(closer))
for _, clp := range closer {

// == to self? thats bad
if clp == dht.self {
logger.Error("BUG betterPeersToQuery: attempted to return self! this shouldn't happen...")
return nil
filtered := make([]peer.ID, 0, min(len(closestPeers), count))
for _, p := range closestPeers {
// Per spec: don't include self in responses. This should never happen since
// self should not be in the routing table, but check defensively.
if p == dht.self {
logger.Debugw("self found in routing table, skipping", "key", string(pmes.GetKey()))
continue
}
// Dont send a peer back themselves
if clp == from {
// Per spec: don't include requester in responses (exception handled in handleFindPeer).
if p == from {
continue
}

filtered = append(filtered, clp)
filtered = append(filtered, p)
if len(filtered) >= count {
break
}
}

// ok seems like closer nodes
return filtered
}

Expand Down
62 changes: 22 additions & 40 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/peer"
pstore "github.com/libp2p/go-libp2p/p2p/host/peerstore"
"github.com/libp2p/go-libp2p/core/peerstore"

ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p-kad-dht/internal"
Expand Down Expand Up @@ -66,12 +66,11 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
resp.Record = rec

// Find closest peer on given cluster to desired key and reply with that info
closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize)
if len(closer) > 0 {
// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
closerinfos := pstore.PeerInfos(dht.peerstore, closer)
for _, pi := range closerinfos {
logger.Debugf("handleGetValue returning closer peer: '%s'", pi.ID)
closestPeers := dht.closestPeersToQuery(pmes, p, dht.bucketSize)
if len(closestPeers) > 0 {
closestInfos := peerstore.AddrInfos(dht.peerstore, closestPeers)
for _, pi := range closestInfos {
logger.Debugf("handleGetValue returning closest peer: '%s'", pi.ID)
if len(pi.Addrs) < 1 {
logger.Warnw("no addresses on peer being sent",
"local", dht.self,
Expand All @@ -81,7 +80,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
}
}

resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), closerinfos)
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), closestInfos)
}

return resp, nil
Expand Down Expand Up @@ -252,42 +251,26 @@ func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (

func (dht *IpfsDHT) handleFindPeer(ctx context.Context, from peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) {
resp := pb.NewMessage(pmes.GetType(), nil, pmes.GetClusterLevel())
var closest []peer.ID

if len(pmes.GetKey()) == 0 {
return nil, errors.New("handleFindPeer with empty key")
}

// if looking for self... special case where we send it on CloserPeers.
targetPid := peer.ID(pmes.GetKey())
closest = dht.betterPeersToQuery(pmes, from, dht.bucketSize)

// Never tell a peer about itself.
if targetPid != from {
// Add the target peer to the set of closest peers if
// not already present in our routing table.
//
// Later, when we lookup known addresses for all peers
// in this set, we'll prune this peer if we don't
// _actually_ know where it is.
found := false
for _, p := range closest {
if targetPid == p {
found = true
break
}
}
if !found {
closest = append(closest, targetPid)
}
}
closest := dht.closestPeersToQuery(pmes, from, dht.bucketSize)

if closest == nil {
return resp, nil
// Prepend targetPid to the front of the list if not already present.
// targetPid is always the closest key to itself.
//
// Per IPFS Kademlia DHT spec: FIND_PEER has a special exception where the
// target peer MUST be included in the response (if present in peerstore),
// even if it is self, the requester, or not a DHT server. This allows peers
// to discover multiaddresses for any peer, not just DHT servers.
if len(closest) == 0 || closest[0] != targetPid {
closest = append([]peer.ID{targetPid}, closest...)
}

// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
closestinfos := pstore.PeerInfos(dht.peerstore, closest)
closestinfos := peerstore.AddrInfos(dht.peerstore, closest)
// possibly an over-allocation but this array is temporary anyways.
withAddresses := make([]peer.AddrInfo, 0, len(closestinfos))
for _, pi := range closestinfos {
Expand Down Expand Up @@ -326,11 +309,10 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.

resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), filtered)

// Also send closer peers.
closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize)
if closer != nil {
// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
infos := pstore.PeerInfos(dht.peerstore, closer)
// Also send closest dht servers we know about.
closestPeers := dht.closestPeersToQuery(pmes, p, dht.bucketSize)
if closestPeers != nil {
infos := peerstore.AddrInfos(dht.peerstore, closestPeers)
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
}

Expand Down
Loading