@@ -3,14 +3,15 @@ package rabbitmq
33import (
44 "context"
55 "encoding/json"
6+ "errors"
67 "fmt"
78 "time"
89
910 "github.com/google/uuid"
1011 amqp "github.com/rabbitmq/amqp091-go"
1112 "github.com/rs/zerolog/log"
1213
13- "github.com/bxcodec/goqueue/errors"
14+ goqueueErrors "github.com/bxcodec/goqueue/errors"
1415 headerKey "github.com/bxcodec/goqueue/headers/key"
1516 headerVal "github.com/bxcodec/goqueue/headers/value"
1617 "github.com/bxcodec/goqueue/interfaces"
@@ -19,6 +20,11 @@ import (
1920 consumerOpts "github.com/bxcodec/goqueue/options/consumer"
2021)
2122
23+ const (
24+ // millisecondsMultiplier converts seconds to milliseconds for RabbitMQ expiration
25+ millisecondsMultiplier = 10_000
26+ )
27+
2228// rabbitMQ is the subscriber handler for rabbitmq
2329type rabbitMQ struct {
2430 consumerChannel * amqp.Channel
@@ -213,7 +219,8 @@ func (r *rabbitMQ) initRetryModule() {
213219// It takes a context, an inbound message handler, and a map of metadata as input parameters.
214220// The function continuously listens for messages from the queue and processes them until the context is canceled.
215221// If the context is canceled, the function stops consuming messages and returns.
216- // For each received message, the function builds an inbound message, extracts the retry count, and checks if the maximum retry count has been reached.
222+ // For each received message, the function builds an inbound message, extracts the retry count,
223+ // and checks if the maximum retry count has been reached.
217224// If the maximum retry count has been reached, the message is moved to the dead letter queue.
218225// Otherwise, the message is passed to the message handler for processing.
219226// The message handler is responsible for handling the message and returning an error if any.
@@ -225,7 +232,7 @@ func (r *rabbitMQ) initRetryModule() {
225232// The function returns an error if any occurred during message handling or if the context was canceled.
226233func (r * rabbitMQ ) Consume (ctx context.Context ,
227234 h interfaces.InboundMessageHandler ,
228- meta map [string ]interface {} ) (err error ) {
235+ meta map [string ]any ) (err error ) {
229236 log .Info ().
230237 Str ("queue_name" , r .option .QueueName ).
231238 Interface ("consumer_meta" , meta ).
@@ -238,19 +245,19 @@ func (r *rabbitMQ) Consume(ctx context.Context,
238245 Str ("queue_name" , r .option .QueueName ).
239246 Interface ("consumer_meta" , meta ).
240247 Msg ("stopping the worker" )
241- return
248+ return err
242249 case receivedMsg , ok := <- r .msgReceiver :
243250 if ! ok {
244251 // deliveries channel closed (e.g., due to Stop/Cancel or connection closure)
245252 log .Info ().
246253 Str ("queue_name" , r .option .QueueName ).
247254 Interface ("consumer_meta" , meta ).
248255 Msg ("message receiver closed, stopping the worker" )
249- return
256+ return err
250257 }
251258 msg , err := buildMessage (meta , receivedMsg )
252259 if err != nil {
253- if err == errors .ErrInvalidMessageFormat {
260+ if errors . Is ( err , goqueueErrors .ErrInvalidMessageFormat ) {
254261 nackErr := receivedMsg .Nack (false , false ) // nack with requeue false
255262 if nackErr != nil {
256263 log .Error ().
@@ -283,7 +290,7 @@ func (r *rabbitMQ) Consume(ctx context.Context,
283290 m := interfaces.InboundMessage {
284291 Message : msg ,
285292 RetryCount : retryCount ,
286- Metadata : map [string ]interface {} {
293+ Metadata : map [string ]any {
287294 "app-id" : receivedMsg .AppId ,
288295 "consumer-tag" : receivedMsg .ConsumerTag ,
289296 "content-encoding" : receivedMsg .ContentEncoding ,
@@ -299,17 +306,17 @@ func (r *rabbitMQ) Consume(ctx context.Context,
299306 "type" : receivedMsg .Type ,
300307 "user-id" : receivedMsg .UserId ,
301308 },
302- Ack : func (ctx context.Context ) (err error ) {
309+ Ack : func (_ context.Context ) (err error ) {
303310 err = receivedMsg .Ack (false )
304311 return
305312 },
306- Nack : func (ctx context.Context ) (err error ) {
313+ Nack : func (_ context.Context ) (err error ) {
307314 // receivedMsg.Nack(false, true) => will redelivered again instantly (same with receivedMsg.reject)
308315 // receivedMsg.Nack(false, false) => will put the message to dead letter queue (same with receivedMsg.reject)
309316 err = receivedMsg .Nack (false , true )
310317 return
311318 },
312- MoveToDeadLetterQueue : func (ctx context.Context ) (err error ) {
319+ MoveToDeadLetterQueue : func (_ context.Context ) (err error ) {
313320 // receivedMsg.Nack(false, true) => will redelivered again instantly (same with receivedMsg.reject)
314321 // receivedMsg.Nack(false, false) => will put the message to dead letter queue (same with receivedMsg.reject)
315322 err = receivedMsg .Nack (false , false )
@@ -342,13 +349,13 @@ func (r *rabbitMQ) Consume(ctx context.Context,
342349 }
343350}
344351
345- func buildMessage (consumerMeta map [string ]interface {} , receivedMsg amqp.Delivery ) (msg interfaces.Message , err error ) {
352+ func buildMessage (consumerMeta map [string ]any , receivedMsg amqp.Delivery ) (msg interfaces.Message , err error ) {
346353 if len (receivedMsg .Body ) == 0 {
347354 log .Error ().
348355 Interface ("consumer_meta" , consumerMeta ).
349356 Str ("msg" , string (receivedMsg .Body )).
350357 Msg ("message body is empty, removing the message due to wrong message format" )
351- return msg , errors .ErrInvalidMessageFormat
358+ return msg , goqueueErrors .ErrInvalidMessageFormat
352359 }
353360
354361 err = json .Unmarshal (receivedMsg .Body , & msg )
@@ -358,7 +365,7 @@ func buildMessage(consumerMeta map[string]interface{}, receivedMsg amqp.Delivery
358365 Str ("msg" , string (receivedMsg .Body )).
359366 Err (err ).
360367 Msg ("failed to unmarshal the message, removing the message due to wrong message format" )
361- return msg , errors .ErrInvalidMessageFormat
368+ return msg , goqueueErrors .ErrInvalidMessageFormat
362369 }
363370
364371 if msg .ID == "" {
@@ -384,14 +391,14 @@ func buildMessage(consumerMeta map[string]interface{}, receivedMsg amqp.Delivery
384391 Interface ("consumer_meta" , consumerMeta ).
385392 Interface ("msg" , msg ).
386393 Msg ("message data is empty, removing the message due to wrong message format" )
387- return msg , errors .ErrInvalidMessageFormat
394+ return msg , goqueueErrors .ErrInvalidMessageFormat
388395 }
389396
390397 msg .SetSchemaVersion (extractHeaderString (receivedMsg .Headers , headerKey .SchemaVer ))
391398 return msg , nil
392399}
393400
394- func (r * rabbitMQ ) requeueMessageWithDLQ (consumerMeta map [string ]interface {} , msg interfaces.Message ,
401+ func (r * rabbitMQ ) requeueMessageWithDLQ (consumerMeta map [string ]any , msg interfaces.Message ,
395402 receivedMsg amqp.Delivery ) func (ctx context.Context , delayFn interfaces.DelayFn ) (err error ) {
396403 return func (ctx context.Context , delayFn interfaces.DelayFn ) (err error ) {
397404 if delayFn == nil {
@@ -417,7 +424,7 @@ func (r *rabbitMQ) requeueMessageWithDLQ(consumerMeta map[string]interface{}, ms
417424 Body : receivedMsg .Body ,
418425 Timestamp : time .Now (),
419426 AppId : r .tagName ,
420- Expiration : fmt .Sprintf ("%d" , delayInSeconds * 10000 ),
427+ Expiration : fmt .Sprintf ("%d" , delayInSeconds * millisecondsMultiplier ),
421428 },
422429 )
423430
0 commit comments