Skip to content

Commit 197d609

Browse files
nolashnonsense
authored andcommitted
swarm/pss: Message handler refactor (#18169)
1 parent ca22856 commit 197d609

File tree

10 files changed

+644
-109
lines changed

10 files changed

+644
-109
lines changed

swarm/network/kademlia.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,15 @@ func NewKadParams() *KadParams {
8181
// Kademlia is a table of live peers and a db of known peers (node records)
8282
type Kademlia struct {
8383
lock sync.RWMutex
84-
*KadParams // Kademlia configuration parameters
85-
base []byte // immutable baseaddress of the table
86-
addrs *pot.Pot // pots container for known peer addresses
87-
conns *pot.Pot // pots container for live peer connections
88-
depth uint8 // stores the last current depth of saturation
89-
nDepth int // stores the last neighbourhood depth
90-
nDepthC chan int // returned by DepthC function to signal neighbourhood depth change
91-
addrCountC chan int // returned by AddrCountC function to signal peer count change
84+
*KadParams // Kademlia configuration parameters
85+
base []byte // immutable baseaddress of the table
86+
addrs *pot.Pot // pots container for known peer addresses
87+
conns *pot.Pot // pots container for live peer connections
88+
depth uint8 // stores the last current depth of saturation
89+
nDepth int // stores the last neighbourhood depth
90+
nDepthC chan int // returned by DepthC function to signal neighbourhood depth change
91+
addrCountC chan int // returned by AddrCountC function to signal peer count change
92+
Pof func(pot.Val, pot.Val, int) (int, bool) // function for calculating kademlia routing distance between two addresses
9293
}
9394

9495
// NewKademlia creates a Kademlia table for base address addr
@@ -103,6 +104,7 @@ func NewKademlia(addr []byte, params *KadParams) *Kademlia {
103104
KadParams: params,
104105
addrs: pot.NewPot(nil, 0),
105106
conns: pot.NewPot(nil, 0),
107+
Pof: pof,
106108
}
107109
}
108110

@@ -289,6 +291,7 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) {
289291
// neighbourhood depth on each change.
290292
// Not receiving from the returned channel will block On function
291293
// when the neighbourhood depth is changed.
294+
// TODO: Why is this exported, and if it should be; why can't we have more subscribers than one?
292295
func (k *Kademlia) NeighbourhoodDepthC() <-chan int {
293296
k.lock.Lock()
294297
defer k.lock.Unlock()
@@ -429,7 +432,12 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool
429432
// neighbourhoodDepth returns the proximity order that defines the distance of
430433
// the nearest neighbour set with cardinality >= MinProxBinSize
431434
// if there is altogether less than MinProxBinSize peers it returns 0
432-
// caller must hold the lock
435+
func (k *Kademlia) NeighbourhoodDepth() (depth int) {
436+
k.lock.RLock()
437+
defer k.lock.RUnlock()
438+
return k.neighbourhoodDepth()
439+
}
440+
433441
func (k *Kademlia) neighbourhoodDepth() (depth int) {
434442
if k.conns.Size() < k.MinProxBinSize {
435443
return 0

swarm/pss/api.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,15 @@ func NewAPI(ps *Pss) *API {
5151
//
5252
// All incoming messages to the node matching this topic will be encapsulated in the APIMsg
5353
// struct and sent to the subscriber
54-
func (pssapi *API) Receive(ctx context.Context, topic Topic) (*rpc.Subscription, error) {
54+
func (pssapi *API) Receive(ctx context.Context, topic Topic, raw bool, prox bool) (*rpc.Subscription, error) {
5555
notifier, supported := rpc.NotifierFromContext(ctx)
5656
if !supported {
5757
return nil, fmt.Errorf("Subscribe not supported")
5858
}
5959

6060
psssub := notifier.CreateSubscription()
6161

62-
handler := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
62+
hndlr := NewHandler(func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
6363
apimsg := &APIMsg{
6464
Msg: hexutil.Bytes(msg),
6565
Asymmetric: asymmetric,
@@ -69,9 +69,15 @@ func (pssapi *API) Receive(ctx context.Context, topic Topic) (*rpc.Subscription,
6969
log.Warn(fmt.Sprintf("notification on pss sub topic rpc (sub %v) msg %v failed!", psssub.ID, msg))
7070
}
7171
return nil
72+
})
73+
if raw {
74+
hndlr.caps.raw = true
75+
}
76+
if prox {
77+
hndlr.caps.prox = true
7278
}
7379

74-
deregf := pssapi.Register(&topic, handler)
80+
deregf := pssapi.Register(&topic, hndlr)
7581
go func() {
7682
defer deregf()
7783
select {

swarm/pss/client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func (c *Client) RunProtocol(ctx context.Context, proto *p2p.Protocol) error {
236236
topichex := topicobj.String()
237237
msgC := make(chan pss.APIMsg)
238238
c.peerPool[topicobj] = make(map[string]*pssRPCRW)
239-
sub, err := c.rpc.Subscribe(ctx, "pss", msgC, "receive", topichex)
239+
sub, err := c.rpc.Subscribe(ctx, "pss", msgC, "receive", topichex, false, false)
240240
if err != nil {
241241
return fmt.Errorf("pss event subscription failed: %v", err)
242242
}

swarm/pss/handshake.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ func (api *HandshakeAPI) Handshake(pubkeyid string, topic Topic, sync bool, flus
486486

487487
// Activate handshake functionality on a topic
488488
func (api *HandshakeAPI) AddHandshake(topic Topic) error {
489-
api.ctrl.deregisterFuncs[topic] = api.ctrl.pss.Register(&topic, api.ctrl.handler)
489+
api.ctrl.deregisterFuncs[topic] = api.ctrl.pss.Register(&topic, NewHandler(api.ctrl.handler))
490490
return nil
491491
}
492492

swarm/pss/notify/notify.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func NewController(ps *pss.Pss) *Controller {
113113
notifiers: make(map[string]*notifier),
114114
subscriptions: make(map[string]*subscription),
115115
}
116-
ctrl.pss.Register(&controlTopic, ctrl.Handler)
116+
ctrl.pss.Register(&controlTopic, pss.NewHandler(ctrl.Handler))
117117
return ctrl
118118
}
119119

@@ -336,7 +336,7 @@ func (c *Controller) handleNotifyWithKeyMsg(msg *Msg) error {
336336
// \TODO keep track of and add actual address
337337
updaterAddr := pss.PssAddress([]byte{})
338338
c.pss.SetSymmetricKey(symkey, topic, &updaterAddr, true)
339-
c.pss.Register(&topic, c.Handler)
339+
c.pss.Register(&topic, pss.NewHandler(c.Handler))
340340
return c.subscriptions[msg.namestring].handler(msg.namestring, msg.Payload[:len(msg.Payload)-symKeyLength])
341341
}
342342

swarm/pss/notify/notify_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func TestStart(t *testing.T) {
121121
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
122122
defer cancel()
123123
rmsgC := make(chan *pss.APIMsg)
124-
rightSub, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", controlTopic)
124+
rightSub, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", controlTopic, false, false)
125125
if err != nil {
126126
t.Fatal(err)
127127
}
@@ -174,7 +174,7 @@ func TestStart(t *testing.T) {
174174
t.Fatalf("expected payload length %d, have %d", len(updateMsg)+symKeyLength, len(dMsg.Payload))
175175
}
176176

177-
rightSubUpdate, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", rsrcTopic)
177+
rightSubUpdate, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", rsrcTopic, false, false)
178178
if err != nil {
179179
t.Fatal(err)
180180
}

swarm/pss/protocol_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,15 +92,15 @@ func testProtocol(t *testing.T) {
9292
lmsgC := make(chan APIMsg)
9393
lctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
9494
defer cancel()
95-
lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic)
95+
lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false, false)
9696
if err != nil {
9797
t.Fatal(err)
9898
}
9999
defer lsub.Unsubscribe()
100100
rmsgC := make(chan APIMsg)
101101
rctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
102102
defer cancel()
103-
rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic)
103+
rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false)
104104
if err != nil {
105105
t.Fatal(err)
106106
}
@@ -130,6 +130,7 @@ func testProtocol(t *testing.T) {
130130
log.Debug("lnode ok")
131131
case cerr := <-lctx.Done():
132132
t.Fatalf("test message timed out: %v", cerr)
133+
return
133134
}
134135
select {
135136
case <-rmsgC:

0 commit comments

Comments
 (0)