Skip to content

Commit eca0b81

Browse files
authored
Merge pull request #1 from rsnullptr/publisher-binding-exchange
Publisher binding exchange
2 parents 0f20aff + bd05607 commit eca0b81

File tree

7 files changed

+332
-238
lines changed

7 files changed

+332
-238
lines changed

bindings_options.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package rabbitmq
2+
3+
// BindingExchangeOptions are used when binding to an exchange.
4+
// it will verify the exchange is created before binding to it.
5+
type BindingExchangeOptions struct {
6+
Name string
7+
Kind string
8+
Durable bool
9+
AutoDelete bool
10+
Internal bool
11+
NoWait bool
12+
BindingNoWait bool
13+
BindingArgs Table
14+
ExchangeArgs Table
15+
}
16+
17+
// QueueDeclareOptions arguments to declare a queue
18+
type QueueDeclareOptions struct {
19+
QueueName string
20+
QueueDurable bool
21+
QueueAutoDelete bool
22+
QueueExclusive bool
23+
QueueNoWait bool
24+
QueueArgs Table
25+
}
26+
27+
// WithQueueDeclareOptionsDurable sets the queue to durable, which means it won't
28+
// be destroyed when the server restarts. It must only be bound to durable exchanges
29+
func WithQueueDeclareOptionsDurable(options *QueueDeclareOptions) {
30+
options.QueueDurable = true
31+
}
32+
33+
// WithQueueDeclareOptionsAutoDelete sets the queue to auto delete, which means it will
34+
// be deleted when there are no more consumers on it
35+
func WithQueueDeclareOptionsAutoDelete(options *QueueDeclareOptions) {
36+
options.QueueAutoDelete = true
37+
}
38+
39+
// WithQueueDeclareOptionsExclusive sets the queue to exclusive, which means
40+
// it's are only accessible by the connection that declares it and
41+
// will be deleted when the connection closes. Channels on other connections
42+
// will receive an error when attempting to declare, bind, consume, purge or
43+
// delete a queue with the same name.
44+
func WithQueueDeclareOptionsExclusive(options *QueueDeclareOptions) {
45+
options.QueueExclusive = true
46+
}
47+
48+
// WithQueueDeclareOptionsNoWait sets the queue to nowait, which means
49+
// the queue will assume to be declared on the server. A
50+
// channel exception will arrive if the conditions are met for existing queues
51+
// or attempting to modify an existing queue from a different connection.
52+
func WithQueueDeclareOptionsNoWait(options *QueueDeclareOptions) {
53+
options.QueueNoWait = true
54+
}
55+
56+
// WithQueueDeclareOptionsQuorum sets the queue a quorum type, which means multiple nodes
57+
// in the cluster will have the messages distributed amongst them for higher reliability
58+
func WithQueueDeclareOptionsQuorum(options *QueueDeclareOptions) {
59+
if options.QueueArgs == nil {
60+
options.QueueArgs = Table{}
61+
}
62+
options.QueueArgs["x-queue-type"] = "quorum"
63+
}
64+
65+
// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values.
66+
func getBindingExchangeOptionsOrSetDefault(options *BindingExchangeOptions) *BindingExchangeOptions {
67+
if options == nil {
68+
options = &BindingExchangeOptions{
69+
Name: "",
70+
Kind: "direct",
71+
Durable: false,
72+
AutoDelete: false,
73+
Internal: false,
74+
NoWait: false,
75+
ExchangeArgs: nil,
76+
}
77+
}
78+
return options
79+
}
80+
81+
// WithBindingExchangeOptionsExchangeName returns a function that sets the exchange name the queue will be bound to
82+
func WithBindingExchangeOptionsExchangeName(name string, options *BindingExchangeOptions) {
83+
options.Name = name
84+
}
85+
86+
// WithBindingExchangeOptionsExchangeKind returns a function that sets the binding exchange kind/type
87+
func WithBindingExchangeOptionsExchangeKind(kind string, options *BindingExchangeOptions) {
88+
options.Kind = kind
89+
}
90+
91+
// WithBindingExchangeOptionsExchangeDurable returns a function that sets the binding exchange durable flag
92+
func WithBindingExchangeOptionsExchangeDurable(options *BindingExchangeOptions) {
93+
options.Durable = true
94+
}
95+
96+
// WithBindingExchangeOptionsExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag
97+
func WithBindingExchangeOptionsExchangeAutoDelete(options *BindingExchangeOptions) {
98+
options.AutoDelete = true
99+
}
100+
101+
// WithBindingExchangeOptionsExchangeInternal returns a function that sets the binding exchange internal flag
102+
func WithBindingExchangeOptionsExchangeInternal(options *BindingExchangeOptions) {
103+
options.Internal = true
104+
}
105+
106+
// WithBindingExchangeOptionsExchangeNoWait returns a function that sets the binding exchange noWait flag
107+
func WithBindingExchangeOptionsExchangeNoWait(options *BindingExchangeOptions) {
108+
options.NoWait = true
109+
}
110+
111+
// WithBindingExchangeOptionsExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange
112+
func WithBindingExchangeOptionsExchangeArgs(args Table, options *BindingExchangeOptions) {
113+
options.ExchangeArgs = args
114+
}
115+
116+
// WithBindingExchangeOptionsNoWait sets the bindings to nowait, which means if the queue can not be bound
117+
// the channel will not be closed with an error.
118+
func WithBindingExchangeOptionsNoWait(options *BindingExchangeOptions) {
119+
options.BindingNoWait = true
120+
}

