Skip to content

Commit 9c8262f

Browse files
kortatunolash
authored andcommitted
pss: Improve pressure backstop queue handling - no mutex (ethersphere#1695)
reimplement outbox so failed messages could be reenqueued and process messages in parallel
1 parent f13e647 commit 9c8262f

File tree

3 files changed

+222
-134
lines changed

3 files changed

+222
-134
lines changed

pss/forwarding_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,11 @@ func testForwardMsg(t *testing.T, ps *Pss, c *testCase) {
259259
}
260260

261261
msg := newTestMsg(recipientAddr)
262-
ps.forward(msg)
262+
err := ps.forward(msg)
263263

264+
if err != nil {
265+
t.Fatal(fmt.Sprintf("test [%s]\nmsg can't be forwarded. Expected no error but got %v", c.name, err.Error()))
266+
}
264267
// check test results
265268
var fail bool
266269
precision := len(recipientAddr)

pss/pss.go

Lines changed: 99 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,90 @@ func (params *Params) WithPrivateKey(privatekey *ecdsa.PrivateKey) *Params {
117117
return params
118118
}
119119

120+
type outbox struct {
121+
queue []*outboxMsg
122+
slots chan int
123+
process chan int
124+
quitC chan struct{}
125+
forward func(msg *PssMsg) error
126+
}
127+
128+
func newOutbox(capacity int, quitC chan struct{}, forward func(msg *PssMsg) error) outbox {
129+
outbox := outbox{
130+
queue: make([]*outboxMsg, capacity),
131+
slots: make(chan int, capacity),
132+
process: make(chan int),
133+
quitC: quitC,
134+
forward: forward,
135+
}
136+
// fill up outbox slots
137+
for i := 0; i < cap(outbox.slots); i++ {
138+
outbox.slots <- i
139+
}
140+
return outbox
141+
}
142+
143+
func (o outbox) len() int {
144+
return cap(o.slots) - len(o.slots)
145+
}
146+
147+
// enqueue a new element in the outbox if there is any slot available.
148+
// Then send it to process. This method is blocking in the process channel!
149+
func (o *outbox) enqueue(outboxmsg *outboxMsg) error {
150+
// first we try to obtain a slot in the outbox
151+
select {
152+
case slot := <-o.slots:
153+
o.queue[slot] = outboxmsg
154+
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len()))
155+
// we send this message slot to process
156+
select {
157+
case o.process <- slot:
158+
case <-o.quitC:
159+
}
160+
return nil
161+
default:
162+
metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
163+
return errors.New("outbox full")
164+
}
165+
}
166+
167+
func (o *outbox) processOutbox() {
168+
for slot := range o.process {
169+
go func(slot int) {
170+
msg := o.msg(slot)
171+
metrics.GetOrRegisterResettingTimer("pss.handle.outbox", nil).UpdateSince(msg.startedAt)
172+
if err := o.forward(msg.msg); err != nil {
173+
metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
174+
// if we failed to forward, re-insert message in the queue
175+
log.Debug(err.Error())
176+
// reenqueue the message for processing
177+
o.reenqueue(slot)
178+
log.Debug("Message re-enqued", "slot", slot)
179+
return
180+
}
181+
// free the outbox slot
182+
o.free(slot)
183+
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len()))
184+
}(slot)
185+
}
186+
}
187+
188+
func (o outbox) msg(slot int) *outboxMsg {
189+
return o.queue[slot]
190+
}
191+
192+
func (o outbox) free(slot int) {
193+
o.slots <- slot
194+
}
195+
196+
func (o outbox) reenqueue(slot int) {
197+
select {
198+
case o.process <- slot:
199+
case <-o.quitC:
200+
}
201+
202+
}
203+
120204
// Pss is the top-level struct, which takes care of message sending, receiving, decryption and encryption, message handler dispatchers
121205
// and message forwarding. Implements node.Service
122206
type Pss struct {
@@ -136,7 +220,7 @@ type Pss struct {
136220
msgTTL time.Duration
137221
paddingByteSize int
138222
capstring string
139-
outbox chan *outboxMsg
223+
outbox outbox
140224

141225
// message handling
142226
handlers map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle()
@@ -178,7 +262,6 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) {
178262
msgTTL: params.MsgTTL,
179263
paddingByteSize: defaultPaddingByteSize,
180264
capstring: c.String(),
181-
outbox: make(chan *outboxMsg, defaultOutboxCapacity),
182265

183266
handlers: make(map[Topic]map[*handler]bool),
184267
topicHandlerCaps: make(map[Topic]*handlerCaps),
@@ -189,6 +272,7 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) {
189272
},
190273
},
191274
}
275+
ps.outbox = newOutbox(defaultOutboxCapacity, ps.quitC, ps.forward)
192276

