-
Notifications
You must be signed in to change notification settings - Fork 11
Description
Summary
The rabbitmq-stream-js-client library throws a RangeError when attempting to unsubscribe consumers after the internal consumerId counter exceeds 255. This is caused by using Buffer.writeUInt8() to serialize the subscription ID in UnsubscribeRequest, which only supports values 0-255, while the consumer ID counter can grow indefinitely.
Environment
- Library:
rabbitmq-stream-js-client - Version:
^1.0.0 - Node.js: v24.13.0
- Platform: Linux (Kubernetes)
- Use Case: Long-running service with dynamic consumer creation/deletion
Description
The Problem
When a RabbitMQ Streams client runs for extended periods and creates/destroys consumers dynamically (e.g., through subscription management), the internal consumerId counter increments without bounds. After 256 consumer operations, attempting to close/unsubscribe any consumer results in a RangeError crash.
Root Cause
File: dist/requests/unsubscribe_request.js (line 17)
writeContent(writer) {
writer.writeUInt8(this.subscriptionId); // ❌ UInt8 only supports 0-255
}File: dist/connection.js
getNextConsumerId() {
const consumerId = this.consumerId;
this.consumerId++; // ✅ Increments indefinitely (JavaScript number)
return consumerId;
}The consumer ID counter is a JavaScript number (theoretically up to 2^53) but is serialized as a UInt8 (max 255). After 256 consumers, the value overflows the buffer type.
Steps to Reproduce
- Create a long-running application using
rabbitmq-stream-js-client - Implement dynamic consumer management (create/destroy consumers based on events)
- Perform 256+ consumer creation operations (e.g., via Redis Pub/Sub events triggering consumer starts/stops)
- Attempt to close/unsubscribe a consumer (e.g., during graceful shutdown)
- Observe
RangeError: The value of "value" is out of range. It must be >= 0 and <= 255. Received 256
Minimal Reproduction (Pseudocode):
const client = await connect({ /* ... */ });
// Simulate 256+ consumer operations
for (let i = 0; i < 260; i++) {
const consumer = await client.declareConsumer({
stream: 'test-stream',
offset: Offset.next(),
}, (message) => { /* handler */ });
// Close immediately
await consumer.close(); // ❌ Will crash at i=256
}Error Stack Trace
RangeError [ERR_OUT_OF_RANGE]: The value of "value" is out of range. It must be >= 0 and <= 255. Received 269
at writeU_Int8 (node:internal/buffer:740:11)
at Buffer.writeUInt8 (node:internal/buffer:750:10)
at BufferDataWriter.writeUInt8 (/node_modules/rabbitmq-stream-js-client/dist/requests/abstract_request.js:50:36)
at UnsubscribeRequest.writeContent (/node_modules/rabbitmq-stream-js-client/dist/requests/unsubscribe_request.js:17:16)
at UnsubscribeRequest.toBuffer (/node_modules/rabbitmq-stream-js-client/dist/requests/abstract_request.js:117:14)
at /node_modules/rabbitmq-stream-js-client/dist/connection.js:298:30
at new Promise (<anonymous>)
at Connection.sendAndWait (/node_modules/rabbitmq-stream-js-client/dist/connection.js:295:16)
at Client.unsubscribe (/node_modules/rabbitmq-stream-js-client/dist/client.js:693:38)
at Client.closeConsumer (/node_modules/rabbitmq-stream-js-client/dist/client.js:289:24)
Impact
Severity: HIGH
- Process Crashes: The error is thrown as an unhandled rejection, causing the Node.js process to exit with code 1
- Kubernetes Restart Loops: In containerized environments, this triggers automatic restarts
- Zombie Consumers: Because consumers fail to close properly, they remain as "zombie" connections in RabbitMQ
- Production Impact: After ~256 consumer operations (which can happen in hours/days), the service becomes unstable
Real-World Scenario
In our production environment:
- Service runs continuously with dynamic subscription management
- After 16 hours of operation,
consumerIdreaches 269+ - Graceful shutdown triggered (SIGTERM from Kubernetes)
- Unsubscribe fails with RangeError
- Process crashes (Exit Code 1)
- Kubernetes restarts the pod
- Cycle repeats, creating more zombie consumers
Proposed Solution
Fix: Use Larger Integer Type
The subscription ID should be serialized using a larger integer type that matches the protocol specification.
Option 1: UInt16 (supports up to 65,535 consumers)
// dist/requests/unsubscribe_request.js
writeContent(writer) {
- writer.writeUInt8(this.subscriptionId);
+ writer.writeUInt16(this.subscriptionId);
}Option 2: UInt32 (supports up to 4,294,967,295 consumers - recommended)
// dist/requests/unsubscribe_request.js
writeContent(writer) {
- writer.writeUInt8(this.subscriptionId);
+ writer.writeUInt32(this.subscriptionId);
}Additional Considerations
- Protocol Compatibility: Verify what the RabbitMQ Streams protocol actually expects (check RABBITMQ_STREAM_PROTOCOL.md or protocol spec)
- Subscribe Request: Check if
SubscribeRequestuses the same serialization - it should be consistent - Wire Format: Ensure the change is backward compatible with RabbitMQ server versions
Alternative: Counter Reset
If the protocol truly only supports UInt8, consider resetting the counter:
getNextConsumerId() {
const consumerId = this.consumerId;
this.consumerId = (this.consumerId + 1) % 256; // Wrap at 255
return consumerId;
}Note: This requires careful handling to avoid ID collisions with active consumers.
Current Workaround
We've implemented a workaround by catching and suppressing the ERR_OUT_OF_RANGE error:
// In application code
process.on('unhandledRejection', (reason, promise) => {
const error = reason as { code?: string; message?: string };
// Suppress known library bug
if (
error.code === 'ERR_OUT_OF_RANGE' ||
(error.message?.includes('out of range') && error.message?.includes('UnsubscribeRequest'))
) {
logger.warn(`Suppressing RangeError from rabbitmq-stream-js-client bug`);
return; // Don't crash the process
}
// Handle other rejections normally
// ...
});This prevents the process crash but does not fix the underlying issue - consumers still fail to close properly.
Testing Checklist
- Test with consumer ID = 254, 255, 256, 257
- Verify protocol specification for subscription ID size
- Test backward compatibility with existing RabbitMQ server versions
- Add integration test for long-running scenarios (256+ consumers)
- Verify no similar issues in other request types (Subscribe, Credit, etc.)
Additional Context
- Similar issues might exist in other request classes that serialize consumer/subscription IDs
- The
consumerIdcounter is per-connection and never resets - With Single Active Consumer (SAC), multiple connections to the same queue can accumulate IDs quickly
References
- RabbitMQ Streams Protocol: [Link if available]
- Related Code:
dist/connection.js-getNextConsumerId()dist/requests/unsubscribe_request.js-writeContent()dist/requests/abstract_request.js-BufferDataWriter.writeUInt8()
Reporter: Kurt Bädker, AIRSYS GmbH
Date: March 26, 2026
Severity: High
Type: Bug
Component: Protocol Serialization
Thank you for this excellent library! This bug fix would prevent crashes in long-running production environments.