Skip to content

delivery modified causing a rabbitmq server error #441

@l4mby

Description

@l4mby

Hello, while trying to modify a message with annotations from the delivery context, we stumbled on this error:

node:events:496
      throw er; // Unhandled 'error' event
      ^
SessionError: function_clause
[{rabbit_amqp_session,'-settle_op_from_outcome/1-fun-0-',
                      [{{utf8,<<"x-opt-annotation-key">>},
                        {utf8,<<"annotation-value">>}}],
                      [{file,"src/rabbit_amqp_session.erl"},{line,2007}]},
 {lists,map,2,[{file,"lists.erl"},{line,2077}]},
 {rabbit_amqp_session,settle_op_from_outcome,1,
                      [{file,"src/rabbit_amqp_session.erl"},{line,2005}]},
 {rabbit_amqp_session,handle_frame,2,
                      [{file,"src/rabbit_amqp_session.erl"},{line,1070}]},
 {rabbit_amqp_session,handle_cast,2,
                      [{file,"src/rabbit_amqp_session.erl"},{line,539}]},
 {gen_server,try_handle_cast,3,[{file,"gen_server.erl"},{line,2371}]},
 {gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,2433}]},
 {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,329}]}]
    at Session.on_end (/home/coders51/Desktop/Coders51/Personal/rhea-test/node_modules/rhea/lib/session.js:702:86)
    at Connection.<computed> [as on_end] (/home/coders51/Desktop/Coders51/Personal/rhea-test/node_modules/rhea/lib/connection.js:856:30)
    at c.dispatch (/home/coders51/Desktop/Coders51/Personal/rhea-test/node_modules/rhea/lib/types.js:948:33)
    at Transport.read (/home/coders51/Desktop/Coders51/Personal/rhea-test/node_modules/rhea/lib/transport.js:117:36)
    at SaslClient.read (/home/coders51/Desktop/Coders51/Personal/rhea-test/node_modules/rhea/lib/sasl.js:344:26)
    at Connection.input (/home/coders51/Desktop/Coders51/Personal/rhea-test/node_modules/rhea/lib/connection.js:568:37)
    at Socket.emit (node:events:518:28)
    at addChunk (node:internal/streams/readable:561:12)
    at readableAddChunkPushByteMode (node:internal/streams/readable:512:3)
    at Readable.push (node:internal/streams/readable:392:5)
Emitted 'error' event on Container instance at:
    at Container.dispatch (/home/coders51/Desktop/Coders51/Personal/rhea-test/node_modules/rhea/lib/container.js:41:33)
    at Connection.dispatch (/home/coders51/Desktop/Coders51/Personal/rhea-test/node_modules/rhea/lib/connection.js:262:40)
    at Connection.input (/home/coders51/Desktop/Coders51/Personal/rhea-test/node_modules/rhea/lib/connection.js:594:18)
    at Socket.emit (node:events:518:28)
    [... lines matching original stack trace ...]
    at Readable.push (node:internal/streams/readable:392:5)
    at TCP.onStreamRead (node:internal/stream_base_commons:189:23) {
  condition: 'amqp:internal-error',
  description: 'function_clause\n' +
    "[{rabbit_amqp_session,'-settle_op_from_outcome/1-fun-0-',\n" +
    '                      [{{utf8,<<"x-opt-annotation-key">>},\n' +
    '                        {utf8,<<"annotation-value">>}}],\n' +
    '                      [{file,"src/rabbit_amqp_session.erl"},{line,2007}]},\n' +
    ' {lists,map,2,[{file,"lists.erl"},{line,2077}]},\n' +
    ' {rabbit_amqp_session,settle_op_from_outcome,1,\n' +
    '                      [{file,"src/rabbit_amqp_session.erl"},{line,2005}]},\n' +
    ' {rabbit_amqp_session,handle_frame,2,\n' +
    '                      [{file,"src/rabbit_amqp_session.erl"},{line,1070}]},\n' +
    ' {rabbit_amqp_session,handle_cast,2,\n' +
    '                      [{file,"src/rabbit_amqp_session.erl"},{line,539}]},\n' +
    ' {gen_server,try_handle_cast,3,[{file,"gen_server.erl"},{line,2371}]},\n' +
    ' {gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,2433}]},\n' +
    ' {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,329}]}]'
}

Node.js v22.15.0

the code that we launched for the receiver end is:

/*
 * AMQP 1.0 receiver example for RabbitMQ
 */

// var container = require('rhea');
var container = require("rhea"); // local rhea version

// Configure connection options
var connection_options = {
  port: 5672,
  host: "localhost",
  username: "rabbit",
  password: "rabbit",
  reconnect: false,
  container_id: "rhea-rabbitmq-receiver",
};

var received = 0;
var expected = 20;

console.log("Connecting to RabbitMQ at %s:%s...", "rabbit", "rabbit");
container.connect(connection_options);

// When the connection is established
container.on("connection_open", function (context) {
  console.log("Connected to RabbitMQ");

  // Create a receiver from the specified queue
  var source = {
    address: "rhea-first-queue",
  };
  context.connection.open_receiver({
    source: source,
    credit_window: 10,
    autoaccept: true,
  });

  console.log("Created receiver from queue: %s", "rhea-first-queue");
});

// When the receiver link is opened
container.on("receiver_open", function () {
  console.log("Receiver link established");
  console.log("Waiting for messages...");
});

