Skip to content

Commit 913886f

Browse files
committed
docs: Add example
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 92925c3 commit 913886f

File tree

2 files changed

+86
-8
lines changed

2 files changed

+86
-8
lines changed

README.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,81 @@ nsqadmin --lookupd-http-address localhost:4161
3131
```sh
3232
go test -v ./...
3333
```
34+
35+
## Example
36+
37+
```go
38+
package main
39+
40+
import (
41+
"context"
42+
"encoding/json"
43+
"fmt"
44+
"log"
45+
"time"
46+
47+
"github.com/golang-queue/nsq"
48+
"github.com/golang-queue/queue"
49+
)
50+
51+
type job struct {
52+
Message string
53+
}
54+
55+
func (j *job) Bytes() []byte {
56+
b, err := json.Marshal(j)
57+
if err != nil {
58+
panic(err)
59+
}
60+
return b
61+
}
62+
63+
func main() {
64+
taskN := 100
65+
rets := make(chan string, taskN)
66+
67+
// define the worker
68+
w := nsq.NewWorker(
69+
nsq.WithAddr("127.0.0.1:4150"),
70+
nsq.WithTopic("example"),
71+
nsq.WithChannel("foobar"),
72+
// concurrent job number
73+
nsq.WithMaxInFlight(10),
74+
nsq.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
75+
var v *job
76+
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
77+
return err
78+
}
79+
rets <- v.Message
80+
return nil
81+
}),
82+
)
83+
84+
// define the queue
85+
q := queue.NewPool(
86+
10,
87+
queue.WithWorker(w),
88+
)
89+
defer q.Release()
90+
91+
// start the five worker
92+
q.Start()
93+
94+
// assign tasks in queue
95+
for i := 0; i < taskN; i++ {
96+
go func(i int) {
97+
if err := q.Queue(&job{
98+
Message: fmt.Sprintf("handle the job: %d", i+1),
99+
}); err != nil {
100+
log.Fatal(err)
101+
}
102+
}(i)
103+
}
104+
105+
// wait until all tasks done
106+
for i := 0; i < taskN; i++ {
107+
fmt.Println("message:", <-rets)
108+
time.Sleep(50 * time.Millisecond)
109+
}
110+
}
111+
```

_example/main.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"log"
78
"time"
89

910
"github.com/golang-queue/nsq"
@@ -34,13 +35,10 @@ func main() {
3435
// concurrent job number
3536
nsq.WithMaxInFlight(10),
3637
nsq.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
37-
v, ok := m.(*job)
38-
if !ok {
39-
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
40-
return err
41-
}
38+
var v *job
39+
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
40+
return err
4241
}
43-
4442
rets <- v.Message
4543
return nil
4644
}),
@@ -59,9 +57,11 @@ func main() {
5957
// assign tasks in queue
6058
for i := 0; i < taskN; i++ {
6159
go func(i int) {
62-
q.Queue(&job{
60+
if err := q.Queue(&job{
6361
Message: fmt.Sprintf("handle the job: %d", i+1),
64-
})
62+
}); err != nil {
63+
log.Fatal(err)
64+
}
6565
}(i)
6666
}
6767

0 commit comments

Comments
 (0)