Skip to content

Commit 90b8180

Browse files
jpeletierzelig
authored andcommitted
pss: Refactor. Step 1. Refactor PssMsg (ethersphere#1734)
* Add ut testing lib * pss: refactor msgParams to message.Flags and RLP serialization * pss: move Topic to message package * pss: refactor PssMsg to message package * pss/message: Flags.EncodeRLP cosmetic change * pss: remove ut and rewrite tests
1 parent 3694854 commit 90b8180

21 files changed

+524
-356
lines changed

pss/ARCHITECTURE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
Pss provides devp2p functionality for swarm nodes without the need for a direct tcp connection between them.
44

5-
Messages are encapsulated in a devp2p message structure `PssMsg`. These capsules are forwarded from node to node using ordinary tcp devp2p until they reach their destination: The node or nodes who can successfully decrypt the message.
5+
Messages are encapsulated in a devp2p message structure `message.Message`. These capsules are forwarded from node to node using ordinary tcp devp2p until they reach their destination: The node or nodes who can successfully decrypt the message.
66

77
| Layer | Contents |
88
|-----------|-----------------|

pss/api.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/ethereum/go-ethereum/p2p"
2626
"github.com/ethereum/go-ethereum/rpc"
2727
"github.com/ethersphere/swarm/log"
28+
"github.com/ethersphere/swarm/pss/message"
2829
)
2930

