Skip to content

Commit 3ef28d7

Browse files
committed
pss: move Topic to message package
1 parent 5df59a8 commit 3ef28d7

24 files changed

+215
-180
lines changed

pss/api.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/ethereum/go-ethereum/p2p"
2626
"github.com/ethereum/go-ethereum/rpc"
2727
"github.com/ethersphere/swarm/log"
28+
"github.com/ethersphere/swarm/pss/message"
2829
)
2930

3031
// Wrapper for receiving pss messages when using the pss API
@@ -50,7 +51,7 @@ func NewAPI(ps *Pss) *API {
5051
//
5152
// All incoming messages to the node matching this topic will be encapsulated in the APIMsg
5253
// struct and sent to the subscriber
53-
func (pssapi *API) Receive(ctx context.Context, topic Topic, raw bool, prox bool) (*rpc.Subscription, error) {
54+
func (pssapi *API) Receive(ctx context.Context, topic message.Topic, raw bool, prox bool) (*rpc.Subscription, error) {
5455
notifier, supported := rpc.NotifierFromContext(ctx)
5556
if !supported {
5657
return nil, fmt.Errorf("Subscribe not supported")
@@ -90,7 +91,7 @@ func (pssapi *API) Receive(ctx context.Context, topic Topic, raw bool, prox bool
9091
return psssub, nil
9192
}
9293

93-
func (pssapi *API) GetAddress(topic Topic, asymmetric bool, key string) (PssAddress, error) {
94+
func (pssapi *API) GetAddress(topic message.Topic, asymmetric bool, key string) (PssAddress, error) {
9495
var addr PssAddress
9596
if asymmetric {
9697
peer, ok := pssapi.Pss.pubKeyPool[key][topic]
@@ -122,7 +123,7 @@ func (pssapi *API) GetPublicKey() (keybytes hexutil.Bytes) {
122123
}
123124

124125
// Set Public key to associate with a particular Pss peer
125-
func (pssapi *API) SetPeerPublicKey(pubkey hexutil.Bytes, topic Topic, addr PssAddress) error {
126+
func (pssapi *API) SetPeerPublicKey(pubkey hexutil.Bytes, topic message.Topic, addr PssAddress) error {
126127
pk, err := pssapi.Pss.Crypto.UnmarshalPublicKey(pubkey)
127128
if err != nil {
128129
return fmt.Errorf("Cannot unmarshal pubkey: %x", pubkey)
@@ -139,50 +140,50 @@ func (pssapi *API) GetSymmetricKey(symkeyid string) (hexutil.Bytes, error) {
139140
return hexutil.Bytes(symkey), err
140141
}
141142

142-
func (pssapi *API) GetSymmetricAddressHint(topic Topic, symkeyid string) (PssAddress, error) {
143+
func (pssapi *API) GetSymmetricAddressHint(topic message.Topic, symkeyid string) (PssAddress, error) {
143144
return pssapi.Pss.symKeyPool[symkeyid][topic].address, nil
144145
}
145146

146-
func (pssapi *API) GetAsymmetricAddressHint(topic Topic, pubkeyid string) (PssAddress, error) {
147+
func (pssapi *API) GetAsymmetricAddressHint(topic message.Topic, pubkeyid string) (PssAddress, error) {
147148
return pssapi.Pss.pubKeyPool[pubkeyid][topic].address, nil
148149
}
149150

150-
func (pssapi *API) StringToTopic(topicstring string) (Topic, error) {
151-
topicbytes := BytesToTopic([]byte(topicstring))
151+
func (pssapi *API) StringToTopic(topicstring string) (message.Topic, error) {
152+
topicbytes := message.NewTopic([]byte(topicstring))
152153
if topicbytes == rawTopic {
153154
return rawTopic, errors.New("Topic string hashes to 0x00000000 and cannot be used")
154155
}
155156
return topicbytes, nil
156157
}
157158

158-
func (pssapi *API) SendAsym(pubkeyhex string, topic Topic, msg hexutil.Bytes) error {
159+
func (pssapi *API) SendAsym(pubkeyhex string, topic message.Topic, msg hexutil.Bytes) error {
159160
if err := validateMsg(msg); err != nil {
160161
return err
161162
}
162163
return pssapi.Pss.SendAsym(pubkeyhex, topic, msg[:])
163164
}
164165

165-
func (pssapi *API) SendSym(symkeyhex string, topic Topic, msg hexutil.Bytes) error {
166+
func (pssapi *API) SendSym(symkeyhex string, topic message.Topic, msg hexutil.Bytes) error {
166167
if err := validateMsg(msg); err != nil {
167168
return err
168169
}
169170
return pssapi.Pss.SendSym(symkeyhex, topic, msg[:])
170171
}
171172

172-
func (pssapi *API) SendRaw(addr hexutil.Bytes, topic Topic, msg hexutil.Bytes) error {
173+
func (pssapi *API) SendRaw(addr hexutil.Bytes, topic message.Topic, msg hexutil.Bytes) error {
173174
if err := validateMsg(msg); err != nil {
174175
return err
175176
}
176177
return pssapi.Pss.SendRaw(PssAddress(addr), topic, msg[:])
177178
}
178179

179-
func (pssapi *API) GetPeerTopics(pubkeyhex string) ([]Topic, error) {
180+
func (pssapi *API) GetPeerTopics(pubkeyhex string) ([]message.Topic, error) {
180181
topics, _, err := pssapi.Pss.GetPublickeyPeers(pubkeyhex)
181182
return topics, err
182183

183184
}
184185

185-
func (pssapi *API) GetPeerAddress(pubkeyhex string, topic Topic) (PssAddress, error) {
186+
func (pssapi *API) GetPeerAddress(pubkeyhex string, topic message.Topic) (PssAddress, error) {
186187
return pssapi.Pss.getPeerAddress(pubkeyhex, topic)
187188
}
188189

pss/client/client.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/ethersphere/swarm/log"
3434
"github.com/ethersphere/swarm/p2p/protocols"
3535
"github.com/ethersphere/swarm/pss"
36+
"github.com/ethersphere/swarm/pss/message"
3637
)
3738

3839
const (
@@ -46,8 +47,8 @@ type Client struct {
4647
BaseAddrHex string
4748

4849
// peers
49-
peerPool map[pss.Topic]map[string]*pssRPCRW
50-
protos map[pss.Topic]*p2p.Protocol
50+
peerPool map[message.Topic]map[string]*pssRPCRW
51+
protos map[message.Topic]*p2p.Protocol
5152

5253
// rpc connections
5354
rpc *rpc.Client
@@ -71,7 +72,7 @@ type pssRPCRW struct {
7172
closed bool
7273
}
7374

74-
func (c *Client) newpssRPCRW(pubkeyid string, addr pss.PssAddress, topicobj pss.Topic) (*pssRPCRW, error) {
75+
func (c *Client) newpssRPCRW(pubkeyid string, addr pss.PssAddress, topicobj message.Topic) (*pssRPCRW, error) {
7576
topic := topicobj.String()
7677
err := c.rpc.Call(nil, "pss_setPeerPublicKey", pubkeyid, topic, hexutil.Encode(addr[:]))
7778
if err != nil {
@@ -218,8 +219,8 @@ func NewClientWithRPC(rpcclient *rpc.Client) (*Client, error) {
218219
func newClient() (client *Client) {
219220
client = &Client{
220221
quitC: make(chan struct{}),
221-
peerPool: make(map[pss.Topic]map[string]*pssRPCRW),
222-
protos: make(map[pss.Topic]*p2p.Protocol),
222+
peerPool: make(map[message.Topic]map[string]*pssRPCRW),
223+
protos: make(map[message.Topic]*p2p.Protocol),
223224
}
224225
return
225226
}
@@ -232,7 +233,7 @@ func newClient() (client *Client) {
232233
// when an incoming message is received from a peer that is not yet known to the client,
233234
// this peer object is instantiated, and the protocol is run on it.
234235
func (c *Client) RunProtocol(ctx context.Context, proto *p2p.Protocol) error {
235-
topicobj := pss.BytesToTopic([]byte(fmt.Sprintf("%s:%d", proto.Name, proto.Version)))
236+
topicobj := message.NewTopic([]byte(fmt.Sprintf("%s:%d", proto.Name, proto.Version)))
236237
topichex := topicobj.String()
237238
msgC := make(chan pss.APIMsg)
238239
c.peerPool[topicobj] = make(map[string]*pssRPCRW)

pss/forwarding_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/ethersphere/swarm/network"
1313
"github.com/ethersphere/swarm/p2p/protocols"
1414
"github.com/ethersphere/swarm/pot"
15-
"github.com/ethersphere/swarm/pss/internal/message"
15+
"github.com/ethersphere/swarm/pss/message"
1616
)
1717

1818
type testCase struct {

pss/handshake.go

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/ethereum/go-ethereum/rlp"
3232
"github.com/ethereum/go-ethereum/rpc"
3333
"github.com/ethersphere/swarm/log"
34+
"github.com/ethersphere/swarm/pss/message"
3435
)
3536

3637
const (
@@ -54,7 +55,7 @@ type handshakeMsg struct {
5455
Limit uint16
5556
Keys [][]byte
5657
Request uint8
57-
Topic Topic
58+
Topic message.Topic
5859
}
5960

6061
// internal representation of an individual symmetric key
@@ -112,8 +113,8 @@ type HandshakeController struct {
112113
symKeySendLimit uint16
113114
symKeyCapacity uint8
114115
symKeyIndex map[string]*handshakeKey
115-
handshakes map[string]map[Topic]*handshake
116-
deregisterFuncs map[Topic]func()
116+
handshakes map[string]map[message.Topic]*handshake
117+
deregisterFuncs map[message.Topic]func()
117118
}
118119

119120
// Attach HandshakeController to pss node
@@ -128,8 +129,8 @@ func SetHandshakeController(pss *Pss, params *HandshakeParams) error {
128129
symKeySendLimit: params.SymKeySendLimit,
129130
symKeyCapacity: params.SymKeyCapacity,
130131
symKeyIndex: make(map[string]*handshakeKey),
131-
handshakes: make(map[string]map[Topic]*handshake),
132-
deregisterFuncs: make(map[Topic]func()),
132+
handshakes: make(map[string]map[message.Topic]*handshake),
133+
deregisterFuncs: make(map[message.Topic]func()),
133134
}
134135
api := &HandshakeAPI{
135136
namespace: "pss",
@@ -147,7 +148,7 @@ func SetHandshakeController(pss *Pss, params *HandshakeParams) error {
147148

148149
// Return all unexpired symmetric keys from store by
149150
// peer (public key), topic and specified direction
150-
func (ctl *HandshakeController) validKeys(pubkeyid string, topic *Topic, in bool) (validkeys []*string) {
151+
func (ctl *HandshakeController) validKeys(pubkeyid string, topic *message.Topic, in bool) (validkeys []*string) {
151152
ctl.lock.Lock()
152153
defer ctl.lock.Unlock()
153154
now := time.Now()
@@ -177,11 +178,11 @@ func (ctl *HandshakeController) validKeys(pubkeyid string, topic *Topic, in bool
177178

178179
// Add all given symmetric keys with validity limits to store by
179180
// peer (public key), topic and specified direction
180-
func (ctl *HandshakeController) updateKeys(pubkeyid string, topic *Topic, in bool, symkeyids []string, limit uint16) {
181+
func (ctl *HandshakeController) updateKeys(pubkeyid string, topic *message.Topic, in bool, symkeyids []string, limit uint16) {
181182
ctl.lock.Lock()
182183
defer ctl.lock.Unlock()
183184
if _, ok := ctl.handshakes[pubkeyid]; !ok {
184-
ctl.handshakes[pubkeyid] = make(map[Topic]*handshake)
185+
ctl.handshakes[pubkeyid] = make(map[message.Topic]*handshake)
185186

186187
}
187188
if ctl.handshakes[pubkeyid][*topic] == nil {
@@ -214,14 +215,14 @@ func (ctl *HandshakeController) updateKeys(pubkeyid string, topic *Topic, in boo
214215
}
215216
}
216217

217-
func (ctl *HandshakeController) releaseKey(symkeyid string, topic *Topic) bool {
218+
func (ctl *HandshakeController) releaseKey(symkeyid string, topic *message.Topic) bool {
218219
ctl.lock.Lock()
219220
defer ctl.lock.Unlock()
220221
return ctl.releaseKeyNoLock(symkeyid, topic)
221222
}
222223

223224
// Expire a symmetric key, making it eligible for garbage collection
224-
func (ctl *HandshakeController) releaseKeyNoLock(symkeyid string, topic *Topic) bool {
225+
func (ctl *HandshakeController) releaseKeyNoLock(symkeyid string, topic *message.Topic) bool {
225226
if ctl.symKeyIndex[symkeyid] == nil {
226227
log.Debug("no symkey", "symkeyid", symkeyid)
227228
return false
@@ -236,7 +237,7 @@ func (ctl *HandshakeController) releaseKeyNoLock(symkeyid string, topic *Topic)
236237
// Expired means:
237238
// - expiry timestamp is set, and grace period is exceeded
238239
// - message validity limit is reached
239-
func (ctl *HandshakeController) cleanHandshake(pubkeyid string, topic *Topic, in bool, out bool) int {
240+
func (ctl *HandshakeController) cleanHandshake(pubkeyid string, topic *message.Topic, in bool, out bool) int {
240241
ctl.lock.Lock()
241242
defer ctl.lock.Unlock()
242243
var deletecount int
@@ -378,7 +379,7 @@ func (ctl *HandshakeController) handleKeys(pubkeyid string, keymsg *handshakeMsg
378379
// If number of valid outgoing keys is less than the ideal/max
379380
// amount, a request is sent for the amount of keys to make up
380381
// the difference
381-
func (ctl *HandshakeController) sendKey(pubkeyid string, topic *Topic, keycount uint8) ([]string, error) {
382+
func (ctl *HandshakeController) sendKey(pubkeyid string, topic *message.Topic, keycount uint8) ([]string, error) {
382383

383384
var requestcount uint8
384385
to := PssAddress{}
@@ -392,7 +393,7 @@ func (ctl *HandshakeController) sendKey(pubkeyid string, topic *Topic, keycount
392393
recvkeyids := make([]string, keycount)
393394
ctl.lock.Lock()
394395
if _, ok := ctl.handshakes[pubkeyid]; !ok {
395-
ctl.handshakes[pubkeyid] = make(map[Topic]*handshake)
396+
ctl.handshakes[pubkeyid] = make(map[message.Topic]*handshake)
396397
}
397398
ctl.lock.Unlock()
398399

@@ -479,7 +480,7 @@ type HandshakeAPI struct {
479480
//
480481
// Fails if the incoming symmetric key store is already full (and `flush` is false),
481482
// or if the underlying key dispatcher fails
482-
func (api *HandshakeAPI) Handshake(pubkeyid string, topic Topic, sync bool, flush bool) (keys []string, err error) {
483+
func (api *HandshakeAPI) Handshake(pubkeyid string, topic message.Topic, sync bool, flush bool) (keys []string, err error) {
483484
var hsc chan []string
484485
var keycount uint8
485486
if flush {
@@ -512,13 +513,13 @@ func (api *HandshakeAPI) Handshake(pubkeyid string, topic Topic, sync bool, flus
512513
}
513514

514515
// Activate handshake functionality on a topic
515-
func (api *HandshakeAPI) AddHandshake(topic Topic) error {
516+
func (api *HandshakeAPI) AddHandshake(topic message.Topic) error {
516517
api.ctrl.deregisterFuncs[topic] = api.ctrl.pss.Register(&topic, NewHandler(api.ctrl.handler))
517518
return nil
518519
}
519520

520521
// Deactivate handshake functionality on a topic
521-
func (api *HandshakeAPI) RemoveHandshake(topic *Topic) error {
522+
func (api *HandshakeAPI) RemoveHandshake(topic *message.Topic) error {
522523
if _, ok := api.ctrl.deregisterFuncs[*topic]; ok {
523524
api.ctrl.deregisterFuncs[*topic]()
524525
}
@@ -531,7 +532,7 @@ func (api *HandshakeAPI) RemoveHandshake(topic *Topic) error {
531532
// The `in` and `out` parameters indicate for which direction(s)
532533
// symmetric keys will be returned.
533534
// If both are false, no keys (and no error) will be returned.
534-
func (api *HandshakeAPI) GetHandshakeKeys(pubkeyid string, topic Topic, in bool, out bool) (keys []string, err error) {
535+
func (api *HandshakeAPI) GetHandshakeKeys(pubkeyid string, topic message.Topic, in bool, out bool) (keys []string, err error) {
535536
if in {
536537
for _, inkey := range api.ctrl.validKeys(pubkeyid, &topic, true) {
537538
keys = append(keys, *inkey)
@@ -570,7 +571,7 @@ func (api *HandshakeAPI) GetHandshakePublicKey(symkeyid string) (string, error)
570571
// If `flush` is set, garbage collection will be performed before returning.
571572
//
572573
// Returns true on successful removal, false otherwise
573-
func (api *HandshakeAPI) ReleaseHandshakeKey(pubkeyid string, topic Topic, symkeyid string, flush bool) (removed bool, err error) {
574+
func (api *HandshakeAPI) ReleaseHandshakeKey(pubkeyid string, topic message.Topic, symkeyid string, flush bool) (removed bool, err error) {
574575
removed = api.ctrl.releaseKey(symkeyid, &topic)
575576
if removed && flush {
576577
api.ctrl.cleanHandshake(pubkeyid, &topic, true, true)
@@ -582,7 +583,7 @@ func (api *HandshakeAPI) ReleaseHandshakeKey(pubkeyid string, topic Topic, symke
582583
//
583584
// Overloads the pss.SendSym() API call, adding symmetric key usage count
584585
// for message expiry control
585-
func (api *HandshakeAPI) SendSym(symkeyid string, topic Topic, msg hexutil.Bytes) (err error) {
586+
func (api *HandshakeAPI) SendSym(symkeyid string, topic message.Topic, msg hexutil.Bytes) (err error) {
586587
err = api.ctrl.pss.SendSym(symkeyid, topic, msg[:])
587588
if otherErr := api.ctrl.registerSymKeyUse(symkeyid); otherErr != nil {
588589
return otherErr

0 commit comments

Comments
 (0)