-
Notifications
You must be signed in to change notification settings - Fork 11
Open
Description
Problem
we are publishing messages using amqp 0.9.1 (used amqplib) to stream queue. I am trying to consume messages using rabbitmq-stream-js-client. But having error
Error
Error: FormatCode Invalid type 81Error Stack
Error: FormatCode Invalid type 81
at decodeFormatCode (local/node_modules/rabbitmq-stream-js-client/dist/response_decoder.js:353:19)
at local/node_modules/rabbitmq-stream-js-client/dist/amqp10/applicationProperties.js:12:75
at Array.reduce (<anonymous>)
at ApplicationProperties.parse (local/node_modules/rabbitmq-stream-js-client/dist/amqp10/applicationProperties.js:9:46)
at decodeApplicationProperties (local/node_modules/rabbitmq-stream-js-client/dist/response_decoder.js:236:58)
at decodeMessage (local/node_modules/rabbitmq-stream-js-client/dist/response_decoder.js:206:41)
at decodeDeliverResponse (local/node_modules/rabbitmq-stream-js-client/dist/response_decoder.js:173:27)
at decodeResponse (local/node_modules/rabbitmq-stream-js-client/dist/response_decoder.js:67:64)
at decode (local/node_modules/rabbitmq-stream-js-client/dist/response_decoder.js:61:41)
at ResponseDecoder.add (local/node_modules/rabbitmq-stream-js-client/dist/response_decoder.js:519:45)
Code Snippets
Consumer
import rabbit from "rabbitmq-stream-js-client";
async function main() {
console.log("hello world");
const client = await rabbit.connect({
hostname: "localhost",
port: 5552,
username: "guest",
password: "guest",
vhost: "/"
}, {
debug: (messge) => console.log(`RMQ DEBUG:`, messge),
info: (messge) => console.log(`RMQ INFO:`, messge),
warn: (messge) => console.log(`RMQ WARN:`, messge),
error: (messge) => console.log(`RMQ ERROR:`, messge)
});
const streamQueue = "debug_work_queue.dlq";
await client.createStream({ stream: streamQueue, arguments: { "max-age": "14D" } });
await client.declareConsumer({ stream: streamQueue, consumerRef: "debug_consumer", offset: rabbit.Offset.first() }, (msg) => {
console.log("consumed message:", msg.content.toString());
console.log(`Properties : ${JSON.stringify(msg.messageProperties)}`);
// console.log(`Headers : ${JSON.stringify(msg.messageProperties?.headers)}`);
});
}
main();
amqp message
{
"properties": {
"headers": {
"attemptID": 1,
"chainIndex": 1,
"A": "hell.world",
"ID": 55,
"placedInDLQ": 1764156981485,
"recursiveDepth": 1,
"taskIdentifier": "107",
"timestamp": 1764156978730,
"x-death": [
{
"count": 1,
"reason": "expired",
"queue": "delay-level-02-queue",
"time": { "!": "timestamp", "value": 1764180998 },
"exchange": "delay-level-02-exchange",
"routing-keys": ["0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.1.0.0"]
}
],
"x-first-death-exchange": "delay-level-02-exchange",
"x-first-death-queue": "delay-level-02-queue",
"x-first-death-reason": "expired",
"x-last-death-exchange": "delay-level-02-exchange",
"x-last-death-queue": "delay-level-02-queue",
"x-last-death-reason": "expired",
"x-stream-filter-value": "55_1764156978",
"x-stream-offset": 30000
},
"deliveryMode": 2,
"priority": 8,
"messageId": "7ebdbb2c-5726-49ae-b24f-335158f04f1c",
"type": "message_type",
"x-stream-filter-value": "55_1764156978",
"x-stream-offset": 30771
},
"content": "gap0YXNrTnVtYmVyoTA="
}
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels