Skip to content

Commit 4137999

Browse files
authored
network: Add Capabilities to Kademlia database (ethersphere#1713)
network: Add capabilities to kademlia database
1 parent 01d9194 commit 4137999

34 files changed

+854
-267
lines changed

cmd/swarm-snapshot/create.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func createSnapshot(filename string, nodes int, services []string) (err error) {
6060

6161
sim := simulation.NewInProc(map[string]simulation.ServiceFunc{
6262
"bzz": func(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) {
63-
addr := network.NewAddr(ctx.Config.Node())
63+
addr := network.NewBzzAddrFromEnode(ctx.Config.Node())
6464
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
6565
hp := network.NewHiveParams()
6666
hp.KeepAliveInterval = time.Duration(200) * time.Millisecond

cmd/swarm/run_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,10 @@ func newTestCluster(t *testing.T, size int) *testCluster {
131131
}
132132

133133
// connect the nodes together
134-
for _, node := range cluster.Nodes {
134+
for i, node := range cluster.Nodes {
135+
if i == 0 {
136+
continue
137+
}
135138
if err := node.Client.Call(nil, "admin_addPeer", cluster.Nodes[0].Enode); err != nil {
136139
t.Fatal(err)
137140
}

network/capability/capability.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,3 +185,33 @@ func (c *Capabilities) DecodeRLP(s *rlp.Stream) error {
185185

186186
return nil
187187
}
188+
189+
// Match returns true if all bits set in the argument is also set in the receiver
190+
func (c *Capability) Match(capCompare *Capability) bool {
191+
if capCompare == nil || len(c.Cap) != len(capCompare.Cap) {
192+
return false
193+
}
194+
// on the first occurrence of false where query has true we can fail
195+
for i, flag := range capCompare.Cap {
196+
if flag && !c.Cap[i] {
197+
return false
198+
}
199+
}
200+
return true
201+
}
202+
203+
// Match returns true if all bits set in all capability arguments are also set in the receiver's capabilities
204+
func (c *Capabilities) Match(capsCompare *Capabilities) bool {
205+
for _, capCompare := range capsCompare.Caps {
206+
207+
// if queryied id doesn't exist in object we can nay right away
208+
cap := c.Get(capCompare.Id)
209+
if cap == nil {
210+
return false
211+
}
212+
if !cap.Match(capCompare) {
213+
return false
214+
}
215+
}
216+
return true
217+
}

network/capability/capability_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,59 @@ func TestCapabilitiesRLP(t *testing.T) {
172172
t.Fatalf("cap 1 caps not correct, expected %v, got %v", cap2.Cap, cap2Restored.Cap)
173173
}
174174
}
175+
176+
// TestCapabilitiesQuery tests methods for quering capability states
177+
func TestCapabilitiesQuery(t *testing.T) {
178+
179+
// Initialize capability
180+
caps := NewCapabilities()
181+
182+
// Register module. Should succeed
183+
c1 := NewCapability(1, 3)
184+
c1.Set(1)
185+
err := caps.Add(c1)
186+
if err != nil {
187+
t.Fatalf("RegisterCapabilityModule fail: %v", err)
188+
}
189+
190+
c2 := NewCapability(42, 9)
191+
c2.Set(2)
192+
c2.Set(8)
193+
err = caps.Add(c2)
194+
if err != nil {
195+
t.Fatalf("RegisterCapabilityModule fail: %v", err)
196+
}
197+
198+
capsCompare := NewCapabilities()
199+
capCompare := NewCapability(42, 10)
200+
capCompare.Set(2)
201+
capCompare.Set(8)
202+
capsCompare.Add(capCompare)
203+
if caps.Match(capsCompare) {
204+
t.Fatalf("Expected cCompare with mismatch length to fail; %s != %s", capsCompare, caps)
205+
}
206+
capsCompare = NewCapabilities()
207+
capCompare = NewCapability(42, 9)
208+
capCompare.Set(2)
209+
capsCompare.Add(capCompare)
210+
if !caps.Match(capsCompare) {
211+
t.Fatalf("Expected %s to match %s", capsCompare, caps)
212+
}
213+
214+
capCompare = NewCapability(1, 3)
215+
capsCompare.Add(capCompare)
216+
if !caps.Match(capsCompare) {
217+
t.Fatalf("Expected %s to match %s", capsCompare, caps)
218+
}
219+
220+
capCompare.Set(1)
221+
if !caps.Match(capsCompare) {
222+
t.Fatalf("Expected %s to match %s", capsCompare, caps)
223+
}
224+
225+
capCompare.Set(2)
226+
if caps.Match(capsCompare) {
227+
t.Fatalf("Expected %s not to match %s", capsCompare, caps)
228+
}
229+
230+
}

network/discovery.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"fmt"
2222
"sync"
2323

24+
"github.com/ethereum/go-ethereum/rlp"
25+
"github.com/ethersphere/swarm/log"
2426
"github.com/ethersphere/swarm/pot"
2527
)
2628

@@ -95,6 +97,7 @@ func (d *Peer) NotifyPeer(a *BzzAddr, po uint8) {
9597
resp := &peersMsg{
9698
Peers: []*BzzAddr{a},
9799
}
100+
log.Warn("notifypeer", "notify", resp)
98101
go d.Send(context.TODO(), resp)
99102
}
100103

@@ -127,6 +130,27 @@ type peersMsg struct {
127130
Peers []*BzzAddr
128131
}
129132

133+
// DecodeRLP implements rlp.Decoder interface
134+
func (p *peersMsg) DecodeRLP(s *rlp.Stream) error {
135+
_, err := s.List()
136+
if err != nil {
137+
return err
138+
}
139+
_, err = s.List()
140+
if err != nil {
141+
return err
142+
}
143+
for {
144+
var addr BzzAddr
145+
err = s.Decode(&addr)
146+
if err != nil {
147+
break
148+
}
149+
p.Peers = append(p.Peers, &addr)
150+
}
151+
return nil
152+
}
153+
130154
// String pretty prints a peersMsg
131155
func (msg peersMsg) String() string {
132156
return fmt.Sprintf("%T: %v", msg, msg.Peers)
@@ -140,7 +164,6 @@ func (d *Peer) handlePeersMsg(msg *peersMsg) error {
140164
if len(msg.Peers) == 0 {
141165
return nil
142166
}
143-
144167
for _, a := range msg.Peers {
145168
d.seen(a)
146169
NotifyPeer(a, d.kad)

network/discovery_test.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package network
1818

1919
import (
20+
"bytes"
2021
"crypto/ecdsa"
21-
crand "crypto/rand"
2222
"encoding/binary"
2323
"fmt"
2424
"math/rand"
@@ -48,7 +48,7 @@ func TestSubPeersMsg(t *testing.T) {
4848
}
4949

5050
node := s.Nodes[0]
51-
raddr := NewAddr(node)
51+
raddr := NewBzzAddrFromEnode(node)
5252
pp.Register(raddr)
5353

5454
// start the hive and wait for the connection
@@ -114,18 +114,15 @@ func testInitialPeersMsg(t *testing.T, peerPO, peerDepth int) {
114114
connect := func(a pot.Address, po int) (addrs []*BzzAddr) {
115115
n := rand.Intn(maxPeersPerPO)
116116
for i := 0; i < n; i++ {
117-
peer, err := newDiscPeer(pot.RandomAddressAt(a, po))
118-
if err != nil {
119-
t.Fatal(err)
120-
}
117+
peer := newDiscPeer(pot.RandomAddressAt(a, po))
121118
hive.On(peer)
122119
addrs = append(addrs, peer.BzzAddr)
123120
}
124121
return addrs
125122
}
126123
register := func(a pot.Address, po int) {
127-
addr := pot.RandomAddressAt(a, po)
128-
hive.Register(&BzzAddr{OAddr: addr[:]})
124+
discPeer := newDiscPeer(a)
125+
hive.Register(discPeer.BzzAddr)
129126
}
130127

131128
// generate connected and just registered peers
@@ -238,20 +235,21 @@ func testSortPeers(peers []*BzzAddr) []*BzzAddr {
238235
// as we are not creating a real node via the protocol,
239236
// we need to create the discovery peer objects for the additional kademlia
240237
// nodes manually
241-
func newDiscPeer(addr pot.Address) (*Peer, error) {
242-
pKey, err := ecdsa.GenerateKey(crypto.S256(), crand.Reader)
243-
if err != nil {
244-
return nil, err
245-
}
238+
func newDiscPeer(addr pot.Address) *Peer {
239+
240+
// deterministically create enode id
241+
// Input to the non-random input buffer is 2xaddress since it munches 256 bits
242+
addrSeed := append(addr.Bytes(), addr.Bytes()...)
243+
pKey, _ := ecdsa.GenerateKey(crypto.S256(), bytes.NewBuffer(addrSeed))
246244
pubKey := pKey.PublicKey
247245
nod := enode.NewV4(&pubKey, net.IPv4(127, 0, 0, 1), 0, 0)
248-
bzzAddr := &BzzAddr{OAddr: addr[:], UAddr: []byte(nod.String())}
246+
bzzAddr := NewBzzAddr(addr[:], []byte(nod.String()))
249247
id := nod.ID()
250248
p2pPeer := p2p.NewPeer(id, id.String(), nil)
251249
return NewPeer(&BzzPeer{
252250
Peer: protocols.NewPeer(p2pPeer, &dummyMsgRW{}, DiscoverySpec),
253251
BzzAddr: bzzAddr,
254-
}, nil), nil
252+
}, nil)
255253
}
256254

257255
type dummyMsgRW struct{}

network/enr.go

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,35 +48,26 @@ func (b *ENRAddrEntry) DecodeRLP(s *rlp.Stream) error {
4848
return nil
4949
}
5050

51-
type ENRLightNodeEntry bool
52-
53-
func (b ENRLightNodeEntry) ENRKey() string {
54-
return "bzzlightnode"
55-
}
56-
5751
type ENRBootNodeEntry bool
5852

5953
func (b ENRBootNodeEntry) ENRKey() string {
6054
return "bzzbootnode"
6155
}
6256

6357
func getENRBzzPeer(p *p2p.Peer, rw p2p.MsgReadWriter, spec *protocols.Spec) *BzzPeer {
64-
var lightnode ENRLightNodeEntry
6558
var bootnode ENRBootNodeEntry
6659

6760
// retrieve the ENR Record data
6861
record := p.Node().Record()
69-
record.Load(&lightnode)
7062
record.Load(&bootnode)
7163

72-
// get the address; separate function as long as we need swarm/network:NewAddr() to call it
64+
// get the address; separate function as long as we need swarm/network:NewBzzAddrFromEnode() to call it
7365
addr := getENRBzzAddr(p.Node())
7466

7567
// build the peer using the retrieved data
7668
return &BzzPeer{
77-
Peer: protocols.NewPeer(p, rw, spec),
78-
LightNode: bool(lightnode),
79-
BzzAddr: addr,
69+
Peer: protocols.NewPeer(p, rw, spec),
70+
BzzAddr: addr,
8071
}
8172
}
8273

@@ -86,8 +77,5 @@ func getENRBzzAddr(nod *enode.Node) *BzzAddr {
8677
record := nod.Record()
8778
record.Load(&addr)
8879

89-
return &BzzAddr{
90-
OAddr: addr.data,
91-
UAddr: []byte(nod.String()),
92-
}
80+
return NewBzzAddr(addr.data, []byte(nod.String()))
9381
}

network/hive.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,8 @@ func (h *Hive) PeerInfo(id enode.ID) interface{} {
209209
if p == nil {
210210
return nil
211211
}
212-
addr := NewAddr(p.Node())
212+
// TODO this is bogus, the overlay address will not be correct
213+
addr := NewBzzAddrFromEnode(p.Node())
213214
return struct {
214215
OAddr hexutil.Bytes
215216
UAddr hexutil.Bytes

network/hive_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func TestRegisterAndConnect(t *testing.T) {
5555
}
5656

5757
node := s.Nodes[0]
58-
raddr := NewAddr(node)
58+
raddr := NewBzzAddrFromEnode(node)
5959
pp.Register(raddr)
6060

6161
// start the hive
@@ -100,15 +100,15 @@ func TestRegisterAndConnect(t *testing.T) {
100100
}
101101
}
102102

103-
// TestHiveStatePersistance creates a protocol simulation with n peers for a node
103+
// TestHiveStatePersistence creates a protocol simulation with n peers for a node
104104
// After protocols complete, the node is shut down and the state is stored.
105105
// Another simulation is created, where 0 nodes are created, but where the stored state is passed
106106
// The test succeeds if all the peers from the stored state are known after the protocols of the
107107
// second simulation have completed
108108
//
109109
// Actual connectivity is not in scope for this test, as the peers loaded from state are not known to
110110
// the simulation; the test only verifies that the peers are known to the node
111-
func TestHiveStatePersistance(t *testing.T) {
111+
func TestHiveStatePersistence(t *testing.T) {
112112
dir, err := ioutil.TempDir("", "hive_test_store")
113113
if err != nil {
114114
t.Fatal(err)
@@ -152,7 +152,7 @@ func TestHiveStatePersistance(t *testing.T) {
152152
h1, cleanup1 := startHive(t, dir)
153153
peers := make(map[string]bool)
154154
for i := 0; i < peersCount; i++ {
155-
raddr := RandomAddr()
155+
raddr := RandomBzzAddr()
156156
h1.Register(raddr)
157157
peers[raddr.String()] = true
158158
}

0 commit comments

Comments
 (0)