Skip to content

Commit 627a8c8

Browse files
committed
docs: Add producer and consumer example
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 535aad4 commit 627a8c8

File tree

5 files changed

+188
-0
lines changed

5 files changed

+188
-0
lines changed

_example/producer-consumer/README.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Example with server and client
2+
3+
Please refer the following steps to build server and client.
4+
5+
## Build server
6+
7+
```sh
8+
go build -o app server/main.go
9+
```
10+
11+
## Build client
12+
13+
```sh
14+
go build -o agent client/main.go
15+
```
16+
17+
## Usage
18+
19+
Run the multiple agent. (open two console in the same terminal)
20+
21+
```sh
22+
./agent
23+
```
24+
25+
Publish the message.
26+
27+
```sh
28+
./app
29+
```
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"time"
8+
9+
"github.com/appleboy/graceful"
10+
"github.com/golang-queue/queue"
11+
"github.com/golang-queue/queue/core"
12+
"github.com/golang-queue/rabbitmq"
13+
)
14+
15+
type job struct {
16+
Message string
17+
}
18+
19+
func (j *job) Bytes() []byte {
20+
b, err := json.Marshal(j)
21+
if err != nil {
22+
panic(err)
23+
}
24+
return b
25+
}
26+
27+
func main() {
28+
taskN := 10000
29+
rets := make(chan string, taskN)
30+
31+
m := graceful.NewManager()
32+
33+
// define the worker
34+
w := rabbitmq.NewWorker(
35+
rabbitmq.WithSubj("sample_2"),
36+
rabbitmq.WithExchangeType("fanout"),
37+
rabbitmq.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
38+
var v *job
39+
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
40+
return err
41+
}
42+
rets <- v.Message
43+
time.Sleep(500 * time.Millisecond)
44+
return nil
45+
}),
46+
)
47+
// define the queue
48+
q := queue.NewPool(
49+
2,
50+
queue.WithWorker(w),
51+
)
52+
53+
m.AddRunningJob(func(ctx context.Context) error {
54+
for {
55+
select {
56+
case <-ctx.Done():
57+
select {
58+
case m := <-rets:
59+
fmt.Println("message:", m)
60+
default:
61+
}
62+
return nil
63+
case m := <-rets:
64+
fmt.Println("message:", m)
65+
time.Sleep(50 * time.Millisecond)
66+
}
67+
}
68+
})
69+
70+
m.AddShutdownJob(func() error {
71+
// shutdown the service and notify all the worker
72+
q.Release()
73+
return nil
74+
})
75+
76+
<-m.Done()
77+
}

_example/producer-consumer/go.mod

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
module example
2+
3+
go 1.18
4+
5+
require (
6+
github.com/appleboy/graceful v0.0.4
7+
github.com/golang-queue/queue v0.1.3
8+
github.com/golang-queue/rabbitmq v0.0.3-0.20210907015837-3e2e4b448b3d
9+
)
10+
11+
require (
12+
github.com/goccy/go-json v0.9.7 // indirect
13+
github.com/rabbitmq/amqp091-go v1.3.4 // indirect
14+
)
15+
16+
replace github.com/golang-queue/rabbitmq => ../../

_example/producer-consumer/go.sum

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
github.com/appleboy/graceful v0.0.4 h1:Q4LCeq4DFy59qiACLtuH+mSqDERtUzwkQbCWpRaWwvQ=
2+
github.com/appleboy/graceful v0.0.4/go.mod h1:Q2mVx0t+N0lCDZc5MJudbcpTm6cgGM/J2gZCZIqD9dc=
3+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
4+
github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM=
5+
github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
6+
github.com/golang-queue/queue v0.1.3 h1:FGIrn8e0fN8EmL3glP0rFEcYVtWUGMEeqX4h4nnzh40=
7+
github.com/golang-queue/queue v0.1.3/go.mod h1:h/PhaoMwT5Jc4sQNus7APgWBUItm6QC9k6JtmwrsRos=
8+
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
9+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
10+
github.com/rabbitmq/amqp091-go v1.3.4 h1:tXuIslN1nhDqs2t6Jrz3BAoqvt4qIZzxvdbdcxWtHYU=
11+
github.com/rabbitmq/amqp091-go v1.3.4/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
12+
github.com/stretchr/testify v1.7.3 h1:dAm0YRdRQlWojc3CrCRgPBzG5f941d0zvAKu7qY4e+I=
13+
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
14+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
"github.com/golang-queue/queue"
10+
"github.com/golang-queue/rabbitmq"
11+
)
12+
13+
type job struct {
14+
Message string
15+
}
16+
17+
func (j *job) Bytes() []byte {
18+
b, err := json.Marshal(j)
19+
if err != nil {
20+
panic(err)
21+
}
22+
return b
23+
}
24+
25+
func main() {
26+
taskN := 100
27+
28+
// define the worker
29+
w := rabbitmq.NewWorker(
30+
rabbitmq.WithSubj("sample_2"),
31+
rabbitmq.WithExchangeType("fanout"),
32+
)
33+
34+
// define the queue
35+
q := queue.NewPool(
36+
0,
37+
queue.WithWorker(w),
38+
)
39+
40+
// assign tasks in queue
41+
for i := 0; i < taskN; i++ {
42+
if err := q.Queue(&job{
43+
Message: fmt.Sprintf("handle the job: %d", i+1),
44+
}); err != nil {
45+
log.Fatal(err)
46+
}
47+
}
48+
49+
time.Sleep(1 * time.Second)
50+
// shutdown the service and notify all the worker
51+
q.Release()
52+
}

0 commit comments

Comments
 (0)