From ffb13d0c2facdacb3797222f5a7da70dd3a7eacd Mon Sep 17 00:00:00 2001 From: Kirill Date: Fri, 19 Sep 2025 15:43:44 +0400 Subject: [PATCH] Enhance .Stop() methods in queues --- client/rabbitmq_client.go | 12 +++++------- queuemngr/queue_manager.go | 12 +++++------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/client/rabbitmq_client.go b/client/rabbitmq_client.go index f91ea73..f90920f 100644 --- a/client/rabbitmq_client.go +++ b/client/rabbitmq_client.go @@ -8,6 +8,7 @@ import ( amqp "github.com/rabbitmq/amqp091-go" + "errors" "github.com/babylonlabs-io/staking-queue-client/config" ) @@ -262,16 +263,13 @@ func (c *RabbitMqClient) SendMessage(ctx context.Context, messageBody string) er // Stop stops the message receiving process. func (c *RabbitMqClient) Stop() error { - if err := c.channel.Close(); err != nil { - return err - } - if err := c.connection.Close(); err != nil { - return err - } + var channelErr, connectionErr error + channelErr = c.channel.Close() + connectionErr = c.connection.Close() close(c.stopCh) - return nil + return errors.Join(channelErr, connectionErr) } func (c *RabbitMqClient) GetQueueName() string { diff --git a/queuemngr/queue_manager.go b/queuemngr/queue_manager.go index 155f678..ace4ba2 100644 --- a/queuemngr/queue_manager.go +++ b/queuemngr/queue_manager.go @@ -8,6 +8,7 @@ import ( "go.uber.org/zap" + "errors" "github.com/babylonlabs-io/staking-queue-client/client" "github.com/babylonlabs-io/staking-queue-client/config" ) @@ -94,15 +95,12 @@ func (qc *QueueManager) ReQueueMessage(ctx context.Context, message client.Queue } func (qc *QueueManager) Stop() error { - if err := qc.ActiveStakingQueue.Stop(); err != nil { - return err - } + var activeErr, unbondingErr error - if err := qc.UnbondingStakingQueue.Stop(); err != nil { - return err - } + activeErr = qc.ActiveStakingQueue.Stop() + unbondingErr = qc.UnbondingStakingQueue.Stop() - return nil + return errors.Join(activeErr, unbondingErr) } // Ping checks the health of the RabbitMQ infrastructure.