Skip to content

Commit d19f0c3

Browse files
committed
chore(queue): "add exchange name and type default is direct"
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 63feee0 commit d19f0c3

File tree

2 files changed

+48
-6
lines changed

2 files changed

+48
-6
lines changed

options.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,18 @@ import (
1010
// Option for queue system
1111
type Option func(*options)
1212

13+
// AMQP 0-9-1 Model Explained
14+
// ref: https://www.rabbitmq.com/tutorials/amqp-concepts.html
1315
type options struct {
1416
runFunc func(context.Context, core.QueuedMessage) error
1517
logger queue.Logger
1618
addr string
1719
subj string
1820
tag string
21+
// Durable AMQP exchange name
22+
exchangeName string
23+
// Exchange Types: Direct, Fanout, Topic and Headers
24+
exchangeType string
1925
}
2026

2127
// WithAddr setup the URI
@@ -25,6 +31,28 @@ func WithAddr(addr string) Option {
2531
}
2632
}
2733

34+
// WithExchangeName setup the Exchange name
35+
// Exchanges are AMQP 0-9-1 entities where messages are sent to.
36+
// Exchanges take a message and route it into zero or more queues.
37+
func WithExchangeName(val string) Option {
38+
return func(w *options) {
39+
w.exchangeName = val
40+
}
41+
}
42+
43+
// WithExchangeType setup the Exchange type
44+
// The routing algorithm used depends on the exchange type and rules called bindings.
45+
// AMQP 0-9-1 brokers provide four exchange types:
46+
// Direct exchange (Empty string) and amq.direct
47+
// Fanout exchange amq.fanout
48+
// Topic exchange amq.topic
49+
// Headers exchange amq.match (and amq.headers in RabbitMQ)
50+
func WithExchangeType(val string) Option {
51+
return func(w *options) {
52+
w.exchangeType = val
53+
}
54+
}
55+
2856
// WithAddr setup the tag
2957
func WithTag(tag string) Option {
3058
return func(w *options) {
@@ -55,10 +83,12 @@ func WithLogger(l queue.Logger) Option {
5583

5684
func newOptions(opts ...Option) options {
5785
defaultOpts := options{
58-
addr: "amqp://guest:guest@localhost:5672/",
59-
subj: "queue",
60-
tag: "golang-queue",
61-
logger: queue.NewLogger(),
86+
addr: "amqp://guest:guest@localhost:5672/",
87+
subj: "golang-queue",
88+
tag: "golang-queue",
89+
exchangeName: "test-exchange",
90+
exchangeType: "direct",
91+
logger: queue.NewLogger(),
6292
runFunc: func(context.Context, core.QueuedMessage) error {
6393
return nil
6494
},

rabbitmq.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,18 @@ func NewWorker(opts ...Option) *Worker {
4646
panic(err)
4747
}
4848

49+
if err := w.channel.ExchangeDeclare(
50+
w.opts.exchangeName, // name
51+
w.opts.exchangeType, // type
52+
true, // durable
53+
false, // auto-deleted
54+
false, // internal
55+
false, // noWait
56+
nil, // arguments
57+
); err != nil {
58+
panic(err)
59+
}
60+
4961
return w
5062
}
5163

@@ -54,7 +66,7 @@ func (w *Worker) startConsumer() (err error) {
5466
var err error
5567
q, err := w.channel.QueueDeclare(
5668
w.opts.subj, // name
57-
false, // durable
69+
true, // durable
5870
false, // delete when unused
5971
false, // exclusive
6072
false, // no-wait
@@ -167,7 +179,7 @@ func (w *Worker) Queue(job core.QueuedMessage) error {
167179

168180
q, err := w.channel.QueueDeclare(
169181
w.opts.subj, // name
170-
false, // durable
182+
true, // durable
171183
false, // delete when unused
172184
false, // exclusive
173185
false, // no-wait

0 commit comments

Comments
 (0)