3031
// Wrapper for receiving pss messages when using the pss API
@@ -50,7 +51,7 @@ func NewAPI(ps *Pss) *API {
5051
//
5152
// All incoming messages to the node matching this topic will be encapsulated in the APIMsg
5253
// struct and sent to the subscriber
53-
func (pssapi *API) Receive(ctx context.Context, topic Topic, raw bool, prox bool) (*rpc.Subscription, error) {
54+
func (pssapi *API) Receive(ctx context.Context, topic message.Topic, raw bool, prox bool) (*rpc.Subscription, error) {
5455
notifier, supported := rpc.NotifierFromContext(ctx)
5556
if !supported {
5657
return nil, fmt.Errorf("Subscribe not supported")
@@ -90,7 +91,7 @@ func (pssapi *API) Receive(ctx context.Context, topic Topic, raw bool, prox bool
9091
return psssub, nil
9192
}
9293

93-
func (pssapi *API) GetAddress(topic Topic, asymmetric bool, key string) (PssAddress, error) {
94+
func (pssapi *API) GetAddress(topic message.Topic, asymmetric bool, key string) (PssAddress, error) {
9495
var addr PssAddress
9596
if asymmetric {
9697
peer, ok := pssapi.Pss.pubKeyPool[key][topic]
@@ -122,7 +123,7 @@ func (pssapi *API) GetPublicKey() (keybytes hexutil.Bytes) {
122123
}
123124

124125
// Set Public key to associate with a particular Pss peer
125-
func (pssapi *API) SetPeerPublicKey(pubkey hexutil.Bytes, topic Topic, addr PssAddress) error {
126+
func (pssapi *API) SetPeerPublicKey(pubkey hexutil.Bytes, topic message.Topic, addr PssAddress) error {
126127
pk, err := pssapi.Pss.Crypto.UnmarshalPublicKey(pubkey)
127128
if err != nil {
128129
return fmt.Errorf("Cannot unmarshal pubkey: %x", pubkey)
@@ -139,50 +140,50 @@ func (pssapi *API) GetSymmetricKey(symkeyid string) (hexutil.Bytes, error) {
139140
return hexutil.Bytes(symkey), err
140141
}
141142

142-
func (pssapi *API) GetSymmetricAddressHint(topic Topic, symkeyid string) (PssAddress, error) {
143+
func (pssapi *API) GetSymmetricAddressHint(topic message.Topic, symkeyid string) (PssAddress, error) {
143144
return pssapi.Pss.symKeyPool[symkeyid][topic].address, nil
144145
}
145146

146-
func (pssapi *API) GetAsymmetricAddressHint(topic Topic, pubkeyid string) (PssAddress, error) {
147+
func (pssapi *API) GetAsymmetricAddressHint(topic message.Topic, pubkeyid string) (PssAddress, error) {
147148
return pssapi.Pss.pubKeyPool[pubkeyid][topic].address, nil
148149
}
149150

150-
func (pssapi *API) StringToTopic(topicstring string) (Topic, error) {
151-
topicbytes := BytesToTopic([]byte(topicstring))
151+
func (pssapi *API) StringToTopic(topicstring string) (message.Topic, error) {
152+
topicbytes := message.NewTopic([]byte(topicstring))
152153
if topicbytes == rawTopic {
153154
return rawTopic, errors.New("Topic string hashes to 0x00000000 and cannot be used")
154155
}
155156
return topicbytes, nil
156157
}
157158

158-
func (pssapi *API) SendAsym(pubkeyhex string, topic Topic, msg hexutil.Bytes) error {
159+
func (pssapi *API) SendAsym(pubkeyhex string, topic message.Topic, msg hexutil.Bytes) error {
159160
if err := validateMsg(msg); err != nil {
160161
return err
161162
}
162163
return pssapi.Pss.SendAsym(pubkeyhex, topic, msg[:])
163164
}
164165

165-
func (pssapi *API) SendSym(symkeyhex string, topic Topic, msg hexutil.Bytes) error {
166+
func (pssapi *API) SendSym(symkeyhex string, topic message.Topic, msg hexutil.Bytes) error {
166167
if err := validateMsg(msg); err != nil {
167168
return err
168169
}
169170
return pssapi.Pss.SendSym(symkeyhex, topic, msg[:])
170171
}
171172

172-
func (pssapi *API) SendRaw(addr hexutil.Bytes, topic Topic, msg hexutil.Bytes) error {
173+
func (pssapi *API) SendRaw(addr hexutil.Bytes, topic message.Topic, msg hexutil.Bytes) error {
173174
if err := validateMsg(msg); err != nil {
174175
return err
175176
}
176177
return pssapi.Pss.SendRaw(PssAddress(addr), topic, msg[:])
177178
}
178179

179-
func (pssapi *API) GetPeerTopics(pubkeyhex string) ([]Topic, error) {
180+
func (pssapi *API) GetPeerTopics(pubkeyhex string) ([]message.Topic, error) {
180181
topics, _, err := pssapi.Pss.GetPublickeyPeers(pubkeyhex)
181182
return topics, err
182183

183184
}
184185

185-
func (pssapi *API) GetPeerAddress(pubkeyhex string, topic Topic) (PssAddress, error) {
186+
func (pssapi *API) GetPeerAddress(pubkeyhex string, topic message.Topic) (PssAddress, error) {
186187
return pssapi.Pss.getPeerAddress(pubkeyhex, topic)
187188
}
188189

pss/client/client.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/ethersphere/swarm/log"
3434
"github.com/ethersphere/swarm/p2p/protocols"
3535
"github.com/ethersphere/swarm/pss"
36+
"github.com/ethersphere/swarm/pss/message"
3637
)
3738

3839
const (
@@ -46,8 +47,8 @@ type Client struct {
4647
BaseAddrHex string
4748

4849
// peers
49-
peerPool map[pss.Topic]map[string]*pssRPCRW
50-
protos map[pss.Topic]*p2p.Protocol
50+
peerPool map[message.Topic]map[string]*pssRPCRW
51+
protos map[message.Topic]*p2p.Protocol
5152

5253
// rpc connections
5354
rpc *rpc.Client
@@ -71,7 +72,7 @@ type pssRPCRW struct {
7172
closed bool
7273
}
7374

74-
func (c *Client) newpssRPCRW(pubkeyid string, addr pss.PssAddress, topicobj pss.Topic) (*pssRPCRW, error) {
75+
func (c *Client) newpssRPCRW(pubkeyid string, addr pss.PssAddress, topicobj message.Topic) (*pssRPCRW, error) {
7576
topic := topicobj.String()
7677
err := c.rpc.Call(nil, "pss_setPeerPublicKey", pubkeyid, topic, hexutil.Encode(addr[:]))
7778
if err != nil {
@@ -218,8 +219,8 @@ func NewClientWithRPC(rpcclient *rpc.Client) (*Client, error) {
218219
func newClient() (client *Client) {
219220
client = &Client{
220221
quitC: make(chan struct{}),
221-
peerPool: make(map[pss.Topic]map[string]*pssRPCRW),
222-
protos: make(map[pss.Topic]*p2p.Protocol),
222+
peerPool: make(map[message.Topic]map[string]*pssRPCRW),
223+
protos: make(map[message.Topic]*p2p.Protocol),
223224
}
224225
return
225226
}
@@ -232,7 +233,7 @@ func newClient() (client *Client) {
232233
// when an incoming message is received from a peer that is not yet known to the client,
233234
// this peer object is instantiated, and the protocol is run on it.
234235
func (c *Client) RunProtocol(ctx context.Context, proto *p2p.Protocol) error {
235-
topicobj := pss.BytesToTopic([]byte(fmt.Sprintf("%s:%d", proto.Name, proto.Version)))
236+
topicobj := message.NewTopic([]byte(fmt.Sprintf("%s:%d", proto.Name, proto.Version)))
236237
topichex := topicobj.String()
237238
msgC := make(chan pss.APIMsg)
238239
c.peerPool[topicobj] = make(map[string]*pssRPCRW)

pss/doc.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
// Pss provides devp2p functionality for swarm nodes without the need for a direct tcp connection between them.
1818
//
19-
// Messages are encapsulated in a devp2p message structure `PssMsg`. These capsules are forwarded from node to node using ordinary tcp devp2p until it reaches its destination: The node or nodes who can successfully decrypt the message.
19+
// Messages are encapsulated in a devp2p message structure `message.Message`. These capsules are forwarded from node to node using ordinary tcp devp2p until it reaches its destination: The node or nodes who can successfully decrypt the message.
2020
//
2121
// Routing of messages is done using swarm's own kademlia routing. Optionally routing can be turned off, forcing the message to be sent to all peers, similar to the behavior of the whisper protocol.
2222
//
@@ -50,11 +50,11 @@
5050
//
5151
// Under the hood, pss implements its own MsgReadWriter, which bridges MsgReadWriter.WriteMsg with Pss.SendRaw, and deftly adds an InjectMsg method which pipes incoming messages to appear on the MsgReadWriter.ReadMsg channel.
5252
//
53-
// An incoming connection is nothing more than an actual PssMsg appearing with a certain Topic. If a Handler har been registered to that Topic, the message will be passed to it. This constitutes a "new" connection if:
53+
// An incoming connection is nothing more than an actual message.Message appearing with a certain Topic. If a Handler har been registered to that Topic, the message will be passed to it. This constitutes a "new" connection if:
5454
//
5555
// - The pss node never called AddPeer with this combination of remote peer address and topic, and
5656
//
57-
// - The pss node never received a PssMsg from this remote peer with this specific Topic before.
57+
// - The pss node never received a message.Message from this remote peer with this specific Topic before.
5858
//
5959
// If it is a "new" connection, the protocol will be "run" on the remote peer, in the same manner as if it was pre-emptively added.
6060
//

pss/forwarding_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/ethersphere/swarm/network"
1313
"github.com/ethersphere/swarm/p2p/protocols"
1414
"github.com/ethersphere/swarm/pot"
15+
"github.com/ethersphere/swarm/pss/message"
1516
)
1617

1718
type testCase struct {
@@ -247,7 +248,7 @@ func testForwardMsg(t *testing.T, ps *Pss, c *testCase) {
247248
resultMap := make(map[pot.Address]int)
248249

249250
defer func() { sendFunc = sendMsg }()
250-
sendFunc = func(_ *Pss, sp *network.Peer, _ *PssMsg) bool {
251+
sendFunc = func(_ *Pss, sp *network.Peer, _ *message.Message) bool {
251252
if tries < nFails {
252253
tries++
253254
return false
@@ -346,8 +347,8 @@ func newTestDiscoveryPeer(addr pot.Address, kad *network.Kademlia) *network.Peer
346347
return network.NewPeer(bp, kad)
347348
}
348349

349-
func newTestMsg(addr []byte) *PssMsg {
350-
msg := newPssMsg(&msgParams{})
350+
func newTestMsg(addr []byte) *message.Message {
351+
msg := message.New(message.Flags{})
351352
msg.To = addr[:]
352353
msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix())
353354
msg.Topic = [4]byte{}

pss/handshake.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/ethereum/go-ethereum/rlp"
3232
"github.com/ethereum/go-ethereum/rpc"
3333
"github.com/ethersphere/swarm/log"
34+
"github.com/ethersphere/swarm/pss/message"
3435
)
3536

3637
const (
@@ -54,7 +55,7 @@ type handshakeMsg struct {
5455
Limit uint16
5556
Keys [][]byte
5657
Request uint8
57-
Topic Topic
58+
Topic message.Topic
5859
}
5960

6061
// internal representation of an individual symmetric key
@@ -112,8 +113,8 @@ type HandshakeController struct {
112113
symKeySendLimit uint16
113114
symKeyCapacity uint8
114115
symKeyIndex map[string]*handshakeKey
115-
handshakes map[string]map[Topic]*handshake
116-
deregisterFuncs map[Topic]func()
116+
handshakes map[string]map[message.Topic]*handshake
117+
deregisterFuncs map[message.Topic]func()
117118
}
118119

119120
// Attach HandshakeController to pss node
@@ -128,8 +129,8 @@ func SetHandshakeController(pss *Pss, params *HandshakeParams) error {
128129
symKeySendLimit: params.SymKeySendLimit,
129130
symKeyCapacity: params.SymKeyCapacity,
130131
symKeyIndex: make(map[string]*handshakeKey),
131-
handshakes: make(map[string]map[Topic]*handshake),
132-
deregisterFuncs: make(map[Topic]func()),
132+
handshakes: make(map[string]map[message.Topic]*handshake),
133+
deregisterFuncs: make(map[message.Topic]func()),
133134
}
134135
api := &HandshakeAPI{
135136
namespace: "pss",
@@ -147,7 +148,7 @@ func SetHandshakeController(pss *Pss, params *HandshakeParams) error {
147148

148149
// Return all unexpired symmetric keys from store by
149150
// peer (public key), topic and specified direction
150-
func (ctl *HandshakeController) validKeys(pubkeyid string, topic *Topic, in bool) (validkeys []*string) {
151+
func (ctl *HandshakeController) validKeys(pubkeyid string, topic *message.Topic, in bool) (validkeys []*string) {
151152
ctl.lock.Lock()
152153
defer ctl.lock.Unlock()
153154
now := time.Now()
@@ -177,11 +178,11 @@ func (ctl *HandshakeController) validKeys(pubkeyid string, topic *Topic, in bool
177178

178179
// Add all given symmetric keys with validity limits to store by
179180
// peer (public key), topic and specified direction
180-
func (ctl *HandshakeController) updateKeys(pubkeyid string, topic *Topic, in bool, symkeyids []string, limit uint16) {
181+
func (ctl *HandshakeController) updateKeys(pubkeyid string, topic *message.Topic, in bool, symkeyids []string, limit uint16) {
181182
ctl.lock.Lock()
182183
defer ctl.lock.Unlock()
183184
if _, ok := ctl.handshakes[pubkeyid]; !ok {
184-
ctl.handshakes[pubkeyid] = make(map[Topic]*handshake)
185+
ctl.handshakes[pubkeyid] = make(map[message.Topic]*handshake)
185186

186187
}
187188
if ctl.handshakes[pubkeyid][*topic] == nil {
@@ -214,14 +215,14 @@ func (ctl *HandshakeController) updateKeys(pubkeyid string, topic *Topic, in boo
214215
}
215216
}
216217

217-
func (ctl *HandshakeController) releaseKey(symkeyid string, topic *Topic) bool {
218+
func (ctl *HandshakeController) releaseKey(symkeyid string, topic *message.Topic) bool {
218219
ctl.lock.Lock()
219220
defer ctl.lock.Unlock()
220221
return ctl.releaseKeyNoLock(symkeyid, topic)
221222
}
222223

223224
// Expire a symmetric key, making it eligible for garbage collection
224-
func (ctl *HandshakeController) releaseKeyNoLock(symkeyid string, topic *Topic) bool {
225+
func (ctl *HandshakeController) releaseKeyNoLock(symkeyid string, topic *message.Topic) bool {
225226
if ctl.symKeyIndex[symkeyid] == nil {
226227
log.Debug("no symkey", "symkeyid", symkeyid)
227228
return false
@@ -236,7 +237,7 @@ func (ctl *HandshakeController) releaseKeyNoLock(symkeyid string, topic *Topic)
236237
// Expired means:
237238
// - expiry timestamp is set, and grace period is exceeded
238239
// - message validity limit is reached
239-
func (ctl *HandshakeController) cleanHandshake(pubkeyid string, topic *Topic, in bool, out bool) int {
240+
func (ctl *HandshakeController) cleanHandshake(pubkeyid string, topic *message.Topic, in bool, out bool) int {
240241
ctl.lock.Lock()
241242
defer ctl.lock.Unlock()
242243
var deletecount int
@@ -290,7 +291,7 @@ func (ctl *HandshakeController) getSymKey(symkeyid string) *handshakeKey {
290291
return ctl.symKeyIndex[symkeyid]
291292
}
292293

293-
// Passed as a PssMsg handler for the topic handshake is activated on
294+
// Passed as a message.Message handler for the topic handshake is activated on
294295
// Handles incoming key exchange messages and
295296
// counts message usage by symmetric key (expiry limit control)
296297
// Only returns error if key handler fails
@@ -378,7 +379,7 @@ func (ctl *HandshakeController) handleKeys(pubkeyid string, keymsg *handshakeMsg
378379
// If number of valid outgoing keys is less than the ideal/max
379380
// amount, a request is sent for the amount of keys to make up
380381
// the difference
381-
func (ctl *HandshakeController) sendKey(pubkeyid string, topic *Topic, keycount uint8) ([]string, error) {
382+
func (ctl *HandshakeController) sendKey(pubkeyid string, topic *message.Topic, keycount uint8) ([]string, error) {
382383

383384
var requestcount uint8
384385
to := PssAddress{}
@@ -392,7 +393,7 @@ func (ctl *HandshakeController) sendKey(pubkeyid string, topic *Topic, keycount
392393
recvkeyids := make([]string, keycount)
393394
ctl.lock.Lock()
394395
if _, ok := ctl.handshakes[pubkeyid]; !ok {
395-
ctl.handshakes[pubkeyid] = make(map[Topic]*handshake)
396+
ctl.handshakes[pubkeyid] = make(map[message.Topic]*handshake)
396397
}
397398
ctl.lock.Unlock()
398399

@@ -479,7 +480,7 @@ type HandshakeAPI struct {
479480
//
480481
// Fails if the incoming symmetric key store is already full (and `flush` is false),
481482
// or if the underlying key dispatcher fails
482-
func (api *HandshakeAPI) Handshake(pubkeyid string, topic Topic, sync bool, flush bool) (keys []string, err error) {
483+
func (api *HandshakeAPI) Handshake(pubkeyid string, topic message.Topic, sync bool, flush bool) (keys []string, err error) {
483484
var hsc chan []string
484485
var keycount uint8
485486
if flush {
@@ -512,13 +513,13 @@ func (api *HandshakeAPI) Handshake(pubkeyid string, topic Topic, sync bool, flus
512513
}
513514

514515
// Activate handshake functionality on a topic
515-
func (api *HandshakeAPI) AddHandshake(topic Topic) error {
516+
func (api *HandshakeAPI) AddHandshake(topic message.Topic) error {
516517
api.ctrl.deregisterFuncs[topic] = api.ctrl.pss.Register(&topic, NewHandler(api.ctrl.handler))
517518
return nil
518519
}
519520

520521
// Deactivate handshake functionality on a topic
521-
func (api *HandshakeAPI) RemoveHandshake(topic *Topic) error {
522+
func (api *HandshakeAPI) RemoveHandshake(topic *message.Topic) error {
522523
if _, ok := api.ctrl.deregisterFuncs[*topic]; ok {
523524
api.ctrl.deregisterFuncs[*topic]()
524525
}
@@ -531,7 +532,7 @@ func (api *HandshakeAPI) RemoveHandshake(topic *Topic) error {
531532
// The `in` and `out` parameters indicate for which direction(s)
532533
// symmetric keys will be returned.
533534
// If both are false, no keys (and no error) will be returned.
534-
func (api *HandshakeAPI) GetHandshakeKeys(pubkeyid string, topic Topic, in bool, out bool) (keys []string, err error) {
535+
func (api *HandshakeAPI) GetHandshakeKeys(pubkeyid string, topic message.Topic, in bool, out bool) (keys []string, err error) {
535536
if in {
536537
for _, inkey := range api.ctrl.validKeys(pubkeyid, &topic, true) {
537538
keys = append(keys, *inkey)
@@ -570,7 +571,7 @@ func (api *HandshakeAPI) GetHandshakePublicKey(symkeyid string) (string, error)
570571
// If `flush` is set, garbage collection will be performed before returning.
571572
//
572573
// Returns true on successful removal, false otherwise
573-
func (api *HandshakeAPI) ReleaseHandshakeKey(pubkeyid string, topic Topic, symkeyid string, flush bool) (removed bool, err error) {
574+
func (api *HandshakeAPI) ReleaseHandshakeKey(pubkeyid string, topic message.Topic, symkeyid string, flush bool) (removed bool, err error) {
574575
removed = api.ctrl.releaseKey(symkeyid, &topic)
575576
if removed && flush {
576577
api.ctrl.cleanHandshake(pubkeyid, &topic, true, true)
@@ -582,7 +583,7 @@ func (api *HandshakeAPI) ReleaseHandshakeKey(pubkeyid string, topic Topic, symke
582583
//
583584
// Overloads the pss.SendSym() API call, adding symmetric key usage count
584585
// for message expiry control
585-
func (api *HandshakeAPI) SendSym(symkeyid string, topic Topic, msg hexutil.Bytes) (err error) {
586+
func (api *HandshakeAPI) SendSym(symkeyid string, topic message.Topic, msg hexutil.Bytes) (err error) {
586587
err = api.ctrl.pss.SendSym(symkeyid, topic, msg[:])
587588
if otherErr := api.ctrl.registerSymKeyUse(symkeyid); otherErr != nil {
588589
return otherErr

0 commit comments

Comments
 (0)