Skip to content

Commit 85346bf

Browse files
committed
chore: move nats to new repo
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 82668b3 commit 85346bf

File tree

6 files changed

+744
-1
lines changed

6 files changed

+744
-1
lines changed

.github/workflows/go.yml

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
name: Run Tests
2+
3+
on:
4+
push:
5+
branches:
6+
- main
7+
pull_request:
8+
branches:
9+
- main
10+
11+
jobs:
12+
test:
13+
strategy:
14+
matrix:
15+
os: [ubuntu-latest]
16+
go: [1.13, 1.14, 1.15, 1.16]
17+
name: ${{ matrix.os }} @ Go ${{ matrix.go }}
18+
runs-on: ${{ matrix.os }}
19+
20+
# Service containers to run with `container-job`
21+
services:
22+
# Label used to access the service container
23+
redis:
24+
# Docker Hub image
25+
image: redis
26+
# Set health checks to wait until redis has started
27+
options: >-
28+
--health-cmd "redis-cli ping"
29+
--health-interval 10s
30+
--health-timeout 5s
31+
--health-retries 5
32+
ports:
33+
# Maps port 6379 on service container to the host
34+
- 6379:6379
35+
36+
nsq:
37+
image: nsqio/nsq
38+
ports:
39+
# Maps port 6379 on service container to the host
40+
- 4150:4150
41+
42+
env:
43+
GO111MODULE: on
44+
TESTTAGS: ${{ matrix.test-tags }}
45+
GOPROXY: https://proxy.golang.org
46+
steps:
47+
- name: Set up Go ${{ matrix.go }}
48+
uses: actions/setup-go@v2
49+
with:
50+
go-version: ${{ matrix.go }}
51+
52+
- name: Checkout Code
53+
uses: actions/checkout@v2
54+
with:
55+
ref: ${{ github.ref }}
56+
57+
- name: golangci-lint
58+
uses: golangci/golangci-lint-action@v2
59+
with:
60+
version: v1.41.1

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,6 @@
11
# nats
2-
NATS as backend for Queue Package
2+
3+
[![Run Tests](https://github.com/golang-queue/nsq/actions/workflows/go.yml/badge.svg?branch=master)](https://github.com/golang-queue/nsq/actions/workflows/go.yml)
4+
[![codecov](https://codecov.io/gh/golang-queue/nsq/branch/master/graph/badge.svg?token=V8A1WA0P5E)](https://codecov.io/gh/golang-queue/nsq)
5+
6+
NATS as backend with [Queue package](https://github.com/golang-queue/queue) (Connective Technology for Adaptive Edge & Distributed Systems)

go.mod

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
module github/golang-queue/nats
2+
3+
go 1.16
4+
5+
require (
6+
github.com/golang-queue/queue v0.0.6
7+
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30
8+
github.com/stretchr/testify v1.7.0
9+
)

go.sum

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
2+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/golang-queue/queue v0.0.6 h1:TLd0lSM7uNgXXj7SXSMfyaZUwRgbOu9tq6UfZXXELnA=
4+
github.com/golang-queue/queue v0.0.6/go.mod h1:IeIGBO1largDrFEaxDgIckoAFIUTn0eolTQris8bm08=
5+
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
6+
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
7+
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
8+
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
9+
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
10+
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
11+
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
12+
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
13+
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
14+
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
15+
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
16+
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
17+
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
18+
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
19+
github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk=
20+
github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
21+
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
22+
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
23+
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
24+
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
25+
github.com/nats-io/jwt/v2 v2.0.2 h1:ejVCLO8gu6/4bOKIHQpmB5UhhUJfAQw55yvLWpfmKjI=
26+
github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
27+
github.com/nats-io/nats-server/v2 v2.3.2 h1:SGJLWrjBHsl0DsdY8PeTR3YKEfiUEYVVq2STw9d8MSY=
28+
github.com/nats-io/nats-server/v2 v2.3.2/go.mod h1:dUf7Cm5z5LbciFVwWx54owyCKm8x4/hL6p7rrljhLFY=
29+
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30 h1:9GqilBhZaR3xYis0JgMlJjNw933WIobdjKhilXm+Vls=
30+
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
31+
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
32+
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
33+
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
34+
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
35+
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
36+
github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
37+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
38+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
39+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
40+
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
41+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
42+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
43+
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
44+
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
45+
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI=
46+
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
47+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
48+
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
49+
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
50+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
51+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
52+
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
53+
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
54+
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
55+
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
56+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
57+
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
58+
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
59+
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
60+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
61+
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
62+
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
63+
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
64+
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
65+
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
66+
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
67+
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
68+
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
69+
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
70+
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
71+
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
72+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
73+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
74+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
75+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

nats.go

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
package nats
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"sync"
7+
"sync/atomic"
8+
"time"
9+
10+
"github.com/golang-queue/queue"
11+
12+
"github.com/nats-io/nats.go"
13+
)
14+
15+
var _ queue.Worker = (*Worker)(nil)
16+
17+
// Option for queue system
18+
type Option func(*Worker)
19+
20+
// Worker for NSQ
21+
type Worker struct {
22+
addr string
23+
subj string
24+
queue string
25+
client *nats.Conn
26+
stop chan struct{}
27+
stopOnce sync.Once
28+
runFunc func(context.Context, queue.QueuedMessage) error
29+
logger queue.Logger
30+
stopFlag int32
31+
}
32+
33+
// WithAddr setup the addr of NATS
34+
func WithAddr(addr string) Option {
35+
return func(w *Worker) {
36+
w.addr = "nats://" + addr
37+
}
38+
}
39+
40+
// WithSubj setup the subject of NATS
41+
func WithSubj(subj string) Option {
42+
return func(w *Worker) {
43+
w.subj = subj
44+
}
45+
}
46+
47+
// WithQueue setup the queue of NATS
48+
func WithQueue(queue string) Option {
49+
return func(w *Worker) {
50+
w.queue = queue
51+
}
52+
}
53+
54+
// WithRunFunc setup the run func of queue
55+
func WithRunFunc(fn func(context.Context, queue.QueuedMessage) error) Option {
56+
return func(w *Worker) {
57+
w.runFunc = fn
58+
}
59+
}
60+
61+
// WithLogger set custom logger
62+
func WithLogger(l queue.Logger) Option {
63+
return func(w *Worker) {
64+
w.logger = l
65+
}
66+
}
67+
68+
// NewWorker for struc
69+
func NewWorker(opts ...Option) *Worker {
70+
var err error
71+
w := &Worker{
72+
addr: "127.0.0.1:4222",
73+
subj: "foobar",
74+
queue: "foobar",
75+
stop: make(chan struct{}),
76+
runFunc: func(context.Context, queue.QueuedMessage) error {
77+
return nil
78+
},
79+
}
80+
81+
// Loop through each option
82+
for _, opt := range opts {
83+
// Call the option giving the instantiated
84+
opt(w)
85+
}
86+
87+
w.client, err = nats.Connect(w.addr)
88+
if err != nil {
89+
panic(err)
90+
}
91+
92+
return w
93+
}
94+
95+
// BeforeRun run script before start worker
96+
func (s *Worker) BeforeRun() error {
97+
return nil
98+
}
99+
100+
// AfterRun run script after start worker
101+
func (s *Worker) AfterRun() error {
102+
return nil
103+
}
104+
105+
func (s *Worker) handle(job queue.Job) error {
106+
// create channel with buffer size 1 to avoid goroutine leak
107+
done := make(chan error, 1)
108+
panicChan := make(chan interface{}, 1)
109+
startTime := time.Now()
110+
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
111+
defer cancel()
112+
113+
// run the job
114+
go func() {
115+
// handle panic issue
116+
defer func() {
117+
if p := recover(); p != nil {
118+
panicChan <- p
119+
}
120+
}()
121+
122+
// run custom process function
123+
done <- s.runFunc(ctx, job)
124+
}()
125+
126+
select {
127+
case p := <-panicChan:
128+
panic(p)
129+
case <-ctx.Done(): // timeout reached
130+
return ctx.Err()
131+
case <-s.stop: // shutdown service
132+
// cancel job
133+
cancel()
134+
135+
leftTime := job.Timeout - time.Since(startTime)
136+
// wait job
137+
select {
138+
case <-time.After(leftTime):
139+
return context.DeadlineExceeded
140+
case err := <-done: // job finish
141+
return err
142+
case p := <-panicChan:
143+
panic(p)
144+
}
145+
case err := <-done: // job finish
146+
return err
147+
}
148+
}
149+
150+
// Run start the worker
151+
func (s *Worker) Run() error {
152+
wg := &sync.WaitGroup{}
153+
panicChan := make(chan interface{}, 1)
154+
_, err := s.client.QueueSubscribe(s.subj, s.queue, func(m *nats.Msg) {
155+
wg.Add(1)
156+
defer func() {
157+
wg.Done()
158+
if p := recover(); p != nil {
159+
panicChan <- p
160+
}
161+
}()
162+
163+
var data queue.Job
164+
_ = json.Unmarshal(m.Data, &data)
165+
166+
if err := s.handle(data); err != nil {
167+
s.logger.Error(err)
168+
}
169+
})
170+
if err != nil {
171+
return err
172+
}
173+
174+
// wait close signal
175+
select {
176+
case <-s.stop:
177+
case err := <-panicChan:
178+
s.logger.Error(err)
179+
}
180+
181+
// wait job completed
182+
wg.Wait()
183+
184+
return nil
185+
}
186+
187+
// Shutdown worker
188+
func (s *Worker) Shutdown() error {
189+
if !atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1) {
190+
return queue.ErrQueueShutdown
191+
}
192+
193+
s.stopOnce.Do(func() {
194+
s.client.Close()
195+
close(s.stop)
196+
})
197+
return nil
198+
}
199+
200+
// Capacity for channel
201+
func (s *Worker) Capacity() int {
202+
return 0
203+
}
204+
205+
// Usage for count of channel usage
206+
func (s *Worker) Usage() int {
207+
return 0
208+
}
209+
210+
// Queue send notification to queue
211+
func (s *Worker) Queue(job queue.QueuedMessage) error {
212+
if atomic.LoadInt32(&s.stopFlag) == 1 {
213+
return queue.ErrQueueShutdown
214+
}
215+
216+
err := s.client.Publish(s.subj, job.Bytes())
217+
if err != nil {
218+
return err
219+
}
220+
221+
return nil
222+
}

0 commit comments

Comments
 (0)