|
4 | 4 | "context" |
5 | 5 | "errors" |
6 | 6 | "fmt" |
7 | | - rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" |
8 | 7 | "time" |
| 8 | + |
| 9 | + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" |
9 | 10 | ) |
10 | 11 |
|
11 | 12 | func main() { |
@@ -44,7 +45,7 @@ func main() { |
44 | 45 | // this is valid for the connection lifecycle |
45 | 46 | amqpConnection.NotifyStatusChange(stateChanged) |
46 | 47 |
|
47 | | - rmq.Info("AMQP connection opened.\n") |
| 48 | + rmq.Info("AMQP connection opened") |
48 | 49 | // Create the management interface for the connection |
49 | 50 | // so we can declare exchanges, queues, and bindings |
50 | 51 | management := amqpConnection.Management() |
@@ -94,21 +95,21 @@ func main() { |
94 | 95 | deliveryContext, err := consumer.Receive(ctx) |
95 | 96 | if errors.Is(err, context.Canceled) { |
96 | 97 | // The consumer was closed correctly |
97 | | - rmq.Info("[Consumer]", "consumer closed. Context", err) |
| 98 | + rmq.Info("[Consumer] Consumer closed", "context", err) |
98 | 99 | return |
99 | 100 | } |
100 | 101 | if err != nil { |
101 | 102 | // An error occurred receiving the message |
102 | | - rmq.Error("[Consumer]", "Error receiving message", err) |
| 103 | + rmq.Error("[Consumer] Error receiving message", "error", err) |
103 | 104 | return |
104 | 105 | } |
105 | 106 |
|
106 | | - rmq.Info("[Consumer]", "Received message", |
| 107 | + rmq.Info("[Consumer] Received message", "message", |
107 | 108 | fmt.Sprintf("%s", deliveryContext.Message().Data)) |
108 | 109 |
|
109 | 110 | err = deliveryContext.Accept(context.Background()) |
110 | 111 | if err != nil { |
111 | | - rmq.Error("Error accepting message", err) |
| 112 | + rmq.Error("[Consumer] Error accepting message", "error", err) |
112 | 113 | return |
113 | 114 | } |
114 | 115 | } |
@@ -172,39 +173,39 @@ func main() { |
172 | 173 | err = management.Unbind(context.TODO(), bindingPath) |
173 | 174 |
|
174 | 175 | if err != nil { |
175 | | - rmq.Error("Error unbinding: %v\n", err) |
| 176 | + rmq.Error("Error unbinding", "error", err) |
176 | 177 | return |
177 | 178 | } |
178 | 179 |
|
179 | 180 | err = management.DeleteExchange(context.TODO(), exchangeInfo.Name()) |
180 | 181 | if err != nil { |
181 | | - rmq.Error("Error deleting exchange: %v\n", err) |
| 182 | + rmq.Error("Error deleting exchange", "error", err) |
182 | 183 | return |
183 | 184 | } |
184 | 185 |
|
185 | 186 | // Purge the queue |
186 | 187 | purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name()) |
187 | 188 | if err != nil { |
188 | | - rmq.Error("Error purging queue: %v\n", err) |
| 189 | + rmq.Error("Error purging queue", "error", err) |
189 | 190 | return |
190 | 191 | } |
191 | | - rmq.Info("Purged %d messages from the queue.\n", purged) |
| 192 | + rmq.Info("Purged messages from the queue", "count", purged) |
192 | 193 |
|
193 | 194 | err = management.DeleteQueue(context.TODO(), queueInfo.Name()) |
194 | 195 | if err != nil { |
195 | | - rmq.Error("Error deleting queue: %v\n", err) |
| 196 | + rmq.Error("Error deleting queue", "error", err) |
196 | 197 | return |
197 | 198 | } |
198 | 199 |
|
199 | 200 | // Close all the connections. but you can still use the environment |
200 | 201 | // to create new connections |
201 | 202 | err = env.CloseConnections(context.Background()) |
202 | 203 | if err != nil { |
203 | | - rmq.Error("Error closing connection: %v\n", err) |
| 204 | + rmq.Error("Error closing connection", "error", err) |
204 | 205 | return |
205 | 206 | } |
206 | 207 |
|
207 | | - rmq.Info("AMQP connection closed.\n") |
| 208 | + rmq.Info("AMQP connection closed") |
208 | 209 | // not necessary. It waits for the status change to be printed |
209 | 210 | time.Sleep(100 * time.Millisecond) |
210 | 211 | close(stateChanged) |
|
0 commit comments