diff --git a/amino/defaults.go b/amino/defaults.go index c28f672ee..a979183e2 100644 --- a/amino/defaults.go +++ b/amino/defaults.go @@ -47,6 +47,18 @@ const ( // After it expires, the returned records will require an extra lookup, to // find the multiaddress associated with the returned peer id. DefaultProviderAddrTTL = 24 * time.Hour + + // DefaultMaxPeersPerIPGroup is the maximal number of peers with addresses in + // the same IP group allowed in the routing table. Once this limit is + // reached, newly discovered peers with addresses in the same IP group will + // not be added to the routing table. + DefaultMaxPeersPerIPGroup = 3 + + // DefaultMaxPeersPerIPGroupPerCpl is maximal number of peers with addresses + // in the same IP group allowed in each routing table bucket, defined by its + // common prefix length to self peer id. + // also see: `DefaultMaxPeersPerIPGroup`. + DefaultMaxPeersPerIPGroupPerCpl = 2 ) // Protocols is a slice containing all supported protocol IDs for Amino DHT. diff --git a/dual/dual.go b/dual/dual.go index a86d9294d..ae7c6547e 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -9,6 +9,7 @@ import ( "github.com/ipfs/go-cid" dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p-kad-dht/amino" "github.com/libp2p/go-libp2p-kad-dht/internal" kb "github.com/libp2p/go-libp2p-kbucket" "github.com/libp2p/go-libp2p-kbucket/peerdiversity" @@ -49,11 +50,6 @@ var ( _ routing.ValueStore = (*DHT)(nil) ) -var ( - maxPrefixCountPerCpl = 2 - maxPrefixCount = 3 -) - type config struct { wan, lan []dht.Option } @@ -106,7 +102,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*DHT, error) { WanDHTOption( dht.QueryFilter(dht.PublicQueryFilter), dht.RoutingTableFilter(dht.PublicRoutingTableFilter), - dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, maxPrefixCountPerCpl, maxPrefixCount)), + dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, amino.DefaultMaxPeersPerIPGroupPerCpl, amino.DefaultMaxPeersPerIPGroup)), // filter out all private addresses dht.AddressFilter(func(addrs []ma.Multiaddr) []ma.Multiaddr { return ma.FilterAddrs(addrs, manet.IsPublicAddr) }), ), diff --git a/fullrt/dht.go b/fullrt/dht.go index 070ec63ae..3ac3a65db 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "maps" "math/rand" "sync" "sync/atomic" @@ -12,8 +13,10 @@ import ( "github.com/multiformats/go-base32" ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" "github.com/multiformats/go-multihash" + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" "github.com/libp2p/go-libp2p-routing-helpers/tracing" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" @@ -32,6 +35,7 @@ import ( "google.golang.org/protobuf/proto" kaddht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p-kad-dht/amino" "github.com/libp2p/go-libp2p-kad-dht/crawler" "github.com/libp2p/go-libp2p-kad-dht/internal" internalConfig "github.com/libp2p/go-libp2p-kad-dht/internal/config" @@ -113,6 +117,8 @@ type FullRT struct { self peer.ID peerConnectednessSubscriber event.Subscription + + ipDiversityFilterLimit int } // NewFullRT creates a DHT client that tracks the full network. It takes a protocol prefix for the given network, @@ -127,10 +133,11 @@ type FullRT struct { // bootstrap peers). func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*FullRT, error) { fullrtcfg := config{ - crawlInterval: time.Hour, - bulkSendParallelism: 20, - waitFrac: 0.3, - timeoutPerOp: 5 * time.Second, + crawlInterval: time.Hour, + bulkSendParallelism: 20, + waitFrac: 0.3, + timeoutPerOp: 5 * time.Second, + ipDiversityFilterLimit: amino.DefaultMaxPeersPerIPGroup, } if err := fullrtcfg.apply(options...); err != nil { return nil, err @@ -156,7 +163,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful return nil, err } - ms := net.NewMessageSenderImpl(h, []protocol.ID{dhtcfg.ProtocolPrefix + "/kad/1.0.0"}) + ms := net.NewMessageSenderImpl(h, amino.Protocols) protoMessenger, err := dht_pb.NewProtocolMessenger(ms) if err != nil { return nil, err @@ -266,14 +273,9 @@ func (dht *FullRT) TriggerRefresh(ctx context.Context) error { } func (dht *FullRT) Stat() map[string]peer.ID { - newMap := make(map[string]peer.ID) - dht.kMapLk.RLock() - for k, v := range dht.keyToPeerMap { - newMap[k] = v - } - dht.kMapLk.RUnlock() - return newMap + defer dht.kMapLk.RUnlock() + return maps.Clone(dht.keyToPeerMap) } // Ready indicates that the routing table has been refreshed recently. It is recommended to be used for operations where @@ -449,7 +451,7 @@ func (dht *FullRT) CheckPeers(ctx context.Context, peers ...peer.ID) (int, int) func workers(numWorkers int, fn func(peer.AddrInfo), inputs <-chan peer.AddrInfo) { jobs := make(chan peer.AddrInfo) defer close(jobs) - for i := 0; i < numWorkers; i++ { + for range numWorkers { go func() { for j := range jobs { fn(j) @@ -461,30 +463,78 @@ func workers(numWorkers int, fn func(peer.AddrInfo), inputs <-chan peer.AddrInfo } } +// GetClosestPeers tries to return the `dht.bucketSize` closest known peers to +// the given key. +// +// If the IP diversity filter limit is set, the returned peers will contain at +// most `dht.ipDiversityFilterLimit` peers sharing the same IP group. Hence, +// the peers may not be the absolute closest peers to the given key, but they +// will be more diverse in terms of IP addresses. func (dht *FullRT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) { _, span := internal.StartSpan(ctx, "FullRT.GetClosestPeers", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) defer span.End() kbID := kb.ConvertKey(key) kadKey := kadkey.KbucketIDToKey(kbID) - dht.rtLk.RLock() - closestKeys := kademlia.ClosestN(kadKey, dht.rt, dht.bucketSize) - dht.rtLk.RUnlock() - peers := make([]peer.ID, 0, len(closestKeys)) - for _, k := range closestKeys { - dht.kMapLk.RLock() - p, ok := dht.keyToPeerMap[string(k)] - if !ok { - logger.Errorf("key not found in map") - } - dht.kMapLk.RUnlock() - dht.peerAddrsLk.RLock() - peerAddrs := dht.peerAddrs[p] - dht.peerAddrsLk.RUnlock() + ipGroupCounts := make(map[peerdiversity.PeerIPGroupKey]map[peer.ID]struct{}) + peers := make([]peer.ID, 0, dht.bucketSize) + + // If ipDiversityFilterLimit is non-zero, the step is slightly larger than + // the bucket size, allowing to have a few backup peers in case some are + // filtered out by the diversity filter. Multiple calls to ClosestN are + // expensive, but increasing the `count` parameter is cheap. + step := dht.bucketSize + 2*dht.ipDiversityFilterLimit + for nClosest := 0; nClosest < dht.rt.Size(); nClosest += step { + dht.rtLk.RLock() + // Get the last `step` closest peers, because we already tried the `nClosest` closest peers + closestKeys := kademlia.ClosestN(kadKey, dht.rt, nClosest+step)[nClosest:] + dht.rtLk.RUnlock() + + PeersLoop: + for _, k := range closestKeys { + dht.kMapLk.RLock() + // Recover the peer ID from the key + p, ok := dht.keyToPeerMap[string(k)] + if !ok { + logger.Errorf("key not found in map") + continue + } + dht.kMapLk.RUnlock() + dht.peerAddrsLk.RLock() + peerAddrs := dht.peerAddrs[p] + dht.peerAddrsLk.RUnlock() + + if dht.ipDiversityFilterLimit > 0 { + for _, addr := range peerAddrs { + ip, err := manet.ToIP(addr) + if err != nil { + continue + } + ipGroup := peerdiversity.IPGroupKey(ip) + if len(ipGroup) == 0 { + continue + } + if _, ok := ipGroupCounts[ipGroup]; !ok { + ipGroupCounts[ipGroup] = make(map[peer.ID]struct{}) + } + if len(ipGroupCounts[ipGroup]) >= dht.ipDiversityFilterLimit { + // This ip group is already overrepresented, skip this peer + continue PeersLoop + } + ipGroupCounts[ipGroup][p] = struct{}{} + } + } + + // Add the peer's known addresses to the peerstore so that it can be + // dialed by the caller. + dht.h.Peerstore().AddAddrs(p, peerAddrs, peerstore.TempAddrTTL) + peers = append(peers, p) - dht.h.Peerstore().AddAddrs(p, peerAddrs, peerstore.TempAddrTTL) - peers = append(peers, p) + if len(peers) == dht.bucketSize { + return peers, nil + } + } } return peers, nil } @@ -615,7 +665,7 @@ func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing. } stopCh := make(chan struct{}) - valCh, lookupRes := dht.getValues(ctx, key, stopCh) + valCh, lookupRes := dht.getValues(ctx, key) out := make(chan []byte) go func() { @@ -743,7 +793,7 @@ type lookupWithFollowupResult struct { peers []peer.ID // the top K not unreachable peers at the end of the query } -func (dht *FullRT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) { +func (dht *FullRT) getValues(ctx context.Context, key string) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) { valCh := make(chan RecvdVal, 1) lookupResCh := make(chan *lookupWithFollowupResult, 1) @@ -1004,7 +1054,7 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash) keysAsPeerIDs = append(keysAsPeerIDs, peer.ID(k)) } - return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn, true) + return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn) } func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte) error { @@ -1035,10 +1085,10 @@ func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte) return dht.protoMessenger.PutValue(ctx, p, record.MakePutRecord(keyStr, keyRecMap[keyStr])) } - return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn, false) + return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn) } -func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(ctx context.Context, target, k peer.ID) error, isProvRec bool) error { +func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(ctx context.Context, target, k peer.ID) error) error { ctx, span := internal.StartSpan(ctx, "FullRT.BulkMessageSend") defer span.End() @@ -1089,7 +1139,7 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( workCh := make(chan workMessage, 1) wg := sync.WaitGroup{} wg.Add(dht.bulkSendParallelism) - for i := 0; i < dht.bulkSendParallelism; i++ { + for range dht.bulkSendParallelism { go func() { defer wg.Done() defer logger.Debugf("bulk send goroutine done") diff --git a/fullrt/dht_test.go b/fullrt/dht_test.go index 23eef30b5..bad72d0b9 100644 --- a/fullrt/dht_test.go +++ b/fullrt/dht_test.go @@ -1,15 +1,24 @@ package fullrt import ( + "context" + "crypto/rand" "strconv" "testing" + "github.com/libp2p/go-libp2p" + dht "github.com/libp2p/go-libp2p-kad-dht" + kadkey "github.com/libp2p/go-libp2p-xor/key" + "github.com/libp2p/go-libp2p-xor/trie" + "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" + ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" ) func TestDivideByChunkSize(t *testing.T) { var keys []peer.ID - for i := 0; i < 10; i++ { + for i := range 10 { keys = append(keys, peer.ID(strconv.Itoa(i))) } @@ -68,7 +77,7 @@ func TestDivideByChunkSize(t *testing.T) { if len(gr) != 10 { t.Fatal("incorrect number of groups") } - for i := 0; i < 10; i++ { + for i := range 10 { if g, expected := convertToStrings(gr[i]), []string{strconv.Itoa(i)}; !pidsEquals(g, expected) { t.Fatalf("expected %v, got %v", expected, g) } @@ -84,3 +93,199 @@ func TestDivideByChunkSize(t *testing.T) { } }) } + +func TestIPDiversityFilter(t *testing.T) { + ctx := context.Background() + h, err := libp2p.New() + require.NoError(t, err) + dht, err := NewFullRT(h, "", DHTOption(dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...))) + require.NoError(t, err) + + dht.bucketSize = 3 + dht.ipDiversityFilterLimit = 1 + + // peer id whose kadid starts with 15 0's + target, err := peer.Decode("QmNLfyis4M4iAWth8ApJwbCfuQaaaXKWECGAHQfXKUG6C7") + require.NoError(t, err) + + type addr struct { + ipv6 bool + addr string + } + // setDhtPeers replaces the dht's routing table with the provided addresses + // assigned with random new peer ids. The provided order of addresses is also + // the kademlia distance order to the key requested later. + setDhtPeers := func(peerMaddrs ...[]addr) []peer.ID { + newTrie := trie.New() + peerAddrs := make(map[peer.ID][]ma.Multiaddr) + kPeerMap := make(map[string]peer.ID) + pids := make([]peer.ID, 0, len(peerMaddrs)) + for i, ips := range peerMaddrs { + _, pubKey, err := crypto.GenerateEd25519Key(rand.Reader) + require.NoError(t, err) + pid, err := peer.IDFromPublicKey(pubKey) + require.NoError(t, err) + p := &peer.AddrInfo{ID: pid, Addrs: make([]ma.Multiaddr, 0, len(ips))} + for _, ip := range ips { + var a ma.Multiaddr + var err error + if ip.ipv6 { + a, err = ma.NewMultiaddr("/ip6/" + ip.addr + "/tcp/4001") + } else { + a, err = ma.NewMultiaddr("/ip4/" + ip.addr + "/tcp/4001") + } + require.NoError(t, err) + p.Addrs = append(p.Addrs, a) + } + k := [32]byte{} + k[0] = byte(i) + kadKey := kadkey.Key(k[:]) + _, ok := newTrie.Add(kadKey) + require.True(t, ok) + kPeerMap[string(kadKey)] = p.ID + peerAddrs[p.ID] = p.Addrs + pids = append(pids, p.ID) + } + dht.rt = newTrie + dht.peerAddrsLk.Lock() + dht.peerAddrs = peerAddrs + dht.peerAddrsLk.Unlock() + dht.kMapLk.Lock() + dht.keyToPeerMap = kPeerMap + dht.kMapLk.Unlock() + return pids + } + + t.Run("Different IPv4 blocks", func(t *testing.T) { + pids := setDhtPeers([][]addr{ + {{ipv6: false, addr: "1.1.1.1"}}, + {{ipv6: false, addr: "2.2.2.2"}}, + {{ipv6: false, addr: "3.3.3.3"}}, + {{ipv6: false, addr: "4.4.4.4"}}, + }...) + cp, err := dht.GetClosestPeers(ctx, string(target)) + require.NoError(t, err) + require.Len(t, cp, dht.bucketSize) + require.Equal(t, pids[:dht.bucketSize], cp) + }) + + t.Run("Duplicate address from IPv4 block", func(t *testing.T) { + pids := setDhtPeers([][]addr{ + {{ipv6: false, addr: "1.1.1.1"}}, + {{ipv6: false, addr: "1.1.2.2"}}, + {{ipv6: false, addr: "3.3.3.3"}}, + {{ipv6: false, addr: "4.4.4.4"}}, + }...) + cp, err := dht.GetClosestPeers(ctx, string(target)) + require.NoError(t, err) + require.Len(t, cp, dht.bucketSize) + require.Contains(t, cp, pids[0]) + require.Contains(t, cp, pids[2]) + require.Contains(t, cp, pids[3]) + }) + + t.Run("Duplicate address from 2 IPv4 blocks", func(t *testing.T) { + pids := setDhtPeers([][]addr{ + {{ipv6: false, addr: "1.1.1.1"}}, + {{ipv6: false, addr: "1.1.2.2"}}, + {{ipv6: false, addr: "3.3.3.3"}}, + {{ipv6: false, addr: "3.3.4.4"}}, + {{ipv6: false, addr: "5.5.5.5"}}, + }...) + cp, err := dht.GetClosestPeers(ctx, string(target)) + require.NoError(t, err) + require.Len(t, cp, dht.bucketSize) + require.Contains(t, cp, pids[0]) + require.Contains(t, cp, pids[2]) + require.Contains(t, cp, pids[4]) + }) + + t.Run("Different IPv6 blocks", func(t *testing.T) { + pids := setDhtPeers([][]addr{ + {{ipv6: true, addr: "2001:4860:4860::1"}}, + {{ipv6: true, addr: "2606:4700:4700::2"}}, + {{ipv6: true, addr: "2620:fe::3"}}, + {{ipv6: true, addr: "2a02:6b8::4"}}, + }...) + cp, err := dht.GetClosestPeers(ctx, string(target)) + require.NoError(t, err) + require.Len(t, cp, dht.bucketSize) + require.Equal(t, pids[:dht.bucketSize], cp) + }) + + t.Run("Duplicate address from IPv6 block", func(t *testing.T) { + pids := setDhtPeers([][]addr{ + {{ipv6: true, addr: "2001:4860:4860::1"}}, + {{ipv6: true, addr: "2001:4860:4860::2"}}, + {{ipv6: true, addr: "2620:fe::3"}}, + {{ipv6: true, addr: "2a02:6b8::4"}}, + }...) + cp, err := dht.GetClosestPeers(ctx, string(target)) + require.NoError(t, err) + require.Len(t, cp, dht.bucketSize) + require.Contains(t, cp, pids[0]) + require.Contains(t, cp, pids[2]) + require.Contains(t, cp, pids[3]) + }) + + t.Run("Duplicate address from 2 IPv6 blocks", func(t *testing.T) { + pids := setDhtPeers([][]addr{ + {{ipv6: true, addr: "2001:4860:4860::1"}}, + {{ipv6: true, addr: "2001:4860:4860::2"}}, + {{ipv6: true, addr: "2606:4700:4700::3"}}, + {{ipv6: true, addr: "2606:4700:4700::4"}}, + {{ipv6: true, addr: "2620:fe::5"}}, + }...) + cp, err := dht.GetClosestPeers(ctx, string(target)) + require.NoError(t, err) + require.Len(t, cp, dht.bucketSize) + require.Contains(t, cp, pids[0]) + require.Contains(t, cp, pids[2]) + require.Contains(t, cp, pids[4]) + }) + + t.Run("IPv4+IPv6 acceptable representation", func(t *testing.T) { + pids := setDhtPeers([][]addr{ + {{ipv6: false, addr: "1.1.1.1"}, {ipv6: true, addr: "2001:4860:4860::1"}}, + {{ipv6: false, addr: "2.2.2.2"}, {ipv6: true, addr: "2606:4700:4700::2"}}, + {{ipv6: false, addr: "3.3.3.3"}, {ipv6: true, addr: "2620:fe::3"}}, + {{ipv6: false, addr: "4.4.4.4"}, {ipv6: true, addr: "2a02:6b8::4"}}, + {{ipv6: false, addr: "5.5.5.5"}, {ipv6: true, addr: "2620:fe::5"}}, + }...) + cp, err := dht.GetClosestPeers(ctx, string(target)) + require.NoError(t, err) + require.Len(t, cp, dht.bucketSize) + require.Equal(t, pids[:dht.bucketSize], cp) + }) + + t.Run("IPv4+IPv6 overrepresentation", func(t *testing.T) { + pids := setDhtPeers([][]addr{ + {{ipv6: false, addr: "1.1.1.1"}, {ipv6: true, addr: "2001:4860:4860::1"}}, + {{ipv6: false, addr: "1.1.2.2"}, {ipv6: true, addr: "2606:4700:4700::2"}}, + {{ipv6: false, addr: "3.3.3.3"}, {ipv6: true, addr: "2606:4700:4700::3"}}, + {{ipv6: false, addr: "4.4.4.4"}, {ipv6: true, addr: "2001:4860:4860::4"}}, + {{ipv6: false, addr: "5.5.5.5"}, {ipv6: true, addr: "2620:fe::5"}}, + }...) + cp, err := dht.GetClosestPeers(ctx, string(target)) + require.NoError(t, err) + require.Len(t, cp, dht.bucketSize) + require.Contains(t, cp, pids[0]) + require.Contains(t, cp, pids[2]) + require.Contains(t, cp, pids[4]) + }) + + dht.ipDiversityFilterLimit = 0 + t.Run("Disabled IP Diversity Filter", func(t *testing.T) { + pids := setDhtPeers([][]addr{ + {{ipv6: false, addr: "1.1.1.1"}, {ipv6: true, addr: "2606:4700:4700::1"}}, + {{ipv6: false, addr: "1.1.2.2"}, {ipv6: true, addr: "2606:4700:4700::2"}}, + {{ipv6: false, addr: "1.1.3.3"}, {ipv6: true, addr: "2606:4700:4700::3"}}, + {{ipv6: false, addr: "1.1.4.4"}, {ipv6: true, addr: "2606:4700:4700::4"}}, + {{ipv6: false, addr: "1.1.5.5"}, {ipv6: true, addr: "2606:4700:4700::5"}}, + }...) + cp, err := dht.GetClosestPeers(ctx, string(target)) + require.NoError(t, err) + require.Len(t, cp, dht.bucketSize) + require.Equal(t, pids[:dht.bucketSize], cp) + }) +} diff --git a/fullrt/options.go b/fullrt/options.go index 275792f9e..e367866dd 100644 --- a/fullrt/options.go +++ b/fullrt/options.go @@ -12,12 +12,13 @@ import ( type config struct { dhtOpts []kaddht.Option - crawlInterval time.Duration - waitFrac float64 - bulkSendParallelism int - timeoutPerOp time.Duration - crawler crawler.Crawler - pmOpts []providers.Option + crawlInterval time.Duration + waitFrac float64 + bulkSendParallelism int + timeoutPerOp time.Duration + crawler crawler.Crawler + pmOpts []providers.Option + ipDiversityFilterLimit int } func (cfg *config) apply(opts ...Option) error { @@ -47,8 +48,8 @@ func WithCrawler(c crawler.Crawler) Option { } } -// WithCrawlInterval sets the interval at which the DHT is crawled to refresh peer store. -// Defaults to 1 hour if unspecified. +// WithCrawlInterval sets the interval at which the DHT is crawled to refresh +// peer store. Defaults to 1 hour if unspecified. func WithCrawlInterval(i time.Duration) Option { return func(opt *config) error { opt.crawlInterval = i @@ -56,7 +57,8 @@ func WithCrawlInterval(i time.Duration) Option { } } -// WithSuccessWaitFraction sets the fraction of peers to wait for before considering an operation a success defined as a number between (0, 1]. +// WithSuccessWaitFraction sets the fraction of peers to wait for before +// considering an operation a success defined as a number between (0, 1]. // Defaults to 30% if unspecified. func WithSuccessWaitFraction(f float64) Option { return func(opt *config) error { @@ -68,8 +70,9 @@ func WithSuccessWaitFraction(f float64) Option { } } -// WithBulkSendParallelism sets the maximum degree of parallelism at which messages are sent to other peers. It must be at least 1. -// Defaults to 20 if unspecified. +// WithBulkSendParallelism sets the maximum degree of parallelism at which +// messages are sent to other peers. It must be at least 1. Defaults to 20 if +// unspecified. func WithBulkSendParallelism(b int) Option { return func(opt *config) error { if b < 1 { @@ -80,8 +83,9 @@ func WithBulkSendParallelism(b int) Option { } } -// WithTimeoutPerOperation sets the timeout per operation, where operations include putting providers and querying the DHT. -// Defaults to 5 seconds if unspecified. +// WithTimeoutPerOperation sets the timeout per operation, where operations +// include putting providers and querying the DHT. Defaults to 5 seconds if +// unspecified. func WithTimeoutPerOperation(t time.Duration) Option { return func(opt *config) error { opt.timeoutPerOp = t @@ -89,10 +93,20 @@ func WithTimeoutPerOperation(t time.Duration) Option { } } -// WithProviderManagerOptions sets the options to use when instantiating providers.ProviderManager. +// WithProviderManagerOptions sets the options to use when instantiating +// providers.ProviderManager. func WithProviderManagerOptions(pmOpts ...providers.Option) Option { return func(opt *config) error { opt.pmOpts = pmOpts return nil } } + +// WithIPDiversityFilterLimit sets the maximum number of peers with addresses +// in the same IP group returned by GetClosestPeers. +func WithIPDiversityFilterLimit(ipDiversityFilterLimit int) Option { + return func(opt *config) error { + opt.ipDiversityFilterLimit = ipDiversityFilterLimit + return nil + } +} diff --git a/go.mod b/go.mod index 01bef9988..977a501b1 100644 --- a/go.mod +++ b/go.mod @@ -6,15 +6,15 @@ require ( github.com/google/gopacket v1.1.19 github.com/google/uuid v1.6.0 github.com/hashicorp/golang-lru v1.0.2 - github.com/ipfs/boxo v0.28.0 + github.com/ipfs/boxo v0.29.1 github.com/ipfs/go-cid v0.5.0 github.com/ipfs/go-datastore v0.8.2 github.com/ipfs/go-detect-race v0.0.1 github.com/ipfs/go-log v1.0.5 github.com/ipfs/go-log/v2 v2.5.1 github.com/ipfs/go-test v0.2.1 - github.com/libp2p/go-libp2p v0.41.0 - github.com/libp2p/go-libp2p-kbucket v0.6.5 + github.com/libp2p/go-libp2p v0.41.1 + github.com/libp2p/go-libp2p-kbucket v0.7.0 github.com/libp2p/go-libp2p-record v0.3.1 github.com/libp2p/go-libp2p-routing-helpers v0.7.5 github.com/libp2p/go-libp2p-testing v0.12.0 @@ -34,7 +34,7 @@ require ( go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 gonum.org/v1/gonum v0.15.1 - google.golang.org/protobuf v1.36.5 + google.golang.org/protobuf v1.36.6 ) require ( @@ -93,14 +93,14 @@ require ( github.com/pion/datachannel v1.5.10 // indirect github.com/pion/dtls/v2 v2.2.12 // indirect github.com/pion/dtls/v3 v3.0.4 // indirect - github.com/pion/ice/v4 v4.0.6 // indirect + github.com/pion/ice/v4 v4.0.8 // indirect github.com/pion/interceptor v0.1.37 // indirect github.com/pion/logging v0.2.3 // indirect github.com/pion/mdns/v2 v2.0.7 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pion/rtcp v1.2.15 // indirect github.com/pion/rtp v1.8.11 // indirect - github.com/pion/sctp v1.8.36 // indirect + github.com/pion/sctp v1.8.37 // indirect github.com/pion/sdp/v3 v3.0.10 // indirect github.com/pion/srtp/v3 v3.0.4 // indirect github.com/pion/stun v0.6.1 // indirect @@ -112,12 +112,12 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/polydawn/refmt v0.89.0 // indirect - github.com/prometheus/client_golang v1.21.0 // indirect + github.com/prometheus/client_golang v1.21.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.62.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/quic-go/qpack v0.5.1 // indirect - github.com/quic-go/quic-go v0.50.0 // indirect + github.com/quic-go/quic-go v0.50.1 // indirect github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect @@ -126,14 +126,14 @@ require ( go.uber.org/dig v1.18.0 // indirect go.uber.org/fx v1.23.0 // indirect go.uber.org/mock v0.5.0 // indirect - golang.org/x/crypto v0.35.0 // indirect - golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa // indirect - golang.org/x/mod v0.23.0 // indirect - golang.org/x/net v0.36.0 // indirect - golang.org/x/sync v0.11.0 // indirect - golang.org/x/sys v0.30.0 // indirect - golang.org/x/text v0.22.0 // indirect - golang.org/x/tools v0.30.0 // indirect + golang.org/x/crypto v0.36.0 // indirect + golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 // indirect + golang.org/x/mod v0.24.0 // indirect + golang.org/x/net v0.37.0 // indirect + golang.org/x/sync v0.12.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/text v0.23.0 // indirect + golang.org/x/tools v0.31.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.4.0 // indirect ) diff --git a/go.sum b/go.sum index 1cb8631d9..235baa58f 100644 --- a/go.sum +++ b/go.sum @@ -144,8 +144,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/ipfs/boxo v0.28.0 h1:io6nXqN8XMOstB7dQGG5GWnMk4WssoMvva9OADErZdI= -github.com/ipfs/boxo v0.28.0/go.mod h1:eY9w3iTpmZGKzDfEYjm3oK8f+xjv8KJhhNXJwicmd3I= +github.com/ipfs/boxo v0.29.1 h1:z61ZT4YDfTHLjXTsu/+3wvJ8aJlExthDSOCpx6Nh8xc= +github.com/ipfs/boxo v0.29.1/go.mod h1:MkDJStXiJS9U99cbAijHdcmwNfVn5DKYBmQCOgjY2NU= github.com/ipfs/go-block-format v0.2.0 h1:ZqrkxBA2ICbDRbK8KJs/u0O3dlp6gmAuuXUJNiW1Ycs= github.com/ipfs/go-block-format v0.2.0/go.mod h1:+jpL11nFx5A/SPpsoBn6Bzkra/zaArfSmsknbPMYgzM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= @@ -220,15 +220,15 @@ github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZ github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= github.com/libp2p/go-flow-metrics v0.2.0 h1:EIZzjmeOE6c8Dav0sNv35vhZxATIXWZg6j/C08XmmDw= github.com/libp2p/go-flow-metrics v0.2.0/go.mod h1:st3qqfu8+pMfh+9Mzqb2GTiwrAGjIPszEjZmtksN8Jc= -github.com/libp2p/go-libp2p v0.41.0 h1:JRaD39dqf/tBBGapJ0T38N73vOaDCsWgcx3mE6HgXWk= -github.com/libp2p/go-libp2p v0.41.0/go.mod h1:Be8QYqC4JW6Xq8buukNeoZJjyT1XUDcGoIooCHm1ye4= +github.com/libp2p/go-libp2p v0.41.1 h1:8ecNQVT5ev/jqALTvisSJeVNvXYJyK4NhQx1nNRXQZE= +github.com/libp2p/go-libp2p v0.41.1/go.mod h1:DcGTovJzQl/I7HMrby5ZRjeD0kQkGiy+9w6aEkSZpRI= github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94= github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8= github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV5VxfIrm0bcI0g= github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw= github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio= -github.com/libp2p/go-libp2p-kbucket v0.6.5 h1:Fsl1YvZcMwqrR4DYrTO02yo9PGYs2HBQIT3lGXFMTxg= -github.com/libp2p/go-libp2p-kbucket v0.6.5/go.mod h1:U6WOd0BvnSp03IQSrjgM54tg7zh1UUNsXLJqAQzClTA= +github.com/libp2p/go-libp2p-kbucket v0.7.0 h1:vYDvRjkyJPeWunQXqcW2Z6E93Ywx7fX0jgzb/dGOKCs= +github.com/libp2p/go-libp2p-kbucket v0.7.0/go.mod h1:blOINGIj1yiPYlVEX0Rj9QwEkmVnz3EP8LK1dRKBC6g= github.com/libp2p/go-libp2p-peerstore v0.1.4/go.mod h1:+4BDbDiiKf4PzpANZDAT+knVdLxvqh7hXOujessqdzs= github.com/libp2p/go-libp2p-record v0.3.1 h1:cly48Xi5GjNw5Wq+7gmjfBiG9HCzQVkiZOUZ8kUl+Fg= github.com/libp2p/go-libp2p-record v0.3.1/go.mod h1:T8itUkLcWQLCYMqtX7Th6r7SexyUJpIyPgks757td/E= @@ -348,8 +348,8 @@ github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk= github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U= github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg= -github.com/pion/ice/v4 v4.0.6 h1:jmM9HwI9lfetQV/39uD0nY4y++XZNPhvzIPCb8EwxUM= -github.com/pion/ice/v4 v4.0.6/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw= +github.com/pion/ice/v4 v4.0.8 h1:ajNx0idNG+S+v9Phu4LSn2cs8JEfTsA1/tEjkkAVpFY= +github.com/pion/ice/v4 v4.0.8/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw= github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI= github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= @@ -363,8 +363,8 @@ github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo= github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0= github.com/pion/rtp v1.8.11 h1:17xjnY5WO5hgO6SD3/NTIUPvSFw/PbLsIJyz1r1yNIk= github.com/pion/rtp v1.8.11/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4= -github.com/pion/sctp v1.8.36 h1:owNudmnz1xmhfYje5L/FCav3V9wpPRePHle3Zi+P+M0= -github.com/pion/sctp v1.8.36/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE= +github.com/pion/sctp v1.8.37 h1:ZDmGPtRPX9mKCiVXtMbTWybFw3z/hVKAZgU81wcOrqs= +github.com/pion/sctp v1.8.37/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE= github.com/pion/sdp/v3 v3.0.10 h1:6MChLE/1xYB+CjumMw+gZ9ufp2DPApuVSnDT8t5MIgA= github.com/pion/sdp/v3 v3.0.10/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E= github.com/pion/srtp/v3 v3.0.4 h1:2Z6vDVxzrX3UHEgrUyIGM4rRouoC7v+NiF1IHtp9B5M= @@ -391,8 +391,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/polydawn/refmt v0.89.0 h1:ADJTApkvkeBZsN0tBTx8QjpD9JkmxbKp0cxfr9qszm4= github.com/polydawn/refmt v0.89.0/go.mod h1:/zvteZs/GwLtCgZ4BL6CBsk9IKIlexP43ObX9AxTqTw= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= -github.com/prometheus/client_golang v1.21.0 h1:DIsaGmiaBkSangBgMtWdNfxbMNdku5IK6iNhrEqWvdA= -github.com/prometheus/client_golang v1.21.0/go.mod h1:U9NM32ykUErtVBxdvD3zfi+EuFkkaBvMb09mIfe0Zgg= +github.com/prometheus/client_golang v1.21.1 h1:DOvXXTqVzvkIewV/CDPFdejpMCGeMcbGCQ8YOmu+Ibk= +github.com/prometheus/client_golang v1.21.1/go.mod h1:U9NM32ykUErtVBxdvD3zfi+EuFkkaBvMb09mIfe0Zgg= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= @@ -404,8 +404,8 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI= github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg= -github.com/quic-go/quic-go v0.50.0 h1:3H/ld1pa3CYhkcc20TPIyG1bNsdhn9qZBGN3b9/UyUo= -github.com/quic-go/quic-go v0.50.0/go.mod h1:Vim6OmUvlYdwBhXP9ZVrtGmCMWa3wEqhq3NgYrI8b4E= +github.com/quic-go/quic-go v0.50.1 h1:unsgjFIUqW8a2oopkY7YNONpV1gYND6Nt9hnt1PN94Q= +github.com/quic-go/quic-go v0.50.1/go.mod h1:Vim6OmUvlYdwBhXP9ZVrtGmCMWa3wEqhq3NgYrI8b4E= github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 h1:4WFk6u3sOT6pLa1kQ50ZVdm8BQFgJNA117cepZxtLIg= github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66/go.mod h1:Vp72IJajgeOL6ddqrAhmp7IM9zbTcgkQxD/YdxrVwMw= github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk= @@ -544,11 +544,11 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= -golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= -golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa h1:t2QcU6V556bFjYgu4L6C+6VrCPyJZ+eyRsABUPs1mz4= -golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa/go.mod h1:BHOTPb3L19zxehTsLoJXVaTktb06DFgmdW6Wb9s8jqk= +golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 h1:nDVHiLt8aIbd/VzvPWN6kSOPE7+F/fNFDSXLVYkE/Iw= +golang.org/x/exp v0.0.0-20250305212735-054e65f0b394/go.mod h1:sIifuuw/Yco/y6yb6+bDNfyeQ/MdPUy/hKEMYQV17cM= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -562,8 +562,8 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.23.0 h1:Zb7khfcRGKk+kqfxFaP5tZqCnDZMjC5VtUBs87Hr6QM= -golang.org/x/mod v0.23.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= +golang.org/x/mod v0.24.0 h1:ZfthKaKaT4NrhGVZHO1/WDTwGES4De8KtWO0SIbNJMU= +golang.org/x/mod v0.24.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -587,8 +587,8 @@ golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= -golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA= -golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I= +golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c= +golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -604,8 +604,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= -golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= +golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -636,8 +636,8 @@ golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -654,8 +654,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= -golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= @@ -680,8 +680,8 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.30.0 h1:BgcpHewrV5AUp2G9MebG4XPFI1E2W41zU1SaqVA9vJY= -golang.org/x/tools v0.30.0/go.mod h1:c347cR/OJfw5TI+GfX7RUPNMdDRRbjvYTS0jPyvsVtY= +golang.org/x/tools v0.31.0 h1:0EedkvKDbh+qistFTd0Bcwe/YLh4vHwWEkiI0toFIBU= +golang.org/x/tools v0.31.0/go.mod h1:naFTU+Cev749tSJRXJlna0T3WxKvb1kWEx15xA4SdmQ= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -706,8 +706,8 @@ google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9M google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= -google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= -google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/query.go b/query.go index 9d8740b32..5dbcfcb38 100644 --- a/query.go +++ b/query.go @@ -45,6 +45,12 @@ type query struct { // seedPeers is the set of peers that seed the query seedPeers []peer.ID + // If non-zero, define how many closer peers from the same IP block are + // allowed to be returned in a response. if response contains more than + // maxPeersPerIPGroup peers from the same IP block, all peers from that IP + // block are dropped + maxPeersPerIPGroup int + // peerTimes contains the duration of each successful query to a peer peerTimes map[peer.ID]time.Duration @@ -127,7 +133,7 @@ func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, qu // wait for all queries to complete before returning, aborting ongoing queries if we've been externally stopped followupsCompleted := 0 processFollowUp: - for i := 0; i < len(queryPeers); i++ { + for i := range queryPeers { select { case <-doneCh: followupsCompleted++ @@ -168,18 +174,27 @@ func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn }) return nil, nil, kb.ErrLookupFailure } + // if the DHT has a diversity filter, reuse the maxForTable value to drop + // responses from peers providing too many closer peers in the same IP block + var maxPeersPerIPGroup int + if dht.rtPeerDiversityFilter != nil { + if filter, ok := dht.rtPeerDiversityFilter.(*rtPeerIPGroupFilter); ok { + maxPeersPerIPGroup = filter.maxForTable + } + } q := &query{ - id: uuid.New(), - key: target, - ctx: ctx, - dht: dht, - queryPeers: qpeerset.NewQueryPeerset(target), - seedPeers: seedPeers, - peerTimes: make(map[peer.ID]time.Duration), - terminated: false, - queryFn: queryFn, - stopFn: stopFn, + id: uuid.New(), + key: target, + ctx: ctx, + dht: dht, + queryPeers: qpeerset.NewQueryPeerset(target), + maxPeersPerIPGroup: maxPeersPerIPGroup, + seedPeers: seedPeers, + peerTimes: make(map[peer.ID]time.Duration), + terminated: false, + queryFn: queryFn, + stopFn: stopFn, } // run the query @@ -425,11 +440,14 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID // query successful, try to add to RT q.dht.validPeerFound(p) + if q.maxPeersPerIPGroup != 0 { + newPeers = filterPeersByIPDiversity(newPeers, q.maxPeersPerIPGroup) + } + // process new peers saw := []peer.ID{} for _, next := range newPeers { if next.ID == q.dht.self { // don't add self. - logger.Debugf("PEERS CLOSER -- worker for: %v found self", p) continue } diff --git a/rt_diversity_filter.go b/rt_diversity_filter.go index 444eff4fd..e902e6853 100644 --- a/rt_diversity_filter.go +++ b/rt_diversity_filter.go @@ -10,6 +10,7 @@ import ( logging "github.com/ipfs/go-log" ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" ) var dfLog = logging.Logger("dht/RtDiversityFilter") @@ -106,3 +107,57 @@ func (r *rtPeerIPGroupFilter) PeerAddresses(p peer.ID) []ma.Multiaddr { } return addr } + +// filterPeersByIPDiversity filters out peers from the response that are overrepresented by IP group. +// If an IP group has more than `limit` peers, all peers with at least 1 address in that IP group +// are filtered out. +func filterPeersByIPDiversity(newPeers []*peer.AddrInfo, limit int) []*peer.AddrInfo { + // If no diversity limit is set, return all peers + if limit == 0 { + return newPeers + } + + // Count peers per IP group + ipGroupPeers := make(map[peerdiversity.PeerIPGroupKey]map[peer.ID]struct{}) + for _, p := range newPeers { + // Find all IP groups this peer belongs to + for _, addr := range p.Addrs { + ip, err := manet.ToIP(addr) + if err != nil { + continue + } + group := peerdiversity.IPGroupKey(ip) + if len(group) == 0 { + continue + } + if _, ok := ipGroupPeers[group]; !ok { + ipGroupPeers[group] = make(map[peer.ID]struct{}) + } + ipGroupPeers[group][p.ID] = struct{}{} + } + } + + // Identify overrepresented groups and tag peers for removal + peersToRemove := make(map[peer.ID]struct{}) + for _, peers := range ipGroupPeers { + if len(peers) > limit { + for p := range peers { + peersToRemove[p] = struct{}{} + } + } + } + if len(peersToRemove) == 0 { + // No groups are overrepresented, return all peers + return newPeers + } + + // Filter out peers from overrepresented groups + filteredPeers := make([]*peer.AddrInfo, 0, len(newPeers)) + for _, p := range newPeers { + if _, ok := peersToRemove[p.ID]; !ok { + filteredPeers = append(filteredPeers, p) + } + } + + return filteredPeers +} diff --git a/rt_diversity_filter_test.go b/rt_diversity_filter_test.go index 8bec676fe..d1f7d52eb 100644 --- a/rt_diversity_filter_test.go +++ b/rt_diversity_filter_test.go @@ -2,13 +2,17 @@ package dht import ( "context" + "crypto/rand" "testing" "time" kb "github.com/libp2p/go-libp2p-kbucket" "github.com/libp2p/go-libp2p-kbucket/peerdiversity" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" + ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" ) @@ -153,3 +157,133 @@ func TestRoutingTableEndToEndMaxPerTable(t *testing.T) { require.Len(t, d.routingTable.ListPeers(), 3) require.True(t, d.routingTable.Find(d5.self) == "") } + +func TestFilterPeersByIPDiversity(t *testing.T) { + maxIPsPerGroup := 2 + + type addr struct { + ipv6 bool + addr string + } + createPeer := func(ips ...addr) *peer.AddrInfo { + _, pubKey, err := crypto.GenerateEd25519Key(rand.Reader) + require.NoError(t, err) + pid, err := peer.IDFromPublicKey(pubKey) + require.NoError(t, err) + p := &peer.AddrInfo{ID: pid, Addrs: make([]ma.Multiaddr, 0, len(ips))} + for _, ip := range ips { + var a ma.Multiaddr + var err error + if ip.ipv6 { + a, err = ma.NewMultiaddr("/ip6/" + ip.addr + "/tcp/4001") + } else { + a, err = ma.NewMultiaddr("/ip4/" + ip.addr + "/tcp/4001") + } + require.NoError(t, err) + p.Addrs = append(p.Addrs, a) + } + return p + } + + t.Run("Different IPv4 blocks", func(t *testing.T) { + peers := []*peer.AddrInfo{ + createPeer(addr{ipv6: false, addr: "1.1.1.1"}), + createPeer(addr{ipv6: false, addr: "2.2.2.2"}), + createPeer(addr{ipv6: false, addr: "3.3.3.3"}), + createPeer(addr{ipv6: false, addr: "4.4.4.4"}), + } + filtered := filterPeersByIPDiversity(peers, maxIPsPerGroup) + require.Equal(t, peers, filtered) + }) + + t.Run("Same IPv4 block, but acceptable diversity", func(t *testing.T) { + peers := []*peer.AddrInfo{ + createPeer(addr{ipv6: false, addr: "1.1.1.1"}), + createPeer(addr{ipv6: false, addr: "1.1.2.2"}), + createPeer(addr{ipv6: false, addr: "3.3.3.3"}), + createPeer(addr{ipv6: false, addr: "4.4.4.4"}), + } + filtered := filterPeersByIPDiversity(peers, maxIPsPerGroup) + require.Equal(t, peers, filtered) + }) + + t.Run("Overrepresented IPv4 block", func(t *testing.T) { + peers := []*peer.AddrInfo{ + createPeer(addr{ipv6: false, addr: "1.1.1.1"}), + createPeer(addr{ipv6: false, addr: "1.1.1.2"}), + createPeer(addr{ipv6: false, addr: "1.1.3.3"}), + createPeer(addr{ipv6: false, addr: "4.4.4.4"}), + } + filtered := filterPeersByIPDiversity(peers, maxIPsPerGroup) + require.Len(t, filtered, 1) + require.Equal(t, peers[3], filtered[0]) + }) + + t.Run("Different IPv6 ASNs", func(t *testing.T) { + peers := []*peer.AddrInfo{ + createPeer(addr{ipv6: true, addr: "2001:4860:4860::1"}), + createPeer(addr{ipv6: true, addr: "2606:4700:4700::2"}), + createPeer(addr{ipv6: true, addr: "2620:fe::3"}), + createPeer(addr{ipv6: true, addr: "2a02:6b8::4"}), + } + filtered := filterPeersByIPDiversity(peers, maxIPsPerGroup) + require.Equal(t, filtered, peers) + }) + + t.Run("Same IPv6 ASNs, but acceptable diversity", func(t *testing.T) { + peers := []*peer.AddrInfo{ + createPeer(addr{ipv6: true, addr: "2001:4860:4860::1"}), + createPeer(addr{ipv6: true, addr: "2001:4860:4860::2"}), + createPeer(addr{ipv6: true, addr: "2620:fe::3"}), + createPeer(addr{ipv6: true, addr: "2a02:6b8::4"}), + } + filtered := filterPeersByIPDiversity(peers, maxIPsPerGroup) + require.Equal(t, filtered, peers) + }) + + t.Run("Overrepresented IPv6 ASN", func(t *testing.T) { + peers := []*peer.AddrInfo{ + createPeer(addr{ipv6: true, addr: "2001:4860:4860::1"}), + createPeer(addr{ipv6: true, addr: "2001:4860:4860::2"}), + createPeer(addr{ipv6: true, addr: "2001:4860:4860::3"}), + createPeer(addr{ipv6: true, addr: "2a02:6b8::4"}), + } + filtered := filterPeersByIPDiversity(peers, maxIPsPerGroup) + require.Len(t, filtered, 1) + require.Equal(t, peers[3], filtered[0]) + }) + + t.Run("IPv4+IPv6 acceptable representation", func(t *testing.T) { + peers := []*peer.AddrInfo{ + createPeer(addr{ipv6: false, addr: "1.1.1.1"}, addr{ipv6: true, addr: "2001:4860:4860::1"}), + createPeer(addr{ipv6: false, addr: "2.2.2.2"}, addr{ipv6: true, addr: "2001:4860:4860::2"}), + createPeer(addr{ipv6: false, addr: "2.2.3.3"}, addr{ipv6: true, addr: "2620:fe::3"}), + createPeer(addr{ipv6: false, addr: "4.4.4.4"}, addr{ipv6: true, addr: "2a02:6b8::4"}), + } + filtered := filterPeersByIPDiversity(peers, maxIPsPerGroup) + require.Equal(t, filtered, peers) + }) + + t.Run("IPv4+IPv6 overrepresentation", func(t *testing.T) { + peers := []*peer.AddrInfo{ + createPeer(addr{ipv6: false, addr: "1.1.1.1"}, addr{ipv6: true, addr: "2001:4860:4860::1"}), + createPeer(addr{ipv6: false, addr: "1.1.1.2"}, addr{ipv6: true, addr: "2606:4700:4700::2"}), + createPeer(addr{ipv6: false, addr: "1.1.3.3"}, addr{ipv6: true, addr: "2620:fe::3"}), + createPeer(addr{ipv6: false, addr: "4.4.4.4"}, addr{ipv6: true, addr: "2620:fe::4"}), + } + filtered := filterPeersByIPDiversity(peers, maxIPsPerGroup) + require.Len(t, filtered, 1) + require.Equal(t, peers[3], filtered[0]) + }) + + t.Run("Disabled IP diversity filter", func(t *testing.T) { + peers := []*peer.AddrInfo{ + createPeer(addr{ipv6: false, addr: "1.1.1.1"}), + createPeer(addr{ipv6: false, addr: "1.1.1.2"}), + createPeer(addr{ipv6: false, addr: "1.1.3.3"}), + createPeer(addr{ipv6: false, addr: "1.1.4.4"}), + } + filtered := filterPeersByIPDiversity(peers, 0) + require.Equal(t, filtered, peers) + }) +}