193277
for i := 0; i < hasherCount; i++ {
194278
hashfunc := storage.MakeHashFunc(storage.DefaultHash)()
@@ -219,23 +303,10 @@ func (p *Pss) Start(srv *p2p.Server) error {
219303
}
220304
}
221305
}()
222-
go func() {
223-
for {
224-
select {
225-
case msg := <-p.outbox:
226-
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(len(p.outbox)))
227-
228-
err := p.forward(msg.msg)
229-
if err != nil {
230-
log.Error(err.Error())
231-
metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
232-
}
233-
metrics.GetOrRegisterResettingTimer("pss.handle.outbox", nil).UpdateSince(msg.startedAt)
234-
case <-p.quitC:
235-
return
236-
}
237-
}
238-
}()
306+
307+
// Forward outbox messages
308+
go p.outbox.processOutbox()
309+
239310
log.Info("Started Pss")
240311
log.Info("Loaded EC keys", "pubkey", common.ToHex(crypto.FromECDSAPub(p.PublicKey())), "secp256", common.ToHex(crypto.CompressPubkey(p.PublicKey())))
241312
return nil
@@ -557,16 +628,8 @@ func (p *Pss) enqueue(msg *PssMsg) error {
557628
defer metrics.GetOrRegisterResettingTimer("pss.enqueue", nil).UpdateSince(time.Now())
558629

559630
outboxmsg := newOutboxMsg(msg)
560-
select {
561-
case p.outbox <- outboxmsg:
562-
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(len(p.outbox)))
563631

564-
return nil
565-
default:
566-
}
567-
568-
metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
569-
return errors.New("outbox full")
632+
return p.outbox.enqueue(outboxmsg)
570633
}
571634

572635
// Send a raw message (any encryption is responsibility of calling client)
@@ -730,8 +793,8 @@ func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool {
730793
// node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of
731794
// partial address, it should be forwarded to all the peers matching the partial address, if there
732795
// are any; otherwise only to one peer, closest to the recipient address. In any case, if the message
733-
// forwarding fails, the node should try to forward it to the next best peer, until the message is
734-
// successfully forwarded to at least one peer.
796+
//// forwarding fails, the node should try to forward it to the next best peer, until the message is
797+
//// successfully forwarded to at least one peer.
735798
func (p *Pss) forward(msg *PssMsg) error {
736799
metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1)
737800
sent := 0 // number of successful sends
@@ -779,19 +842,14 @@ func (p *Pss) forward(msg *PssMsg) error {
779842
return true
780843
})
781844

782-
// if we failed to send to anyone, re-insert message in the send-queue
783-
if sent == 0 {
784-
log.Debug("unable to forward to any peers")
785-
if err := p.enqueue(msg); err != nil {
786-
metrics.GetOrRegisterCounter("pss.forward.enqueue.error", nil).Inc(1)
787-
log.Error(err.Error())
788-
return err
789-
}
790-
}
791-
792845
// cache the message
793846
p.addFwdCache(msg)
794-
return nil
847+
848+
if sent == 0 {
849+
return errors.New("unable to forward to any peers")
850+
} else {
851+
return nil
852+
}
795853
}
796854

797855
/////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)