Skip to content

Commit 23b424e

Browse files
committed
pss/outbox: Removed interface
1 parent 416960e commit 23b424e

File tree

5 files changed

+29
-47
lines changed

5 files changed

+29
-47
lines changed

pss/outbox/mock.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ var mockForwardFunction = func(msg *message.Message) error {
1414
return nil
1515
}
1616

17-
func NewMock(config *Config) (outboxMock Outbox) {
17+
func NewMock(config *Config) (outboxMock *Outbox) {
1818
if config == nil {
1919
config = &Config{
2020
NumberSlots: defaultOutboxCapacity,
@@ -28,5 +28,5 @@ func NewMock(config *Config) (outboxMock Outbox) {
2828
config.NumberSlots = defaultOutboxCapacity
2929
}
3030
}
31-
return New(config)
31+
return NewOutbox(config)
3232
}

pss/outbox/outbox.go

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,12 @@ import (
77
"github.com/ethersphere/swarm/pss/message"
88
)
99

10-
type Outbox interface {
11-
Enqueue(outboxMsg *outboxMsg) error
12-
Start()
13-
Stop()
14-
SetForwardFunction(forwardFunc forwardFunction) // Needed right now to mock forwardFunc in one pss_tests.go test
15-
}
16-
1710
type Config struct {
1811
NumberSlots int
1912
Forward forwardFunction
2013
}
2114

22-
type outbox struct {
15+
type Outbox struct {
2316
forwardFunc forwardFunction
2417
queue []*outboxMsg
2518
slots chan int
@@ -30,12 +23,8 @@ type forwardFunction func(msg *message.Message) error
3023

3124
var ErrOutboxFull = errors.New("outbox full")
3225

33-
func New(config *Config) Outbox {
34-
return newOutbox(config)
35-
}
36-
37-
func newOutbox(config *Config) *outbox {
38-
outbox := outbox{
26+
func NewOutbox(config *Config) *Outbox {
27+
outbox := &Outbox{
3928
forwardFunc: config.Forward,
4029
queue: make([]*outboxMsg, config.NumberSlots),
4130
slots: make(chan int, config.NumberSlots),
@@ -45,22 +34,22 @@ func newOutbox(config *Config) *outbox {
4534
for i := 0; i < cap(outbox.slots); i++ {
4635
outbox.slots <- i
4736
}
48-
return &outbox
37+
return outbox
4938
}
5039

51-
func (o *outbox) Start() {
40+
func (o *Outbox) Start() {
5241
log.Info("Starting outbox")
5342
go o.processOutbox()
5443
}
5544

56-
func (o *outbox) Stop() {
57-
log.Warn("Stopping outbox")
45+
func (o *Outbox) Stop() {
46+
log.Info("Stopping outbox")
5847
close(o.process)
5948
}
6049

6150
// Enqueue a new element in the outbox if there is any slot available.
6251
// Then send it to process. This method is blocking in the process channel!
63-
func (o *outbox) Enqueue(outboxMsg *outboxMsg) error {
52+
func (o *Outbox) Enqueue(outboxMsg *outboxMsg) error {
6453
// first we try to obtain a slot in the outbox
6554
select {
6655
case slot := <-o.slots:
@@ -76,7 +65,7 @@ func (o *outbox) Enqueue(outboxMsg *outboxMsg) error {
7665
}
7766

7867
//ProcessOutbox starts a routine that tries to forward messages present in the outbox queue
79-
func (o *outbox) processOutbox() {
68+
func (o *Outbox) processOutbox() {
8069
for slot := range o.process {
8170
go func(slot int) {
8271
msg := o.msg(slot)
@@ -96,22 +85,22 @@ func (o *outbox) processOutbox() {
9685
}(slot)
9786
}
9887
}
99-
func (o *outbox) SetForwardFunction(forwardFunc forwardFunction) {
88+
func (o *Outbox) SetForwardFunction(forwardFunc forwardFunction) {
10089
o.forwardFunc = forwardFunc
10190
}
10291

103-
func (o *outbox) msg(slot int) *outboxMsg {
92+
func (o *Outbox) msg(slot int) *outboxMsg {
10493
return o.queue[slot]
10594
}
10695

107-
func (o *outbox) free(slot int) {
96+
func (o *Outbox) free(slot int) {
10897
o.slots <- slot
10998
}
11099

111-
func (o *outbox) requeue(slot int) {
100+
func (o *Outbox) requeue(slot int) {
112101
o.process <- slot
113102
}
114103

115-
func (o *outbox) len() int {
104+
func (o *Outbox) len() int {
116105
return cap(o.slots) - len(o.slots)
117106
}

pss/outbox/outbox_test.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,11 @@ func TestOutbox(tx *testing.T) {
4747
err := testOutbox.Enqueue(testOutboxMessage)
4848
t.Ok(err)
4949

50-
select {
51-
case <-successC:
52-
case <-time.After(waitTimeout):
53-
t.Fatal("timeout waiting for first success forward")
54-
}
50+
//We wait for the forward function to success
51+
<-successC
5552

5653
forwardFail = true
54+
5755
err = testOutbox.Enqueue(testOutboxMessage)
5856
t.Ok(err)
5957

@@ -68,11 +66,7 @@ func TestOutbox(tx *testing.T) {
6866
continueC <- struct{}{}
6967

7068
//We wait for the retry and success
71-
select {
72-
case <-successC:
73-
case <-time.After(waitTimeout):
74-
t.Fatal("timeout waiting for second success forward")
75-
}
69+
<-successC
7670
}
7771

7872
var testOutboxMessage = outbox.NewOutboxMessage(&message.Message{

pss/outbox/outbox_whitebox_test.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import (
77
"time"
88
)
99

10-
func TestOutboxFull(tx *testing.T) {
10+
const waitTimeout = 2 * time.Second
11+
12+
func TestFullOutbox(tx *testing.T) {
1113
t := ut.BeginTest(tx, false)
1214
defer t.FinishTest()
1315

@@ -18,7 +20,7 @@ func TestOutboxFull(tx *testing.T) {
1820
return nil
1921
}
2022

21-
testOutbox := newOutbox(&Config{
23+
testOutbox := NewOutbox(&Config{
2224
NumberSlots: outboxCapacity,
2325
Forward: successForward,
2426
})
@@ -36,13 +38,9 @@ func TestOutboxFull(tx *testing.T) {
3638
t.MustFailWith(err, ErrOutboxFull)
3739

3840
processC <- struct{}{}
39-
//There should be a slot again in the outbox to enqueue
40-
select {
41-
case <-testOutbox.slots:
42-
case <-time.After(2 * time.Second):
43-
t.Fatalf("timeout waiting for a free slot")
44-
}
4541

42+
//There should be a slot again in the outbox to enqueue
43+
<-testOutbox.slots
4644
}
4745

4846
var testOutboxMessage = NewOutboxMessage(&message.Message{

pss/pss.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ type Pss struct {
132132
msgTTL time.Duration
133133
paddingByteSize int
134134
capstring string
135-
outbox outbox.Outbox
135+
outbox *outbox.Outbox
136136

137137
// message handling
138138
handlers map[message.Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle()
@@ -183,7 +183,7 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) {
183183
},
184184
},
185185
}
186-
ps.outbox = outbox.New(&outbox.Config{
186+
ps.outbox = outbox.NewOutbox(&outbox.Config{
187187
NumberSlots: defaultOutboxCapacity,
188188
Forward: ps.forward,
189189
})
@@ -220,6 +220,7 @@ func (p *Pss) Start(srv *p2p.Server) error {
220220

221221
// Forward outbox messages
222222
p.outbox.Start()
223+
223224
log.Info("Started Pss")
224225
log.Info("Loaded EC keys", "pubkey", hex.EncodeToString(p.Crypto.SerializePublicKey(p.PublicKey())), "secp256", hex.EncodeToString(p.Crypto.CompressPublicKey(p.PublicKey())))
225226
return nil

0 commit comments

Comments
 (0)