consume.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -173,17 +173,17 @@ func (consumer Consumer) startGoroutines(
173173

174174
_, err := consumer.chManager.channel.QueueDeclare(
175175
queue,
176-
consumeOptions.QueueDurable,
177-
consumeOptions.QueueAutoDelete,
178-
consumeOptions.QueueExclusive,
179-
consumeOptions.QueueNoWait,
180-
tableToAMQPTable(consumeOptions.QueueArgs),
176+
consumeOptions.QueueDeclare.QueueDurable,
177+
consumeOptions.QueueDeclare.QueueAutoDelete,
178+
consumeOptions.QueueDeclare.QueueExclusive,
179+
consumeOptions.QueueDeclare.QueueNoWait,
180+
tableToAMQPTable(consumeOptions.QueueDeclare.QueueArgs),
181181
)
182182
if err != nil {
183183
return err
184184
}
185185

186-
if consumeOptions.BindingExchange != nil {
186+
if consumeOptions.BindingExchange.Name != "" {
187187
exchange := consumeOptions.BindingExchange
188188
if exchange.Name == "" {
189189
return fmt.Errorf("binding to exchange but name not specified")
@@ -205,8 +205,8 @@ func (consumer Consumer) startGoroutines(
205205
queue,
206206
routingKey,
207207
exchange.Name,
208-
consumeOptions.BindingNoWait,
209-
tableToAMQPTable(consumeOptions.BindingArgs),
208+
consumeOptions.BindingExchange.BindingNoWait,
209+
tableToAMQPTable(consumeOptions.BindingExchange.BindingArgs),
210210
)
211211
if err != nil {
212212
return err

consume_options.go

Lines changed: 21 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,25 @@ package rabbitmq
33
// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided
44
func getDefaultConsumeOptions() ConsumeOptions {
55
return ConsumeOptions{
6-
QueueDurable: false,
7-
QueueAutoDelete: false,
8-
QueueExclusive: false,
9-
QueueNoWait: false,
10-
QueueArgs: nil,
11-
BindingExchange: nil,
12-
BindingNoWait: false,
13-
BindingArgs: nil,
6+
QueueDeclare: QueueDeclareOptions{
7+
QueueName: "",
8+
QueueDurable: false,
9+
QueueAutoDelete: false,
10+
QueueExclusive: false,
11+
QueueNoWait: false,
12+
QueueArgs: nil,
13+
},
14+
BindingExchange: BindingExchangeOptions{
15+
Name: "",
16+
Kind: "",
17+
Durable: false,
18+
AutoDelete: false,
19+
Internal: false,
20+
NoWait: false,
21+
BindingNoWait: false,
22+
BindingArgs: nil,
23+
ExchangeArgs: nil,
24+
},
1425
Concurrency: 1,
1526
QOSPrefetch: 0,
1627
QOSGlobal: false,
@@ -25,14 +36,8 @@ func getDefaultConsumeOptions() ConsumeOptions {
2536

2637
// ConsumeOptions are used to describe how a new consumer will be created.
2738
type ConsumeOptions struct {
28-
QueueDurable bool
29-
QueueAutoDelete bool
30-
QueueExclusive bool
31-
QueueNoWait bool
32-
QueueArgs Table
33-
BindingExchange *BindingExchangeOptions
34-
BindingNoWait bool
35-
BindingArgs Table
39+
QueueDeclare QueueDeclareOptions
40+
BindingExchange BindingExchangeOptions
3641
Concurrency int
3742
QOSPrefetch int
3843
QOSGlobal bool
@@ -44,119 +49,6 @@ type ConsumeOptions struct {
4449
ConsumerArgs Table
4550
}
4651

47-
// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values.
48-
func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions {
49-
if options.BindingExchange == nil {
50-
options.BindingExchange = &BindingExchangeOptions{
51-
Name: "",
52-
Kind: "direct",
53-
Durable: false,
54-
AutoDelete: false,
55-
Internal: false,
56-
NoWait: false,
57-
ExchangeArgs: nil,
58-
}
59-
}
60-
return options.BindingExchange
61-
}
62-
63-
// BindingExchangeOptions are used when binding to an exchange.
64-
// it will verify the exchange is created before binding to it.
65-
type BindingExchangeOptions struct {
66-
Name string
67-
Kind string
68-
Durable bool
69-
AutoDelete bool
70-
Internal bool
71-
NoWait bool
72-
ExchangeArgs Table
73-
}
74-
75-
// WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't
76-
// be destroyed when the server restarts. It must only be bound to durable exchanges
77-
func WithConsumeOptionsQueueDurable(options *ConsumeOptions) {
78-
options.QueueDurable = true
79-
}
80-
81-
// WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will
82-
// be deleted when there are no more conusmers on it
83-
func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) {
84-
options.QueueAutoDelete = true
85-
}
86-
87-
// WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means
88-
// it's are only accessible by the connection that declares it and
89-
// will be deleted when the connection closes. Channels on other connections
90-
// will receive an error when attempting to declare, bind, consume, purge or
91-
// delete a queue with the same name.
92-
func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) {
93-
options.QueueExclusive = true
94-
}
95-
96-
// WithConsumeOptionsQueueNoWait sets the queue to nowait, which means
97-
// the queue will assume to be declared on the server. A
98-
// channel exception will arrive if the conditions are met for existing queues
99-
// or attempting to modify an existing queue from a different connection.
100-
func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) {
101-
options.QueueNoWait = true
102-
}
103-
104-
// WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes
105-
// in the cluster will have the messages distributed amongst them for higher reliability
106-
func WithConsumeOptionsQuorum(options *ConsumeOptions) {
107-
if options.QueueArgs == nil {
108-
options.QueueArgs = Table{}
109-
}
110-
options.QueueArgs["x-queue-type"] = "quorum"
111-
}
112-
113-
// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to
114-
func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) {
115-
return func(options *ConsumeOptions) {
116-
getBindingExchangeOptionsOrSetDefault(options).Name = name
117-
}
118-
}
119-
120-
// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type
121-
func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) {
122-
return func(options *ConsumeOptions) {
123-
getBindingExchangeOptionsOrSetDefault(options).Kind = kind
124-
}
125-
}
126-
127-
// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag
128-
func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) {
129-
getBindingExchangeOptionsOrSetDefault(options).Durable = true
130-
}
131-
132-
// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag
133-
func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) {
134-
getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true
135-
}
136-
137-
// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag
138-
func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) {
139-
getBindingExchangeOptionsOrSetDefault(options).Internal = true
140-
}
141-
142-
// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag
143-
func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) {
144-
getBindingExchangeOptionsOrSetDefault(options).NoWait = true
145-
}
146-
147-
// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange
148-
func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) {
149-
return func(options *ConsumeOptions) {
150-
getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args
151-
}
152-
}
153-
154-
// WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound
155-
// the channel will not be closed with an error.
156-
func WithConsumeOptionsBindingNoWait(options *ConsumeOptions) {
157-
options.BindingNoWait = true
158-
}
159-
16052
// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that
16153
// many goroutines will be spawned to run the provided handler on messages
16254
func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) {

examples/consumer/main.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"log"
55

66
"github.com/streadway/amqp"
7-
rabbitmq "github.com/wagslane/go-rabbitmq"
7+
"github.com/wagslane/go-rabbitmq"
88
)
99

