Skip to content

Commit 6fff2a3

Browse files
authored
fullrt rework batching (#720)
* fullrt: do bulk sending explictly on peers Bulk sending now happens by dividing the keys to groups and then for each group collecting which peers need which records. This allows us to deal with peers individually and protect the connections for the duration while still dealing with the case of sending lots of data such that the routing table is updated in the middle of the send. * added additional logging including histograms of success and failure
1 parent 1e92b8a commit 6fff2a3

File tree

2 files changed

+194
-106
lines changed

2 files changed

+194
-106
lines changed

fullrt/dht.go

Lines changed: 178 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7+
"math/rand"
78
"sync"
89
"sync/atomic"
910
"time"
@@ -174,7 +175,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
174175

175176
crawlerInterval: time.Minute * 60,
176177

177-
bulkSendParallelism: 10,
178+
bulkSendParallelism: 20,
178179
}
179180

180181
rt.wg.Add(1)
@@ -918,21 +919,11 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash)
918919
return fmt.Errorf("no known addresses for self, cannot put provider")
919920
}
920921

921-
fn := func(ctx context.Context, k peer.ID) error {
922-
peers, err := dht.GetClosestPeers(ctx, string(k))
923-
if err != nil {
924-
return err
925-
}
926-
successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error {
927-
pmes := dht_pb.NewMessage(dht_pb.Message_ADD_PROVIDER, multihash.Multihash(k), 0)
928-
pmes.ProviderPeers = pbPeers
929-
930-
return dht.messageSender.SendMessage(ctx, p, pmes)
931-
}, peers, true)
932-
if successes == 0 {
933-
return fmt.Errorf("no successful provides")
934-
}
935-
return nil
922+
fn := func(ctx context.Context, p, k peer.ID) error {
923+
pmes := dht_pb.NewMessage(dht_pb.Message_ADD_PROVIDER, multihash.Multihash(k), 0)
924+
pmes.ProviderPeers = pbPeers
925+
926+
return dht.messageSender.SendMessage(ctx, p, pmes)
936927
}
937928

938929
keysAsPeerIDs := make([]peer.ID, 0, len(keys))
@@ -963,114 +954,210 @@ func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte)
963954
return fmt.Errorf("does not support duplicate keys")
964955
}
965956

966-
fn := func(ctx context.Context, k peer.ID) error {
967-
peers, err := dht.GetClosestPeers(ctx, string(k))
968-
if err != nil {
969-
return err
970-
}
971-
successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error {
972-
keyStr := string(k)
973-
return dht.protoMessenger.PutValue(ctx, p, record.MakePutRecord(keyStr, keyRecMap[keyStr]))
974-
}, peers, true)
975-
if successes == 0 {
976-
return fmt.Errorf("no successful puts")
977-
}
978-
return nil
957+
fn := func(ctx context.Context, p, k peer.ID) error {
958+
keyStr := string(k)
959+
return dht.protoMessenger.PutValue(ctx, p, record.MakePutRecord(keyStr, keyRecMap[keyStr]))
979960
}
980961

981962
return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn, false)
982963
}
983964

