File tree Expand file tree Collapse file tree 5 files changed +171
-0
lines changed Expand file tree Collapse file tree 5 files changed +171
-0
lines changed Original file line number Diff line number Diff line change
1
+ # Example with server and client
2
+
3
+ Please refer the following steps to build server and client.
4
+
5
+ ## Build server
6
+
7
+ ``` sh
8
+ go build -o app server/main.go
9
+ ```
10
+
11
+ ## Build client
12
+
13
+ ``` sh
14
+ go build -o agent client/main.go
15
+ ```
16
+
17
+ ## Usage
18
+
19
+ Run the multiple agent. (open two console in the same terminal)
20
+
21
+ ``` sh
22
+ ./agent
23
+ ```
24
+
25
+ Publish the message.
26
+
27
+ ``` sh
28
+ ./app
29
+ ```
Original file line number Diff line number Diff line change
1
+ package main
2
+
3
+ import (
4
+ "context"
5
+ "encoding/json"
6
+ "fmt"
7
+ "time"
8
+
9
+ "github.com/golang-queue/nsq"
10
+ "github.com/golang-queue/queue"
11
+ )
12
+
13
+ type job struct {
14
+ Message string
15
+ }
16
+
17
+ func (j * job ) Bytes () []byte {
18
+ b , err := json .Marshal (j )
19
+ if err != nil {
20
+ panic (err )
21
+ }
22
+ return b
23
+ }
24
+
25
+ func main () {
26
+ taskN := 10000
27
+ rets := make (chan string , taskN )
28
+
29
+ // define the worker
30
+ w := nsq .NewWorker (
31
+ nsq .WithAddr ("127.0.0.1:4150" ),
32
+ nsq .WithTopic ("example" ),
33
+ nsq .WithChannel ("foobar" ),
34
+ // concurrent job number
35
+ nsq .WithMaxInFlight (10 ),
36
+ nsq .WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
37
+ var v * job
38
+ if err := json .Unmarshal (m .Bytes (), & v ); err != nil {
39
+ return err
40
+ }
41
+ rets <- v .Message
42
+ return nil
43
+ }),
44
+ )
45
+
46
+ // define the queue
47
+ q := queue .NewPool (
48
+ 5 ,
49
+ queue .WithWorker (w ),
50
+ )
51
+
52
+ // wait until all tasks done
53
+ for i := 0 ; i < taskN ; i ++ {
54
+ fmt .Println ("message:" , <- rets )
55
+ time .Sleep (50 * time .Millisecond )
56
+ }
57
+
58
+ // shutdown the service and notify all the worker
59
+ q .Release ()
60
+ }
Original file line number Diff line number Diff line change
1
+ module example
2
+
3
+ go 1.16
4
+
5
+ require (
6
+ github.com/golang-queue/nsq v0.0.3-0.20210907001930-28fb526f914f
7
+ github.com/golang-queue/queue v0.0.8-0.20210905095503-cc99dff8fdc3
8
+ )
Original file line number Diff line number Diff line change
1
+ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8 =
2
+ github.com/davecgh/go-spew v1.1.0 /go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38 =
3
+ github.com/golang-queue/nsq v0.0.3-0.20210907001930-28fb526f914f h1:h8Ni0qyAdBWOeOerAq/M/HUDdcqtLuxHABWTdNXKeRY =
4
+ github.com/golang-queue/nsq v0.0.3-0.20210907001930-28fb526f914f /go.mod h1:I2JzRTV5lLsgu/R6c4SXjPt8ZhyX/lsV8bhjTZP9yu0 =
5
+ github.com/golang-queue/queue v0.0.8-0.20210905085819-3cd1dfe014e2 /go.mod h1:JS5tYJacahCjafcplU5idNLX2vkYioqh6wEDX5o9Nms =
6
+ github.com/golang-queue/queue v0.0.8-0.20210905095503-cc99dff8fdc3 h1:ka4/BRgVndDi92gaeOpjnnfuUXrWr1y87x/Go9M3x3Y =
7
+ github.com/golang-queue/queue v0.0.8-0.20210905095503-cc99dff8fdc3 /go.mod h1:JS5tYJacahCjafcplU5idNLX2vkYioqh6wEDX5o9Nms =
8
+ github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4 =
9
+ github.com/golang/snappy v0.0.1 /go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q =
10
+ github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk =
11
+ github.com/nsqio/go-nsq v1.0.8 /go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY =
12
+ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM =
13
+ github.com/pmezard/go-difflib v1.0.0 /go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4 =
14
+ github.com/stretchr/objx v0.1.0 /go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME =
15
+ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY =
16
+ github.com/stretchr/testify v1.7.0 /go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg =
17
+ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 /go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0 =
18
+ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo =
19
+ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c /go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM =
Original file line number Diff line number Diff line change
1
+ package main
2
+
3
+ import (
4
+ "encoding/json"
5
+ "fmt"
6
+ "log"
7
+ "time"
8
+
9
+ "github.com/golang-queue/nsq"
10
+ "github.com/golang-queue/queue"
11
+ )
12
+
13
+ type job struct {
14
+ Message string
15
+ }
16
+
17
+ func (j * job ) Bytes () []byte {
18
+ b , err := json .Marshal (j )
19
+ if err != nil {
20
+ panic (err )
21
+ }
22
+ return b
23
+ }
24
+
25
+ func main () {
26
+ taskN := 100
27
+
28
+ // define the worker
29
+ w := nsq .NewWorker (
30
+ nsq .WithAddr ("127.0.0.1:4150" ),
31
+ nsq .WithTopic ("example" ),
32
+ nsq .WithChannel ("foobar" ),
33
+ )
34
+
35
+ // define the queue
36
+ q := queue .NewPool (
37
+ 0 ,
38
+ queue .WithWorker (w ),
39
+ )
40
+
41
+ // assign tasks in queue
42
+ for i := 0 ; i < taskN ; i ++ {
43
+ go func (i int ) {
44
+ if err := q .Queue (& job {
45
+ Message : fmt .Sprintf ("handle the job: %d" , i + 1 ),
46
+ }); err != nil {
47
+ log .Fatal (err )
48
+ }
49
+ }(i )
50
+ }
51
+
52
+ time .Sleep (1 * time .Second )
53
+ // shutdown the service and notify all the worker
54
+ q .Release ()
55
+ }
You can’t perform that action at this time.
0 commit comments