Skip to content

Commit 5ef766f

Browse files
Fix global contacts calculation (#868)
* fix global contacts calculation * fix * optimizations in download
1 parent ecc0f5d commit 5ef766f

File tree

8 files changed

+101
-159
lines changed

8 files changed

+101
-159
lines changed

p2p/kademlia/common.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func decompressKeysStr(data []byte) ([]string, error) {
7171
return keys, nil
7272
}
7373

74+
/*
7475
func compressSymbols(values [][]byte) ([]byte, error) {
7576
buf, err := msgpack.Marshal(values)
7677
if err != nil {
@@ -97,3 +98,4 @@ func decompressSymbols(data []byte) (values [][]byte, err error) {
9798
9899
return values, nil
99100
}
101+
*/

p2p/kademlia/common_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ func TestCompressAndDecompressKeysStr(t *testing.T) {
120120
}
121121
}
122122

123+
/*
123124
func TestCompressAndDecompressSymbols(t *testing.T) {
124125
tests := []struct {
125126
name string
@@ -156,4 +157,4 @@ func TestCompressAndDecompressSymbols(t *testing.T) {
156157
}
157158
})
158159
}
159-
}
160+
}*/

p2p/kademlia/dht.go

Lines changed: 76 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ var (
3535
delKeysCountThreshold = 10
3636
lowSpaceThreshold = 50 // GB
3737
batchStoreSize = 2500
38+
storeSameSymbolsBatchConcurrency = 1
39+
storeSymbolsBatchConcurrency = 2.0
3840
)
3941

4042
const maxIterations = 5
@@ -525,7 +527,7 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
525527
lenOfKeys = 2000
526528
}
527529

528-
responses := s.batchFindNode(ctx, hashes[:lenOfKeys], knownNodes)
530+
responses, _ := s.batchFindNode(ctx, hashes[:lenOfKeys], knownNodes, make(map[string]bool))
529531
for response := range responses {
530532
if response.Error != nil {
531533
log.WithContext(ctx).WithError(response.Error).Error("batch find node failed on a node")
@@ -558,9 +560,6 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
558560
// remove self from the map
559561
delete(knownNodes, string(self.ID))
560562

561-
// Check if we have enough values locally
562-
log.WithContext(ctx).Info("skipping local fetch")
563-
564563
foundLocalCount, err = s.fetchAndAddLocalKeys(ctx, hexKeys, &resMap, required)
565564
if err != nil {
566565
return nil, fmt.Errorf("fetch and add local keys: %v", err)
@@ -572,10 +571,10 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
572571
}
573572

574573
// We don't have enough values locally, so we need to fetch from the network
575-
batchSize := 2500
574+
batchSize := batchStoreSize
576575
var networkFound int32
577576
totalBatches := int(math.Ceil(float64(required) / float64(batchSize)))
578-
parallelBatches := int(math.Min(float64(totalBatches), 2.0))
577+
parallelBatches := int(math.Min(float64(totalBatches), storeSymbolsBatchConcurrency))
579578

580579
semaphore := make(chan struct{}, parallelBatches)
581580
var wg sync.WaitGroup
@@ -667,7 +666,7 @@ func (s *DHT) doBatchGetValuesCall(ctx context.Context, node *Node, requestKeys
667666

668667
func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, keys []string, hexKeys []string, fetchMap map[string][]int,
669668
resMap *sync.Map, req, alreadyFound int32) (int, map[string]*NodeList, error) {
670-
semaphore := make(chan struct{}, 1) // Limit concurrency to 2
669+
semaphore := make(chan struct{}, storeSameSymbolsBatchConcurrency) // Limit concurrency to 1
671670
closestContacts := make(map[string]*NodeList)
672671
var wg sync.WaitGroup
673672
contactsMap := make(map[string]map[string][]*Node)
@@ -1230,6 +1229,7 @@ func (s *DHT) addKnownNodes(nodes []*Node, knownNodes map[string]*Node) {
12301229
func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int) error {
12311230
globalClosestContacts := make(map[string]*NodeList) // This will store the global top 6 nodes for each symbol's hash
12321231
knownNodes := make(map[string]*Node) // This will store the nodes we've already contacted
1232+
contacted := make(map[string]bool)
12331233
hashes := make([][]byte, len(values))
12341234

12351235
for i := 0; i < len(values); i++ {
@@ -1241,47 +1241,73 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int) e
12411241
s.addKnownNodes(top6.Nodes, knownNodes)
12421242
}
12431243

1244-
localClosestNodes := make(map[string]*NodeList)
1245-
responses := s.batchFindNode(ctx, hashes, knownNodes)
1246-
for response := range responses {
1247-
if response.Error != nil {
1248-
log.WithContext(ctx).WithError(response.Error).Error("batch find node failed on a node")
1249-
}
1244+
var changed bool
1245+
var i int
1246+
for {
1247+
i++
1248+
changed = false
1249+
localClosestNodes := make(map[string]*NodeList)
1250+
responses, atleastOneContacted := s.batchFindNode(ctx, hashes, knownNodes, contacted)
12501251

1251-
if response.Message == nil {
1252-
continue
1252+
if !atleastOneContacted {
1253+
break
12531254
}
12541255

1255-
v, ok := response.Message.Data.(*BatchFindNodeResponse)
1256-
if ok && v.Status.Result == ResultOk {
1257-
for key, nodesList := range v.ClosestNodes {
1258-
if nodesList != nil {
1259-
nl, exists := localClosestNodes[key]
1260-
if exists {
1261-
nl.AddNodes(nodesList)
1262-
localClosestNodes[key] = nl
1263-
} else {
1264-
localClosestNodes[key] = &NodeList{Nodes: nodesList, Comparator: base58.Decode(key)}
1265-
}
1256+
for response := range responses {
1257+
if response.Error != nil {
1258+
log.WithContext(ctx).WithError(response.Error).Error("batch find node failed on a node")
1259+
}
12661260

1267-
s.addKnownNodes(nodesList, knownNodes)
1268-
}
1261+
if response.Message == nil {
1262+
continue
12691263
}
1270-
}
1271-
}
12721264

1273-
for key, nodesList := range localClosestNodes {
1274-
if nodesList == nil {
1275-
continue
1276-
}
1265+
v, ok := response.Message.Data.(*BatchFindNodeResponse)
1266+
if ok && v.Status.Result == ResultOk {
1267+
for key, nodesList := range v.ClosestNodes {
1268+
if nodesList != nil {
1269+
nl, exists := localClosestNodes[key]
1270+
if exists {
1271+
nl.AddNodes(nodesList)
1272+
localClosestNodes[key] = nl
1273+
} else {
1274+
localClosestNodes[key] = &NodeList{Nodes: nodesList, Comparator: base58.Decode(key)}
1275+
}
12771276

1278-
nodesList.Comparator = base58.Decode(key)
1279-
nodesList.Sort()
1280-
nodesList.TopN(Alpha)
1277+
s.addKnownNodes(nodesList, knownNodes)
1278+
}
1279+
}
1280+
}
1281+
}
12811282

12821283
// we now need to check if the nodes in the globalClosestContacts Map are still in the top 6
12831284
// if yes, we can store the data to them
12841285
// if not, we need to send calls to the newly found nodes to inquire about the top 6 nodes
1286+
for key, nodesList := range localClosestNodes {
1287+
if nodesList == nil {
1288+
continue
1289+
}
1290+
1291+
nodesList.Comparator = base58.Decode(key)
1292+
nodesList.Sort()
1293+
nodesList.TopN(Alpha)
1294+
s.addKnownNodes(nodesList.Nodes, knownNodes)
1295+
1296+
if !haveAllNodes(nodesList.Nodes, globalClosestContacts[key].Nodes) {
1297+
log.WithContext(ctx).WithField("key", key).WithField("have", nodesList.String()).WithField("got", globalClosestContacts[key].String()).Info("global closest contacts list changed!")
1298+
changed = true
1299+
}
1300+
1301+
nodesList.AddNodes(globalClosestContacts[key].Nodes)
1302+
nodesList.Sort()
1303+
nodesList.TopN(Alpha)
1304+
globalClosestContacts[key] = nodesList
1305+
}
1306+
1307+
if !changed {
1308+
log.WithContext(ctx).WithField("iter", i).Info("global closest contacts list did not change, we can now store the data")
1309+
break
1310+
}
12851311
}
12861312

12871313
// assume at this point, we have True\Golabl top 6 nodes for each symbol's hash stored in globalClosestContacts Map
@@ -1356,20 +1382,15 @@ func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[
13561382
default:
13571383
keysToStore := storageMap[key]
13581384
toStore := make([][]byte, len(keysToStore))
1385+
totalBytes := 0
13591386
for i, idx := range keysToStore {
13601387
toStore[i] = values[idx]
1388+
totalBytes += len(values[idx])
13611389
}
13621390

1363-
payload, err := compressSymbols(toStore)
1364-
if err != nil {
1365-
log.P2P().WithContext(ctx).WithError(err).Error("compress symbols failed")
1366-
responses <- &MessageWithError{Error: err}
1367-
return
1368-
}
1369-
1370-
log.WithContext(ctx).WithField("keys", len(toStore)).WithField("data-size", utils.BytesIntToMB(len(payload))).Info("batch store to node payload size")
1391+
log.WithContext(ctx).WithField("keys", len(toStore)).WithField("size-before-compress", utils.BytesIntToMB(totalBytes)).Info("batch store to node")
13711392

1372-
data := &BatchStoreDataRequest{Data: payload, Type: typ}
1393+
data := &BatchStoreDataRequest{Data: toStore, Type: typ}
13731394
request := s.newMessage(BatchStoreData, receiver, data)
13741395
response, err := s.network.Call(ctx, request, true)
13751396
if err != nil {
@@ -1389,12 +1410,18 @@ func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[
13891410
return responses
13901411
}
13911412

1392-
func (s *DHT) batchFindNode(ctx context.Context, payload [][]byte, nodes map[string]*Node) chan *MessageWithError {
1413+
func (s *DHT) batchFindNode(ctx context.Context, payload [][]byte, nodes map[string]*Node, contacted map[string]bool) (chan *MessageWithError, bool) {
13931414
responses := make(chan *MessageWithError, len(nodes))
1394-
1415+
atleastOneContacted := false
13951416
var wg sync.WaitGroup
13961417

13971418
for _, node := range nodes {
1419+
if _, ok := contacted[string(node.ID)]; ok {
1420+
continue
1421+
}
1422+
1423+
contacted[string(node.ID)] = true
1424+
atleastOneContacted = true
13981425
wg.Add(1)
13991426
go func(receiver *Node) {
14001427
defer wg.Done()
@@ -1420,5 +1447,5 @@ func (s *DHT) batchFindNode(ctx context.Context, payload [][]byte, nodes map[str
14201447
wg.Wait()
14211448
close(responses)
14221449

1423-
return responses
1450+
return responses, atleastOneContacted
14241451
}

p2p/kademlia/message.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ func decode(conn io.Reader) (*Message, error) {
219219

220220
// BatchStoreDataRequest defines the request data for store data
221221
type BatchStoreDataRequest struct {
222-
Data []byte
222+
Data [][]byte
223223
Type int
224224
}
225225

p2p/kademlia/network.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -843,13 +843,7 @@ func (s *Network) handleBatchStoreData(ctx context.Context, message *Message) (r
843843
// add the sender to queries hash table
844844
s.dht.addNode(ctx, message.Sender)
845845

846-
values, err := decompressSymbols(request.Data)
847-
if err != nil {
848-
err = errors.Errorf("batchStore: decompress symbols: %w", err)
849-
return s.generateResponseMessage(BatchStoreData, message.Sender, ResultFailed, err.Error())
850-
}
851-
852-
if err := s.dht.store.StoreBatch(ctx, values, 1, false); err != nil {
846+
if err := s.dht.store.StoreBatch(ctx, request.Data, 1, false); err != nil {
853847
err = errors.Errorf("batch store the data: %w", err)
854848
return s.generateResponseMessage(BatchStoreData, message.Sender, ResultFailed, err.Error())
855849
}

p2p/kademlia/node.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,23 @@ func (s *NodeList) DelNode(node *Node) {
7878
}
7979
}
8080

81+
func haveAllNodes(nodesA []*Node, nodesB []*Node) bool {
82+
for _, nodeA := range nodesA {
83+
found := false
84+
for _, nodeB := range nodesB {
85+
if bytes.Equal(nodeA.ID, nodeB.ID) {
86+
found = true
87+
break
88+
}
89+
}
90+
if !found {
91+
return false
92+
}
93+
}
94+
95+
return true
96+
}
97+
8198
// Exists return true if the node is already there
8299
func (s *NodeList) Exists(node *Node) bool {
83100
s.Mux.RLock()

supernode/services/common/storage_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
)
2222

2323
const (
24-
loadSymbolsBatchSize = 3000
24+
loadSymbolsBatchSize = 2500
2525
storeSymbolsPercent = 10
2626
)
2727

0 commit comments

Comments
 (0)