Skip to content

Commit d0ec26d

Browse files
committed
pss: Made some changes due to rebase
1 parent f018835 commit d0ec26d

File tree

6 files changed

+89
-81
lines changed

6 files changed

+89
-81
lines changed

pss/outbox/message.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package outbox
22

33
import (
4+
"github.com/ethersphere/swarm/pss/message"
45
"time"
56
)
67

78
type outboxMsg struct {
8-
msg interface{}
9+
msg *message.Message
910
startedAt time.Time
1011
}
1112

12-
func NewOutboxMessage(msg interface{}) *outboxMsg {
13+
func NewOutboxMessage(msg *message.Message) *outboxMsg {
1314
return &outboxMsg{
1415
msg: msg,
1516
startedAt: time.Now(),

pss/outbox/mock.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,35 @@ package outbox
22

33
import (
44
"github.com/ethersphere/swarm/log"
5+
"github.com/ethersphere/swarm/pss/message"
56
)
67

8+
const (
9+
defaultOutboxCapacity = 100000
10+
)
11+
12+
var mockForwardFunction = func(msg *message.Message) error {
13+
log.Debug("Forwarded message", "msg", msg)
14+
return nil
15+
}
16+
717
func NewOutboxMock(config *Config) (outbox *outbox) {
8-
if config.Forward == nil {
9-
config.Forward = func(msg interface{}) error {
10-
log.Debug("Forwarded message", "msg", msg)
11-
return nil
18+
if config == nil {
19+
config = &Config{
20+
NumberSlots: defaultOutboxCapacity,
21+
QuitC: make(chan struct{}),
22+
Forward: mockForwardFunction,
23+
}
24+
} else {
25+
if config.Forward == nil {
26+
config.Forward = mockForwardFunction
27+
}
28+
if &config.NumberSlots == nil {
29+
config.NumberSlots = defaultOutboxCapacity
30+
}
31+
if config.QuitC == nil {
32+
config.QuitC = make(chan struct{})
1233
}
13-
}
14-
15-
if &config.NumberSlots == nil {
16-
config.NumberSlots = defaultOutboxCapacity
17-
}
18-
if config.QuitC == nil {
19-
config.QuitC = make(chan struct{})
2034
}
2135
return NewOutbox(config)
2236
}

pss/outbox/outbox.go

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,37 +4,38 @@ import (
44
"errors"
55
"github.com/ethereum/go-ethereum/metrics"
66
"github.com/ethersphere/swarm/log"
7-
)
8-
9-
const (
10-
defaultOutboxCapacity = 100000
7+
"github.com/ethersphere/swarm/pss/message"
118
)
129

1310
type Outbox interface {
14-
Enqueue(outboxmsg *outboxMsg) error
11+
Enqueue(outboxMsg *outboxMsg) error
1512
ProcessOutbox()
16-
SetForwardFunction(forwardFunc func(msg interface{}) error) //Eventually this will not be needed, when pss has a newMock that creates a mock outbox, this is used just for a test outside of outbox package
13+
SetForwardFunction(forwardFunc forwardFunction) // Needed right now to mock forwardFunc in one pss_tests test
1714
}
1815

1916
type Config struct {
2017
NumberSlots int
2118
QuitC chan struct{}
22-
Forward func(msg interface{}) error
19+
Forward forwardFunction
2320
}
2421

2522
type outbox struct {
26-
Config
27-
queue []*outboxMsg
28-
slots chan int
29-
process chan int
23+
forwardFunc forwardFunction
24+
queue []*outboxMsg
25+
slots chan int
26+
process chan int
27+
quitC chan struct{}
3028
}
3129

30+
type forwardFunction func(msg *message.Message) error
31+
3232
func NewOutbox(config *Config) *outbox {
3333
outbox := outbox{
34-
Config: *config,
35-
process: make(chan int),
36-
slots: make(chan int, config.NumberSlots),
37-
queue: make([]*outboxMsg, config.NumberSlots),
34+
forwardFunc: config.Forward,
35+
queue: make([]*outboxMsg, config.NumberSlots),
36+
slots: make(chan int, config.NumberSlots),
37+
process: make(chan int),
38+
quitC: config.QuitC,
3839
}
3940
// fill up outbox slots
4041
for i := 0; i < cap(outbox.slots); i++ {
@@ -43,18 +44,18 @@ func NewOutbox(config *Config) *outbox {
4344
return &outbox
4445
}
4546

46-
// enqueue a new element in the outbox if there is any slot available.
47+
// Enqueue a new element in the outbox if there is any slot available.
4748
// Then send it to process. This method is blocking in the process channel!
48-
func (o *outbox) Enqueue(outboxmsg *outboxMsg) error {
49+
func (o *outbox) Enqueue(outboxMsg *outboxMsg) error {
4950
// first we try to obtain a slot in the outbox
5051
select {
5152
case slot := <-o.slots:
52-
o.queue[slot] = outboxmsg
53+
o.queue[slot] = outboxMsg
5354
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len()))
5455
// we send this message slot to process
5556
select {
5657
case o.process <- slot:
57-
case <-o.QuitC:
58+
case <-o.quitC:
5859
}
5960
return nil
6061
default:
@@ -63,29 +64,29 @@ func (o *outbox) Enqueue(outboxmsg *outboxMsg) error {
6364
}
6465
}
6566

67+
//ProcessOutbox starts a routine that tries to forward messages present in the outbox queue
6668
func (o *outbox) ProcessOutbox() {
6769
for slot := range o.process {
6870
go func(slot int) {
6971
msg := o.msg(slot)
7072
metrics.GetOrRegisterResettingTimer("pss.handle.outbox", nil).UpdateSince(msg.startedAt)
71-
if err := o.Forward(msg.msg); err != nil {
73+
if err := o.forwardFunc(msg.msg); err != nil {
7274
metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
7375
// if we failed to forward, re-insert message in the queue
7476
log.Debug(err.Error())
75-
// reenqueue the message for processing
76-
o.reenqueue(slot)
77-
log.Debug("Message re-enqued", "slot", slot)
77+
// requeue the message for processing
78+
o.requeue(slot)
79+
log.Debug("Message requeued", "slot", slot)
7880
return
7981
}
80-
// free the outbox slot
82+
//message processed, free the outbox slot
8183
o.free(slot)
8284
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len()))
8385
}(slot)
8486
}
8587
}
86-
87-
func (o *outbox) SetForwardFunction(forwardFunc func(msg interface{}) error) {
88-
o.Forward = forwardFunc
88+
func (o *outbox) SetForwardFunction(forwardFunc forwardFunction) {
89+
o.forwardFunc = forwardFunc
8990
}
9091

9192
func (o *outbox) msg(slot int) *outboxMsg {
@@ -96,10 +97,10 @@ func (o *outbox) free(slot int) {
9697
o.slots <- slot
9798
}
9899

99-
func (o *outbox) reenqueue(slot int) {
100+
func (o *outbox) requeue(slot int) {
100101
select {
101102
case o.process <- slot:
102-
case <-o.QuitC:
103+
case <-o.quitC:
103104
}
104105
}
105106

pss/outbox/outbox_test.go

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,43 +2,37 @@ package outbox
22

33
import (
44
"errors"
5-
"github.com/ethersphere/swarm/log"
5+
"github.com/ethersphere/swarm/pss/message"
66
"testing"
77
"time"
88
)
99

10-
type testMsg struct {
11-
}
12-
1310
func TestMessageOutbox(t *testing.T) {
1411

1512
outboxCapacity := 2
1613
successC := make(chan struct{})
17-
forward := func(msg interface{}) error {
14+
mockForwardFunction := func(msg *message.Message) error {
1815
successC <- struct{}{}
1916
return nil
2017
}
2118

22-
outbox := NewOutboxMock(&Config{
19+
testOutbox := NewOutboxMock(&Config{
2320
NumberSlots: outboxCapacity,
2421
QuitC: nil,
25-
Forward: forward,
22+
Forward: mockForwardFunction,
2623
})
2724

28-
outboxMessage := NewOutboxMessage(testMsg{})
29-
30-
go outbox.ProcessOutbox()
25+
go testOutbox.ProcessOutbox()
3126

32-
err := outbox.Enqueue(outboxMessage)
27+
err := testOutbox.Enqueue(testOutboxMessage)
3328
if err != nil {
3429
t.Fatalf("expected no error, got %v", err)
3530
}
3631

37-
usedSlots := outbox.len()
32+
usedSlots := testOutbox.len()
3833
if usedSlots != 1 {
3934
t.Fatalf("incorrect outbox length. expected 1, got %v", usedSlots)
4035
}
41-
t.Log("Message enqueued", "Outbox len", outbox.len())
4236

4337
select {
4438
case <-successC:
@@ -49,16 +43,16 @@ func TestMessageOutbox(t *testing.T) {
4943
failed := make([]interface{}, 0)
5044
failedC := make(chan struct{})
5145
continueC := make(chan struct{})
52-
failedForward := func(msg interface{}) error {
46+
failedForward := func(msg *message.Message) error {
5347
failed = append(failed, msg)
5448
failedC <- struct{}{}
5549
<-continueC
5650
return errors.New("forced test error forwarding message")
5751
}
5852

59-
outbox.Forward = failedForward
53+
testOutbox.forwardFunc = failedForward
6054

61-
err = outbox.Enqueue(outboxMessage)
55+
err = testOutbox.Enqueue(testOutboxMessage)
6256
if err != nil {
6357
t.Fatalf("Expected no error enqueing, got %v", err.Error())
6458
}
@@ -73,7 +67,7 @@ func TestMessageOutbox(t *testing.T) {
7367
t.Fatal("Incorrect number of failed messages, expected 1 got 0")
7468
}
7569
// The message will be retried once we send to continueC, so first, we change the forward function
76-
outbox.Forward = forward
70+
testOutbox.forwardFunc = mockForwardFunction
7771
continueC <- struct{}{}
7872
select {
7973
case <-successC:
@@ -84,50 +78,48 @@ func TestMessageOutbox(t *testing.T) {
8478

8579
func TestOutboxFull(t *testing.T) {
8680

87-
addr := make([]byte, 32)
88-
addr[0] = 0x01
8981
outboxCapacity := 2
90-
9182
procChan := make(chan struct{})
92-
succesForward := func(msg interface{}) error {
83+
successForward := func(msg *message.Message) error {
9384
<-procChan
94-
log.Info("Message processed")
9585
return nil
9686
}
9787

98-
outbox := NewOutboxMock(&Config{
88+
testOutbox := NewOutboxMock(&Config{
9989
NumberSlots: outboxCapacity,
10090
QuitC: nil,
101-
Forward: succesForward,
91+
Forward: successForward,
10292
})
103-
go outbox.ProcessOutbox()
10493

105-
err := outbox.Enqueue(outboxTestMessage())
94+
go testOutbox.ProcessOutbox()
95+
96+
err := testOutbox.Enqueue(testOutboxMessage)
10697
if err != nil {
10798
t.Fatalf("expected no error enqueing first message, got %v", err)
10899
}
109-
err = outbox.Enqueue(outboxTestMessage())
100+
err = testOutbox.Enqueue(testOutboxMessage)
110101
if err != nil {
111102
t.Fatalf("expected no error enqueing second message, got %v", err)
112103
}
113-
//As we haven't signaled procChan, the messages are still in the outbox
114104

115-
err = outbox.Enqueue(outboxTestMessage())
105+
//As we haven't signaled procChan, the messages are still in the outbox
106+
err = testOutbox.Enqueue(testOutboxMessage)
116107
if err == nil {
117108
t.Fatalf("expected error enqueing third message, instead got nil")
118109
}
119110
procChan <- struct{}{}
120111
//There should be a slot again in the outbox
121112
select {
122-
case <-outbox.slots:
113+
case <-testOutbox.slots:
123114
case <-time.After(2 * time.Second):
124115
t.Fatalf("timeout waiting for a free slot")
125116
}
126117
}
127118

128-
func outboxTestMessage() *outboxMsg {
129-
return &outboxMsg{
130-
msg: nil,
131-
startedAt: time.Time{},
132-
}
133-
}
119+
var testOutboxMessage = NewOutboxMessage(&message.Message{
120+
To: nil,
121+
Flags: message.Flags{},
122+
Expire: 0,
123+
Topic: message.Topic{},
124+
Payload: nil,
125+
})

pss/pss.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -540,8 +540,8 @@ func (p *Pss) isSelfPossibleRecipient(msg *message.Message, prox bool) bool {
540540
func (p *Pss) enqueue(msg *message.Message) error {
541541
defer metrics.GetOrRegisterResettingTimer("pss.enqueue", nil).UpdateSince(time.Now())
542542

543-
outboxmsg := outbox.NewOutboxMessage(msg)
544-
return p.outbox.Enqueue(outboxmsg)
543+
outboxMsg := outbox.NewOutboxMessage(msg)
544+
return p.outbox.Enqueue(outboxMsg)
545545
}
546546

547547
// Send a raw message (any encryption is responsibility of calling client)

pss/pss_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ func TestAddressMatchProx(t *testing.T) {
278278
}
279279
// enqueue method now is blocking, so we need always somebody processing the outbox
280280
mockForwardFunc :=
281-
func(msg interface{}) error {
281+
func(msg *message.Message) error {
282282
return nil
283283
}
284284

0 commit comments

Comments
 (0)