984-
func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(ctx context.Context, k peer.ID) error, isProvRec bool) error {
965+
func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(ctx context.Context, target, k peer.ID) error, isProvRec bool) error {
985966
if len(keys) == 0 {
986967
return nil
987968
}
988969

989-
sortedKeys := kb.SortClosestPeers(keys, kb.ID(make([]byte, 32)))
970+
type report struct {
971+
successes int
972+
failures int
973+
lastSuccess time.Time
974+
mx sync.RWMutex
975+
}
976+
977+
keySuccesses := make(map[peer.ID]*report, len(keys))
978+
var numSkipped int64
979+
980+
for _, k := range keys {
981+
keySuccesses[k] = &report{}
982+
}
983+
984+
logger.Infof("bulk send: number of keys %d, unique %d", len(keys), len(keySuccesses))
985+
numSuccessfulToWaitFor := int(float64(dht.bucketSize) * dht.waitFrac * 1.2)
986+
987+
sortedKeys := make([]peer.ID, 0, len(keySuccesses))
988+
for k := range keySuccesses {
989+
sortedKeys = append(sortedKeys, k)
990+
}
991+
992+
sortedKeys = kb.SortClosestPeers(sortedKeys, kb.ID(make([]byte, 32)))
993+
994+
dht.kMapLk.RLock()
995+
numPeers := len(dht.keyToPeerMap)
996+
dht.kMapLk.RUnlock()
997+
998+
chunkSize := (len(sortedKeys) * dht.bucketSize * 2) / numPeers
999+
if chunkSize == 0 {
1000+
chunkSize = 1
1001+
}
1002+
1003+
connmgrTag := fmt.Sprintf("dht-bulk-provide-tag-%d", rand.Int())
9901004

991-
var numSends uint64 = 0
992-
var numSendsSuccessful uint64 = 0
1005+
type workMessage struct {
1006+
p peer.ID
1007+
keys []peer.ID
1008+
}
9931009

1010+
workCh := make(chan workMessage, 1)
9941011
wg := sync.WaitGroup{}
995-
onePctKeys := uint64(len(sortedKeys)) / 100
1012+
wg.Add(dht.bulkSendParallelism)
1013+
for i := 0; i < dht.bulkSendParallelism; i++ {
1014+
go func() {
1015+
defer wg.Done()
1016+
defer logger.Debugf("bulk send goroutine done")
1017+
for wmsg := range workCh {
1018+
p, workKeys := wmsg.p, wmsg.keys
1019+
dht.peerAddrsLk.RLock()
1020+
peerAddrs := dht.peerAddrs[p]
1021+
dht.peerAddrsLk.RUnlock()
1022+
dialCtx, dialCancel := context.WithTimeout(ctx, dht.timeoutPerOp)
1023+
if err := dht.h.Connect(dialCtx, peer.AddrInfo{ID: p, Addrs: peerAddrs}); err != nil {
1024+
dialCancel()
1025+
atomic.AddInt64(&numSkipped, 1)
1026+
continue
1027+
}
1028+
dialCancel()
1029+
dht.h.ConnManager().Protect(p, connmgrTag)
1030+
for _, k := range workKeys {
1031+
keyReport := keySuccesses[k]
1032+
1033+
queryTimeout := dht.timeoutPerOp
1034+
keyReport.mx.RLock()
1035+
if keyReport.successes >= numSuccessfulToWaitFor {
1036+
if time.Since(keyReport.lastSuccess) > time.Millisecond*500 {
1037+
keyReport.mx.RUnlock()
1038+
continue
1039+
}
1040+
queryTimeout = time.Millisecond * 500
1041+
}
1042+
keyReport.mx.RUnlock()
1043+
1044+
fnCtx, fnCancel := context.WithTimeout(ctx, queryTimeout)
1045+
if err := fn(fnCtx, p, k); err == nil {
1046+
keyReport.mx.Lock()
1047+
keyReport.successes++
1048+
if keyReport.successes >= numSuccessfulToWaitFor {
1049+
keyReport.lastSuccess = time.Now()
1050+
}
1051+
keyReport.mx.Unlock()
1052+
} else {
1053+
keyReport.mx.Lock()
1054+
keyReport.failures++
1055+
keyReport.mx.Unlock()
1056+
if ctx.Err() != nil {
1057+
fnCancel()
1058+
break
1059+
}
1060+
}
1061+
fnCancel()
1062+
}
9961063

997-
bulkSendFn := func(chunk []peer.ID) {
998-
defer wg.Done()
999-
for _, key := range chunk {
1000-
if ctx.Err() != nil {
1001-
break
1064+
dht.h.ConnManager().Unprotect(p, connmgrTag)
10021065
}
1066+
}()
1067+
}
10031068

1004-
sendsSoFar := atomic.AddUint64(&numSends, 1)
1005-
if onePctKeys > 0 && sendsSoFar%onePctKeys == 0 {
1006-
logger.Infof("bulk sending goroutine: %.1f%% done - %d/%d done", 100*float64(sendsSoFar)/float64(len(sortedKeys)), sendsSoFar, len(sortedKeys))
1007-
}
1008-
if err := fn(ctx, key); err != nil {
1009-
var l interface{}
1010-
if isProvRec {
1011-
l = internal.LoggableProviderRecordBytes(key)
1012-
} else {
1013-
l = internal.LoggableRecordKeyString(key)
1069+
keyGroups := divideByChunkSize(sortedKeys, chunkSize)
1070+
sendsSoFar := 0
1071+
for _, g := range keyGroups {
1072+
if ctx.Err() != nil {
1073+
break
1074+
}
1075+
1076+
keysPerPeer := make(map[peer.ID][]peer.ID)
1077+
for _, k := range g {
1078+
peers, err := dht.GetClosestPeers(ctx, string(k))
1079+
if err == nil {
1080+
for _, p := range peers {
1081+
keysPerPeer[p] = append(keysPerPeer[p], k)
10141082
}
1015-
logger.Infof("failed to complete bulk sending of key :%v. %v", l, err)
1016-
} else {
1017-
atomic.AddUint64(&numSendsSuccessful, 1)
10181083
}
10191084
}
1020-
}
10211085

1022-
// divide the keys into groups so that we can talk to more peers at a time, because the keys are sorted in
1023-
// XOR/Kadmelia space consecutive puts will be too the same, or nearly the same, set of peers. Working in parallel
1024-
// means less waiting on individual dials to complete and also continuing to make progress even if one segment of
1025-
// the network is being slow, or we are maxing out the connection, stream, etc. to those peers.
1026-
keyGroups := divideIntoGroups(sortedKeys, dht.bulkSendParallelism)
1027-
wg.Add(len(keyGroups))
1028-
for _, chunk := range keyGroups {
1029-
go bulkSendFn(chunk)
1086+
logger.Debugf("bulk send: %d peers for group size %d", len(keysPerPeer), len(g))
1087+
1088+
keyloop:
1089+
for p, workKeys := range keysPerPeer {
1090+
select {
1091+
case workCh <- workMessage{p: p, keys: workKeys}:
1092+
case <-ctx.Done():
1093+
break keyloop
1094+
}
1095+
}
1096+
sendsSoFar += len(g)
1097+
logger.Infof("bulk sending: %.1f%% done - %d/%d done", 100*float64(sendsSoFar)/float64(len(keySuccesses)), sendsSoFar, len(keySuccesses))
10301098
}
10311099

1100+
close(workCh)
1101+
1102+
logger.Debugf("bulk send complete, waiting on goroutines to close")
1103+
10321104
wg.Wait()
10331105

1106+
numSendsSuccessful := 0
1107+
numFails := 0
1108+
// generate a histogram of how many successful sends occurred per key
1109+
successHist := make(map[int]int)
1110+
// generate a histogram of how many failed sends occurred per key
1111+
// this does not include sends to peers that were skipped and had no messages sent to them at all
1112+
failHist := make(map[int]int)
1113+
for _, v := range keySuccesses {
1114+
if v.successes > 0 {
1115+
numSendsSuccessful++
1116+
}
1117+
successHist[v.successes]++
1118+
failHist[v.failures]++
1119+
numFails += v.failures
1120+
}
1121+
10341122
if numSendsSuccessful == 0 {
1123+
logger.Infof("bulk send failed")
10351124
return fmt.Errorf("failed to complete bulk sending")
10361125
}
10371126

1038-
logger.Infof("bulk send complete: %d of %d successful", numSendsSuccessful, len(keys))
1127+
logger.Infof("bulk send complete: %d keys, %d unique, %d successful, %d skipped peers, %d fails",
1128+
len(keys), len(keySuccesses), numSendsSuccessful, numSkipped, numFails)
1129+
1130+
logger.Infof("bulk send summary: successHist %v, failHist %v", successHist, failHist)
10391131

10401132
return nil
10411133
}
10421134

1043-
// divideIntoGroups divides the set of keys into (at most) the number of groups
1044-
func divideIntoGroups(keys []peer.ID, groups int) [][]peer.ID {
1045-
var keyGroups [][]peer.ID
1046-
if len(keys) < groups {
1047-
for i := 0; i < len(keys); i++ {
1048-
keyGroups = append(keyGroups, keys[i:i+1])
1049-
}
1050-
return keyGroups
1051-
}
1052-
1053-
chunkSize := len(keys) / groups
1054-
remainder := len(keys) % groups
1055-
1056-
start := 0
1057-
end := chunkSize
1058-
for i := 0; i < groups; i++ {
1059-
var chunk []peer.ID
1060-
// distribute the remainder as one extra entry per parallel thread
1061-
if remainder > 0 {
1062-
chunk = keys[start : end+1]
1063-
remainder--
1064-
start = end + 1
1065-
end = end + 1 + chunkSize
1066-
} else {
1067-
chunk = keys[start:end]
1068-
start = end
1069-
end = end + chunkSize
1135+
// divideByChunkSize divides the set of keys into groups of (at most) chunkSize. Chunk size must be greater than 0.
1136+
func divideByChunkSize(keys []peer.ID, chunkSize int) [][]peer.ID {
1137+
if len(keys) == 0 {
1138+
return nil
1139+
}
1140+
1141+
if chunkSize < 1 {
1142+
panic(fmt.Sprintf("fullrt: divide into groups: invalid chunk size %d", chunkSize))
1143+
}
1144+
1145+
var keyChunks [][]peer.ID
1146+
var nextChunk []peer.ID
1147+
chunkProgress := 0
1148+
for _, k := range keys {
1149+
nextChunk = append(nextChunk, k)
1150+
chunkProgress++
1151+
if chunkProgress == chunkSize {
1152+
keyChunks = append(keyChunks, nextChunk)
1153+
chunkProgress = 0
1154+
nextChunk = make([]peer.ID, 0, len(nextChunk))
10701155
}
1071-
keyGroups = append(keyGroups, chunk)
10721156
}
1073-
return keyGroups
1157+
if chunkProgress != 0 {
1158+
keyChunks = append(keyChunks, nextChunk)
1159+
}
1160+
return keyChunks
10741161
}
10751162

10761163
// FindProviders searches until the context expires.

0 commit comments

Comments
 (0)