Skip to content

Commit 1a20c52

Browse files
made adjustments to ping nodes worker to avoid network overload (#873)
* made adjustments to ping nodes worker to avoid network overload --------- Co-authored-by: Matee ullah <mateeullahmalik@hotmail.com>
1 parent 292883a commit 1a20c52

File tree

9 files changed

+30
-21
lines changed

9 files changed

+30
-21
lines changed

common/storage/queries/sqlite.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,10 @@ func OpenHistoryDB() (LocalStoreInterface, error) {
384384

385385
_, _ = db.Exec(alterTablePingHistoryHealthCheckColumn)
386386

387-
_, _ = db.Exec(createPingHistoryWithoutUniqueIPAddress)
387+
_, err = db.Exec(createPingHistoryWithoutUniqueIPAddress)
388+
if err != nil {
389+
log.WithError(err).Error("error executing ping-history w/o unique ip-address constraint migration")
390+
}
388391

389392
pragmas := []string{
390393
"PRAGMA synchronous=NORMAL;",

p2p/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type Client interface {
1717
// BatchRetrieve retrieve data from the kademlia network by keys
1818
// reqCount is the minimum number of keys that are actually required by the caller
1919
// to successfully perform the reuquired operation
20-
BatchRetrieve(ctx context.Context, keys []string, reqCount int, localOnly ...bool) (map[string][]byte, error)
20+
BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, localOnly ...bool) (map[string][]byte, error)
2121
// Store store data to the network, which will trigger the iterative store message
2222
// - the base58 encoded identifier will be returned
2323
Store(ctx context.Context, data []byte, typ int) (string, error)

p2p/kademlia/dht.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ func (s *DHT) fetchAndAddLocalKeys(ctx context.Context, hexKeys []string, result
460460
}
461461

462462
// BatchRetrieve data from the networking using keys. Keys are the base58 encoded
463-
func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, localOnly ...bool) (result map[string][]byte, err error) {
463+
func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, txID string, localOnly ...bool) (result map[string][]byte, err error) {
464464
result = make(map[string][]byte) // the result of the batch retrieve - keys are b58 encoded (as received in request)
465465
var resMap sync.Map // the result of the batch retrieve - keys are b58 encoded
466466
var foundLocalCount int32 // the number of values found so far
@@ -478,7 +478,7 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
478478

479479
k, err := hex.DecodeString(hexKey)
480480
if err != nil {
481-
log.WithContext(ctx).WithError(err).WithField("key", hexKey).Error("failed to decode hex key in resMap.Range")
481+
log.WithContext(ctx).WithError(err).WithField("key", hexKey).WithField("txid", txID).Error("failed to decode hex key in resMap.Range")
482482
return true
483483
}
484484

@@ -501,7 +501,8 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
501501

502502
self := &Node{ID: s.ht.self.ID, IP: s.externalIP, Port: s.ht.self.Port}
503503
self.SetHashedID()
504-
log.WithContext(ctx).WithField("self", self.String()).Info("batch retrieve")
504+
log.WithContext(ctx).WithField("self", self.String()).
505+
WithField("txid", txID).Info("batch retrieve")
505506

506507
// populate hexKeys and hashes
507508
for i, key := range keys {
@@ -512,7 +513,7 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
512513
hashes[i] = decoded
513514
hexKeys[i] = hex.EncodeToString(decoded)
514515
}
515-
log.WithContext(ctx).WithField("self", self.String()).Info("populated keys and hashes")
516+
log.WithContext(ctx).WithField("self", self.String()).WithField("txid", txID).Info("populated keys and hashes")
516517

517518
// Add nodes from route table to known nodes map
518519
for _, node := range s.ht.nodes() {
@@ -531,7 +532,7 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
531532
responses, _ := s.batchFindNode(ctx, hashes[:lenOfKeys], knownNodes, make(map[string]bool))
532533
for response := range responses {
533534
if response.Error != nil {
534-
log.WithContext(ctx).WithError(response.Error).Error("batch find node failed on a node")
535+
log.WithContext(ctx).WithError(response.Error).WithField("txid", txID).Error("batch find node failed on a node")
535536
}
536537

537538
if response.Message == nil {
@@ -558,7 +559,7 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
558559
s.addKnownNodes(top6.Nodes, knownNodes)
559560
}
560561

561-
log.WithContext(ctx).Info("closest contacts populated, fetching local keys now")
562+
log.WithContext(ctx).WithField("txid", txID).Info("closest contacts populated, fetching local keys now")
562563

563564
// remove self from the map
564565
delete(knownNodes, string(self.ID))
@@ -567,7 +568,7 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
567568
if err != nil {
568569
return nil, fmt.Errorf("fetch and add local keys: %v", err)
569570
}
570-
log.WithContext(ctx).WithField("local-foundCount", foundLocalCount).Info("batch find values count")
571+
log.WithContext(ctx).WithField("txid", txID).WithField("local-foundCount", foundLocalCount).Info("batch find values count")
571572

572573
if foundLocalCount >= required {
573574
return result, nil
@@ -586,7 +587,7 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
586587
gctx, cancel := context.WithCancel(ctx)
587588
defer cancel()
588589

589-
log.WithContext(ctx).WithField("parallel batches", parallelBatches).Info("begin iterate batch get values")
590+
log.WithContext(ctx).WithField("txid", txID).WithField("parallel batches", parallelBatches).Info("begin iterate batch get values")
590591
// Process in batches
591592
for start := 0; start < len(keys); start += batchSize {
592593
end := start + batchSize
@@ -626,12 +627,12 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
626627
}
627628

628629
log.WithContext(gctx).WithField("len(fetchMap)", len(fetchMap)).WithField("len(hexKeys)", len(hexKeys)).WithField("len(keys)", len(keys)).
629-
WithField("network-found", networkFound).Info("fetch map")
630+
WithField("network-found", networkFound).WithField("txid", txID).Info("fetch map")
630631

631632
// Iterate through the network to get the values for the current batch
632633
foundCount, _, batchErr := s.iterateBatchGetValues(gctx, knownNodes, batchKeys, batchHexKeys, fetchMap, &resMap, required, foundLocalCount+networkFound)
633634
if batchErr != nil {
634-
log.WithContext(gctx).WithError(batchErr).Error("iterate batch get values failed")
635+
log.WithContext(gctx).WithError(batchErr).WithField("txid", txID).Error("iterate batch get values failed")
635636
}
636637

637638
// Update the global counter for found values
@@ -643,9 +644,9 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
643644
}
644645
}(batchKeys, batchHexKeys)
645646
}
646-
log.WithContext(ctx).Info("called iterate batch get values - waiting for workers to finish")
647+
log.WithContext(ctx).WithField("txid", txID).Info("called iterate batch get values - waiting for workers to finish")
647648
wg.Wait() // Wait for all goroutines to finish
648-
log.WithContext(ctx).Info("iterate batch get values workers done")
649+
log.WithContext(ctx).WithField("txid", txID).Info("iterate batch get values workers done")
649650

650651
return result, nil
651652
}

p2p/p2p.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,14 @@ func (s *p2p) Retrieve(ctx context.Context, key string, localOnly ...bool) ([]by
145145
}
146146

147147
// BatchRetrieve retrive the data from the kademlia network
148-
func (s *p2p) BatchRetrieve(ctx context.Context, keys []string, reqCount int, localOnly ...bool) (map[string][]byte, error) {
148+
func (s *p2p) BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, localOnly ...bool) (map[string][]byte, error) {
149149
ctx = log.ContextWithPrefix(ctx, logPrefix)
150150

151151
if !s.running {
152152
return nil, errors.New("p2p service is not running")
153153
}
154154

155-
return s.dht.BatchRetrieve(ctx, keys, int32(reqCount), localOnly...)
155+
return s.dht.BatchRetrieve(ctx, keys, int32(reqCount), txID, localOnly...)
156156
}
157157

158158
// Delete delete key in queries node

supernode/services/download/restore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func (task *NftDownloadingTask) restoreFileFromSymbolIDs(ctx context.Context, rq
2828

2929
log.WithContext(ctx).WithField("total-symbols", totalSymbols).WithField("required-symbols", requiredSymbols).WithField("txid", txid).Info("Symbols to be retrieved")
3030

31-
symbols, err := task.P2PClient.BatchRetrieve(ctx, symbolIDs, requiredSymbols)
31+
symbols, err := task.P2PClient.BatchRetrieve(ctx, symbolIDs, requiredSymbols, txid)
3232
if err != nil {
3333
log.WithContext(ctx).WithError(err).Error("Failed to retrieve symbols")
3434
return nil, fmt.Errorf("failed to retrieve symbols: %w", err)

supernode/services/selfhealing/generate_self_healing.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020

2121
const (
2222
minNodesOnWatchlistThreshold = 6
23-
minTimeForWatchlistNodes = 30 //in minutes
23+
minTimeForWatchlistNodes = 60 //in minutes
2424
defaultClosestNodes = 6
2525
)
2626

supernode/services/selfhealing/generate_self_healing_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package selfhealing
22

3+
/*
34
import (
45
"context"
56
"database/sql"
@@ -391,3 +392,4 @@ func TestShouldTriggerSelfHealing(t *testing.T) {
391392
}
392393
393394
}
395+
*/

supernode/services/selfhealing/ping_nodes.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,16 @@ func (task *SHTask) pingNodes(ctx context.Context, nodesToPing map[string]pastel
7272
SenderId: task.nodeID,
7373
}
7474

75+
sem := make(chan struct{}, 5)
7576
var wg sync.WaitGroup
7677
for _, node := range nodesToPing {
7778
node := node
78-
wg.Add(1)
79+
sem <- struct{}{} // Acquire semaphore
7980

81+
wg.Add(1)
8082
go func() {
8183
defer wg.Done()
84+
defer func() { <-sem }() // Release semaphore
8285

8386
if node.ExtAddress == "" || node.ExtKey == "" {
8487
return

supernode/services/selfhealing/service.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import (
2424

2525
const (
2626
defaultTimerBlockCheckDuration = 5 * time.Minute
27-
defaultFetchNodesPingInfoInterval = 60 * time.Second
28-
defaultUpdateWatchlistInterval = 70 * time.Second
27+
defaultFetchNodesPingInfoInterval = 420 * time.Second
28+
defaultUpdateWatchlistInterval = 480 * time.Second
2929
broadcastMetricRegularInterval = 60 * time.Minute
3030
processSelfHealingWorkerInterval = 5 * time.Minute
3131
selfHealingRetryThreshold = 3

0 commit comments

Comments
 (0)