// When a message is received
container.on("message", function (context) {
  // Extract message information
  var msg = context.message;
  var delivery = context.delivery;

  console.log("\n--- Message %d received ---", received);
  console.log("Message ID: %s", msg.message_id);
  console.log("User ID: %s", msg.user_id);
  console.log("Created: %s", msg.creation_time ? new Date(msg.creation_time).toISOString() : "N/A");
  console.log("Subject: %s", msg.subject);
  console.log("Content type: %s", msg.content_type);

  // Display application properties if present
  if (msg.application_properties) {
    console.log("Application properties:");
    for (var key in msg.application_properties) {
      console.log("  %s: %s", key, msg.application_properties[key]);
    }
  }
  if (msg.message_annotations) {
    console.log("Message Annotations:");
    for (var key in msg.message_annotations) {
      console.log("  %s: %s", key, msg.message_annotations[key]);
    }
  }

  // Display message body
  console.log("Body: %j", msg.body);

  // Manual acknowledgment if auto-ack is disabled
  //   if (!args.auto_ack) {
  //     delivery.accept();
  //   }

  delivery.modified({
    delivery_failed: true,
    undeliverable_here: true,
    message_annotations: { "x-opt-annotation-key": "annotation-value" },
  });
});

// When the connection is lost
container.on("disconnected", function (context) {
  if (context.error) {
    console.error("Disconnected due to error: %s", context.error);
  } else {
    console.log("Disconnected");
  }

  if (context.reconnecting) {
    console.log("Attempting to reconnect...");
  } else if (expected > 0 && received < expected) {
    console.log("Exiting after receiving %d of %d messages", received, expected);
    throw new Error("Incomplete message reception");
  } else {
    console.log("Exiting");
  }
});

and the code that we use on the sender is

/*
 * AMQP 1.0 sender example for RabbitMQ
 */
var container = require("rhea"); // local rhea version

// Configure connection options
var connection_options = {
  port: 5672,
  host: "localhost",
  username: "rabbit",
  password: "rabbit",
  reconnect: false,
  container_id: "rhea-rabbitmq-sender",
};

console.log("Connecting to RabbitMQ at %s:%s...", "rabbit", "rabbit");
// Connect but don't store the returned object since we're not using it directly
container.connect(connection_options);

var sent = 0;
var confirmed = 0;
var total = 20;

// When the connection is established
container.on("connection_open", function (context) {
  console.log("Connected to RabbitMQ");

  // Create a sender for the specified address
  var target = {};
  const address = "rhea-first-queue";
  if (address) {
    // If sending to an exchange, set the address and use 'address' as routing key
    target = {
      address: address,
      capabilities: ["fanout"],
    };
    context.connection.open_sender({
      target: target,
      properties: {
        "routing-key": address,
        subject: address,
      },
    });
  } else {
    // For direct access to the queue
    target = {
      address: address,
    };
    context.connection.open_sender({
      target: target,
    });
  }

  console.log("Created sender to %s (address: %s)", address, address);
});

// When the sender link is opened
container.on("sender_open", function (context) {
  console.log("Sender link established");

  // Set a timer to send messages periodically
  var message_interval = setInterval(function () {
    if (sent < total) {
      if (context.sender.sendable()) {
        send_message(context.sender);
      }
    } else {
      clearInterval(message_interval);
      // Wait for pending confirmations
      if (confirmed >= total) {
        context.sender.close();
        context.connection.close();
      }
    }
  }, 3000);
});

// When we can send messages (credit available)
container.on("sendable", function (context) {
  if (sent < total) {
    send_message(context.sender);
  } else if (confirmed >= total) {
    context.sender.close();
    context.connection.close();
  }
});

// When the message is confirmed by RabbitMQ
container.on("accepted", function (context) {
  confirmed++;
  console.log("Message %d confirmed, total confirmed: %d", context.delivery.id, confirmed);

  if (confirmed >= total && sent >= total) {
    context.sender.close();
    context.connection.close();
  }
});

// When the connection is lost
container.on("disconnected", function (context) {
  if (context.error) {
    console.error("Disconnected due to error: %s", context.error);
  } else {
    console.log("Disconnected");
  }

  if (context.reconnecting) {
    console.log("Attempting to reconnect...");
  } else {
    console.log("Exiting");
    // Instead of process.exit(0), throw an error that can be caught
    throw new Error("Disconnected and not reconnecting");
  }
});

// Function to send a message
function send_message(sender) {
  sent++;
  var message_id = sent;
  var message_body = {
    sequence: message_id,
    text: "Message " + message_id,
    timestamp: new Date().toISOString(),
  };

  console.log("Sending message %d: %j", message_id, message_body);

  // Send the message with properties
  sender.send({
    message_id: message_id,
    user_id: "rabbit",
    creation_time: new Date(),
    subject: "rhea-first-queue",
    content_type: "application/json",
    application_properties: {
      source: "rhea-rabbitmq-sender",
      message_type: "test",
      routing_key: "rhea-first-queue",
    },
    body: message_body,
    message_annotations: { "x-opt-annotation": "annotation-value" },
  });
}

// Main error handling
process.on("uncaughtException", function (err) {
  // Handle the disconnection error gracefully
  if (err.message === "Disconnected and not reconnecting") {
    // eslint-disable-next-line no-process-exit
    process.exit(0);
  } else {
    console.error("Uncaught exception:", err);
    // eslint-disable-next-line no-process-exit
    process.exit(1);
  }
});

we are using Nodejs v22.15.0 and the docker compose is the following:

docker_compose.zip

What are we doing in the wrong way on your opinion? Thanks a lot in advance

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions