Skip to content

[BUG]Create connection failed when queue is empty #1107

@naughtyGitCat

Description

@naughtyGitCat
            await Task.Delay(new TimeSpan(0, 0, 1), stoppingToken);
            try
            {
                _logger.LogInformation($"now reading {_rabbitMQQueueName} rabbit msg");
                var factory = new ConnectionFactory()
                {
                    HostName = _rabbitConfig.Host,
                    Port = _rabbitConfig.Port,
                    UserName = _rabbitConfig.User,
                    Password = _rabbitConfig.Password,
                    VirtualHost = _rabbitConfig.VirtualHostName,
                    AutomaticRecoveryEnabled = true,
                    TopologyRecoveryEnabled = true
                };
                factory.DispatchConsumersAsync = true;
                using var connection = factory.CreateConnection();
                using var channel = connection.CreateModel();
                channel.ExchangeDeclare(exchange: _rabbitConfig.ExchangeName, type: ExchangeType.Direct, durable: true);
                channel.QueueDeclare(
                    _rabbitMQQueueName,
                    durable: true, exclusive: false,
                    autoDelete: false, arguments: new Dictionary<string, object> { { "x-message-ttl", 3600000 } });
                var consumer = new AsyncEventingBasicConsumer(channel);

                consumer.Received += async (model, ea) =>
                {
                    var msg = Encoding.UTF8.GetString(ea.Body.Span);
                    try
                    {
                        await HandleMessageAsync(msg);
                    }
                    catch (Exception ex)
                    {
                        await _alertService.SendExceptionAsync(ex, "HandleMessageAsync failed");
                        _errorBudget -= 1;
                        if (_errorBudget < 0) throw new Exception($"Consume {_rabbitMQQueueName} failed for {_errorBudget} times, terminate the task");
                    }
                };

                var consumeResult = channel.BasicConsume(_rabbitMQQueueName, autoAck: true, consumer: consumer);

                while (!stoppingToken.IsCancellationRequested)
                {
                    _logger.LogInformation("Blocking for waiting rabbit message...");
                    //https://stackoverflow.com/questions/44880870/rabbitmq-eventbasicconsumer-not-working
                    await Task.Delay(new TimeSpan(0, 0, 56), stoppingToken);
                }

            }
            catch (Exception ex)
            {
                await _alertService.SendExceptionAsync(ex, $"{this.GetType().Name}");
            }

version

RabbitMQ 3.8
RabbitMQ.Client 6.2.2

output

2021-11-15 20:18:51.346 +08:00 [WRN] =>[DBAInternalCommon.Services.DBAAlertService]=> DBAMessageDTO { Project = DBAMetaConsumerHelper, Exception = RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable
 ---> RabbitMQ.Client.Exceptions.PossibleAuthenticationFailureException: Possibly caused by authentication failure
 ---> RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text='Unexpected Exception', classId=0, methodId=0, cause=System.IO.IOException: Unable to read data from the transport connection: Connection reset by peer.
 ---> System.Net.Sockets.SocketException (104): Connection reset by peer
   at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)
   --- End of inner exception stack trace ---
   at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)
   at System.IO.BufferedStream.ReadByteSlow()
   at System.IO.BufferedStream.ReadByte()
   at RabbitMQ.Client.Impl.InboundFrame.ReadFrom(Stream reader, Byte[] frameHeaderBuffer)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame()
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.ConnectionStartOk(IDictionary`2 clientProperties, String mechanism, Byte[] response, String locale)
   at RabbitMQ.Client.Framing.Impl.Connection.StartAndTune()
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.Framing.Impl.Connection.StartAndTune()
   at RabbitMQ.Client.Framing.Impl.Connection.Open(Boolean insist)
   at RabbitMQ.Client.Framing.Impl.Connection..ctor(IConnectionFactory factory, Boolean insist, IFrameHandler frameHandler, String clientProvidedName)
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IFrameHandler fh)
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IEndpointResolver endpoints)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(String clientProvidedName)
   at DBAMetaConsumer.Workers.RabbitConsumerBase.ExecuteAsync(CancellationToken stoppingToken), Comment = HostCrontabConsumer , Receivers = System.String[], EmailSubject = XXXXYYYY, EmailSender = [DBAMessage], SendChannels = 20 }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions