Skip to content

Commit a5cdd6b

Browse files
committed
pss/outbox: refactored outbox into own package, touched pss_tests and pss to accommodate changes
1 parent 01d9194 commit a5cdd6b

File tree

8 files changed

+303
-231
lines changed

8 files changed

+303
-231
lines changed

pss/outbox/message.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package outbox
2+
3+
import (
4+
"time"
5+
6+
"github.com/ethersphere/swarm/pss/message"
7+
)
8+
9+
type outboxMsg struct {
10+
msg *message.Message
11+
startedAt time.Time
12+
}
13+
14+
func NewOutboxMessage(msg *message.Message) *outboxMsg {
15+
return &outboxMsg{
16+
msg: msg,
17+
startedAt: time.Now(),
18+
}
19+
}

pss/outbox/mock.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package outbox
2+
3+
import (
4+
"github.com/ethersphere/swarm/log"
5+
"github.com/ethersphere/swarm/pss/message"
6+
)
7+
8+
const (
9+
defaultOutboxCapacity = 100000
10+
)
11+
12+
var mockForwardFunction = func(msg *message.Message) error {
13+
log.Debug("Forwarded message", "msg", msg)
14+
return nil
15+
}
16+
17+
func NewMock(config *Config) (outboxMock *Outbox) {
18+
if config == nil {
19+
config = &Config{
20+
NumberSlots: defaultOutboxCapacity,
21+
Forward: mockForwardFunction,
22+
}
23+
} else {
24+
if config.Forward == nil {
25+
config.Forward = mockForwardFunction
26+
}
27+
if config.NumberSlots == 0 {
28+
config.NumberSlots = defaultOutboxCapacity
29+
}
30+
}
31+
return NewOutbox(config)
32+
}

pss/outbox/outbox.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package outbox
2+
3+
import (
4+
"errors"
5+
6+
"github.com/ethereum/go-ethereum/metrics"
7+
"github.com/ethersphere/swarm/log"
8+
"github.com/ethersphere/swarm/pss/message"
9+
)
10+
11+
type Config struct {
12+
NumberSlots int
13+
Forward forwardFunction
14+
}
15+
16+
type Outbox struct {
17+
forwardFunc forwardFunction
18+
queue []*outboxMsg
19+
slots chan int
20+
process chan int
21+
stopC chan struct{}
22+
}
23+
24+
type forwardFunction func(msg *message.Message) error
25+
26+
var ErrOutboxFull = errors.New("outbox full")
27+
28+
func NewOutbox(config *Config) *Outbox {
29+
outbox := &Outbox{
30+
forwardFunc: config.Forward,
31+
queue: make([]*outboxMsg, config.NumberSlots),
32+
slots: make(chan int, config.NumberSlots),
33+
process: make(chan int),
34+
stopC: make(chan struct{}),
35+
}
36+
// fill up outbox slots
37+
for i := 0; i < cap(outbox.slots); i++ {
38+
outbox.slots <- i
39+
}
40+
return outbox
41+
}
42+
43+
func (o *Outbox) Start() {
44+
log.Info("Starting outbox")
45+
go o.processOutbox()
46+
}
47+
48+
func (o *Outbox) Stop() {
49+
log.Info("Stopping outbox")
50+
close(o.stopC)
51+
}
52+
53+
// Enqueue a new element in the outbox if there is any slot available.
54+
// Then send it to process. This method is blocking in the process channel!
55+
func (o *Outbox) Enqueue(outboxMsg *outboxMsg) error {
56+
// first we try to obtain a slot in the outbox
57+
select {
58+
case slot := <-o.slots:
59+
o.queue[slot] = outboxMsg
60+
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len()))
61+
// we send this message slot to process
62+
select {
63+
case <-o.stopC:
64+
case o.process <- slot:
65+
}
66+
return nil
67+
default:
68+
metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
69+
return ErrOutboxFull
70+
}
71+
}
72+
73+
//ProcessOutbox starts a routine that tries to forward messages present in the outbox queue
74+
func (o *Outbox) processOutbox() {
75+
for slot := range o.process {
76+
go func(slot int) {
77+
msg := o.queue[slot]
78+
metrics.GetOrRegisterResettingTimer("pss.handle.outbox", nil).UpdateSince(msg.startedAt)
79+
if err := o.forwardFunc(msg.msg); err != nil {
80+
metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
81+
log.Debug(err.Error())
82+
// requeue the message for processing
83+
o.requeue(slot)
84+
log.Debug("Message requeued", "slot", slot)
85+
return
86+
}
87+
//message processed, free the outbox slot
88+
o.free(slot)
89+
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len()))
90+
}(slot)
91+
}
92+
}
93+
94+
func (o *Outbox) free(slot int) {
95+
select {
96+
case <-o.stopC:
97+
case o.slots <- slot:
98+
}
99+
100+
}
101+
102+
func (o *Outbox) requeue(slot int) {
103+
select {
104+
case <-o.stopC:
105+
case o.process <- slot:
106+
}
107+
}
108+
func (o *Outbox) len() int {
109+
return cap(o.slots) - len(o.slots)
110+
}

