@@ -25,7 +25,8 @@ type Config struct {
25
25
}
26
26
27
27
type outbox struct {
28
- Config
28
+ forward forwardFunc
29
+ quitC chan struct {}
29
30
queue []* outboxMsg
30
31
slots chan int
31
32
process chan int
@@ -34,7 +35,8 @@ type forwardFunc func(msg interface{}) error
34
35
35
36
func NewOutbox (config * Config ) * outbox {
36
37
outbox := outbox {
37
- Config : * config ,
38
+ forward : config .Forward ,
39
+ quitC : config .QuitC ,
38
40
process : make (chan int ),
39
41
slots : make (chan int , config .NumberSlots ),
40
42
queue : make ([]* outboxMsg , config .NumberSlots ),
@@ -57,7 +59,7 @@ func (o *outbox) Enqueue(outboxmsg *outboxMsg) error {
57
59
// we send this message slot to process
58
60
select {
59
61
case o .process <- slot :
60
- case <- o .QuitC :
62
+ case <- o .quitC :
61
63
}
62
64
return nil
63
65
default :
@@ -71,7 +73,7 @@ func (o *outbox) ProcessOutbox() {
71
73
go func (slot int ) {
72
74
msg := o .msg (slot )
73
75
metrics .GetOrRegisterResettingTimer ("pss.handle.outbox" , nil ).UpdateSince (msg .startedAt )
74
- if err := o .Forward (msg .msg ); err != nil {
76
+ if err := o .forward (msg .msg ); err != nil {
75
77
metrics .GetOrRegisterCounter ("pss.forward.err" , nil ).Inc (1 )
76
78
// if we failed to forward, re-insert message in the queue
77
79
log .Debug (err .Error ())
@@ -88,7 +90,7 @@ func (o *outbox) ProcessOutbox() {
88
90
}
89
91
90
92
func (o * outbox ) SetForwardFunction (f forwardFunc ) {
91
- o .Forward = f
93
+ o .forward = f
92
94
}
93
95
94
96
func (o * outbox ) msg (slot int ) * outboxMsg {
@@ -102,7 +104,7 @@ func (o *outbox) free(slot int) {
102
104
func (o * outbox ) reenqueue (slot int ) {
103
105
select {
104
106
case o .process <- slot :
105
- case <- o .QuitC :
107
+ case <- o .quitC :
106
108
}
107
109
}
108
110
0 commit comments