Skip to content

Commit c13027a

Browse files
committed
chore: move NSQ to new repo
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 26b4fa6 commit c13027a

File tree

5 files changed

+725
-2
lines changed

5 files changed

+725
-2
lines changed

README.md

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,29 @@
1-
# nsq
2-
NSQ as backend for Queue Package
1+
# NSQ
2+
3+
NSQ as backend with [Queue package](https://github.com/golang-queue/queue) (A realtime distributed messaging platform)
4+
5+
## Setup
6+
7+
start the NSQ lookupd
8+
9+
```sh
10+
nsqlookupd
11+
```
12+
13+
start the NSQ server
14+
15+
```sh
16+
nsqd --lookupd-tcp-address=localhost:4160
17+
```
18+
19+
start the NSQ admin dashboard
20+
21+
```sh
22+
nsqadmin --lookupd-http-address localhost:4161
23+
```
24+
25+
## Testing
26+
27+
```sh
28+
go test -v ./...
29+
```

go.mod

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
module github/golang-queue/nsq
2+
3+
go 1.16
4+
5+
require (
6+
github.com/golang-queue/queue v0.0.6
7+
github.com/nsqio/go-nsq v1.0.8
8+
github.com/stretchr/testify v1.7.0
9+
)

go.sum

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
2+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/golang-queue/queue v0.0.6 h1:TLd0lSM7uNgXXj7SXSMfyaZUwRgbOu9tq6UfZXXELnA=
4+
github.com/golang-queue/queue v0.0.6/go.mod h1:IeIGBO1largDrFEaxDgIckoAFIUTn0eolTQris8bm08=
5+
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
6+
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
7+
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
8+
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
9+
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
10+
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
11+
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
12+
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
13+
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
14+
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
15+
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
16+
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
17+
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
18+
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
19+
github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
20+
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
21+
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
22+
github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
23+
github.com/nats-io/nats-server/v2 v2.3.2/go.mod h1:dUf7Cm5z5LbciFVwWx54owyCKm8x4/hL6p7rrljhLFY=
24+
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
25+
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
26+
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
27+
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
28+
github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk=
29+
github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
30+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
31+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
32+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
33+
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
34+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
35+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
36+
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
37+
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
38+
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
39+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
40+
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
41+
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
42+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
43+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
44+
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
45+
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
46+
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
47+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
48+
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
49+
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
50+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
51+
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
52+
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
53+
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
54+
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
55+
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
56+
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
57+
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
58+
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
59+
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
60+
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
61+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
62+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
63+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
64+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

nsq.go

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
package nsq
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"runtime"
7+
"sync"
8+
"sync/atomic"
9+
"time"
10+
11+
"github.com/golang-queue/queue"
12+
13+
"github.com/nsqio/go-nsq"
14+
)
15+
16+
var _ queue.Worker = (*Worker)(nil)
17+
18+
// Option for queue system
19+
type Option func(*Worker)
20+
21+
// Worker for NSQ
22+
type Worker struct {
23+
q *nsq.Consumer
24+
p *nsq.Producer
25+
startOnce sync.Once
26+
stopOnce sync.Once
27+
stop chan struct{}
28+
maxInFlight int
29+
addr string
30+
topic string
31+
channel string
32+
runFunc func(context.Context, queue.QueuedMessage) error
33+
logger queue.Logger
34+
stopFlag int32
35+
startFlag int32
36+
}
37+
38+
// WithAddr setup the addr of NSQ
39+
func WithAddr(addr string) Option {
40+
return func(w *Worker) {
41+
w.addr = addr
42+
}
43+
}
44+
45+
// WithTopic setup the topic of NSQ
46+
func WithTopic(topic string) Option {
47+
return func(w *Worker) {
48+
w.topic = topic
49+
}
50+
}
51+
52+
// WithChannel setup the channel of NSQ
53+
func WithChannel(channel string) Option {
54+
return func(w *Worker) {
55+
w.channel = channel
56+
}
57+
}
58+
59+
// WithRunFunc setup the run func of queue
60+
func WithRunFunc(fn func(context.Context, queue.QueuedMessage) error) Option {
61+
return func(w *Worker) {
62+
w.runFunc = fn
63+
}
64+
}
65+
66+
// WithMaxInFlight Maximum number of messages to allow in flight (concurrency knob)
67+
func WithMaxInFlight(num int) Option {
68+
return func(w *Worker) {
69+
w.maxInFlight = num
70+
}
71+
}
72+
73+
// WithLogger set custom logger
74+
func WithLogger(l queue.Logger) Option {
75+
return func(w *Worker) {
76+
w.logger = l
77+
}
78+
}
79+
80+
// NewWorker for struc
81+
func NewWorker(opts ...Option) *Worker {
82+
var err error
83+
w := &Worker{
84+
addr: "127.0.0.1:4150",
85+
topic: "gorush",
86+
channel: "ch",
87+
maxInFlight: runtime.NumCPU(),
88+
stop: make(chan struct{}),
89+
logger: queue.NewLogger(),
90+
runFunc: func(context.Context, queue.QueuedMessage) error {
91+
return nil
92+
},
93+
}
94+
95+
// Loop through each option
96+
for _, opt := range opts {
97+
// Call the option giving the instantiated
98+
opt(w)
99+
}
100+
101+
cfg := nsq.NewConfig()
102+
cfg.MaxInFlight = w.maxInFlight
103+
w.q, err = nsq.NewConsumer(w.topic, w.channel, cfg)
104+
if err != nil {
105+
panic(err)
106+
}
107+
108+
w.p, err = nsq.NewProducer(w.addr, cfg)
109+
if err != nil {
110+
panic(err)
111+
}
112+
113+
return w
114+
}
115+
116+
// BeforeRun run script before start worker
117+
func (s *Worker) BeforeRun() error {
118+
return nil
119+
}
120+
121+
// AfterRun run script after start worker
122+
func (s *Worker) AfterRun() error {
123+
s.startOnce.Do(func() {
124+
time.Sleep(100 * time.Millisecond)
125+
err := s.q.ConnectToNSQD(s.addr)
126+
if err != nil {
127+
panic("Could not connect nsq server: " + err.Error())
128+
}
129+
130+
atomic.CompareAndSwapInt32(&s.startFlag, 0, 1)
131+
})
132+
133+
return nil
134+
}
135+
136+
func (s *Worker) handle(job queue.Job) error {
137+
// create channel with buffer size 1 to avoid goroutine leak
138+
done := make(chan error, 1)
139+
panicChan := make(chan interface{}, 1)
140+
startTime := time.Now()
141+
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
142+
defer cancel()
143+
144+
// run the job
145+
go func() {
146+
// handle panic issue
147+
defer func() {
148+
if p := recover(); p != nil {
149+
panicChan <- p
150+
}
151+
}()
152+
153+
// run custom process function
154+
done <- s.runFunc(ctx, job)
155+
}()
156+
157+
select {
158+
case p := <-panicChan:
159+
panic(p)
160+
case <-ctx.Done(): // timeout reached
161+
return ctx.Err()
162+
case <-s.stop: // shutdown service
163+
// cancel job
164+
cancel()
165+
166+
leftTime := job.Timeout - time.Since(startTime)
167+
// wait job
168+
select {
169+
case <-time.After(leftTime):
170+
return context.DeadlineExceeded
171+
case err := <-done: // job finish
172+
return err
173+
case p := <-panicChan:
174+
panic(p)
175+
}
176+
case err := <-done: // job finish
177+
return err
178+
}
179+
}
180+
181+
// Run start the worker
182+
func (s *Worker) Run() error {
183+
wg := &sync.WaitGroup{}
184+
panicChan := make(chan interface{}, 1)
185+
s.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
186+
wg.Add(1)
187+
defer func() {
188+
wg.Done()
189+
if p := recover(); p != nil {
190+
panicChan <- p
191+
}
192+
}()
193+
if len(msg.Body) == 0 {
194+
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
195+
// In this case, a message with an empty body is simply ignored/discarded.
196+
return nil
197+
}
198+
199+
var data queue.Job
200+
_ = json.Unmarshal(msg.Body, &data)
201+
return s.handle(data)
202+
}))
203+
204+
// wait close signal
205+
select {
206+
case <-s.stop:
207+
case err := <-panicChan:
208+
s.logger.Error(err)
209+
}
210+
211+
// wait job completed
212+
wg.Wait()
213+
214+
return nil
215+
}
216+
217+
// Shutdown worker
218+
func (s *Worker) Shutdown() error {
219+
if !atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1) {
220+
return queue.ErrQueueShutdown
221+
}
222+
223+
s.stopOnce.Do(func() {
224+
if atomic.LoadInt32(&s.startFlag) == 1 {
225+
s.q.Stop()
226+
s.p.Stop()
227+
}
228+
229+
close(s.stop)
230+
})
231+
return nil
232+
}
233+
234+
// Capacity for channel
235+
func (s *Worker) Capacity() int {
236+
return 0
237+
}
238+
239+
// Usage for count of channel usage
240+
func (s *Worker) Usage() int {
241+
return 0
242+
}
243+
244+
// Queue send notification to queue
245+
func (s *Worker) Queue(job queue.QueuedMessage) error {
246+
if atomic.LoadInt32(&s.stopFlag) == 1 {
247+
return queue.ErrQueueShutdown
248+
}
249+
250+
err := s.p.Publish(s.topic, job.Bytes())
251+
if err != nil {
252+
return err
253+
}
254+
255+
return nil
256+
}

0 commit comments

Comments
 (0)