Skip to content

Commit 6ae3581

Browse files
authored
chore: support nats platform (#14)
1 parent b4680e5 commit 6ae3581

File tree

6 files changed

+318
-7
lines changed

6 files changed

+318
-7
lines changed

.drone.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ services:
3232
commands:
3333
- /nsqd
3434

35+
- name: nats
36+
image: nats
37+
3538
volumes:
3639
- name: gopath
3740
temp: {}

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ module github.com/appleboy/queue
33
go 1.16
44

55
require (
6+
github.com/golang/protobuf v1.5.2 // indirect
7+
github.com/nats-io/nats-server/v2 v2.3.2 // indirect
8+
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30
69
github.com/nsqio/go-nsq v1.0.8
710
github.com/stretchr/testify v1.7.0
11+
google.golang.org/protobuf v1.27.1 // indirect
812
)

go.sum

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,74 @@
11
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
22
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
4+
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
5+
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
6+
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
7+
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
8+
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
9+
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
10+
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
11+
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
312
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
413
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
14+
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
15+
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
16+
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
17+
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
18+
github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk=
19+
github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
20+
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
21+
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
22+
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
23+
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
24+
github.com/nats-io/jwt/v2 v2.0.2 h1:ejVCLO8gu6/4bOKIHQpmB5UhhUJfAQw55yvLWpfmKjI=
25+
github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
26+
github.com/nats-io/nats-server/v2 v2.3.2 h1:SGJLWrjBHsl0DsdY8PeTR3YKEfiUEYVVq2STw9d8MSY=
27+
github.com/nats-io/nats-server/v2 v2.3.2/go.mod h1:dUf7Cm5z5LbciFVwWx54owyCKm8x4/hL6p7rrljhLFY=
28+
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30 h1:9GqilBhZaR3xYis0JgMlJjNw933WIobdjKhilXm+Vls=
29+
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
30+
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
31+
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
32+
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
33+
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
34+
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
535
github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk=
636
github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
737
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
838
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
939
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
1040
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
1141
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
42+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
43+
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
44+
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
45+
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI=
46+
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
47+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
48+
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
49+
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
50+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
51+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
52+
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
53+
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
54+
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
55+
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
56+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
57+
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
58+
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
59+
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
60+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
61+
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
62+
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
63+
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
64+
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
65+
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
66+
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
67+
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
68+
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
69+
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
70+
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
71+
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
1272
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
1373
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
1474
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=

nats/nats.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package nats
2+
3+
import (
4+
"sync"
5+
6+
"github.com/appleboy/queue"
7+
8+
"github.com/nats-io/nats.go"
9+
)
10+
11+
var _ queue.Worker = (*Worker)(nil)
12+
13+
// Option for queue system
14+
type Option func(*Worker)
15+
16+
// Job with NSQ message
17+
type Job struct {
18+
Body []byte
19+
}
20+
21+
// Bytes get bytes format
22+
func (j *Job) Bytes() []byte {
23+
return j.Body
24+
}
25+
26+
// Worker for NSQ
27+
type Worker struct {
28+
addr string
29+
subj string
30+
queue string
31+
client *nats.Conn
32+
stop chan struct{}
33+
stopOnce sync.Once
34+
runFunc func(queue.QueuedMessage, <-chan struct{}) error
35+
}
36+
37+
// WithAddr setup the addr of NATS
38+
func WithAddr(addr string) Option {
39+
return func(w *Worker) {
40+
w.addr = "nats://" + addr
41+
}
42+
}
43+
44+
// WithSubj setup the subject of NATS
45+
func WithSubj(subj string) Option {
46+
return func(w *Worker) {
47+
w.subj = subj
48+
}
49+
}
50+
51+
// WithQueue setup the queue of NATS
52+
func WithQueue(queue string) Option {
53+
return func(w *Worker) {
54+
w.queue = queue
55+
}
56+
}
57+
58+
// WithRunFunc setup the run func of queue
59+
func WithRunFunc(fn func(queue.QueuedMessage, <-chan struct{}) error) Option {
60+
return func(w *Worker) {
61+
w.runFunc = fn
62+
}
63+
}
64+
65+
// NewWorker for struc
66+
func NewWorker(opts ...Option) *Worker {
67+
var err error
68+
w := &Worker{
69+
addr: "nats://127.0.0.1:4222",
70+
subj: "foobar",
71+
queue: "foobar",
72+
stop: make(chan struct{}),
73+
runFunc: func(queue.QueuedMessage, <-chan struct{}) error {
74+
return nil
75+
},
76+
}
77+
78+
// Loop through each option
79+
for _, opt := range opts {
80+
// Call the option giving the instantiated
81+
opt(w)
82+
}
83+
84+
w.client, err = nats.Connect(w.addr)
85+
if err != nil {
86+
panic(err)
87+
}
88+
89+
return w
90+
}
91+
92+
// BeforeRun run script before start worker
93+
func (s *Worker) BeforeRun() error {
94+
return nil
95+
}
96+
97+
// AfterRun run script after start worker
98+
func (s *Worker) AfterRun() error {
99+
return nil
100+
}
101+
102+
// Run start the worker
103+
func (s *Worker) Run() error {
104+
wg := &sync.WaitGroup{}
105+
106+
_, err := s.client.QueueSubscribe(s.subj, s.queue, func(m *nats.Msg) {
107+
wg.Add(1)
108+
defer wg.Done()
109+
job := &Job{
110+
Body: m.Data,
111+
}
112+
113+
// run custom process function
114+
_ = s.runFunc(job, s.stop)
115+
})
116+
if err != nil {
117+
return err
118+
}
119+
120+
// wait close signal
121+
<-s.stop
122+
123+
// wait job completed
124+
wg.Wait()
125+
126+
return nil
127+
}
128+
129+
// Shutdown worker
130+
func (s *Worker) Shutdown() error {
131+
s.stopOnce.Do(func() {
132+
close(s.stop)
133+
s.client.Close()
134+
})
135+
return nil
136+
}
137+
138+
// Capacity for channel
139+
func (s *Worker) Capacity() int {
140+
return 0
141+
}
142+
143+
// Usage for count of channel usage
144+
func (s *Worker) Usage() int {
145+
return 0
146+
}
147+
148+
// Queue send notification to queue
149+
func (s *Worker) Queue(job queue.QueuedMessage) error {
150+
err := s.client.Publish(s.subj, job.Bytes())
151+
if err != nil {
152+
return err
153+
}
154+
155+
return nil
156+
}

nats/nats_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package nats
2+
3+
import (
4+
"log"
5+
"testing"
6+
"time"
7+
8+
"github.com/appleboy/queue"
9+
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
var host = "nats"
14+
15+
func TestNATSDefaultFlow(t *testing.T) {
16+
m := &Job{
17+
Body: []byte("foo"),
18+
}
19+
w := NewWorker(
20+
WithAddr(host+":4222"),
21+
WithSubj("test"),
22+
WithQueue("test"),
23+
)
24+
q, err := queue.NewQueue(
25+
queue.WithWorker(w),
26+
queue.WithWorkerCount(2),
27+
)
28+
assert.NoError(t, err)
29+
q.Start()
30+
time.Sleep(100 * time.Millisecond)
31+
assert.NoError(t, q.Queue(m))
32+
m.Body = []byte("new message")
33+
assert.NoError(t, q.Queue(m))
34+
q.Shutdown()
35+
q.Wait()
36+
}
37+
38+
func TestNATSShutdown(t *testing.T) {
39+
w := NewWorker(
40+
WithAddr(host+":4222"),
41+
WithSubj("test"),
42+
WithQueue("test"),
43+
)
44+
q, err := queue.NewQueue(
45+
queue.WithWorker(w),
46+
queue.WithWorkerCount(2),
47+
)
48+
assert.NoError(t, err)
49+
q.Start()
50+
time.Sleep(1 * time.Second)
51+
q.Shutdown()
52+
// check shutdown once
53+
q.Shutdown()
54+
q.Wait()
55+
}
56+
57+
func TestNATSCustomFuncAndWait(t *testing.T) {
58+
m := &Job{
59+
Body: []byte("foo"),
60+
}
61+
w := NewWorker(
62+
WithAddr(host+":4222"),
63+
WithSubj("test"),
64+
WithQueue("test"),
65+
WithRunFunc(func(msg queue.QueuedMessage, s <-chan struct{}) error {
66+
log.Println("show message: " + string(msg.Bytes()))
67+
time.Sleep(500 * time.Millisecond)
68+
return nil
69+
}),
70+
)
71+
q, err := queue.NewQueue(
72+
queue.WithWorker(w),
73+
queue.WithWorkerCount(2),
74+
)
75+
assert.NoError(t, err)
76+
q.Start()
77+
time.Sleep(100 * time.Millisecond)
78+
assert.NoError(t, q.Queue(m))
79+
assert.NoError(t, q.Queue(m))
80+
assert.NoError(t, q.Queue(m))
81+
assert.NoError(t, q.Queue(m))
82+
time.Sleep(600 * time.Millisecond)
83+
q.Shutdown()
84+
q.Wait()
85+
// you will see the execute time > 1000ms
86+
}

nsq/nsq_test.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package nsq
22

33
import (
4+
"log"
45
"testing"
56
"time"
67

@@ -11,14 +12,14 @@ import (
1112

1213
var host = "nsq"
1314

14-
func TestDefaultFlow(t *testing.T) {
15+
func TestNSQDefaultFlow(t *testing.T) {
1516
m := &Job{
1617
Body: []byte("foo"),
1718
}
1819
w := NewWorker(
1920
WithAddr(host+":4150"),
20-
WithTopic("test"),
21-
WithChannel("test"),
21+
WithTopic("test1"),
22+
WithChannel("test1"),
2223
)
2324
q, err := queue.NewQueue(
2425
queue.WithWorker(w),
@@ -34,10 +35,10 @@ func TestDefaultFlow(t *testing.T) {
3435
q.Wait()
3536
}
3637

37-
func TestShutdown(t *testing.T) {
38+
func TestNSQShutdown(t *testing.T) {
3839
w := NewWorker(
3940
WithAddr(host+":4150"),
40-
WithTopic("test"),
41+
WithTopic("test2"),
4142
)
4243
q, err := queue.NewQueue(
4344
queue.WithWorker(w),
@@ -52,15 +53,16 @@ func TestShutdown(t *testing.T) {
5253
q.Wait()
5354
}
5455

55-
func TestCustomFuncAndWait(t *testing.T) {
56+
func TestNSQCustomFuncAndWait(t *testing.T) {
5657
m := &Job{
5758
Body: []byte("foo"),
5859
}
5960
w := NewWorker(
6061
WithAddr(host+":4150"),
61-
WithTopic("test"),
62+
WithTopic("test3"),
6263
WithMaxInFlight(2),
6364
WithRunFunc(func(msg queue.QueuedMessage, s <-chan struct{}) error {
65+
log.Println("show message: " + string(msg.Bytes()))
6466
time.Sleep(500 * time.Millisecond)
6567
return nil
6668
}),

0 commit comments

Comments
 (0)