Skip to content

Commit 98ce018

Browse files
authored
docs: add documentation (#8)
1 parent b3fad19 commit 98ce018

File tree

5 files changed

+276
-8
lines changed

5 files changed

+276
-8
lines changed

README.md

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,148 @@
11
# goqueue
22

3-
GoQueue - Golang Queue Wrapper for all queue platforms
3+
GoQueue - one library to rule them all. A golang wrapper that handles all the complexity of every Queue platforms. Extensible and easy to learn
4+
5+
## Index
6+
7+
- [Support](#support)
8+
- [Getting Started](#getting-started)
9+
- [Example](#example)
10+
- [Contribution](#contribution)
11+
12+
## Support
13+
14+
You can file an [Issue](https://github.com/bxcodec/goqueue/issues/new).
15+
See documentation in [Go.Dev](https://pkg.go.dev/github.com/bxcodec/goqueue?tab=doc)
16+
17+
## Getting Started
18+
19+
#### Install
20+
21+
```shell
22+
go get -u github.com/bxcodec/goqueue
23+
```
24+
25+
# Example
26+
27+
```go
28+
package main
29+
30+
import (
31+
"context"
32+
"encoding/json"
33+
"fmt"
34+
"time"
35+
36+
amqp "github.com/rabbitmq/amqp091-go"
37+
38+
"github.com/bxcodec/goqueue"
39+
"github.com/bxcodec/goqueue/consumer"
40+
rmqConsumer "github.com/bxcodec/goqueue/consumer/rabbitmq"
41+
"github.com/bxcodec/goqueue/middleware"
42+
"github.com/bxcodec/goqueue/publisher"
43+
rmqPublisher "github.com/bxcodec/goqueue/publisher/rabbitmq"
44+
)
45+
46+
func initExchange(ch *amqp.Channel, exchangeName string) error {
47+
return ch.ExchangeDeclare(
48+
exchangeName,
49+
"topic",
50+
true,
51+
false,
52+
false,
53+
false,
54+
nil,
55+
)
56+
}
57+
58+
func main() {
59+
rmqDSN := "amqp://rabbitmq:rabbitmq@localhost:5672/"
60+
rmqConn, err := amqp.Dial(rmqDSN)
61+
if err != nil {
62+
panic(err)
63+
}
64+
65+
rmqPub := rmqPublisher.NewPublisher(rmqConn,
66+
publisher.WithPublisherID("publisher_id"),
67+
publisher.WithMiddlewares(
68+
middleware.HelloWorldMiddlewareExecuteBeforePublisher(),
69+
middleware.HelloWorldMiddlewareExecuteAfterPublisher()),
70+
)
71+
72+
publisherChannel, err := rmqConn.Channel()
73+
if err != nil {
74+
panic(err)
75+
}
76+
77+
defer publisherChannel.Close()
78+
initExchange(publisherChannel, "goqueue")
79+
80+
consumerChannel, err := rmqConn.Channel()
81+
if err != nil {
82+
panic(err)
83+
}
84+
defer consumerChannel.Close()
85+
86+
rmqConsumer := rmqConsumer.NewConsumer(
87+
publisherChannel,
88+
consumerChannel,
89+
consumer.WithMiddlewares(
90+
middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(),
91+
middleware.HelloWorldMiddlewareExecuteBeforeInboundMessageHandler()),
92+
consumer.WithQueueName("consumer_queue"),
93+
consumer.WithConsumerID("consumer_id"),
94+
consumer.WithBatchMessageSize(1),
95+
consumer.WithMaxRetryFailedMessage(3),
96+
consumer.WithActionsPatternSubscribed("goqueue.payments.#", "goqueue.users.#"),
97+
consumer.WithTopicName("goqueue"),
98+
)
99+
100+
queueSvc := goqueue.NewQueueService(
101+
goqueue.WithPublisher(rmqPub),
102+
goqueue.WithConsumer(rmqConsumer),
103+
goqueue.WithMessageHandler(handler()),
104+
)
105+
106+
go func() {
107+
for i := 0; i < 10; i++ {
108+
data := map[string]interface{}{
109+
"message": fmt.Sprintf("Hello World %d", i),
110+
}
111+
jbyt, _ := json.Marshal(data)
112+
err := queueSvc.Publish(context.Background(), goqueue.Message{
113+
Data: data,
114+
Action: "goqueue.payments.create",
115+
Topic: "goqueue",
116+
})
117+
if err != nil {
118+
panic(err)
119+
}
120+
fmt.Println("Message Sent: ", string(jbyt))
121+
}
122+
}()
123+
124+
// change to context.Background() if you want to run it forever
125+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
126+
defer cancel()
127+
err = queueSvc.Start(ctx)
128+
if err != nil {
129+
panic(err)
130+
}
131+
}
132+
133+
func handler() goqueue.InboundMessageHandlerFunc {
134+
return func(ctx context.Context, m goqueue.InboundMessage) (err error) {
135+
data := m.Data
136+
jbyt, _ := json.Marshal(data)
137+
fmt.Println("Message Received: ", string(jbyt))
138+
return m.Ack(ctx)
139+
}
140+
}
141+
142+
```
143+
144+
## Contribution
145+
146+
---
147+
148+
To contrib to this project, you can open a PR or an issue.

consumer/option.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type Option struct {
1818
ActionsPatternSubscribed []string
1919
TopicName string
2020
MaxRetryFailedMessage int64
21+
ConsumerID string
2122
}
2223

2324
// OptionFunc is a function type that takes an `opt` parameter of type `*Option`.
@@ -73,3 +74,10 @@ func WithMaxRetryFailedMessage(n int64) OptionFunc {
7374
opt.MaxRetryFailedMessage = n
7475
}
7576
}
77+
78+
// WithConsumerID sets the consumer ID for the consumer option.
79+
func WithConsumerID(id string) OptionFunc {
80+
return func(opt *Option) {
81+
opt.ConsumerID = id
82+
}
83+
}

consumer/rabbitmq/blackbox_consumer_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func (s *rabbitMQTestSuite) seedPublish(contentType string, action string) {
162162
func (s *rabbitMQTestSuite) TestConsumerWithoutExchangePatternProvided() {
163163
s.initQueueForTesting(s.T(), "goqueue.action.#")
164164
s.seedPublish(string(headerVal.ContentTypeJSON), testAction)
165-
rmqSubs := rmq.NewConsumer(s.conn,
165+
rmqSubs := rmq.NewConsumer(
166166
s.consumerChannel,
167167
s.publishChannel,
168168
consumer.WithBatchMessageSize(1),
@@ -186,7 +186,7 @@ func (s *rabbitMQTestSuite) TestConsumerWithoutExchangePatternProvided() {
186186
func (s *rabbitMQTestSuite) TestConsumerWithExchangePatternProvided() {
187187
s.seedPublish(string(headerVal.ContentTypeJSON), testAction)
188188

189-
rmqSubs := rmq.NewConsumer(s.conn,
189+
rmqSubs := rmq.NewConsumer(
190190
s.consumerChannel,
191191
s.publishChannel,
192192
consumer.WithBatchMessageSize(1),
@@ -234,7 +234,7 @@ func handler(t *testing.T, expected goqueue.Message) goqueue.InboundMessageHandl
234234
func (s *rabbitMQTestSuite) TestRequeueWithouthExchangePatternProvided() {
235235
s.initQueueForTesting(s.T(), "goqueue.action.#")
236236
s.seedPublish(string(headerVal.ContentTypeJSON), testActionRequeue)
237-
rmqSubs := rmq.NewConsumer(s.conn,
237+
rmqSubs := rmq.NewConsumer(
238238
s.consumerChannel,
239239
s.publishChannel,
240240
consumer.WithBatchMessageSize(1),

consumer/rabbitmq/consumer.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020

2121
// rabbitMQ is the subscriber handler for rabbitmq
2222
type rabbitMQ struct {
23-
client *amqp.Connection
2423
consumerChannel *amqp.Channel
2524
requeueChannel *amqp.Channel //if want requeue support to another queue
2625
option *consumer.Option
@@ -38,21 +37,20 @@ var defaultOption = func() *consumer.Option {
3837

3938
// New will initialize the rabbitMQ subscriber
4039
func NewConsumer(
41-
client *amqp.Connection,
4240
consumerChannel *amqp.Channel,
4341
requeueChannel *amqp.Channel,
4442
opts ...consumer.OptionFunc) goqueue.Consumer {
4543
opt := defaultOption()
4644
for _, o := range opts {
4745
o(opt)
4846
}
47+
4948
rmqHandler := &rabbitMQ{
50-
client: client,
5149
consumerChannel: consumerChannel,
5250
requeueChannel: requeueChannel,
5351
option: opt,
5452
}
55-
if len(opt.ActionsPatternSubscribed) > 0 {
53+
if len(opt.ActionsPatternSubscribed) > 0 && opt.TopicName != "" {
5654
rmqHandler.initQueue()
5755
}
5856
rmqHandler.initConsumer()
@@ -99,6 +97,10 @@ func (r *rabbitMQ) initQueue() {
9997
// If an error occurs during the initialization, it logs the error and exits the program.
10098
func (r *rabbitMQ) initConsumer() {
10199
r.tagName = fmt.Sprintf("%s-%s", r.option.QueueName, uuid.New().String())
100+
if r.option.ConsumerID != "" {
101+
r.tagName = r.option.ConsumerID
102+
}
103+
102104
err := r.consumerChannel.Qos(r.option.BatchMessageSize, 0, false)
103105
if err != nil {
104106
logrus.Fatal("error when setting the prefetch count, ", err)

examples/rabbitmq/main.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"time"
8+
9+
amqp "github.com/rabbitmq/amqp091-go"
10+
11+
"github.com/bxcodec/goqueue"
12+
"github.com/bxcodec/goqueue/consumer"
13+
rmqConsumer "github.com/bxcodec/goqueue/consumer/rabbitmq"
14+
"github.com/bxcodec/goqueue/middleware"
15+
"github.com/bxcodec/goqueue/publisher"
16+
rmqPublisher "github.com/bxcodec/goqueue/publisher/rabbitmq"
17+
)
18+
19+
func initExchange(ch *amqp.Channel, exchangeName string) error {
20+
return ch.ExchangeDeclare(
21+
exchangeName,
22+
"topic",
23+
true,
24+
false,
25+
false,
26+
false,
27+
nil,
28+
)
29+
}
30+
31+
func main() {
32+
rmqDSN := "amqp://rabbitmq:rabbitmq@localhost:5672/"
33+
rmqConn, err := amqp.Dial(rmqDSN)
34+
if err != nil {
35+
panic(err)
36+
}
37+
38+
rmqPub := rmqPublisher.NewPublisher(rmqConn,
39+
publisher.WithPublisherID("publisher_id"),
40+
publisher.WithMiddlewares(
41+
middleware.HelloWorldMiddlewareExecuteBeforePublisher(),
42+
middleware.HelloWorldMiddlewareExecuteAfterPublisher()),
43+
)
44+
45+
publisherChannel, err := rmqConn.Channel()
46+
if err != nil {
47+
panic(err)
48+
}
49+
50+
defer publisherChannel.Close()
51+
initExchange(publisherChannel, "goqueue")
52+
53+
consumerChannel, err := rmqConn.Channel()
54+
if err != nil {
55+
panic(err)
56+
}
57+
defer consumerChannel.Close()
58+
59+
rmqConsumer := rmqConsumer.NewConsumer(
60+
publisherChannel,
61+
consumerChannel,
62+
consumer.WithMiddlewares(
63+
middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(),
64+
middleware.HelloWorldMiddlewareExecuteBeforeInboundMessageHandler()),
65+
consumer.WithQueueName("consumer_queue"),
66+
consumer.WithConsumerID("consumer_id"),
67+
consumer.WithBatchMessageSize(1),
68+
consumer.WithMaxRetryFailedMessage(3),
69+
consumer.WithActionsPatternSubscribed("goqueue.payments.#", "goqueue.users.#"),
70+
consumer.WithTopicName("goqueue"),
71+
)
72+
73+
queueSvc := goqueue.NewQueueService(
74+
goqueue.WithPublisher(rmqPub),
75+
goqueue.WithConsumer(rmqConsumer),
76+
goqueue.WithMessageHandler(handler()),
77+
)
78+
79+
go func() {
80+
for i := 0; i < 10; i++ {
81+
data := map[string]interface{}{
82+
"message": fmt.Sprintf("Hello World %d", i),
83+
}
84+
jbyt, _ := json.Marshal(data)
85+
err := queueSvc.Publish(context.Background(), goqueue.Message{
86+
Data: data,
87+
Action: "goqueue.payments.create",
88+
Topic: "goqueue",
89+
})
90+
if err != nil {
91+
panic(err)
92+
}
93+
fmt.Println("Message Sent: ", string(jbyt))
94+
}
95+
}()
96+
97+
// change to context.Background() if you want to run it forever
98+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
99+
defer cancel()
100+
err = queueSvc.Start(ctx)
101+
if err != nil {
102+
panic(err)
103+
}
104+
}
105+
106+
func handler() goqueue.InboundMessageHandlerFunc {
107+
return func(ctx context.Context, m goqueue.InboundMessage) (err error) {
108+
data := m.Data
109+
jbyt, _ := json.Marshal(data)
110+
fmt.Println("Message Received: ", string(jbyt))
111+
return m.Ack(ctx)
112+
}
113+
}

0 commit comments

Comments
 (0)