diff --git a/pss/outbox/message.go b/pss/outbox/message.go new file mode 100644 index 0000000000..d790011159 --- /dev/null +++ b/pss/outbox/message.go @@ -0,0 +1,19 @@ +package outbox + +import ( + "time" + + "github.com/ethersphere/swarm/pss/message" +) + +type outboxMsg struct { + msg *message.Message + startedAt time.Time +} + +func NewOutboxMessage(msg *message.Message) *outboxMsg { + return &outboxMsg{ + msg: msg, + startedAt: time.Now(), + } +} diff --git a/pss/outbox/mock.go b/pss/outbox/mock.go new file mode 100644 index 0000000000..dbcca1e23b --- /dev/null +++ b/pss/outbox/mock.go @@ -0,0 +1,32 @@ +package outbox + +import ( + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/pss/message" +) + +const ( + defaultOutboxCapacity = 100000 +) + +var mockForwardFunction = func(msg *message.Message) error { + log.Debug("Forwarded message", "msg", msg) + return nil +} + +func NewMock(config *Config) (outboxMock *Outbox) { + if config == nil { + config = &Config{ + NumberSlots: defaultOutboxCapacity, + Forward: mockForwardFunction, + } + } else { + if config.Forward == nil { + config.Forward = mockForwardFunction + } + if config.NumberSlots == 0 { + config.NumberSlots = defaultOutboxCapacity + } + } + return NewOutbox(config) +} diff --git a/pss/outbox/outbox.go b/pss/outbox/outbox.go new file mode 100644 index 0000000000..133301d3fb --- /dev/null +++ b/pss/outbox/outbox.go @@ -0,0 +1,115 @@ +package outbox + +import ( + "errors" + + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/pss/message" +) + +type Config struct { + NumberSlots int + Forward forwardFunction +} + +type Outbox struct { + forwardFunc forwardFunction + queue []*outboxMsg + slots chan int + process chan int + stopC chan struct{} +} + +type forwardFunction func(msg *message.Message) error + +var ErrOutboxFull = errors.New("outbox full") + +func NewOutbox(config *Config) *Outbox { + outbox := &Outbox{ + forwardFunc: config.Forward, + queue: make([]*outboxMsg, config.NumberSlots), + slots: make(chan int, config.NumberSlots), + process: make(chan int), + stopC: make(chan struct{}), + } + // fill up outbox slots + for i := 0; i < cap(outbox.slots); i++ { + outbox.slots <- i + } + return outbox +} + +func (o *Outbox) Start() { + log.Info("Starting outbox") + go o.processOutbox() +} + +func (o *Outbox) Stop() { + log.Info("Stopping outbox") + close(o.stopC) +} + +// Enqueue a new element in the outbox if there is any slot available. +// Then send it to process. This method is blocking in the process channel! +func (o *Outbox) Enqueue(outboxMsg *outboxMsg) error { + // first we try to obtain a slot in the outbox + select { + case slot := <-o.slots: + o.queue[slot] = outboxMsg + metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len())) + // we send this message slot to process + select { + case <-o.stopC: + case o.process <- slot: + } + return nil + default: + metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1) + return ErrOutboxFull + } +} + +//ProcessOutbox starts a routine that tries to forward messages present in the outbox queue +func (o *Outbox) processOutbox() { + for { + select { + case <-o.stopC: + return + case slot := <-o.process: + go func(slot int) { + msg := o.queue[slot] + metrics.GetOrRegisterResettingTimer("pss.handle.outbox", nil).UpdateSince(msg.startedAt) + if err := o.forwardFunc(msg.msg); err != nil { + metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1) + log.Debug(err.Error()) + // requeue the message for processing + o.requeue(slot) + log.Debug("Message requeued", "slot", slot) + return + } + //message processed, free the outbox slot + o.free(slot) + metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len())) + }(slot) + } + } +} + +func (o *Outbox) free(slot int) { + select { + case <-o.stopC: + case o.slots <- slot: + } + +} + +func (o *Outbox) requeue(slot int) { + select { + case <-o.stopC: + case o.process <- slot: + } +} +func (o *Outbox) len() int { + return cap(o.slots) - len(o.slots) +} diff --git a/pss/outbox/outbox_test.go b/pss/outbox/outbox_test.go new file mode 100644 index 0000000000..e45757c200 --- /dev/null +++ b/pss/outbox/outbox_test.go @@ -0,0 +1,83 @@ +package outbox_test + +import ( + "errors" + "testing" + "time" + + "github.com/ethersphere/swarm/pss/message" + "github.com/ethersphere/swarm/pss/outbox" +) + +const timeout = 2 * time.Second + +//Tests successful and failed forwarding. Failure to forward should requeue the failed message +func TestOutbox(t *testing.T) { + + outboxCapacity := 2 + failedC := make(chan struct{}) + successC := make(chan struct{}) + continueC := make(chan struct{}) + + forwardFail := false + + mockForwardFunction := func(msg *message.Message) error { + if !forwardFail { + successC <- struct{}{} + return nil + } else { + failedC <- struct{}{} + <-continueC + return errors.New("forced test error forwarding message") + } + } + + testOutbox := outbox.NewMock(&outbox.Config{ + NumberSlots: outboxCapacity, + Forward: mockForwardFunction, + }) + + testOutbox.Start() + defer testOutbox.Stop() + + err := testOutbox.Enqueue(testOutboxMessage) + if err != nil { + t.Fatalf("unexpected error enqueueing, %v", err) + } + + //We wait for the forward function to success + <-successC + + forwardFail = true + + err = testOutbox.Enqueue(testOutboxMessage) + if err != nil { + t.Fatalf("unexpected error enqueueing, %v", err) + } + //We wait for the forward function to fail + select { + case <-failedC: + case <-time.After(timeout): + t.Fatalf("timeout waiting for failedC") + + } + + // The message will be retried once we send to continueC, so first, we change the forward function + forwardFail = false + continueC <- struct{}{} + + //We wait for the retry and success + select { + case <-successC: + case <-time.After(timeout): + t.Fatalf("timeout waiting for successC") + } +} + +var testOutboxMessage = outbox.NewOutboxMessage(&message.Message{ + To: nil, + Flags: message.Flags{}, + Expire: 0, + Topic: message.Topic{}, + Payload: nil, +}) diff --git a/pss/outbox/outbox_whitebox_test.go b/pss/outbox/outbox_whitebox_test.go new file mode 100644 index 0000000000..0783a56d44 --- /dev/null +++ b/pss/outbox/outbox_whitebox_test.go @@ -0,0 +1,59 @@ +package outbox + +import ( + "testing" + "time" + + "github.com/ethersphere/swarm/pss/message" +) + +const timeout = 2 * time.Second + +//Tests that a slot in the outbox is not freed until a message is successfully forwarded +func TestFullOutbox(t *testing.T) { + + outboxCapacity := 2 + processC := make(chan struct{}) + successForward := func(msg *message.Message) error { + <-processC + return nil + } + + testOutbox := NewMock(&Config{ + NumberSlots: outboxCapacity, + Forward: successForward, + }) + testOutbox.Start() + defer testOutbox.Stop() + + err := testOutbox.Enqueue(testOutboxMessage) + if err != nil { + t.Fatalf("unexpected error enqueueing, %v", err) + } + + err = testOutbox.Enqueue(testOutboxMessage) + if err != nil { + t.Fatalf("unexpected error enqueueing, %v", err) + } + //As we haven't signaled processC, the messages are still in the outbox + err = testOutbox.Enqueue(testOutboxMessage) + if err != ErrOutboxFull { + t.Fatalf("unexpected error type, got %v, wanted %v", err, ErrOutboxFull) + } + processC <- struct{}{} + + //There should be a slot in the outbox to enqueue + select { + case <-testOutbox.slots: + case <-time.After(timeout): + t.Fatalf("timeout waiting for a free slot") + } +} + +var testOutboxMessage = NewOutboxMessage(&message.Message{ + To: nil, + Flags: message.Flags{}, + Expire: 0, + Topic: message.Topic{}, + Payload: nil, +}) diff --git a/pss/pss.go b/pss/pss.go index 8bbe1b0de6..c8d728b5df 100644 --- a/pss/pss.go +++ b/pss/pss.go @@ -40,6 +40,7 @@ import ( "github.com/ethersphere/swarm/pss/internal/ticker" "github.com/ethersphere/swarm/pss/internal/ttlset" "github.com/ethersphere/swarm/pss/message" + "github.com/ethersphere/swarm/pss/outbox" "github.com/tilinna/clock" ) @@ -113,97 +114,6 @@ func (params *Params) WithPrivateKey(privatekey *ecdsa.PrivateKey) *Params { return params } -type outbox struct { - queue []*outboxMsg - slots chan int - process chan int - quitC chan struct{} - forward func(msg *message.Message) error -} - -func newOutbox(capacity int, quitC chan struct{}, forward func(msg *message.Message) error) outbox { - outbox := outbox{ - queue: make([]*outboxMsg, capacity), - slots: make(chan int, capacity), - process: make(chan int), - quitC: quitC, - forward: forward, - } - // fill up outbox slots - for i := 0; i < cap(outbox.slots); i++ { - outbox.slots <- i - } - return outbox -} - -func (o outbox) len() int { - return cap(o.slots) - len(o.slots) -} - -// enqueue a new element in the outbox if there is any slot available. -// Then send it to process. This method is blocking in the process channel! -func (o *outbox) enqueue(outboxmsg *outboxMsg) error { - // first we try to obtain a slot in the outbox - select { - case slot := <-o.slots: - o.queue[slot] = outboxmsg - metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len())) - // we send this message slot to process - select { - case o.process <- slot: - case <-o.quitC: - } - return nil - default: - metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1) - return errors.New("outbox full") - } -} - -func (o *outbox) processOutbox() { - for { - select { - case slot := <-o.process: - go func(slot int) { - msg := o.msg(slot) - metrics.GetOrRegisterResettingTimer("pss.handle.outbox", nil).UpdateSince(msg.startedAt) - if err := o.forward(msg.msg); err != nil { - metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1) - // if we failed to forward, re-insert message in the queue - log.Warn(err.Error()) - // reenqueue the message for processing - o.reenqueue(slot) - return - } - // free the outbox slot - o.free(slot) - metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len())) - }(slot) - case <-o.quitC: - return - } - } -} - -func (o outbox) msg(slot int) *outboxMsg { - return o.queue[slot] -} - -func (o outbox) free(slot int) { - select { - case o.slots <- slot: - case <-o.quitC: - } -} - -func (o outbox) reenqueue(slot int) { - select { - case o.process <- slot: - case <-o.quitC: - } - -} - // Pss is the top-level struct, which takes care of message sending, receiving, decryption and encryption, message handler dispatchers // and message forwarding. Implements node.Service type Pss struct { @@ -221,7 +131,7 @@ type Pss struct { msgTTL time.Duration capstring string - outbox outbox + outbox *outbox.Outbox // message handling handlers map[message.Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle() @@ -278,7 +188,10 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) { metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1) }, }) - ps.outbox = newOutbox(defaultOutboxCapacity, ps.quitC, ps.forward) + ps.outbox = outbox.NewOutbox(&outbox.Config{ + NumberSlots: defaultOutboxCapacity, + Forward: ps.forward, + }) cp := capability.NewCapability(CapabilityID, 8) cp.Set(capabilitiesSend) @@ -312,7 +225,7 @@ func (p *Pss) Start(srv *p2p.Server) error { }() // Forward outbox messages - go p.outbox.processOutbox() + p.outbox.Start() log.Info("Started Pss") log.Info("Loaded EC keys", "pubkey", hex.EncodeToString(p.Crypto.SerializePublicKey(p.PublicKey())), "secp256", hex.EncodeToString(p.Crypto.CompressPublicKey(p.PublicKey()))) @@ -325,6 +238,7 @@ func (p *Pss) Stop() error { return err } close(p.quitC) + p.outbox.Stop() return nil } @@ -650,9 +564,8 @@ func (p *Pss) isSelfPossibleRecipient(msg *message.Message, prox bool) bool { func (p *Pss) enqueue(msg *message.Message) error { defer metrics.GetOrRegisterResettingTimer("pss.enqueue", nil).UpdateSince(time.Now()) - outboxmsg := newOutboxMsg(msg) - - return p.outbox.enqueue(outboxmsg) + outboxMsg := outbox.NewOutboxMessage(msg) + return p.outbox.Enqueue(outboxMsg) } // Send a raw message (any encryption is responsibility of calling client) diff --git a/pss/pss_test.go b/pss/pss_test.go index 0e1ed892a0..31a3de808b 100644 --- a/pss/pss_test.go +++ b/pss/pss_test.go @@ -21,7 +21,6 @@ import ( "context" "crypto/ecdsa" "encoding/binary" - "errors" "fmt" "math/rand" "strconv" @@ -172,16 +171,14 @@ func TestAddressMatchProx(t *testing.T) { privKey, err := ethCrypto.GenerateKey() pssp := NewParams().WithPrivateKey(privKey) ps, err := New(kad, pssp) - // enqueue method now is blocking, so we need always somebody processing the outbox - go func() { - for slot := range ps.outbox.process { - ps.outbox.free(slot) - } - }() if err != nil { t.Fatal(err.Error()) } + //Forwarding will fail. Since in this test we are not relying on forwarding, but just handling, the test is valid + ps.outbox.Start() + defer ps.outbox.Stop() + // create kademlia peers, so we have peers both inside and outside minproxlimit var peers []*network.Peer for i := 0; i < peerCount; i++ { @@ -355,127 +352,6 @@ func TestAddressMatchProx(t *testing.T) { } } -func TestMessageOutbox(t *testing.T) { - // setup - privkey, err := ethCrypto.GenerateKey() - if err != nil { - t.Fatal(err.Error()) - } - - addr := make([]byte, 32) - addr[0] = 0x01 - ps := newTestPssStart(privkey, network.NewKademlia(addr, network.NewKadParams()), NewParams(), false) - outboxCapacity := 2 - - successC := make(chan struct{}) - forward := func(msg *message.Message) error { - successC <- struct{}{} - return nil - } - ps.outbox = newOutbox(outboxCapacity, ps.quitC, forward) - - ps.Start(nil) - defer ps.Stop() - - err = ps.enqueue(testRandomMessage()) - if err != nil { - t.Fatalf("expected no error, got %v", err) - } - usedSlots := ps.outbox.len() - if usedSlots != 1 { - t.Fatalf("incorrect outbox length. expected 1, got %v", usedSlots) - } - t.Log("Message enqueued", "Outbox len", ps.outbox.len()) - - select { - case <-successC: - case <-time.After(2 * time.Second): - t.Fatal("timeout waiting for success forward") - } - - failed := make([]*message.Message, 0) - failedC := make(chan struct{}) - continueC := make(chan struct{}) - failedForward := func(msg *message.Message) error { - failed = append(failed, msg) - failedC <- struct{}{} - <-continueC - return errors.New("Forced test error forwarding message") - } - - ps.outbox.forward = failedForward - - err = ps.enqueue(testRandomMessage()) - if err != nil { - t.Fatalf("Expected no error enqueing, got %v", err.Error()) - } - - select { - case <-failedC: - case <-time.After(2 * time.Second): - t.Fatal("timeout waiting for failing forward") - } - - if len(failed) == 0 { - t.Fatal("Incorrect number of failed messages, expected 1 got 0") - } - // The message will be retried once we send to continueC, so first, we change the forward function - ps.outbox.forward = forward - continueC <- struct{}{} - select { - case <-successC: - case <-time.After(2 * time.Second): - t.Fatal("timeout waiting for second success forward") - } - -} - -func TestOutboxFull(t *testing.T) { - // setup - privkey, err := ethCrypto.GenerateKey() - if err != nil { - t.Fatal(err.Error()) - } - - addr := make([]byte, 32) - addr[0] = 0x01 - ps := newTestPssStart(privkey, network.NewKademlia(addr, network.NewKadParams()), NewParams(), false) - defer ps.Stop() - outboxCapacity := 2 - - procChan := make(chan struct{}) - succesForward := func(msg *message.Message) error { - <-procChan - log.Info("Message processed") - return nil - } - ps.outbox = newOutbox(outboxCapacity, ps.quitC, succesForward) - - ps.Start(nil) - - err = ps.enqueue(testRandomMessage()) - if err != nil { - t.Fatalf("expected no error enqueing first message, got %v", err) - } - err = ps.enqueue(testRandomMessage()) - if err != nil { - t.Fatalf("expected no error enqueing second message, got %v", err) - } - //As we haven't signaled procChan, the messages are still in the outbox - - err = ps.enqueue(testRandomMessage()) - if err == nil { - t.Fatalf("expected error enqueing third message, instead got nil") - } - procChan <- struct{}{} - //There should be a slot again in the outbox - select { - case <-ps.outbox.slots: - case <-time.After(2 * time.Second): - t.Fatalf("timeout waiting for a free slot") - } -} - // set and generate pubkeys and symkeys func TestKeys(t *testing.T) { // make our key and init pss with it diff --git a/pss/types.go b/pss/types.go index 964b227ac6..b81d6a1bea 100644 --- a/pss/types.go +++ b/pss/types.go @@ -18,7 +18,6 @@ package pss import ( "encoding/json" - "time" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/p2p" @@ -49,18 +48,6 @@ func (a *PssAddress) UnmarshalJSON(input []byte) error { return nil } -type outboxMsg struct { - msg *message.Message - startedAt time.Time -} - -func newOutboxMsg(msg *message.Message) *outboxMsg { - return &outboxMsg{ - msg: msg, - startedAt: time.Now(), - } -} - // Signature for a message handler function for a Message // Implementations of this type are passed to Pss.Register together with a topic, type HandlerFunc func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error