Skip to content

If the Received event occurs before "reading e.Body.ToArray()", then the message is incorrect #1654

@quanzhenying

Description

@quanzhenying

Describe the bug

Messages from different queues are interweaving across threads.
Affected versions: All versions of 6.x

`
Received Event

  await Task.Delay(rd.Next(0, 2));
  var project = Encoding.UTF8.GetString((byte[])msg.BasicProperties.Headers["project"]);
  var body = msg.Body.ToArray();
  string message = Encoding.UTF8.GetString(body);
  Console.WriteLine($"Received project/body: {project}/{message}");

`
无标题1

Reproduction steps

1.Two queues are consuming messages simultaneously, using the same "Received" event for processing.
2.Within the Received event, add an await before reading the Body.
3.If the line await Task.Delay(rd.Next(0, 2)); is removed, the issue will not reproduce.

All code:
`
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text;

var factory = new ConnectionFactory
{
    HostName = "localhost",
    Port = 5672,
    UserName = "admin",
    Password = "123456"
};

var rd = new Random();
using IConnection connection = factory.CreateConnection();
var exchange = "iot.exchange";

//publish data
using var producer = connection.CreateModel();
producer.ExchangeDeclare(exchange, "headers", true, false);
var arguments = new Dictionary<string, object>
{
    { "x-queue-type", "quorum" }
};
producer.QueueDeclare(queue: "1", durable: true, exclusive: false, autoDelete: false, arguments);
producer.QueueBind("1", exchange, "", new Dictionary<string, object>
{
    { "project", "1" },
    { "x-match", "all" }
});
producer.QueueDeclare(queue: "2", durable: true, exclusive: false, autoDelete: false, arguments);
producer.QueueBind("2", exchange, "", new Dictionary<string, object>
{
    { "project", "2" },
    { "x-match", "all" }
});
Enumerable.Range(0, 200).ToList().ForEach(i =>
{
    var props = producer.CreateBasicProperties();
    props.ContentType = "application/octet-stream";
    props.DeliveryMode = 2;
    props.Headers = new Dictionary<string, object>
    {
        { "project", "1" }
    };
    producer.BasicPublish(exchange, "", props, Encoding.UTF8.GetBytes("1"));

    props = producer.CreateBasicProperties();
    props.ContentType = "application/octet-stream";
    props.DeliveryMode = 2;
    props.Headers = new Dictionary<string, object>
    {
        { "project", "2" }
    };
    producer.BasicPublish(exchange, "", props, Encoding.UTF8.GetBytes("2"));
});

//consumer
string[] queueNames = { "1", "2" };
foreach (var queuename in queueNames)
{
    var channel = connection.CreateModel();
    // set channel.BasicQos
    channel.BasicQos(0, 1, false);

    var consumer = new EventingBasicConsumer(channel);
    channel.ConfirmSelect();
    consumer.Received += async (model, msg) =>
    {
        //init tasks / eq. dbContext
        await Task.Delay(rd.Next(0, 2));

        var project = Encoding.UTF8.GetString((byte[])msg.BasicProperties.Headers["project"]);
        var body = msg.Body.ToArray();
        string message = Encoding.UTF8.GetString(body);
        Console.WriteLine($"Received project/body: {project}/{message}");

        if (project != message)
            Console.WriteLine("error: The body does not belong to this message");

        channel.BasicAck(msg.DeliveryTag, false);
    };
    channel.BasicConsume(queuename, false, consumer);
}

Console.WriteLine("Waiting for messages. To exit press CTRL+C");
Console.ReadLine();

`

Expected behavior

"Task.Delay(rd.Next(0, 2))" should not cause the Body to contain the content of another message.

Additional context

1.Version 5.1.2 does not have this issue;
2.Using “Task.Delay(rd.Next(0, 2)).GetAwaiter().GetResult()” also does not have this issue.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions