Skip to content

Commit 6bebf46

Browse files
Merge pull request #643 from pastelnetwork/PSL-942_p2pFixes
fix sort based on xor, fix check node activity worker
2 parents 88022a2 + 9815f68 commit 6bebf46

File tree

14 files changed

+398
-232
lines changed

14 files changed

+398
-232
lines changed

p2p/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,10 @@ type Client interface {
2929
// LocalStore store data to the network, which will trigger the iterative store message
3030
// - the base58 encoded identifier will be returned
3131
LocalStore(ctx context.Context, key string, data []byte) (string, error)
32+
33+
// DisableKey adds key to disabled keys list - It takes in a B58 encoded SHA-256 hash
34+
DisableKey(ctx context.Context, b58EncodedHash string) error
35+
36+
// EnableKey removes key from disabled list - It takes in a B58 encoded SHA-256 hash
37+
EnableKey(ctx context.Context, b58EncodedHash string) error
3238
}

p2p/kademlia/banlist.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ const (
1313
banDuration = 3 * time.Hour
1414

1515
// threshold - threshold
16-
threshold = 5
16+
threshold = 2
1717
)
1818

1919
// BanNode is the over-the-wire representation of a node
@@ -166,7 +166,9 @@ func (s *BanList) ToNodeList() []*Node {
166166
ret := make([]*Node, 0)
167167

168168
for i := 0; i < len(s.Nodes); i++ {
169-
ret = append(ret, &s.Nodes[i].Node)
169+
if s.Nodes[i].count > threshold {
170+
ret = append(ret, &s.Nodes[i].Node)
171+
}
170172
}
171173

172174
return ret

p2p/kademlia/dht.go

Lines changed: 24 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"github.com/pastelnetwork/gonode/common/storage"
1919
"github.com/pastelnetwork/gonode/common/storage/memory"
2020
"github.com/pastelnetwork/gonode/common/utils"
21-
"github.com/pastelnetwork/gonode/p2p/kademlia/domain"
2221
"github.com/pastelnetwork/gonode/pastel"
2322
"golang.org/x/crypto/sha3"
2423
)
@@ -36,20 +35,19 @@ const maxIterations = 5
3635

3736
// DHT represents the state of the local node in the distributed hash table
3837
type DHT struct {
39-
ht *HashTable // the hashtable for routing
40-
options *Options // the options of DHT
41-
network *Network // the network of DHT
42-
store Store // the storage of DHT
43-
metaStore MetaStore // the meta storage of DHT
44-
done chan struct{} // distributed hash table is done
45-
cache storage.KeyValue // store bad bootstrap addresses
46-
pastelClient pastel.Client
47-
externalIP string
48-
mtx sync.Mutex
49-
authHelper *AuthHelper
50-
ignorelist *BanList
51-
nodeReplicationTimes map[string]domain.NodeReplicationInfo
52-
replicationMtx sync.RWMutex
38+
ht *HashTable // the hashtable for routing
39+
options *Options // the options of DHT
40+
network *Network // the network of DHT
41+
store Store // the storage of DHT
42+
metaStore MetaStore // the meta storage of DHT
43+
done chan struct{} // distributed hash table is done
44+
cache storage.KeyValue // store bad bootstrap addresses
45+
pastelClient pastel.Client
46+
externalIP string
47+
mtx sync.Mutex
48+
authHelper *AuthHelper
49+
ignorelist *BanList
50+
replicationMtx sync.RWMutex
5351
}
5452

5553
// Options contains configuration options for the local node
@@ -84,26 +82,16 @@ func NewDHT(ctx context.Context, store Store, metaStore MetaStore, pc pastel.Cli
8482
if options.Port <= 0 {
8583
options.Port = defaultNetworkPort
8684
}
87-
info, err := store.GetAllReplicationInfo(ctx)
88-
if err != nil {
89-
log.P2P().WithContext(ctx).WithError(err).Errorf("get all replicationInfo failed")
90-
}
91-
92-
replicationMap := make(map[string]domain.NodeReplicationInfo)
93-
for _, v := range info {
94-
replicationMap[string(v.ID)] = v
95-
}
9685

9786
s := &DHT{
98-
metaStore: metaStore,
99-
store: store,
100-
options: options,
101-
pastelClient: pc,
102-
done: make(chan struct{}),
103-
cache: memory.NewKeyValue(),
104-
ignorelist: NewBanList(ctx),
105-
nodeReplicationTimes: replicationMap,
106-
replicationMtx: sync.RWMutex{},
87+
metaStore: metaStore,
88+
store: store,
89+
options: options,
90+
pastelClient: pc,
91+
done: make(chan struct{}),
92+
cache: memory.NewKeyValue(),
93+
ignorelist: NewBanList(ctx),
94+
replicationMtx: sync.RWMutex{},
10795
}
10896

10997
if options.ExternalIP != "" {
@@ -482,7 +470,7 @@ func (s *DHT) iterate(ctx context.Context, iterativeType int, target []byte, dat
482470

483471
igList := s.ignorelist.ToNodeList()
484472
// find the closest contacts for the target node from local route tables
485-
nl := s.ht.closestContacts(Alpha, target, igList)
473+
nl, _ := s.ht.closestContacts(Alpha, target, igList)
486474
if len(igList) > 0 {
487475
log.P2P().WithContext(ctx).WithField("nodes", nl.String()).WithField("ignored", s.ignorelist.String()).Info("closest contacts")
488476
}
@@ -633,7 +621,7 @@ func (s *DHT) iterateFindValue(ctx context.Context, iterativeType int, target []
633621
igList := s.ignorelist.ToNodeList()
634622

635623
// nl will have the closest nodes to the target value, it will ignore the nodes in igList
636-
nl := s.ht.closestContacts(Alpha, target, igList)
624+
nl, _ := s.ht.closestContacts(Alpha, target, igList)
637625
if len(igList) > 0 {
638626
log.P2P().WithContext(ctx).WithField("nodes", nl.String()).WithField("ignored", s.ignorelist.String()).Info("closest contacts")
639627
}
@@ -785,7 +773,7 @@ func (s *DHT) addNode(ctx context.Context, node *Node) *Node {
785773
func (s *DHT) NClosestNodes(_ context.Context, n int, key string, ignores ...*Node) []*Node {
786774
list := s.ignorelist.ToNodeList()
787775
ignores = append(ignores, list...)
788-
nodeList := s.ht.closestContacts(n, base58.Decode(key), ignores)
776+
nodeList, _ := s.ht.closestContacts(n, base58.Decode(key), ignores)
789777

790778
return nodeList.Nodes
791779
}

p2p/kademlia/fetch_and_store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (s *DHT) BatchFetchAndStoreFailedKeys(ctx context.Context) error {
118118
repKeys := make([]domain.ToRepKey, 0, len(keys))
119119
for i := 0; i < len(keys); i++ {
120120
igList := s.ignorelist.ToNodeList()
121-
nl := s.ht.closestContacts(failedKeysClosestContactsLookupCount, keys[i].Key, igList)
121+
nl, _ := s.ht.closestContacts(failedKeysClosestContactsLookupCount, keys[i].Key, igList)
122122
attempt := (keys[i].Attempts - maxBatchAttempts) + 1
123123

124124
if len(nl.Nodes) > attempt {

p2p/kademlia/hashtable.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func (ht *HashTable) hasNode(id []byte) bool {
189189
}
190190

191191
// closestContacts returns the closest contacts of target
192-
func (ht *HashTable) closestContacts(num int, target []byte, ignoredNodes []*Node) *NodeList {
192+
func (ht *HashTable) closestContacts(num int, target []byte, ignoredNodes []*Node) (*NodeList, int) {
193193
ht.mutex.RLock()
194194
defer ht.mutex.RUnlock()
195195

@@ -203,9 +203,11 @@ func (ht *HashTable) closestContacts(num int, target []byte, ignoredNodes []*Nod
203203
Comparator: target,
204204
}
205205

206+
counter := 0
206207
// Flatten the routeTable and add nodes to nl if they're not in the ignoredMap
207208
for _, bucket := range ht.routeTable {
208209
for _, node := range bucket {
210+
counter++
209211
if !ignoredMap[string(node.ID)] {
210212
nl.AddNodes([]*Node{node})
211213
}
@@ -216,7 +218,7 @@ func (ht *HashTable) closestContacts(num int, target []byte, ignoredNodes []*Nod
216218
nl.Sort()
217219
nl.TopN(num)
218220

219-
return nl
221+
return nl, counter
220222
}
221223

222224
// bucketIndex return the bucket index from two node ids
@@ -298,9 +300,11 @@ func (ht *HashTable) closestContactsWithInlcudingNode(num int, target []byte, ig
298300
}
299301

300302
// Flatten the routeTable and add nodes to nl if they're not in the ignoredMap
303+
counter := 0
301304
for _, bucket := range ht.routeTable {
302305
for _, node := range bucket {
303306
if !ignoredMap[string(node.ID)] {
307+
counter++
304308
nl.AddNodes([]*Node{node})
305309
}
306310
}

p2p/kademlia/network.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func (s *Network) handleFindNode(ctx context.Context, message *Message) (res []b
153153

154154
// the closest contacts
155155
hashedTargetID, _ := utils.Sha3256hash(request.Target)
156-
closest := s.dht.ht.closestContacts(K, hashedTargetID, []*Node{message.Sender})
156+
closest, _ := s.dht.ht.closestContacts(K, hashedTargetID, []*Node{message.Sender})
157157

158158
response := &FindNodeResponse{
159159
Status: ResponseStatus{
@@ -227,7 +227,7 @@ func (s *Network) handleFindValue(ctx context.Context, message *Message) (res []
227227
},
228228
}
229229

230-
closest := s.dht.ht.closestContacts(K, request.Target, []*Node{message.Sender})
230+
closest, _ := s.dht.ht.closestContacts(K, request.Target, []*Node{message.Sender})
231231
response.Closest = closest.Nodes
232232

233233
// new a response message
@@ -246,7 +246,7 @@ func (s *Network) handleFindValue(ctx context.Context, message *Message) (res []
246246
response.Value = value
247247
} else {
248248
// return the closest contacts
249-
closest := s.dht.ht.closestContacts(K, request.Target, []*Node{message.Sender})
249+
closest, _ := s.dht.ht.closestContacts(K, request.Target, []*Node{message.Sender})
250250
response.Closest = closest.Nodes
251251
}
252252

p2p/kademlia/node.go

Lines changed: 8 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,6 @@ import (
77
"sort"
88
"strings"
99
"sync"
10-
11-
"github.com/btcsuite/btcutil/base58"
12-
"github.com/pastelnetwork/gonode/common/log"
13-
"github.com/pastelnetwork/gonode/common/utils"
1410
)
1511

1612
// Node is the over-the-wire representation of a node
@@ -26,7 +22,7 @@ type Node struct {
2622
}
2723

2824
func (s *Node) String() string {
29-
return fmt.Sprintf("%v-%v:%d", base58.Encode(s.ID), s.IP, s.Port)
25+
return fmt.Sprintf("%v-%v:%d", string(s.ID), s.IP, s.Port)
3026
}
3127

3228
// NodeList is used in order to sort a list of nodes
@@ -81,8 +77,8 @@ func (s *NodeList) Exists(node *Node) bool {
8177
}
8278

8379
func (s *NodeList) exists(node *Node) bool {
84-
for _, item := range s.Nodes {
85-
if bytes.Equal(item.ID, node.ID) {
80+
for i := 0; i < len(s.Nodes); i++ {
81+
if bytes.Equal(s.Nodes[i].ID, node.ID) {
8682
return true
8783
}
8884
}
@@ -106,13 +102,6 @@ func (s *NodeList) Len() int {
106102
return len(s.Nodes)
107103
}
108104

109-
func (s *NodeList) distance(id1, id2 []byte) *big.Int {
110-
o1 := new(big.Int).SetBytes(id1)
111-
o2 := new(big.Int).SetBytes(id2)
112-
113-
return new(big.Int).Xor(o1, o2)
114-
}
115-
116105
// AddFirst adds a node to the first position of the list.
117106
func (s *NodeList) AddFirst(node *Node) {
118107
s.Mux.Lock() // lock for writing
@@ -134,32 +123,20 @@ func (s *NodeList) TopN(n int) {
134123
}
135124
}
136125

137-
func (s *NodeList) computeDistances() []big.Int {
138-
distances := make([]big.Int, s.Len())
139-
for i, node := range s.Nodes {
140-
cID, _ := utils.Sha3256hash(node.ID)
141-
dist := s.distance(cID, s.Comparator)
142-
distances[i] = *dist
126+
func (s *NodeList) distance(id1, id2 []byte) *big.Int {
127+
o1 := new(big.Int).SetBytes(id1)
128+
o2 := new(big.Int).SetBytes(id2)
143129

144-
if s.debug {
145-
log.WithField("node", node.String()).WithField("dist", distances[i]).Info("computeDistances")
146-
}
147-
}
148-
return distances
130+
return new(big.Int).Xor(o1, o2)
149131
}
150132

151133
// Sort sorts nodes
152134
func (s *NodeList) Sort() {
153135
s.Mux.Lock()
154136
defer s.Mux.Unlock()
155137

156-
// Compute distances
157-
distances := s.computeDistances()
158-
159138
// Sort using the precomputed distances
160-
sort.Slice(s.Nodes, func(i, j int) bool {
161-
return distances[i].Cmp(&distances[j]) == -1
162-
})
139+
sort.Sort(s)
163140
}
164141

165142
// Swap swap two nodes

0 commit comments

Comments
 (0)