Skip to content

Commit 416960e

Browse files
committed
pss/outbox: Split tests in white and black box. Added Start and Stop to interface
1 parent 344cbde commit 416960e

File tree

6 files changed

+110
-93
lines changed

6 files changed

+110
-93
lines changed

pss/outbox/mock.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,19 @@ var mockForwardFunction = func(msg *message.Message) error {
1414
return nil
1515
}
1616

17-
func NewOutboxMock(config *Config) (outbox *outbox) {
17+
func NewMock(config *Config) (outboxMock Outbox) {
1818
if config == nil {
1919
config = &Config{
2020
NumberSlots: defaultOutboxCapacity,
21-
QuitC: make(chan struct{}),
2221
Forward: mockForwardFunction,
2322
}
2423
} else {
2524
if config.Forward == nil {
2625
config.Forward = mockForwardFunction
2726
}
28-
if &config.NumberSlots == nil {
27+
if config.NumberSlots == 0 {
2928
config.NumberSlots = defaultOutboxCapacity
3029
}
31-
if config.QuitC == nil {
32-
config.QuitC = make(chan struct{})
33-
}
3430
}
35-
return NewOutbox(config)
31+
return New(config)
3632
}

pss/outbox/outbox.go

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ import (
99

1010
type Outbox interface {
1111
Enqueue(outboxMsg *outboxMsg) error
12-
ProcessOutbox()
13-
SetForwardFunction(forwardFunc forwardFunction) // Needed right now to mock forwardFunc in one pss_tests test
12+
Start()
13+
Stop()
14+
SetForwardFunction(forwardFunc forwardFunction) // Needed right now to mock forwardFunc in one pss_tests.go test
1415
}
1516

1617
type Config struct {
1718
NumberSlots int
18-
QuitC chan struct{}
1919
Forward forwardFunction
2020
}
2121

@@ -24,20 +24,22 @@ type outbox struct {
2424
queue []*outboxMsg
2525
slots chan int
2626
process chan int
27-
quitC chan struct{}
2827
}
2928

3029
type forwardFunction func(msg *message.Message) error
3130

3231
var ErrOutboxFull = errors.New("outbox full")
3332

34-
func NewOutbox(config *Config) *outbox {
33+
func New(config *Config) Outbox {
34+
return newOutbox(config)
35+
}
36+
37+
func newOutbox(config *Config) *outbox {
3538
outbox := outbox{
3639
forwardFunc: config.Forward,
3740
queue: make([]*outboxMsg, config.NumberSlots),
3841
slots: make(chan int, config.NumberSlots),
3942
process: make(chan int),
40-
quitC: config.QuitC,
4143
}
4244
// fill up outbox slots
4345
for i := 0; i < cap(outbox.slots); i++ {
@@ -46,6 +48,16 @@ func NewOutbox(config *Config) *outbox {
4648
return &outbox
4749
}
4850

51+
func (o *outbox) Start() {
52+
log.Info("Starting outbox")
53+
go o.processOutbox()
54+
}
55+
56+
func (o *outbox) Stop() {
57+
log.Warn("Stopping outbox")
58+
close(o.process)
59+
}
60+
4961
// Enqueue a new element in the outbox if there is any slot available.
5062
// Then send it to process. This method is blocking in the process channel!
5163
func (o *outbox) Enqueue(outboxMsg *outboxMsg) error {
@@ -55,10 +67,7 @@ func (o *outbox) Enqueue(outboxMsg *outboxMsg) error {
5567
o.queue[slot] = outboxMsg
5668
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len()))
5769
// we send this message slot to process
58-
select {
59-
case o.process <- slot:
60-
case <-o.quitC:
61-
}
70+
o.process <- slot
6271
return nil
6372
default:
6473
metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
@@ -67,7 +76,7 @@ func (o *outbox) Enqueue(outboxMsg *outboxMsg) error {
6776
}
6877

6978
//ProcessOutbox starts a routine that tries to forward messages present in the outbox queue
70-
func (o *outbox) ProcessOutbox() {
79+
func (o *outbox) processOutbox() {
7180
for slot := range o.process {
7281
go func(slot int) {
7382
msg := o.msg(slot)
@@ -100,10 +109,7 @@ func (o *outbox) free(slot int) {
100109
}
101110

102111
func (o *outbox) requeue(slot int) {
103-
select {
104-
case o.process <- slot:
105-
case <-o.quitC:
106-
}
112+
o.process <- slot
107113
}
108114

109115
func (o *outbox) len() int {

pss/outbox/outbox_test.go

Lines changed: 28 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
package outbox
1+
package outbox_test
22

33
import (
44
"errors"
5+
"github.com/ethersphere/swarm/pss/outbox"
56
"testing"
67
"time"
78

@@ -16,104 +17,65 @@ func TestOutbox(tx *testing.T) {
1617
defer t.FinishTest()
1718

1819
outboxCapacity := 2
20+
failed := make([]interface{}, 0)
21+
failedC := make(chan struct{})
1922
successC := make(chan struct{})
23+
continueC := make(chan struct{})
24+
25+
forwardFail := false
26+
2027
mockForwardFunction := func(msg *message.Message) error {
21-
successC <- struct{}{}
22-
return nil
28+
if !forwardFail {
29+
successC <- struct{}{}
30+
return nil
31+
} else {
32+
failed = append(failed, msg)
33+
failedC <- struct{}{}
34+
<-continueC
35+
return errors.New("forced test error forwarding message")
36+
}
2337
}
2438

25-
testOutbox := NewOutboxMock(&Config{
39+
testOutbox := outbox.NewMock(&outbox.Config{
2640
NumberSlots: outboxCapacity,
27-
QuitC: nil,
2841
Forward: mockForwardFunction,
2942
})
30-
go testOutbox.ProcessOutbox()
43+
44+
testOutbox.Start()
45+
defer testOutbox.Stop()
3146

3247
err := testOutbox.Enqueue(testOutboxMessage)
3348
t.Ok(err)
3449

35-
usedSlots := testOutbox.len()
36-
t.Assert(usedSlots == 1, "incorrect outbox length. expected 1, got %v", usedSlots)
37-
3850
select {
3951
case <-successC:
4052
case <-time.After(waitTimeout):
41-
t.Fatal("timeout waiting for success forward")
53+
t.Fatal("timeout waiting for first success forward")
4254
}
4355

44-
failed := make([]interface{}, 0)
45-
failedC := make(chan struct{})
46-
continueC := make(chan struct{})
47-
failedForward := func(msg *message.Message) error {
48-
failed = append(failed, msg)
49-
failedC <- struct{}{}
50-
<-continueC
51-
return errors.New("forced test error forwarding message")
52-
}
53-
testOutbox.forwardFunc = failedForward
54-
56+
forwardFail = true
5557
err = testOutbox.Enqueue(testOutboxMessage)
5658
t.Ok(err)
5759

58-
select {
59-
case <-failedC:
60-
case <-time.After(waitTimeout):
61-
t.Fatal("timeout waiting for failing forward")
62-
}
60+
//We wait for the forward function to fail
61+
<-failedC
6362

6463
failedMessages := len(failed)
6564
t.Assert(failedMessages == 1, "incorrect number of failed messages, expected 1 got %v", failedMessages)
6665

6766
// The message will be retried once we send to continueC, so first, we change the forward function
68-
testOutbox.forwardFunc = mockForwardFunction
67+
forwardFail = false
6968
continueC <- struct{}{}
7069

70+
//We wait for the retry and success
7171
select {
7272
case <-successC:
7373
case <-time.After(waitTimeout):
7474
t.Fatal("timeout waiting for second success forward")
7575
}
7676
}
7777

78-
func TestOutboxFull(tx *testing.T) {
79-
t := ut.BeginTest(tx, false)
80-
defer t.FinishTest()
81-
82-
outboxCapacity := 2
83-
processC := make(chan struct{})
84-
successForward := func(msg *message.Message) error {
85-
<-processC
86-
return nil
87-
}
88-
89-
testOutbox := NewOutboxMock(&Config{
90-
NumberSlots: outboxCapacity,
91-
QuitC: nil,
92-
Forward: successForward,
93-
})
94-
go testOutbox.ProcessOutbox()
95-
96-
err := testOutbox.Enqueue(testOutboxMessage)
97-
t.Ok(err)
98-
99-
err = testOutbox.Enqueue(testOutboxMessage)
100-
t.Ok(err)
101-
102-
//As we haven't signaled processC, the messages are still in the outbox
103-
err = testOutbox.Enqueue(testOutboxMessage)
104-
t.MustFailWith(err, ErrOutboxFull)
105-
106-
processC <- struct{}{}
107-
108-
//There should be a slot again in the outbox
109-
select {
110-
case <-testOutbox.slots:
111-
case <-time.After(waitTimeout):
112-
t.Fatalf("timeout waiting for a free slot")
113-
}
114-
}
115-
116-
var testOutboxMessage = NewOutboxMessage(&message.Message{
78+
var testOutboxMessage = outbox.NewOutboxMessage(&message.Message{
11779
To: nil,
11880
Flags: message.Flags{},
11981
Expire: 0,

pss/outbox/outbox_whitebox_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package outbox
2+
3+
import (
4+
"github.com/epiclabs-io/ut"
5+
"github.com/ethersphere/swarm/pss/message"
6+
"testing"
7+
"time"
8+
)
9+
10+
func TestOutboxFull(tx *testing.T) {
11+
t := ut.BeginTest(tx, false)
12+
defer t.FinishTest()
13+
14+
outboxCapacity := 2
15+
processC := make(chan struct{})
16+
successForward := func(msg *message.Message) error {
17+
<-processC
18+
return nil
19+
}
20+
21+
testOutbox := newOutbox(&Config{
22+
NumberSlots: outboxCapacity,
23+
Forward: successForward,
24+
})
25+
testOutbox.Start()
26+
defer testOutbox.Stop()
27+
28+
err := testOutbox.Enqueue(testOutboxMessage)
29+
t.Ok(err)
30+
31+
err = testOutbox.Enqueue(testOutboxMessage)
32+
t.Ok(err)
33+
34+
//As we haven't signaled processC, the messages are still in the outbox
35+
err = testOutbox.Enqueue(testOutboxMessage)
36+
t.MustFailWith(err, ErrOutboxFull)
37+
38+
processC <- struct{}{}
39+
//There should be a slot again in the outbox to enqueue
40+
select {
41+
case <-testOutbox.slots:
42+
case <-time.After(2 * time.Second):
43+
t.Fatalf("timeout waiting for a free slot")
44+
}
45+
46+
}
47+
48+
var testOutboxMessage = NewOutboxMessage(&message.Message{
49+
To: nil,
50+
Flags: message.Flags{},
51+
Expire: 0,
52+
Topic: message.Topic{},
53+
Payload: nil,
54+
})

pss/pss.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,8 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) {
183183
},
184184
},
185185
}
186-
ps.outbox = outbox.NewOutbox(&outbox.Config{
186+
ps.outbox = outbox.New(&outbox.Config{
187187
NumberSlots: defaultOutboxCapacity,
188-
QuitC: ps.quitC,
189188
Forward: ps.forward,
190189
})
191190

@@ -220,8 +219,7 @@ func (p *Pss) Start(srv *p2p.Server) error {
220219
}()
221220

222221
// Forward outbox messages
223-
go p.outbox.ProcessOutbox()
224-
222+
p.outbox.Start()
225223
log.Info("Started Pss")
226224
log.Info("Loaded EC keys", "pubkey", hex.EncodeToString(p.Crypto.SerializePublicKey(p.PublicKey())), "secp256", hex.EncodeToString(p.Crypto.CompressPublicKey(p.PublicKey())))
227225
return nil
@@ -230,6 +228,7 @@ func (p *Pss) Start(srv *p2p.Server) error {
230228
func (p *Pss) Stop() error {
231229
log.Info("Pss shutting down")
232230
close(p.quitC)
231+
p.outbox.Stop()
233232
return nil
234233
}
235234

pss/pss_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ func TestAddressMatchProx(t *testing.T) {
269269
}
270270

271271
ps.outbox.SetForwardFunction(mockForwardFunc)
272-
go ps.outbox.ProcessOutbox()
272+
ps.outbox.Start()
273273

274274
// create kademlia peers, so we have peers both inside and outside minproxlimit
275275
var peers []*network.Peer

0 commit comments

Comments
 (0)