Skip to content

Commit c174ac1

Browse files
committed
pss/outbox: Removed outbox.msg. Changed how outbox stops
1 parent 1a11e3b commit c174ac1

File tree

3 files changed

+19
-13
lines changed

3 files changed

+19
-13
lines changed

pss/outbox/outbox.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type Outbox struct {
1717
queue []*outboxMsg
1818
slots chan int
1919
process chan int
20+
stopC chan struct{}
2021
}
2122

2223
type forwardFunction func(msg *message.Message) error
@@ -29,6 +30,7 @@ func NewOutbox(config *Config) *Outbox {
2930
queue: make([]*outboxMsg, config.NumberSlots),
3031
slots: make(chan int, config.NumberSlots),
3132
process: make(chan int),
33+
stopC: make(chan struct{}),
3234
}
3335
// fill up outbox slots
3436
for i := 0; i < cap(outbox.slots); i++ {
@@ -44,7 +46,7 @@ func (o *Outbox) Start() {
4446

4547
func (o *Outbox) Stop() {
4648
log.Info("Stopping outbox")
47-
close(o.process)
49+
close(o.stopC)
4850
}
4951

5052
// Enqueue a new element in the outbox if there is any slot available.
@@ -56,7 +58,10 @@ func (o *Outbox) Enqueue(outboxMsg *outboxMsg) error {
5658
o.queue[slot] = outboxMsg
5759
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len()))
5860
// we send this message slot to process
59-
o.process <- slot
61+
select {
62+
case <-o.stopC:
63+
case o.process <- slot:
64+
}
6065
return nil
6166
default:
6267
metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
@@ -68,11 +73,10 @@ func (o *Outbox) Enqueue(outboxMsg *outboxMsg) error {
6873
func (o *Outbox) processOutbox() {
6974
for slot := range o.process {
7075
go func(slot int) {
71-
msg := o.msg(slot)
76+
msg := o.queue[slot]
7277
metrics.GetOrRegisterResettingTimer("pss.handle.outbox", nil).UpdateSince(msg.startedAt)
7378
if err := o.forwardFunc(msg.msg); err != nil {
7479
metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
75-
// if we failed to forward, re-insert message in the queue
7680
log.Debug(err.Error())
7781
// requeue the message for processing
7882
o.requeue(slot)
@@ -86,18 +90,20 @@ func (o *Outbox) processOutbox() {
8690
}
8791
}
8892

89-
func (o *Outbox) msg(slot int) *outboxMsg {
90-
return o.queue[slot]
91-
}
92-
9393
func (o *Outbox) free(slot int) {
94-
o.slots <- slot
94+
select {
95+
case <-o.stopC:
96+
case o.slots <- slot:
97+
}
98+
9599
}
96100

97101
func (o *Outbox) requeue(slot int) {
98-
o.process <- slot
102+
select {
103+
case <-o.stopC:
104+
case o.process <- slot:
105+
}
99106
}
100-
101107
func (o *Outbox) len() int {
102108
return cap(o.slots) - len(o.slots)
103109
}

pss/outbox/outbox_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func TestOutbox(tx *testing.T) {
5959
<-failedC
6060

6161
failedMessages := len(failed)
62-
t.Assert(failedMessages == 1, "incorrect number of failed messages, expected 1 got %v", failedMessages)
62+
t.Equals(failedMessages, 1)
6363

6464
// The message will be retried once we send to continueC, so first, we change the forward function
6565
forwardFail = false

pss/outbox/outbox_whitebox_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func TestFullOutbox(tx *testing.T) {
2020
return nil
2121
}
2222

23-
testOutbox := NewOutbox(&Config{
23+
testOutbox := NewMock(&Config{
2424
NumberSlots: outboxCapacity,
2525
Forward: successForward,
2626
})

0 commit comments

Comments
 (0)