Skip to content

Commit f018835

Browse files
committed
pss: refactored outbox into own package, touched pss_tests and pss to accommodate changes
1 parent 8cf65b0 commit f018835

File tree

6 files changed

+304
-223
lines changed

6 files changed

+304
-223
lines changed

pss/outbox/message.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package outbox
2+
3+
import (
4+
"time"
5+
)
6+
7+
type outboxMsg struct {
8+
msg interface{}
9+
startedAt time.Time
10+
}
11+
12+
func NewOutboxMessage(msg interface{}) *outboxMsg {
13+
return &outboxMsg{
14+
msg: msg,
15+
startedAt: time.Now(),
16+
}
17+
}

pss/outbox/mock.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package outbox
2+
3+
import (
4+
"github.com/ethersphere/swarm/log"
5+
)
6+
7+
func NewOutboxMock(config *Config) (outbox *outbox) {
8+
if config.Forward == nil {
9+
config.Forward = func(msg interface{}) error {
10+
log.Debug("Forwarded message", "msg", msg)
11+
return nil
12+
}
13+
}
14+
15+
if &config.NumberSlots == nil {
16+
config.NumberSlots = defaultOutboxCapacity
17+
}
18+
if config.QuitC == nil {
19+
config.QuitC = make(chan struct{})
20+
}
21+
return NewOutbox(config)
22+
}

pss/outbox/outbox.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package outbox
2+
3+
import (
4+
"errors"
5+
"github.com/ethereum/go-ethereum/metrics"
6+
"github.com/ethersphere/swarm/log"
7+
)
8+
9+
const (
10+
defaultOutboxCapacity = 100000
11+
)
12+
13+
type Outbox interface {
14+
Enqueue(outboxmsg *outboxMsg) error
15+
ProcessOutbox()
16+
SetForwardFunction(forwardFunc func(msg interface{}) error) //Eventually this will not be needed, when pss has a newMock that creates a mock outbox, this is used just for a test outside of outbox package
17+
}
18+
19+
type Config struct {
20+
NumberSlots int
21+
QuitC chan struct{}
22+
Forward func(msg interface{}) error
23+
}
24+
25+
type outbox struct {
26+
Config
27+
queue []*outboxMsg
28+
slots chan int
29+
process chan int
30+
}
31+
32+
func NewOutbox(config *Config) *outbox {
33+
outbox := outbox{
34+
Config: *config,
35+
process: make(chan int),
36+
slots: make(chan int, config.NumberSlots),
37+
queue: make([]*outboxMsg, config.NumberSlots),
38+
}
39+
// fill up outbox slots
40+
for i := 0; i < cap(outbox.slots); i++ {
41+
outbox.slots <- i
42+
}
43+
return &outbox
44+
}
45+
46+
// enqueue a new element in the outbox if there is any slot available.
47+
// Then send it to process. This method is blocking in the process channel!
48+
func (o *outbox) Enqueue(outboxmsg *outboxMsg) error {
49+
// first we try to obtain a slot in the outbox
50+
select {
51+
case slot := <-o.slots:
52+
o.queue[slot] = outboxmsg
53+
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len()))
54+
// we send this message slot to process
55+
select {
56+
case o.process <- slot:
57+
case <-o.QuitC:
58+
}
59+
return nil
60+
default:
61+
metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
62+
return errors.New("outbox full")
63+
}
64+
}
65+
66+
func (o *outbox) ProcessOutbox() {
67+
for slot := range o.process {
68+
go func(slot int) {
69+
msg := o.msg(slot)
70+
metrics.GetOrRegisterResettingTimer("pss.handle.outbox", nil).UpdateSince(msg.startedAt)
71+
if err := o.Forward(msg.msg); err != nil {
72+
metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
73+
// if we failed to forward, re-insert message in the queue
74+
log.Debug(err.Error())
75+
// reenqueue the message for processing
76+
o.reenqueue(slot)
77+
log.Debug("Message re-enqued", "slot", slot)
78+
return
79+
}
80+
// free the outbox slot
81+
o.free(slot)
82+
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len()))
83+
}(slot)
84+
}
85+
}
86+
87+
func (o *outbox) SetForwardFunction(forwardFunc func(msg interface{}) error) {
88+
o.Forward = forwardFunc
89+
}
90+
91+
func (o *outbox) msg(slot int) *outboxMsg {
92+
return o.queue[slot]
93+
}
94+
95+
func (o *outbox) free(slot int) {
96+
o.slots <- slot
97+
}
98+
99+
func (o *outbox) reenqueue(slot int) {
100+
select {
101+
case o.process <- slot:
102+
case <-o.QuitC:
103+
}
104+
}
105+
106+
func (o *outbox) len() int {
107+
return cap(o.slots) - len(o.slots)
108+
}