pss/outbox/outbox_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package outbox_test
2+
3+
import (
4+
"errors"
5+
"testing"
6+
7+
"github.com/ethersphere/swarm/pss/message"
8+
"github.com/ethersphere/swarm/pss/outbox"
9+
)
10+
11+
func TestOutbox(t *testing.T) {
12+
13+
outboxCapacity := 2
14+
failed := make([]interface{}, 0)
15+
failedC := make(chan struct{})
16+
successC := make(chan struct{})
17+
continueC := make(chan struct{})
18+
19+
forwardFail := false
20+
21+
mockForwardFunction := func(msg *message.Message) error {
22+
if !forwardFail {
23+
successC <- struct{}{}
24+
return nil
25+
} else {
26+
failed = append(failed, msg)
27+
failedC <- struct{}{}
28+
<-continueC
29+
return errors.New("forced test error forwarding message")
30+
}
31+
}
32+
33+
testOutbox := outbox.NewMock(&outbox.Config{
34+
NumberSlots: outboxCapacity,
35+
Forward: mockForwardFunction,
36+
})
37+
38+
testOutbox.Start()
39+
defer testOutbox.Stop()
40+
41+
err := testOutbox.Enqueue(testOutboxMessage)
42+
if err != nil {
43+
t.Fatalf("unexpected error enqueueing, %v", err)
44+
}
45+
46+
//We wait for the forward function to success
47+
<-successC
48+
49+
forwardFail = true
50+
51+
err = testOutbox.Enqueue(testOutboxMessage)
52+
if err != nil {
53+
t.Fatalf("unexpected error enqueueing, %v", err)
54+
}
55+
//We wait for the forward function to fail
56+
<-failedC
57+
58+
failedMessages := len(failed)
59+
if failedMessages != 1 {
60+
t.Fatalf("unexpected number of failed messages, got %v, wanted 1", failedMessages)
61+
}
62+
63+
// The message will be retried once we send to continueC, so first, we change the forward function
64+
forwardFail = false
65+
continueC <- struct{}{}
66+
67+
//We wait for the retry and success
68+
<-successC
69+
}
70+
71+
var testOutboxMessage = outbox.NewOutboxMessage(&message.Message{
72+
To: nil,
73+
Flags: message.Flags{},
74+
Expire: 0,
75+
Topic: message.Topic{},
76+
Payload: nil,
77+
})

pss/outbox/outbox_whitebox_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package outbox
2+
3+
import (
4+
"testing"
5+
6+
"github.com/ethersphere/swarm/pss/message"
7+
)
8+
9+
func TestFullOutbox(t *testing.T) {
10+
11+
outboxCapacity := 2
12+
processC := make(chan struct{})
13+
successForward := func(msg *message.Message) error {
14+
<-processC
15+
return nil
16+
}
17+
18+
testOutbox := NewMock(&Config{
19+
NumberSlots: outboxCapacity,
20+
Forward: successForward,
21+
})
22+
testOutbox.Start()
23+
defer testOutbox.Stop()
24+
25+
err := testOutbox.Enqueue(testOutboxMessage)
26+
if err != nil {
27+
t.Fatalf("unexpected error enqueueing, %v", err)
28+
}
29+
30+
err = testOutbox.Enqueue(testOutboxMessage)
31+
if err != nil {
32+
t.Fatalf("unexpected error enqueueing, %v", err)
33+
}
34+
//As we haven't signaled processC, the messages are still in the outbox
35+
err = testOutbox.Enqueue(testOutboxMessage)
36+
if err != ErrOutboxFull {
37+
t.Fatalf("unexpected error, got %v, wanted %v", err, ErrOutboxFull)
38+
}
39+
processC <- struct{}{}
40+
41+
//There should be a slot again in the outbox to enqueue
42+
<-testOutbox.slots
43+
}
44+
45+
var testOutboxMessage = NewOutboxMessage(&message.Message{
46+
To: nil,
47+
Flags: message.Flags{},
48+
Expire: 0,
49+
Topic: message.Topic{},
50+
Payload: nil,
51+
})

0 commit comments

Comments
 (0)