Skip to content

Commit 582e3c4

Browse files
provider: swarm exploration (#1120)
* provider: adding provide and reprovide queue * provider: network operations * add some tests * schedule prefix len computations * provider schedule * provider: handleProvide * addressed review * use go-test/random * satisfy linter * log errors during initial prefix len measurement * address review * satisfy linter * address review * provider: explore swarm * address review * decrease minimal region size from replicationFactor+1 to replicationFactor * simplify unscheduleSubsumedPrefixesNoClock * address review * refactor and test groupAndScheduleKeysByPrefix * moved maxPrefixSize const to top
1 parent ea7a839 commit 582e3c4

File tree

3 files changed

+132
-9
lines changed

3 files changed

+132
-9
lines changed

provider/internal/keyspace/trie.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -305,9 +305,9 @@ type Region struct {
305305
Keys *trie.Trie[bit256.Key, mh.Multihash]
306306
}
307307

308-
// RegionsFromPeers returns the keyspace regions of size `regionSize` from the
309-
// given `peers` sorted according to `order` along with the Common Prefix
310-
// shared by all peers.
308+
// RegionsFromPeers returns the keyspace regions of size at least `regionSize`
309+
// from the given `peers` sorted according to `order` along with the Common
310+
// Prefix shared by all peers.
311311
func RegionsFromPeers(peers []peer.ID, regionSize int, order bit256.Key) ([]Region, bitstr.Key) {
312312
if len(peers) == 0 {
313313
return []Region{}, ""
@@ -326,14 +326,14 @@ func RegionsFromPeers(peers []peer.ID, regionSize int, order bit256.Key) ([]Regi
326326
}
327327

328328
// extractMinimalRegions returns the list of all non-overlapping subtries of
329-
// `t` having strictly more than `size` elements, sorted according to `order`.
330-
// Every element is included in exactly one region.
329+
// `t` having at least `size` elements, sorted according to `order`. Every
330+
// element is included in exactly one region.
331331
func extractMinimalRegions(t *trie.Trie[bit256.Key, peer.ID], path bitstr.Key, size int, order bit256.Key) []Region {
332332
if t.IsEmptyLeaf() {
333333
return nil
334334
}
335335
branch0, branch1 := t.Branch(0), t.Branch(1)
336-
if branch0 != nil && branch1 != nil && branch0.Size() > size && branch1.Size() > size {
336+
if branch0 != nil && branch1 != nil && branch0.Size() >= size && branch1.Size() >= size {
337337
b := int(order.Bit(len(path)))
338338
return append(extractMinimalRegions(t.Branch(b), path+bitstr.Key(byte('0'+b)), size, order),
339339
extractMinimalRegions(t.Branch(1-b), path+bitstr.Key(byte('1'-b)), size, order)...)

provider/internal/keyspace/trie_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -580,15 +580,15 @@ func TestRegionsFromPeers(t *testing.T) {
580580

581581
// Two peers
582582
p1 := genRandPeerID(t)
583-
regions, commonPrefix = RegionsFromPeers([]peer.ID{p0, p1}, 1, bit256.ZeroKey())
583+
regions, commonPrefix = RegionsFromPeers([]peer.ID{p0, p1}, 2, bit256.ZeroKey())
584584
require.Len(t, regions, 1)
585585
cpl := key.CommonPrefixLength(bstrPid0, PeerIDToBit256(p1))
586586
common := bstrPid0[:cpl]
587587
require.Equal(t, common, commonPrefix)
588588

589589
// Three peers
590590
p2 := genRandPeerID(t)
591-
regions, commonPrefix = RegionsFromPeers([]peer.ID{p0, p1, p2}, 1, bit256.ZeroKey())
591+
regions, commonPrefix = RegionsFromPeers([]peer.ID{p0, p1, p2}, 2, bit256.ZeroKey())
592592
require.Len(t, regions, 1)
593593
cpl = key.CommonPrefixLength(common, PeerIDToBit256(p2))
594594
common = common[:cpl]

provider/provider.go

Lines changed: 124 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package provider
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"strconv"
78
"sync"
@@ -98,6 +99,8 @@ type SweepingProvider struct {
9899

99100
keyStore *datastore.KeyStore
100101

102+
replicationFactor int
103+
101104
provideQueue *queue.ProvideQueue
102105
provideRunning sync.Mutex
103106

@@ -129,7 +132,7 @@ type SweepingProvider struct {
129132
// FIXME: remove me
130133
func (s *SweepingProvider) SatisfyLinter() {
131134
s.vanillaProvide([]byte{})
132-
s.closestPeersToKey("")
135+
s.exploreSwarm("")
133136
s.measureInitialPrefixLen()
134137
}
135138

@@ -396,6 +399,126 @@ func (s *SweepingProvider) vanillaProvide(k mh.Multihash) (bitstr.Key, error) {
396399
return coveredPrefix, s.sendProviderRecords(keysAllocations, addrInfo)
397400
}
398401

402+
// exploreSwarm finds all peers whose kademlia identifier matches `prefix` in
403+
// the DHT swarm, and organizes them in keyspace regions.
404+
//
405+
// A region is identified by a keyspace prefix, and contains all the peers
406+
// matching this prefix. A region always has at least s.replicationFactor
407+
// peers. Regions are non-overlapping.
408+
//
409+
// If there less than s.replicationFactor peers match `prefix`, explore
410+
// shorter prefixes until at least s.replicationFactor peers are included in
411+
// the region.
412+
//
413+
// The returned `coveredPrefix` represents the keyspace prefix covered by all
414+
// returned regions combined. It is different to the supplied `prefix` if there
415+
// aren't enough peers matching `prefix`.
416+
func (s *SweepingProvider) exploreSwarm(prefix bitstr.Key) (regions []keyspace.Region, coveredPrefix bitstr.Key, err error) {
417+
peers, err := s.closestPeersToPrefix(prefix)
418+
if err != nil {
419+
return nil, "", fmt.Errorf("exploreSwarm '%s': %w", prefix, err)
420+
}
421+
if len(peers) == 0 {
422+
return nil, "", fmt.Errorf("no peers found when exploring prefix %s", prefix)
423+
}
424+
regions, coveredPrefix = keyspace.RegionsFromPeers(peers, s.replicationFactor, s.order)
425+
return regions, coveredPrefix, nil
426+
}
427+
428+
// maxPrefixSearches is the maximum number of GetClosestPeers operations that
429+
// are allowed to explore a prefix, preventing an infinite loop, since the exit
430+
// condition depends on the network topology.
431+
//
432+
// A lower bound estimate on the number of fresh peers returned by GCP is
433+
// replicationFactor/2. Hence, 64 GCP are expected to return at least
434+
// 32*replicatonFactor peers, which should be more than enough, even if the
435+
// supplied prefix is too short.
436+
const maxPrefixSearches = 64
437+
438+
// closestPeersToPrefix returns at least s.replicationFactor peers
439+
// corresponding to the branch of the network peers trie matching the provided
440+
// prefix. In the case there aren't enough peers matching the provided prefix,
441+
// it will find and return the closest peers to the prefix, even if they don't
442+
// exactly match it.
443+
func (s *SweepingProvider) closestPeersToPrefix(prefix bitstr.Key) ([]peer.ID, error) {
444+
allClosestPeers := make(map[peer.ID]struct{})
445+
446+
nextPrefix := prefix
447+
startTime := time.Now()
448+
coveredPrefixesStack := []bitstr.Key{}
449+
450+
i := 0
451+
// Go down the trie to fully cover prefix.
452+
exploration:
453+
for {
454+
if i == maxPrefixSearches {
455+
return nil, errors.New("closestPeersToPrefix needed more than maxPrefixSearches iterations")
456+
}
457+
if !s.connectivity.IsOnline() {
458+
return nil, errors.New("provider: node is offline")
459+
}
460+
i++
461+
fullKey := keyspace.FirstFullKeyWithPrefix(nextPrefix, s.order)
462+
closestPeers, err := s.closestPeersToKey(fullKey)
463+
if err != nil {
464+
// We only get an err if something really bad happened, e.g no peers in
465+
// routing table, invalid key, etc.
466+
return nil, err
467+
}
468+
if len(closestPeers) == 0 {
469+
return nil, errors.New("dht lookup did not return any peers")
470+
}
471+
coveredPrefix, coveredPeers := keyspace.ShortestCoveredPrefix(fullKey, closestPeers)
472+
for _, p := range coveredPeers {
473+
allClosestPeers[p] = struct{}{}
474+
}
475+
476+
coveredPrefixLen := len(coveredPrefix)
477+
if i == 1 {
478+
if coveredPrefixLen <= len(prefix) && coveredPrefix == prefix[:coveredPrefixLen] && len(allClosestPeers) >= s.replicationFactor {
479+
// Exit early if the prefix is fully covered at the first request and
480+
// we have enough (at least replicationFactor) peers.
481+
break exploration
482+
}
483+
} else {
484+
latestPrefix := coveredPrefixesStack[len(coveredPrefixesStack)-1]
485+
for coveredPrefixLen <= len(latestPrefix) && coveredPrefix[:coveredPrefixLen-1] == latestPrefix[:coveredPrefixLen-1] {
486+
// Pop latest prefix from stack, because current prefix is
487+
// complementary.
488+
// e.g latestPrefix=0010, currentPrefix=0011. latestPrefix is
489+
// replaced by 001, unless 000 was also in the stack, etc.
490+
coveredPrefixesStack = coveredPrefixesStack[:len(coveredPrefixesStack)-1]
491+
coveredPrefix = coveredPrefix[:len(coveredPrefix)-1]
492+
coveredPrefixLen = len(coveredPrefix)
493+
494+
if len(coveredPrefixesStack) == 0 {
495+
if coveredPrefixLen <= len(prefix) && len(allClosestPeers) >= s.replicationFactor {
496+
break exploration
497+
}
498+
// Not enough peers -> add coveredPrefix to stack and continue.
499+
break
500+
}
501+
if coveredPrefixLen == 0 {
502+
logger.Error("coveredPrefixLen==0, coveredPrefixStack ", coveredPrefixesStack)
503+
break exploration
504+
}
505+
latestPrefix = coveredPrefixesStack[len(coveredPrefixesStack)-1]
506+
}
507+
}
508+
// Push coveredPrefix to stack
509+
coveredPrefixesStack = append(coveredPrefixesStack, coveredPrefix)
510+
// Flip last bit of last covered prefix
511+
nextPrefix = keyspace.FlipLastBit(coveredPrefixesStack[len(coveredPrefixesStack)-1])
512+
}
513+
514+
peers := make([]peer.ID, 0, len(allClosestPeers))
515+
for p := range allClosestPeers {
516+
peers = append(peers, p)
517+
}
518+
logger.Debugf("Region %s exploration required %d requests to discover %d peers in %s", prefix, i, len(allClosestPeers), time.Since(startTime))
519+
return peers, nil
520+
}
521+
399522
// closestPeersToKey returns a valid peer ID sharing a long common prefix with
400523
// the provided key. Note that the returned peer IDs aren't random, they are
401524
// taken from a static list of preimages.

0 commit comments

Comments
 (0)