Skip to content

Commit 1085eca

Browse files
authored
chore(nsq): pass the QueuedMessage interface as parameter (#6)
1 parent 712e617 commit 1085eca

File tree

3 files changed

+29
-23
lines changed

3 files changed

+29
-23
lines changed

nsq/nsq.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,16 @@ var _ queue.Worker = (*Worker)(nil)
1515
// Option for queue system
1616
type Option func(*Worker)
1717

18+
// Job with NSQ message
19+
type Job struct {
20+
Body []byte
21+
}
22+
23+
// Bytes get bytes format
24+
func (j *Job) Bytes() []byte {
25+
return j.Body
26+
}
27+
1828
// Worker for NSQ
1929
type Worker struct {
2030
q *nsq.Consumer
@@ -24,7 +34,7 @@ type Worker struct {
2434
addr string
2535
topic string
2636
channel string
27-
runFunc func(msg *nsq.Message) error
37+
runFunc func(queue.QueuedMessage) error
2838
}
2939

3040
// WithAddr setup the addr of NSQ
@@ -49,7 +59,7 @@ func WithChannel(channel string) Option {
4959
}
5060

5161
// WithRunFunc setup the run func of queue
52-
func WithRunFunc(fn func(msg *nsq.Message) error) Option {
62+
func WithRunFunc(fn func(queue.QueuedMessage) error) Option {
5363
return func(w *Worker) {
5464
w.runFunc = fn
5565
}
@@ -69,13 +79,7 @@ func NewWorker(opts ...Option) *Worker {
6979
topic: "gorush",
7080
channel: "ch",
7181
maxInFlight: runtime.NumCPU(),
72-
runFunc: func(msg *nsq.Message) error {
73-
if len(msg.Body) == 0 {
74-
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
75-
// In this case, a message with an empty body is simply ignored/discarded.
76-
return nil
77-
}
78-
82+
runFunc: func(queue.QueuedMessage) error {
7983
return nil
8084
},
8185
}
@@ -127,8 +131,18 @@ func (s *Worker) Run(quit chan struct{}) error {
127131
s.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
128132
wg.Add(1)
129133
defer wg.Done()
130-
// run custom func
131-
return s.runFunc(msg)
134+
if len(msg.Body) == 0 {
135+
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
136+
// In this case, a message with an empty body is simply ignored/discarded.
137+
return nil
138+
}
139+
140+
job := &Job{
141+
Body: msg.Body,
142+
}
143+
144+
// run custom process function
145+
return s.runFunc(job)
132146
}))
133147

134148
// wait close signal

nsq/nsq_test.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,11 @@ import (
66

77
"github.com/appleboy/queue"
88

9-
"github.com/nsqio/go-nsq"
109
"github.com/stretchr/testify/assert"
1110
)
1211

1312
var host = "nsq"
1413

15-
type mockMessage struct {
16-
msg string
17-
}
18-
19-
func (m mockMessage) Bytes() []byte {
20-
return []byte(m.msg)
21-
}
22-
2314
func TestShutdown(t *testing.T) {
2415
w := NewWorker(
2516
WithAddr(host+":4150"),
@@ -39,14 +30,14 @@ func TestShutdown(t *testing.T) {
3930
}
4031

4132
func TestCustomFuncAndWait(t *testing.T) {
42-
m := mockMessage{
43-
msg: "foo",
33+
m := &Job{
34+
Body: []byte("foo"),
4435
}
4536
w := NewWorker(
4637
WithAddr(host+":4150"),
4738
WithTopic("test"),
4839
WithMaxInFlight(2),
49-
WithRunFunc(func(msg *nsq.Message) error {
40+
WithRunFunc(func(msg queue.QueuedMessage) error {
5041
time.Sleep(500 * time.Millisecond)
5142
return nil
5243
}),

simple/simple.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func (s *Worker) AfterRun() error {
3333
// Run start the worker
3434
func (s *Worker) Run(_ chan struct{}) error {
3535
for notification := range s.queueNotification {
36+
// run custom process function
3637
_ = s.runFunc(notification)
3738
}
3839
return nil

0 commit comments

Comments
 (0)