pss/outbox/outbox_test.go

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package outbox
2+
3+
import (
4+
"errors"
5+
"github.com/ethersphere/swarm/log"
6+
"testing"
7+
"time"
8+
)
9+
10+
type testMsg struct {
11+
}
12+
13+
func TestMessageOutbox(t *testing.T) {
14+
15+
outboxCapacity := 2
16+
successC := make(chan struct{})
17+
forward := func(msg interface{}) error {
18+
successC <- struct{}{}
19+
return nil
20+
}
21+
22+
outbox := NewOutboxMock(&Config{
23+
NumberSlots: outboxCapacity,
24+
QuitC: nil,
25+
Forward: forward,
26+
})
27+
28+
outboxMessage := NewOutboxMessage(testMsg{})
29+
30+
go outbox.ProcessOutbox()
31+
32+
err := outbox.Enqueue(outboxMessage)
33+
if err != nil {
34+
t.Fatalf("expected no error, got %v", err)
35+
}
36+
37+
usedSlots := outbox.len()
38+
if usedSlots != 1 {
39+
t.Fatalf("incorrect outbox length. expected 1, got %v", usedSlots)
40+
}
41+
t.Log("Message enqueued", "Outbox len", outbox.len())
42+
43+
select {
44+
case <-successC:
45+
case <-time.After(2 * time.Second):
46+
t.Fatal("timeout waiting for success forward")
47+
}
48+
49+
failed := make([]interface{}, 0)
50+
failedC := make(chan struct{})
51+
continueC := make(chan struct{})
52+
failedForward := func(msg interface{}) error {
53+
failed = append(failed, msg)
54+
failedC <- struct{}{}
55+
<-continueC
56+
return errors.New("forced test error forwarding message")
57+
}
58+
59+
outbox.Forward = failedForward
60+
61+
err = outbox.Enqueue(outboxMessage)
62+
if err != nil {
63+
t.Fatalf("Expected no error enqueing, got %v", err.Error())
64+
}
65+
66+
select {
67+
case <-failedC:
68+
case <-time.After(2 * time.Second):
69+
t.Fatal("timeout waiting for failing forward")
70+
}
71+
72+
if len(failed) == 0 {
73+
t.Fatal("Incorrect number of failed messages, expected 1 got 0")
74+
}
75+
// The message will be retried once we send to continueC, so first, we change the forward function
76+
outbox.Forward = forward
77+
continueC <- struct{}{}
78+
select {
79+
case <-successC:
80+
case <-time.After(2 * time.Second):
81+
t.Fatal("Timeout waiting for second success forward")
82+
}
83+
}
84+
85+
func TestOutboxFull(t *testing.T) {
86+
87+
addr := make([]byte, 32)
88+
addr[0] = 0x01
89+
outboxCapacity := 2
90+
91+
procChan := make(chan struct{})
92+
succesForward := func(msg interface{}) error {
93+
<-procChan
94+
log.Info("Message processed")
95+
return nil
96+
}
97+
98+
outbox := NewOutboxMock(&Config{
99+
NumberSlots: outboxCapacity,
100+
QuitC: nil,
101+
Forward: succesForward,
102+
})
103+
go outbox.ProcessOutbox()
104+
105+
err := outbox.Enqueue(outboxTestMessage())
106+
if err != nil {
107+
t.Fatalf("expected no error enqueing first message, got %v", err)
108+
}
109+
err = outbox.Enqueue(outboxTestMessage())
110+
if err != nil {
111+
t.Fatalf("expected no error enqueing second message, got %v", err)
112+
}
113+
//As we haven't signaled procChan, the messages are still in the outbox
114+
115+
err = outbox.Enqueue(outboxTestMessage())
116+
if err == nil {
117+
t.Fatalf("expected error enqueing third message, instead got nil")
118+
}
119+
procChan <- struct{}{}
120+
//There should be a slot again in the outbox
121+
select {
122+
case <-outbox.slots:
123+
case <-time.After(2 * time.Second):
124+
t.Fatalf("timeout waiting for a free slot")
125+
}
126+
}
127+
128+
func outboxTestMessage() *outboxMsg {
129+
return &outboxMsg{
130+
msg: nil,
131+
startedAt: time.Time{},
132+
}
133+
}

0 commit comments

Comments
 (0)