Skip to content

Commit a85d3f1

Browse files
authored
chore: support NSQ worker. (#5)
1 parent a6f85d7 commit a85d3f1

File tree

7 files changed

+337
-3
lines changed

7 files changed

+337
-3
lines changed

.drone.yml

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
---
2+
kind: pipeline
3+
name: testing
4+
5+
platform:
6+
os: linux
7+
arch: amd64
8+
9+
steps:
10+
- name: test
11+
pull: always
12+
image: golang:1.16
13+
commands:
14+
- go test -v -covermode=atomic -coverprofile=coverage.out ./...
15+
volumes:
16+
- name: gopath
17+
path: /go
18+
19+
- name: codecov
20+
pull: always
21+
image: robertstettner/drone-codecov
22+
settings:
23+
token:
24+
from_secret: codecov_token
25+
26+
services:
27+
- name: redis
28+
image: redis
29+
30+
- name: nsq
31+
image: nsqio/nsq
32+
commands:
33+
- /nsqd
34+
35+
volumes:
36+
- name: gopath
37+
temp: {}

.github/workflows/go.yml

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,33 @@ jobs:
1212
test:
1313
strategy:
1414
matrix:
15-
os: [ubuntu-latest, macos-latest]
15+
os: [ubuntu-latest]
1616
go: [1.13, 1.14, 1.15, 1.16]
1717
name: ${{ matrix.os }} @ Go ${{ matrix.go }}
1818
runs-on: ${{ matrix.os }}
19+
20+
# Service containers to run with `container-job`
21+
services:
22+
# Label used to access the service container
23+
redis:
24+
# Docker Hub image
25+
image: redis
26+
# Set health checks to wait until redis has started
27+
options: >-
28+
--health-cmd "redis-cli ping"
29+
--health-interval 10s
30+
--health-timeout 5s
31+
--health-retries 5
32+
ports:
33+
# Maps port 6379 on service container to the host
34+
- 6379:6379
35+
36+
nsq:
37+
image: nsqio/nsq
38+
ports:
39+
# Maps port 6379 on service container to the host
40+
- 4150:4150
41+
1942
env:
2043
GO111MODULE: on
2144
TESTTAGS: ${{ matrix.test-tags }}
@@ -36,7 +59,8 @@ jobs:
3659

3760
- name: Run Tests
3861
run: |
39-
go test -v -covermode=atomic -coverprofile=coverage.out
62+
go test -v -covermode=atomic -coverprofile=coverage.out .
63+
go test -v -covermode=atomic -coverprofile=coverage.out ./simple/...
4064
4165
- name: Upload coverage to Codecov
4266
uses: codecov/codecov-action@v1

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,7 @@ module github.com/appleboy/queue
22

33
go 1.16
44

5-
require github.com/stretchr/testify v1.7.0
5+
require (
6+
github.com/nsqio/go-nsq v1.0.8
7+
github.com/stretchr/testify v1.7.0
8+
)

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
22
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
4+
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
5+
github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk=
6+
github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
37
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
48
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
59
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

nsq/README.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# NSQ
2+
3+
A realtime distributed messaging platform
4+
5+
## Setup
6+
7+
start the NSQ lookupd
8+
9+
```sh
10+
nsqlookupd
11+
```
12+
13+
start the NSQ server
14+
15+
```sh
16+
nsqd --lookupd-tcp-address=localhost:4160
17+
```
18+
19+
start the NSQ admin dashboard
20+
21+
```sh
22+
nsqadmin --lookupd-http-address localhost:4161
23+
```
24+
25+
## Testing
26+
27+
```sh
28+
go test -v ./...
29+
```

nsq/nsq.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
package nsq
2+
3+
import (
4+
"runtime"
5+
"sync"
6+
"time"
7+
8+
"github.com/appleboy/queue"
9+
10+
"github.com/nsqio/go-nsq"
11+
)
12+
13+
var _ queue.Worker = (*Worker)(nil)
14+
15+
// Option for queue system
16+
type Option func(*Worker)
17+
18+
// Worker for NSQ
19+
type Worker struct {
20+
q *nsq.Consumer
21+
p *nsq.Producer
22+
startOnce sync.Once
23+
maxInFlight int
24+
addr string
25+
topic string
26+
channel string
27+
runFunc func(msg *nsq.Message) error
28+
}
29+
30+
// WithAddr setup the addr of NSQ
31+
func WithAddr(addr string) Option {
32+
return func(w *Worker) {
33+
w.addr = addr
34+
}
35+
}
36+
37+
// WithTopic setup the topic of NSQ
38+
func WithTopic(topic string) Option {
39+
return func(w *Worker) {
40+
w.topic = topic
41+
}
42+
}
43+
44+
// WithChannel setup the channel of NSQ
45+
func WithChannel(channel string) Option {
46+
return func(w *Worker) {
47+
w.channel = channel
48+
}
49+
}
50+
51+
// WithRunFunc setup the run func of queue
52+
func WithRunFunc(fn func(msg *nsq.Message) error) Option {
53+
return func(w *Worker) {
54+
w.runFunc = fn
55+
}
56+
}
57+
58+
// WithMaxInFlight Maximum number of messages to allow in flight (concurrency knob)
59+
func WithMaxInFlight(num int) Option {
60+
return func(w *Worker) {
61+
w.maxInFlight = num
62+
}
63+
}
64+
65+
// NewWorker for struc
66+
func NewWorker(opts ...Option) *Worker {
67+
w := &Worker{
68+
addr: "127.0.0.1:4150",
69+
topic: "gorush",
70+
channel: "ch",
71+
maxInFlight: runtime.NumCPU(),
72+
runFunc: func(msg *nsq.Message) error {
73+
if len(msg.Body) == 0 {
74+
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
75+
// In this case, a message with an empty body is simply ignored/discarded.
76+
return nil
77+
}
78+
79+
return nil
80+
},
81+
}
82+
83+
// Loop through each option
84+
for _, opt := range opts {
85+
// Call the option giving the instantiated
86+
opt(w)
87+
}
88+
89+
cfg := nsq.NewConfig()
90+
cfg.MaxInFlight = w.maxInFlight
91+
q, err := nsq.NewConsumer(w.topic, w.channel, cfg)
92+
if err != nil {
93+
panic(err)
94+
}
95+
w.q = q
96+
97+
p, err := nsq.NewProducer(w.addr, cfg)
98+
if err != nil {
99+
panic(err)
100+
}
101+
w.p = p
102+
103+
return w
104+
}
105+
106+
// BeforeRun run script before start worker
107+
func (s *Worker) BeforeRun() error {
108+
return nil
109+
}
110+
111+
// AfterRun run script after start worker
112+
func (s *Worker) AfterRun() error {
113+
s.startOnce.Do(func() {
114+
time.Sleep(100 * time.Millisecond)
115+
err := s.q.ConnectToNSQD(s.addr)
116+
if err != nil {
117+
panic("Could not connect nsq server: " + err.Error())
118+
}
119+
})
120+
121+
return nil
122+
}
123+
124+
// Run start the worker
125+
func (s *Worker) Run(quit chan struct{}) error {
126+
wg := &sync.WaitGroup{}
127+
s.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
128+
wg.Add(1)
129+
defer wg.Done()
130+
// run custom func
131+
return s.runFunc(msg)
132+
}))
133+
134+
// wait close signal
135+
<-quit
136+
137+
// wait job completed
138+
wg.Wait()
139+
140+
return nil
141+
}
142+
143+
// Shutdown worker
144+
func (s *Worker) Shutdown() error {
145+
s.q.Stop()
146+
s.p.Stop()
147+
return nil
148+
}
149+
150+
// Capacity for channel
151+
func (s *Worker) Capacity() int {
152+
return 0
153+
}
154+
155+
// Usage for count of channel usage
156+
func (s *Worker) Usage() int {
157+
return 0
158+
}
159+
160+
// Queue send notification to queue
161+
func (s *Worker) Queue(job queue.QueuedMessage) error {
162+
err := s.p.Publish(s.topic, job.Bytes())
163+
if err != nil {
164+
return err
165+
}
166+
167+
return nil
168+
}

nsq/nsq_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package nsq
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/appleboy/queue"
8+
9+
"github.com/nsqio/go-nsq"
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
var host = "nsq"
14+
15+
type mockMessage struct {
16+
msg string
17+
}
18+
19+
func (m mockMessage) Bytes() []byte {
20+
return []byte(m.msg)
21+
}
22+
23+
func TestShutdown(t *testing.T) {
24+
w := NewWorker(
25+
WithAddr(host+":4150"),
26+
WithTopic("test"),
27+
)
28+
q, err := queue.NewQueue(
29+
queue.WithWorker(w),
30+
queue.WithWorkerCount(2),
31+
)
32+
assert.NoError(t, err)
33+
q.Start()
34+
time.Sleep(1 * time.Second)
35+
q.Shutdown()
36+
// check shutdown once
37+
q.Shutdown()
38+
q.Wait()
39+
}
40+
41+
func TestCustomFuncAndWait(t *testing.T) {
42+
m := mockMessage{
43+
msg: "foo",
44+
}
45+
w := NewWorker(
46+
WithAddr(host+":4150"),
47+
WithTopic("test"),
48+
WithMaxInFlight(2),
49+
WithRunFunc(func(msg *nsq.Message) error {
50+
time.Sleep(500 * time.Millisecond)
51+
return nil
52+
}),
53+
)
54+
q, err := queue.NewQueue(
55+
queue.WithWorker(w),
56+
queue.WithWorkerCount(2),
57+
)
58+
assert.NoError(t, err)
59+
q.Start()
60+
time.Sleep(100 * time.Millisecond)
61+
assert.NoError(t, q.Queue(m))
62+
assert.NoError(t, q.Queue(m))
63+
assert.NoError(t, q.Queue(m))
64+
assert.NoError(t, q.Queue(m))
65+
time.Sleep(600 * time.Millisecond)
66+
q.Shutdown()
67+
q.Wait()
68+
// you will see the execute time > 1000ms
69+
}

0 commit comments

Comments
 (0)