Skip to content

Commit e3761f9

Browse files
provider: batch provide (#1121)
* provider: adding provide and reprovide queue * provider: network operations * add some tests * schedule prefix len computations * provider schedule * provider: handleProvide * addressed review * use go-test/random * satisfy linter * log errors during initial prefix len measurement * address review * satisfy linter * address review * provider: explore swarm * provider: batch provide * address review * decrease minimal region size from replicationFactor+1 to replicationFactor * simplify unscheduleSubsumedPrefixesNoClock * address review * refactor and test groupAndScheduleKeysByPrefix * moved maxPrefixSize const to top * address review
1 parent 582e3c4 commit e3761f9

File tree

1 file changed

+105
-6
lines changed

1 file changed

+105
-6
lines changed

provider/provider.go

Lines changed: 105 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/libp2p/go-libp2p/core/peer"
1818
ma "github.com/multiformats/go-multiaddr"
1919
mh "github.com/multiformats/go-multihash"
20+
"go.opentelemetry.io/otel/metric"
2021

2122
"github.com/probe-lab/go-libdht/kad/key"
2223
"github.com/probe-lab/go-libdht/kad/key/bit256"
@@ -67,9 +68,16 @@ type DHTProvider interface {
6768

6869
var _ DHTProvider = &SweepingProvider{}
6970

70-
// maxPrefixSize is the maximum size of a prefix used to define a keyspace
71-
// region.
72-
const maxPrefixSize = 24
71+
const (
72+
// maxPrefixSize is the maximum size of a prefix used to define a keyspace
73+
// region.
74+
maxPrefixSize = 24
75+
// individualProvideThreshold is the threshold for the number of keys to
76+
// trigger a region exploration. If the number of keys to provide for a
77+
// region is less or equal to the threshold, the keys will be individually
78+
// provided.
79+
individualProvideThreshold = 2
80+
)
7381

7482
const loggerName = "dht/SweepingProvider"
7583

@@ -127,12 +135,13 @@ type SweepingProvider struct {
127135
msgSender pb.MessageSender
128136
getSelfAddrs func() []ma.Multiaddr
129137
addLocalRecord func(mh.Multihash) error
138+
139+
provideCounter metric.Int64Counter
130140
}
131141

132142
// FIXME: remove me
133143
func (s *SweepingProvider) SatisfyLinter() {
134144
s.vanillaProvide([]byte{})
135-
s.exploreSwarm("")
136145
s.measureInitialPrefixLen()
137146
}
138147

@@ -729,18 +738,108 @@ func (s *SweepingProvider) provideLoop() {
729738
if ok {
730739
go func(prefix bitstr.Key, keys []mh.Multihash) {
731740
defer s.workerPool.Release(burstWorker)
732-
s.provideForPrefix(prefix, keys)
741+
s.batchProvide(prefix, keys)
733742
}(prefix, keys)
734743
} else {
735744
s.workerPool.Release(burstWorker)
736745
}
737746
}
738747
}
739748

740-
func (s *SweepingProvider) provideForPrefix(prefix bitstr.Key, keys []mh.Multihash) {
749+
func (s *SweepingProvider) batchProvide(prefix bitstr.Key, keys []mh.Multihash) {
750+
if len(keys) == 0 {
751+
return
752+
}
753+
addrInfo, ok := s.selfAddrInfo()
754+
if !ok {
755+
// Don't provide if the node doesn't have a valid address to include in the
756+
// provider record.
757+
return
758+
}
759+
if len(keys) <= individualProvideThreshold {
760+
// Don't fully explore the region, execute simple DHT provides for these
761+
// keys. It isn't worth it to fully explore a region for just a few keys.
762+
s.individualProvide(prefix, keys, false, false)
763+
return
764+
}
765+
766+
regions, coveredPrefix, err := s.exploreSwarm(prefix)
767+
if err != nil {
768+
s.failedProvide(prefix, keys, fmt.Errorf("reprovide '%s': %w", prefix, err))
769+
return
770+
}
771+
logger.Debugf("provide: requested prefix '%s' (len %d), prefix covered '%s' (len %d)", prefix, len(prefix), coveredPrefix, len(coveredPrefix))
772+
773+
// Add any key matching the covered prefix from the provide queue to the
774+
// current provide batch.
775+
extraKeys := s.provideQueue.DequeueMatching(coveredPrefix)
776+
keys = append(keys, extraKeys...)
777+
regions = keyspace.AssignKeysToRegions(regions, keys)
778+
779+
if !s.provideRegions(regions, addrInfo) {
780+
logger.Errorf("failed to reprovide any region for prefix %s", prefix)
781+
}
782+
}
783+
784+
func (s *SweepingProvider) failedProvide(prefix bitstr.Key, keys []mh.Multihash, err error) {
785+
logger.Error(err)
786+
// Put keys back to the provide queue.
787+
s.provideQueue.Enqueue(prefix, keys...)
788+
789+
s.connectivity.TriggerCheck()
790+
}
791+
792+
// selfAddrInfo returns the current peer.AddrInfo to be used in the provider
793+
// records sent to remote peers.
794+
//
795+
// If the node currently has no valid multiaddress, return an empty AddrInfo
796+
// and false.
797+
func (s *SweepingProvider) selfAddrInfo() (peer.AddrInfo, bool) {
798+
addrs := s.getSelfAddrs()
799+
if len(addrs) == 0 {
800+
logger.Warn("provider: no self addresses available for providing keys")
801+
return peer.AddrInfo{}, false
802+
}
803+
return peer.AddrInfo{ID: s.peerid, Addrs: addrs}, true
804+
}
805+
806+
// individualProvide provides the keys sharing the same prefix to the network
807+
// without exploring the associated keyspace regions. It performs "normal" DHT
808+
// provides for the supplied keys, handles failures and schedules next
809+
// reprovide is necessary.
810+
func (s *SweepingProvider) individualProvide(prefix bitstr.Key, keys []mh.Multihash, reprovide bool, periodicReprovide bool) {
741811
// TODO: implement me
742812
}
743813

814+
// provideRegions contains common logic to batchProvide() and batchReprovide().
815+
// It iterate over supplied regions, and allocates the regions provider records
816+
// to the appropriate DHT servers.
817+
func (s *SweepingProvider) provideRegions(regions []keyspace.Region, addrInfo peer.AddrInfo) bool {
818+
errCount := 0
819+
for _, r := range regions {
820+
nKeys := r.Keys.Size()
821+
if nKeys == 0 {
822+
continue
823+
}
824+
// Add keys to local provider store
825+
for _, h := range keyspace.AllValues(r.Keys, s.order) {
826+
s.addLocalRecord(h)
827+
}
828+
keysAllocations := keyspace.AllocateToKClosest(r.Keys, r.Peers, s.replicationFactor)
829+
err := s.sendProviderRecords(keysAllocations, addrInfo)
830+
if err != nil {
831+
errCount++
832+
err = fmt.Errorf("cannot send provider records for region %s: %s", r.Prefix, err)
833+
s.failedProvide(r.Prefix, keyspace.AllValues(r.Keys, s.order), err)
834+
continue
835+
}
836+
s.provideCounter.Add(context.Background(), int64(nKeys))
837+
838+
}
839+
// If at least 1 regions was provided, we don't consider it a failure.
840+
return errCount < len(regions)
841+
}
842+
744843
// ProvideOnce only sends provider records for the given keys out to the DHT
745844
// swarm. It does NOT take the responsibility to reprovide these keys.
746845
func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) {

0 commit comments

Comments
 (0)