Skip to content

Commit 2af134a

Browse files
authored
docs: add NSQ example (#10)
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 5119786 commit 2af134a

File tree

5 files changed

+139
-10
lines changed

5 files changed

+139
-10
lines changed

README.md

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,38 @@ Second to create the new worker, use buffered channel as example:
3737
w := simple.NewWorker(
3838
simple.WithQueueNum(taskN),
3939
simple.WithRunFunc(func(m queue.QueuedMessage) error {
40-
j, ok := m.(*job)
40+
v, ok := m.(*job)
4141
if !ok {
42-
return errors.New("message is not job type")
42+
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
43+
return err
44+
}
4345
}
4446

45-
rets <- j.message
47+
rets <- v.Message
48+
return nil
49+
}),
50+
)
51+
```
52+
53+
or you can use the [NSQ](https://nsq.io/) as backend, see the worker example:
54+
55+
```go
56+
// define the worker
57+
w := nsq.NewWorker(
58+
nsq.WithAddr("127.0.0.1:4150"),
59+
nsq.WithTopic("example"),
60+
nsq.WithChannel("foobar"),
61+
// concurrent job number
62+
nsq.WithMaxInFlight(10),
63+
nsq.WithRunFunc(func(m queue.QueuedMessage) error {
64+
v, ok := m.(*job)
65+
if !ok {
66+
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
67+
return err
68+
}
69+
}
70+
71+
rets <- v.Message
4672
return nil
4773
}),
4874
)

_example/nsq/go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module example
2+
3+
go 1.16
4+
5+
require github.com/appleboy/queue v0.0.4-0.20210725052905-4ef008157945

_example/nsq/go.sum

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
github.com/appleboy/queue v0.0.3 h1:rntqVTm6ilh80VCVQjwA0vDMCl1cfveq6GS6X98fKwE=
2+
github.com/appleboy/queue v0.0.3/go.mod h1:6Mn0z4hURZW/26huvRXG0SJ4o7pBdo6hOryRiegy/4Q=
3+
github.com/appleboy/queue v0.0.4-0.20210725052905-4ef008157945 h1:Mec41QJ0hGDtbyfWgHfuXKqTu0a8hSIIBhL3sNbNjqQ=
4+
github.com/appleboy/queue v0.0.4-0.20210725052905-4ef008157945/go.mod h1:6Mn0z4hURZW/26huvRXG0SJ4o7pBdo6hOryRiegy/4Q=
5+
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
6+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
7+
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
8+
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
9+
github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk=
10+
github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
11+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
12+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
13+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
14+
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
15+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
16+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
17+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
18+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

_example/nsq/main.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
"github.com/appleboy/queue"
10+
"github.com/appleboy/queue/nsq"
11+
)
12+
13+
type job struct {
14+
Message string
15+
}
16+
17+
func (j *job) Bytes() []byte {
18+
b, err := json.Marshal(j)
19+
if err != nil {
20+
panic(err)
21+
}
22+
return b
23+
}
24+
25+
func main() {
26+
taskN := 100
27+
rets := make(chan string, taskN)
28+
29+
// define the worker
30+
w := nsq.NewWorker(
31+
nsq.WithAddr("127.0.0.1:4150"),
32+
nsq.WithTopic("example"),
33+
nsq.WithChannel("foobar"),
34+
// concurrent job number
35+
nsq.WithMaxInFlight(10),
36+
nsq.WithRunFunc(func(m queue.QueuedMessage) error {
37+
v, ok := m.(*job)
38+
if !ok {
39+
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
40+
return err
41+
}
42+
}
43+
44+
rets <- v.Message
45+
return nil
46+
}),
47+
)
48+
49+
// define the queue
50+
q, err := queue.NewQueue(
51+
queue.WithWorkerCount(10),
52+
queue.WithWorker(w),
53+
)
54+
if err != nil {
55+
log.Fatal(err)
56+
}
57+
58+
// start the five worker
59+
q.Start()
60+
61+
// assign tasks in queue
62+
for i := 0; i < taskN; i++ {
63+
go func(i int) {
64+
q.Queue(&job{
65+
Message: fmt.Sprintf("handle the job: %d", i+1),
66+
})
67+
}(i)
68+
}
69+
70+
// wait until all tasks done
71+
for i := 0; i < taskN; i++ {
72+
fmt.Println("message:", <-rets)
73+
time.Sleep(50 * time.Millisecond)
74+
}
75+
76+
q.Shutdown()
77+
q.Wait()
78+
}

_example/simple/main.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package main
22

33
import (
4-
"errors"
4+
"encoding/json"
55
"fmt"
66
"log"
77
"time"
@@ -11,11 +11,11 @@ import (
1111
)
1212

1313
type job struct {
14-
message string
14+
Message string
1515
}
1616

1717
func (j *job) Bytes() []byte {
18-
return []byte(j.message)
18+
return []byte(j.Message)
1919
}
2020

2121
func main() {
@@ -26,12 +26,14 @@ func main() {
2626
w := simple.NewWorker(
2727
simple.WithQueueNum(taskN),
2828
simple.WithRunFunc(func(m queue.QueuedMessage) error {
29-
j, ok := m.(*job)
29+
v, ok := m.(*job)
3030
if !ok {
31-
return errors.New("message is not job type")
31+
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
32+
return err
33+
}
3234
}
3335

34-
rets <- j.message
36+
rets <- v.Message
3537
return nil
3638
}),
3739
)
@@ -52,7 +54,7 @@ func main() {
5254
for i := 0; i < taskN; i++ {
5355
go func(i int) {
5456
q.Queue(&job{
55-
message: fmt.Sprintf("handle the job: %d", i+1),
57+
Message: fmt.Sprintf("handle the job: %d", i+1),
5658
})
5759
}(i)
5860
}

0 commit comments

Comments
 (0)