1010
func main() {
@@ -21,14 +21,16 @@ func main() {
2121
// true to ACK, false to NACK
2222
return true
2323
},
24-
"my_queue",
25-
[]string{"routing_key", "routing_key_2"},
24+
"my_queue1",
25+
[]string{"routing_key1", "routing_key_2"},
2626
rabbitmq.WithConsumeOptionsConcurrency(10),
27-
rabbitmq.WithConsumeOptionsQueueDurable,
28-
rabbitmq.WithConsumeOptionsQuorum,
29-
rabbitmq.WithConsumeOptionsBindingExchangeName("events"),
30-
rabbitmq.WithConsumeOptionsBindingExchangeKind("topic"),
31-
rabbitmq.WithConsumeOptionsBindingExchangeDurable,
27+
func(options *rabbitmq.ConsumeOptions) {
28+
rabbitmq.WithQueueDeclareOptionsDurable(&options.QueueDeclare)
29+
rabbitmq.WithQueueDeclareOptionsQuorum(&options.QueueDeclare)
30+
rabbitmq.WithBindingExchangeOptionsExchangeName("events", &options.BindingExchange)
31+
rabbitmq.WithBindingExchangeOptionsExchangeKind("topic", &options.BindingExchange)
32+
rabbitmq.WithBindingExchangeOptionsExchangeDurable(&options.BindingExchange)
33+
},
3234
)
3335
if err != nil {
3436
log.Fatal(err)

0 commit comments

Comments
 (0)