Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions pss/outbox/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package outbox

import (
"time"

"github.com/ethersphere/swarm/pss/message"
)

type outboxMsg struct {
msg *message.Message
startedAt time.Time
}

func NewOutboxMessage(msg *message.Message) *outboxMsg {
return &outboxMsg{
msg: msg,
startedAt: time.Now(),
}
}
32 changes: 32 additions & 0 deletions pss/outbox/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package outbox

import (
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/pss/message"
)

const (
defaultOutboxCapacity = 100000
)

var mockForwardFunction = func(msg *message.Message) error {
log.Debug("Forwarded message", "msg", msg)
return nil
}

func NewMock(config *Config) (outboxMock *Outbox) {
if config == nil {
config = &Config{
NumberSlots: defaultOutboxCapacity,
Forward: mockForwardFunction,
}
} else {
if config.Forward == nil {
config.Forward = mockForwardFunction
}
if config.NumberSlots == 0 {
config.NumberSlots = defaultOutboxCapacity
}
}
return NewOutbox(config)
}
115 changes: 115 additions & 0 deletions pss/outbox/outbox.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package outbox

import (
"errors"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/pss/message"
)

type Config struct {
NumberSlots int
Forward forwardFunction
}

type Outbox struct {
forwardFunc forwardFunction
queue []*outboxMsg
slots chan int
process chan int
stopC chan struct{}
}

type forwardFunction func(msg *message.Message) error

var ErrOutboxFull = errors.New("outbox full")

func NewOutbox(config *Config) *Outbox {
outbox := &Outbox{
forwardFunc: config.Forward,
queue: make([]*outboxMsg, config.NumberSlots),
slots: make(chan int, config.NumberSlots),
process: make(chan int),
stopC: make(chan struct{}),
}
// fill up outbox slots
for i := 0; i < cap(outbox.slots); i++ {
outbox.slots <- i
}
return outbox
}

func (o *Outbox) Start() {
log.Info("Starting outbox")
go o.processOutbox()
}

func (o *Outbox) Stop() {
log.Info("Stopping outbox")
close(o.stopC)
}

// Enqueue a new element in the outbox if there is any slot available.
// Then send it to process. This method is blocking in the process channel!
func (o *Outbox) Enqueue(outboxMsg *outboxMsg) error {
// first we try to obtain a slot in the outbox
select {
case slot := <-o.slots:
o.queue[slot] = outboxMsg
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len()))
// we send this message slot to process
select {
case <-o.stopC:
case o.process <- slot:
}
return nil
default:
metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
return ErrOutboxFull
}
}

//ProcessOutbox starts a routine that tries to forward messages present in the outbox queue
func (o *Outbox) processOutbox() {
for {
select {
case <-o.stopC:
return
case slot := <-o.process:
go func(slot int) {
msg := o.queue[slot]
metrics.GetOrRegisterResettingTimer("pss.handle.outbox", nil).UpdateSince(msg.startedAt)
if err := o.forwardFunc(msg.msg); err != nil {
metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
log.Debug(err.Error())
// requeue the message for processing
o.requeue(slot)
log.Debug("Message requeued", "slot", slot)
return
}
//message processed, free the outbox slot
o.free(slot)
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len()))
}(slot)
}
}
}

func (o *Outbox) free(slot int) {
select {
case <-o.stopC:
case o.slots <- slot:
}

}

func (o *Outbox) requeue(slot int) {
select {
case <-o.stopC:
case o.process <- slot:
}
}
func (o *Outbox) len() int {
return cap(o.slots) - len(o.slots)
}
83 changes: 83 additions & 0 deletions pss/outbox/outbox_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package outbox_test

import (
"errors"
"testing"
"time"

"github.com/ethersphere/swarm/pss/message"
"github.com/ethersphere/swarm/pss/outbox"
)

const timeout = 2 * time.Second

//Tests successful and failed forwarding. Failure to forward should requeue the failed message
func TestOutbox(t *testing.T) {

outboxCapacity := 2
failedC := make(chan struct{})
successC := make(chan struct{})
continueC := make(chan struct{})

forwardFail := false

mockForwardFunction := func(msg *message.Message) error {
if !forwardFail {
successC <- struct{}{}
return nil
} else {
failedC <- struct{}{}
<-continueC
return errors.New("forced test error forwarding message")
}
}

testOutbox := outbox.NewMock(&outbox.Config{
NumberSlots: outboxCapacity,
Forward: mockForwardFunction,
})

testOutbox.Start()
defer testOutbox.Stop()

err := testOutbox.Enqueue(testOutboxMessage)
if err != nil {
t.Fatalf("unexpected error enqueueing, %v", err)
}

//We wait for the forward function to success
<-successC

forwardFail = true

err = testOutbox.Enqueue(testOutboxMessage)
if err != nil {
t.Fatalf("unexpected error enqueueing, %v", err)
}
//We wait for the forward function to fail
select {
case <-failedC:
case <-time.After(timeout):
t.Fatalf("timeout waiting for failedC")

}

// The message will be retried once we send to continueC, so first, we change the forward function
forwardFail = false
continueC <- struct{}{}

//We wait for the retry and success
select {
case <-successC:
case <-time.After(timeout):
t.Fatalf("timeout waiting for successC")
}
}

var testOutboxMessage = outbox.NewOutboxMessage(&message.Message{
To: nil,
Flags: message.Flags{},
Expire: 0,
Topic: message.Topic{},
Payload: nil,
})
59 changes: 59 additions & 0 deletions pss/outbox/outbox_whitebox_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package outbox

import (
"testing"
"time"

"github.com/ethersphere/swarm/pss/message"
)

const timeout = 2 * time.Second

//Tests that a slot in the outbox is not freed until a message is successfully forwarded
func TestFullOutbox(t *testing.T) {

outboxCapacity := 2
processC := make(chan struct{})
successForward := func(msg *message.Message) error {
<-processC
return nil
}

testOutbox := NewMock(&Config{
NumberSlots: outboxCapacity,
Forward: successForward,
})
testOutbox.Start()
defer testOutbox.Stop()

err := testOutbox.Enqueue(testOutboxMessage)
if err != nil {
t.Fatalf("unexpected error enqueueing, %v", err)
}

err = testOutbox.Enqueue(testOutboxMessage)
if err != nil {
t.Fatalf("unexpected error enqueueing, %v", err)
}
//As we haven't signaled processC, the messages are still in the outbox
err = testOutbox.Enqueue(testOutboxMessage)
if err != ErrOutboxFull {
t.Fatalf("unexpected error type, got %v, wanted %v", err, ErrOutboxFull)
}
processC <- struct{}{}

//There should be a slot in the outbox to enqueue
select {
case <-testOutbox.slots:
case <-time.After(timeout):
t.Fatalf("timeout waiting for a free slot")
}
}

var testOutboxMessage = NewOutboxMessage(&message.Message{
To: nil,
Flags: message.Flags{},
Expire: 0,
Topic: message.Topic{},
Payload: nil,
})
Loading