Skip to content

Commit 782c36a

Browse files
committed
fixed example. added qos to options.
1 parent eca0b81 commit 782c36a

File tree

7 files changed

+101
-44
lines changed

7 files changed

+101
-44
lines changed

bindings_options.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package rabbitmq
33
// BindingExchangeOptions are used when binding to an exchange.
44
// it will verify the exchange is created before binding to it.
55
type BindingExchangeOptions struct {
6+
DoBinding bool
67
Name string
78
Kind string
89
Durable bool
@@ -16,6 +17,7 @@ type BindingExchangeOptions struct {
1617

1718
// QueueDeclareOptions arguments to declare a queue
1819
type QueueDeclareOptions struct {
20+
DoDeclare bool
1921
QueueName string
2022
QueueDurable bool
2123
QueueAutoDelete bool
@@ -24,6 +26,18 @@ type QueueDeclareOptions struct {
2426
QueueArgs Table
2527
}
2628

29+
// QosOptions configuration
30+
type QosOptions struct {
31+
QOSPrefetchCount int
32+
QOSPrefetchSize int
33+
QOSGlobal bool
34+
}
35+
36+
// WithQueueDeclare define if queue will be declare or not
37+
func WithQueueDeclare(options *QueueDeclareOptions) {
38+
options.DoDeclare = true
39+
}
40+
2741
// WithQueueDeclareOptionsDurable sets the queue to durable, which means it won't
2842
// be destroyed when the server restarts. It must only be bound to durable exchanges
2943
func WithQueueDeclareOptionsDurable(options *QueueDeclareOptions) {
@@ -118,3 +132,8 @@ func WithBindingExchangeOptionsExchangeArgs(args Table, options *BindingExchange
118132
func WithBindingExchangeOptionsNoWait(options *BindingExchangeOptions) {
119133
options.BindingNoWait = true
120134
}
135+
136+
// WithBindingExchange define if the exchange is to be binded or not
137+
func WithBindingExchange(options *BindingExchangeOptions) {
138+
options.DoBinding = true
139+
}

consume.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -171,24 +171,26 @@ func (consumer Consumer) startGoroutines(
171171
consumer.chManager.channelMux.RLock()
172172
defer consumer.chManager.channelMux.RUnlock()
173173

174-
_, err := consumer.chManager.channel.QueueDeclare(
175-
queue,
176-
consumeOptions.QueueDeclare.QueueDurable,
177-
consumeOptions.QueueDeclare.QueueAutoDelete,
178-
consumeOptions.QueueDeclare.QueueExclusive,
179-
consumeOptions.QueueDeclare.QueueNoWait,
180-
tableToAMQPTable(consumeOptions.QueueDeclare.QueueArgs),
181-
)
182-
if err != nil {
183-
return err
174+
if consumeOptions.QueueDeclare.DoDeclare {
175+
_, err := consumer.chManager.channel.QueueDeclare(
176+
queue,
177+
consumeOptions.QueueDeclare.QueueDurable,
178+
consumeOptions.QueueDeclare.QueueAutoDelete,
179+
consumeOptions.QueueDeclare.QueueExclusive,
180+
consumeOptions.QueueDeclare.QueueNoWait,
181+
tableToAMQPTable(consumeOptions.QueueDeclare.QueueArgs),
182+
)
183+
if err != nil {
184+
return err
185+
}
184186
}
185187

186-
if consumeOptions.BindingExchange.Name != "" {
188+
if consumeOptions.BindingExchange.DoBinding {
187189
exchange := consumeOptions.BindingExchange
188190
if exchange.Name == "" {
189191
return fmt.Errorf("binding to exchange but name not specified")
190192
}
191-
err = consumer.chManager.channel.ExchangeDeclare(
193+
err := consumer.chManager.channel.ExchangeDeclare(
192194
exchange.Name,
193195
exchange.Kind,
194196
exchange.Durable,
@@ -214,10 +216,10 @@ func (consumer Consumer) startGoroutines(
214216
}
215217
}
216218

217-
err = consumer.chManager.channel.Qos(
218-
consumeOptions.QOSPrefetch,
219-
0,
220-
consumeOptions.QOSGlobal,
219+
err := consumer.chManager.channel.Qos(
220+
consumeOptions.Qos.QOSPrefetchCount,
221+
consumeOptions.Qos.QOSPrefetchSize,
222+
consumeOptions.Qos.QOSGlobal,
221223
)
222224
if err != nil {
223225
return err

consume_options.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package rabbitmq
22

3-
// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided
3+
// getDefaultConsumeOptions describes the options that will be used when a value isn't provided
44
func getDefaultConsumeOptions() ConsumeOptions {
55
return ConsumeOptions{
66
QueueDeclare: QueueDeclareOptions{
@@ -22,9 +22,12 @@ func getDefaultConsumeOptions() ConsumeOptions {
2222
BindingArgs: nil,
2323
ExchangeArgs: nil,
2424
},
25+
Qos: QosOptions{
26+
QOSPrefetchCount: 0,
27+
QOSPrefetchSize: 0,
28+
QOSGlobal: false,
29+
},
2530
Concurrency: 1,
26-
QOSPrefetch: 0,
27-
QOSGlobal: false,
2831
ConsumerName: "",
2932
ConsumerAutoAck: false,
3033
ConsumerExclusive: false,
@@ -38,9 +41,8 @@ func getDefaultConsumeOptions() ConsumeOptions {
3841
type ConsumeOptions struct {
3942
QueueDeclare QueueDeclareOptions
4043
BindingExchange BindingExchangeOptions
44+
Qos QosOptions
4145
Concurrency int
42-
QOSPrefetch int
43-
QOSGlobal bool
4446
ConsumerName string
4547
ConsumerAutoAck bool
4648
ConsumerExclusive bool
@@ -62,15 +64,15 @@ func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) {
6264
// This doesn't affect the handler, messages are still processed one at a time.
6365
func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions) {
6466
return func(options *ConsumeOptions) {
65-
options.QOSPrefetch = prefetchCount
67+
options.Qos.QOSPrefetchCount = prefetchCount
6668
}
6769
}
6870

6971
// WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means
7072
// these QOS settings apply to ALL existing and future
7173
// consumers on all channels on the same connection
7274
func WithConsumeOptionsQOSGlobal(options *ConsumeOptions) {
73-
options.QOSGlobal = true
75+
options.Qos.QOSGlobal = true
7476
}
7577

7678
// WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer

examples/consumer/main.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
package main
22

33
import (
4-
"log"
5-
64
"github.com/streadway/amqp"
75
"github.com/wagslane/go-rabbitmq"
6+
"log"
87
)
98

109
func main() {
@@ -21,15 +20,17 @@ func main() {
2120
// true to ACK, false to NACK
2221
return true
2322
},
24-
"my_queue1",
25-
[]string{"routing_key1", "routing_key_2"},
23+
"my_queue5",
24+
[]string{"routing_key_7"},
2625
rabbitmq.WithConsumeOptionsConcurrency(10),
2726
func(options *rabbitmq.ConsumeOptions) {
27+
rabbitmq.WithQueueDeclare(&options.QueueDeclare)
2828
rabbitmq.WithQueueDeclareOptionsDurable(&options.QueueDeclare)
2929
rabbitmq.WithQueueDeclareOptionsQuorum(&options.QueueDeclare)
3030
rabbitmq.WithBindingExchangeOptionsExchangeName("events", &options.BindingExchange)
3131
rabbitmq.WithBindingExchangeOptionsExchangeKind("topic", &options.BindingExchange)
3232
rabbitmq.WithBindingExchangeOptionsExchangeDurable(&options.BindingExchange)
33+
options.Qos.QOSPrefetchCount = 1
3334
},
3435
)
3536
if err != nil {

examples/publisher/main.go

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
11
package main
22

33
import (
4+
"fmt"
45
"log"
6+
"os"
7+
"time"
58

69
"github.com/streadway/amqp"
710
"github.com/wagslane/go-rabbitmq"
811
)
912

1013
func main() {
14+
do()
15+
}
16+
17+
func do() {
1118
publisher, returns, err := rabbitmq.NewPublisher(
1219
"amqp://guest:guest@localhost", amqp.Config{},
1320
rabbitmq.WithPublisherOptionsLogging,
1421
func(options *rabbitmq.PublisherOptions) {
1522
options.BindingExchange = rabbitmq.BindingExchangeOptions{
16-
Name: "beautifulExchange1",
23+
DoBinding: true,
24+
Name: "beautifulExchange5",
1725
Kind: "topic",
1826
Durable: true,
1927
AutoDelete: false,
@@ -24,11 +32,12 @@ func main() {
2432
}
2533

2634
options.RoutingKeys = []string{
27-
"routing_key1",
35+
"routing_key5",
2836
}
2937

3038
options.QueueDeclare = rabbitmq.QueueDeclareOptions{
31-
QueueName: "my_queue1",
39+
DoDeclare: true,
40+
QueueName: "my_queue5",
3241
QueueDurable: true,
3342
QueueAutoDelete: false,
3443
QueueExclusive: false,
@@ -37,26 +46,41 @@ func main() {
3746
}
3847

3948
options.QueueDeclare.QueueArgs["x-queue-type"] = "quorum"
49+
50+
options.Qos = rabbitmq.QosOptions{
51+
QOSPrefetchCount: 1,
52+
QOSPrefetchSize: 0,
53+
QOSGlobal: true,
54+
}
4055
},
4156
)
4257
if err != nil {
4358
log.Fatal(err)
44-
}
45-
err = publisher.Publish(
46-
[]byte("hello, world1"),
47-
[]string{"routing_key1"},
48-
rabbitmq.WithPublishOptionsContentType("application/json"),
49-
rabbitmq.WithPublishOptionsMandatory,
50-
rabbitmq.WithPublishOptionsPersistentDelivery,
51-
rabbitmq.WithPublishOptionsExchange("events"),
52-
)
53-
if err != nil {
54-
log.Fatal(err)
59+
return
5560
}
5661

5762
go func() {
5863
for r := range returns {
5964
log.Printf("message returned from server: %s", string(r.Body))
6065
}
6166
}()
67+
68+
idx := int64(0)
69+
for {
70+
err = publisher.Publish(
71+
[]byte(fmt.Sprintf("AAA %d", idx)),
72+
[]string{"routing_key5"},
73+
rabbitmq.WithPublishOptionsContentType("application/json"),
74+
rabbitmq.WithPublishOptionsMandatory,
75+
rabbitmq.WithPublishOptionsPersistentDelivery,
76+
rabbitmq.WithPublishOptionsExchange("beautifulExchange5"),
77+
)
78+
if err != nil {
79+
log.Fatal(err)
80+
}
81+
82+
fmt.Fprintf(os.Stderr, "%d \n", idx)
83+
idx++
84+
time.Sleep(time.Millisecond * 1000)
85+
}
6286
}

publish.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher
8585
defer publisher.chManager.channelMux.RUnlock()
8686

8787
// valid queue name, declare it
88-
if options.QueueDeclare.QueueName != "" {
88+
if options.QueueDeclare.DoDeclare {
89+
8990
_, err = publisher.chManager.channel.QueueDeclare(
9091
options.QueueDeclare.QueueName,
9192
options.QueueDeclare.QueueDurable,
@@ -101,8 +102,9 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher
101102
}
102103

103104
//valid name, bind it
104-
if options.BindingExchange.Name != "" {
105+
if options.BindingExchange.DoBinding {
105106
exchange := options.BindingExchange
107+
106108
err = publisher.chManager.channel.ExchangeDeclare(
107109
exchange.Name,
108110
exchange.Kind,
@@ -131,7 +133,13 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher
131133
}
132134
}
133135

134-
return publisher, returnChan, nil
136+
err = publisher.chManager.channel.Qos(
137+
options.Qos.QOSPrefetchCount,
138+
options.Qos.QOSPrefetchSize,
139+
options.Qos.QOSGlobal,
140+
)
141+
142+
return publisher, returnChan, err
135143
}
136144

137145
// Publish publishes the provided data to the given routing keys over the connection

publish_options.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ type PublisherOptions struct {
88
RoutingKeys []string
99
BindingExchange BindingExchangeOptions
1010
QueueDeclare QueueDeclareOptions
11+
Qos QosOptions
1112
}
1213

1314
// WithPublisherOptionsLogging sets logging to true on the consumer options

0 commit comments

Comments
 (0)