44 "context"
55 "errors"
66 "fmt"
7- "github.com/Azure/go-amqp"
8- "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp"
7+ rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
98 "time"
109)
1110
@@ -14,62 +13,62 @@ func main() {
1413 queueName := "getting-started-go-queue"
1514 routingKey := "routing-key"
1615
17- rabbitmq_amqp .Info ("Getting started with AMQP Go AMQP 1.0 Client" )
16+ rmq .Info ("Getting started with AMQP Go AMQP 1.0 Client" )
1817
1918 /// Create a channel to receive state change notifications
20- stateChanged := make (chan * rabbitmq_amqp .StateChanged , 1 )
21- go func (ch chan * rabbitmq_amqp .StateChanged ) {
19+ stateChanged := make (chan * rmq .StateChanged , 1 )
20+ go func (ch chan * rmq .StateChanged ) {
2221 for statusChanged := range ch {
23- rabbitmq_amqp .Info ("[connection]" , "Status changed" , statusChanged )
22+ rmq .Info ("[connection]" , "Status changed" , statusChanged )
2423 }
2524 }(stateChanged )
2625
27- // rabbitmq_amqp .NewEnvironment setups the environment.
26+ // rmq .NewEnvironment setups the environment.
2827 // The environment is used to create connections
2928 // given the same parameters
30- env := rabbitmq_amqp .NewEnvironment ([]string {"amqp://" }, nil )
29+ env := rmq .NewEnvironment ([]string {"amqp://" }, nil )
3130
3231 // Open a connection to the AMQP 1.0 server ( RabbitMQ >= 4.0)
3332 amqpConnection , err := env .NewConnection (context .Background ())
3433 if err != nil {
35- rabbitmq_amqp .Error ("Error opening connection" , err )
34+ rmq .Error ("Error opening connection" , err )
3635 return
3736 }
3837 // Register the channel to receive status change notifications
3938 // this is valid for the connection lifecycle
4039 amqpConnection .NotifyStatusChange (stateChanged )
4140
42- rabbitmq_amqp .Info ("AMQP connection opened.\n " )
41+ rmq .Info ("AMQP connection opened.\n " )
4342 // Create the management interface for the connection
4443 // so we can declare exchanges, queues, and bindings
4544 management := amqpConnection .Management ()
46- exchangeInfo , err := management .DeclareExchange (context .TODO (), & rabbitmq_amqp .TopicExchangeSpecification {
45+ exchangeInfo , err := management .DeclareExchange (context .TODO (), & rmq .TopicExchangeSpecification {
4746 Name : exchangeName ,
4847 })
4948 if err != nil {
50- rabbitmq_amqp .Error ("Error declaring exchange" , err )
49+ rmq .Error ("Error declaring exchange" , err )
5150 return
5251 }
5352
5453 // Declare a Quorum queue
55- queueInfo , err := management .DeclareQueue (context .TODO (), & rabbitmq_amqp .QuorumQueueSpecification {
54+ queueInfo , err := management .DeclareQueue (context .TODO (), & rmq .QuorumQueueSpecification {
5655 Name : queueName ,
5756 })
5857
5958 if err != nil {
60- rabbitmq_amqp .Error ("Error declaring queue" , err )
59+ rmq .Error ("Error declaring queue" , err )
6160 return
6261 }
6362
6463 // Bind the queue to the exchange
65- bindingPath , err := management .Bind (context .TODO (), & rabbitmq_amqp .ExchangeToQueueBindingSpecification {
64+ bindingPath , err := management .Bind (context .TODO (), & rmq .ExchangeToQueueBindingSpecification {
6665 SourceExchange : exchangeName ,
6766 DestinationQueue : queueName ,
6867 BindingKey : routingKey ,
6968 })
7069
7170 if err != nil {
72- rabbitmq_amqp .Error ("Error binding" , err )
71+ rmq .Error ("Error binding" , err )
7372 return
7473 }
7574
@@ -78,7 +77,7 @@ func main() {
7877
7978 consumer , err := amqpConnection .NewConsumer (context .Background (), queueName , nil )
8079 if err != nil {
81- rabbitmq_amqp .Error ("Error creating consumer" , err )
80+ rmq .Error ("Error creating consumer" , err )
8281 return
8382 }
8483
@@ -90,61 +89,61 @@ func main() {
9089 deliveryContext , err := consumer .Receive (ctx )
9190 if errors .Is (err , context .Canceled ) {
9291 // The consumer was closed correctly
93- rabbitmq_amqp .Info ("[NewConsumer]" , "consumer closed. Context" , err )
92+ rmq .Info ("[NewConsumer]" , "consumer closed. Context" , err )
9493 return
9594 }
9695 if err != nil {
9796 // An error occurred receiving the message
98- rabbitmq_amqp .Error ("[NewConsumer]" , "Error receiving message" , err )
97+ rmq .Error ("[NewConsumer]" , "Error receiving message" , err )
9998 return
10099 }
101100
102- rabbitmq_amqp .Info ("[NewConsumer]" , "Received message" ,
101+ rmq .Info ("[NewConsumer]" , "Received message" ,
103102 fmt .Sprintf ("%s" , deliveryContext .Message ().Data ))
104103
105104 err = deliveryContext .Accept (context .Background ())
106105 if err != nil {
107- rabbitmq_amqp .Error ("Error accepting message" , err )
106+ rmq .Error ("Error accepting message" , err )
108107 return
109108 }
110109 }
111110 }(consumerContext )
112111
113- publisher , err := amqpConnection .NewPublisher (context .Background (), & rabbitmq_amqp .ExchangeAddress {
112+ publisher , err := amqpConnection .NewPublisher (context .Background (), & rmq .ExchangeAddress {
114113 Exchange : exchangeName ,
115114 Key : routingKey ,
116115 }, "getting-started-publisher" )
117116 if err != nil {
118- rabbitmq_amqp .Error ("Error creating publisher" , err )
117+ rmq .Error ("Error creating publisher" , err )
119118 return
120119 }
121120
122121 for i := 0 ; i < 100 ; i ++ {
123122 // Publish a message to the exchange
124- publishResult , err := publisher .Publish (context .Background (), amqp .NewMessage ([]byte ("Hello, World!" + fmt .Sprintf ("%d" , i ))))
123+ publishResult , err := publisher .Publish (context .Background (), rmq .NewMessage ([]byte ("Hello, World!" + fmt .Sprintf ("%d" , i ))))
125124 if err != nil {
126- rabbitmq_amqp .Error ("Error publishing message" , "error" , err )
125+ rmq .Error ("Error publishing message" , "error" , err )
127126 time .Sleep (1 * time .Second )
128127 continue
129128 }
130129 switch publishResult .Outcome .(type ) {
131- case * amqp .StateAccepted :
132- rabbitmq_amqp .Info ("[NewPublisher]" , "Message accepted" , publishResult .Message .Data [0 ])
130+ case * rmq .StateAccepted :
131+ rmq .Info ("[NewPublisher]" , "Message accepted" , publishResult .Message .Data [0 ])
133132 break
134- case * amqp .StateReleased :
135- rabbitmq_amqp .Warn ("[NewPublisher]" , "Message was not routed" , publishResult .Message .Data [0 ])
133+ case * rmq .StateReleased :
134+ rmq .Warn ("[NewPublisher]" , "Message was not routed" , publishResult .Message .Data [0 ])
136135 break
137- case * amqp .StateRejected :
138- rabbitmq_amqp .Warn ("[NewPublisher]" , "Message rejected" , publishResult .Message .Data [0 ])
139- stateType := publishResult .Outcome .(* amqp .StateRejected )
136+ case * rmq .StateRejected :
137+ rmq .Warn ("[NewPublisher]" , "Message rejected" , publishResult .Message .Data [0 ])
138+ stateType := publishResult .Outcome .(* rmq .StateRejected )
140139 if stateType .Error != nil {
141- rabbitmq_amqp .Warn ("[NewPublisher]" , "Message rejected with error: %v" , stateType .Error )
140+ rmq .Warn ("[NewPublisher]" , "Message rejected with error: %v" , stateType .Error )
142141 }
143142 break
144143 default :
145144 // these status are not supported. Leave it for AMQP 1.0 compatibility
146145 // see: https://www.rabbitmq.com/docs/next/amqp#outcomes
147- rabbitmq_amqp .Warn ("Message state: %v" , publishResult .Outcome )
146+ rmq .Warn ("Message state: %v" , publishResult .Outcome )
148147 }
149148 }
150149
@@ -157,53 +156,53 @@ func main() {
157156 //Close the consumer
158157 err = consumer .Close (context .Background ())
159158 if err != nil {
160- rabbitmq_amqp .Error ("[NewConsumer]" , err )
159+ rmq .Error ("[NewConsumer]" , err )
161160 return
162161 }
163162 // Close the publisher
164163 err = publisher .Close (context .Background ())
165164 if err != nil {
166- rabbitmq_amqp .Error ("[NewPublisher]" , err )
165+ rmq .Error ("[NewPublisher]" , err )
167166 return
168167 }
169168
170169 // Unbind the queue from the exchange
171170 err = management .Unbind (context .TODO (), bindingPath )
172171
173172 if err != nil {
174- rabbitmq_amqp .Error ("Error unbinding: %v\n " , err )
173+ rmq .Error ("Error unbinding: %v\n " , err )
175174 return
176175 }
177176
178177 err = management .DeleteExchange (context .TODO (), exchangeInfo .Name ())
179178 if err != nil {
180- rabbitmq_amqp .Error ("Error deleting exchange: %v\n " , err )
179+ rmq .Error ("Error deleting exchange: %v\n " , err )
181180 return
182181 }
183182
184183 // Purge the queue
185184 purged , err := management .PurgeQueue (context .TODO (), queueInfo .Name ())
186185 if err != nil {
187- rabbitmq_amqp .Error ("Error purging queue: %v\n " , err )
186+ rmq .Error ("Error purging queue: %v\n " , err )
188187 return
189188 }
190- rabbitmq_amqp .Info ("Purged %d messages from the queue.\n " , purged )
189+ rmq .Info ("Purged %d messages from the queue.\n " , purged )
191190
192191 err = management .DeleteQueue (context .TODO (), queueInfo .Name ())
193192 if err != nil {
194- rabbitmq_amqp .Error ("Error deleting queue: %v\n " , err )
193+ rmq .Error ("Error deleting queue: %v\n " , err )
195194 return
196195 }
197196
198197 // Close all the connections. but you can still use the environment
199198 // to create new connections
200199 err = env .CloseConnections (context .Background ())
201200 if err != nil {
202- rabbitmq_amqp .Error ("Error closing connection: %v\n " , err )
201+ rmq .Error ("Error closing connection: %v\n " , err )
203202 return
204203 }
205204
206- rabbitmq_amqp .Info ("AMQP connection closed.\n " )
205+ rmq .Info ("AMQP connection closed.\n " )
207206 // not necessary. It waits for the status change to be printed
208207 time .Sleep (100 * time .Millisecond )
209208 close (stateChanged )
0 commit comments