diff --git a/packages/amqp/lib/AbstractAmqpPublisher.ts b/packages/amqp/lib/AbstractAmqpPublisher.ts index 5b8aabdd..6366d5ae 100644 --- a/packages/amqp/lib/AbstractAmqpPublisher.ts +++ b/packages/amqp/lib/AbstractAmqpPublisher.ts @@ -82,6 +82,7 @@ export abstract class AbstractAmqpPublisher< this.publish(message, options) }) .catch((err) => { + /* v8 ignore next */ this.handleError(err) }) return @@ -140,7 +141,7 @@ export abstract class AbstractAmqpPublisher< return this.messageSchemaContainer.resolveSchema(message) } - /* c8 ignore start */ + /* v8 ignore start */ protected resolveMessage(): Either< MessageInvalidFormatError | MessageValidationError, ResolvedMessage @@ -148,7 +149,6 @@ export abstract class AbstractAmqpPublisher< throw new Error('Not implemented for publisher') } - /* c8 ignore start */ protected override processPrehandlers(): Promise { throw new Error('Not implemented for publisher') } @@ -161,13 +161,13 @@ export abstract class AbstractAmqpPublisher< throw new Error('Not implemented for publisher') } + override processMessage(): Promise> { + throw new Error('Not implemented for publisher') + } + /* v8 ignore stop */ + override async close(): Promise { this.initPromise = undefined await super.close() } - - override processMessage(): Promise> { - throw new Error('Not implemented for publisher') - } - /* c8 ignore stop */ } diff --git a/packages/amqp/lib/AbstractAmqpService.ts b/packages/amqp/lib/AbstractAmqpService.ts index 8cb36f45..b4e57846 100644 --- a/packages/amqp/lib/AbstractAmqpService.ts +++ b/packages/amqp/lib/AbstractAmqpService.ts @@ -86,10 +86,12 @@ export abstract class AbstractAmqpService< try { this.channel = await this.connection.createChannel() } catch (err) { + /* v8 ignore start */ // @ts-expect-error this.logger.error(`Error creating channel: ${err.message}`) await this.connectionManager.reconnect() return + /* v8 ignore stop */ } if (oldChannel) { @@ -102,6 +104,7 @@ export abstract class AbstractAmqpService< this.isShuttingDown = false } + /* v8 ignore start */ this.channel.on('close', () => { if (!this.isShuttingDown) { this.logger.error('AMQP connection lost!') @@ -114,6 +117,7 @@ export abstract class AbstractAmqpService< this.channel.on('error', (err) => { this.handleError(err) }) + /* v8 ignore stop */ await this.createMissingEntities() } diff --git a/packages/amqp/lib/AmqpConnectionManager.ts b/packages/amqp/lib/AmqpConnectionManager.ts index 0a70a300..3df627fb 100644 --- a/packages/amqp/lib/AmqpConnectionManager.ts +++ b/packages/amqp/lib/AmqpConnectionManager.ts @@ -27,6 +27,7 @@ export class AmqpConnectionManager { private async createConnection(): Promise { const connection = await resolveAmqpConnection(this.config) + /* v8 ignore start */ connection.on('error', (err) => { this.logger.error(`AmqpConnectionManager: Connection error: ${err.message}`) this.connection = undefined @@ -42,6 +43,7 @@ export class AmqpConnectionManager { } } }) + /* v8 ignore stop */ const promises: Promise[] = [] diff --git a/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts b/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts index 4b85e3aa..8f359455 100644 --- a/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts +++ b/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts @@ -124,5 +124,26 @@ describe('AmqpQueuePublisherManager', () => { }), ).toThrow(/Error while publishing to AMQP Cannot read properties of undefined/) }) + + it('throws when deprecated publish method is called', () => { + const { queuePublisherManager } = diContainer.cradle + + expect(() => queuePublisherManager.publish()).toThrow( + 'Please use `publishSync` method for AMQP publisher managers', + ) + }) + + it('throws when publishing to unknown queue', () => { + const { queuePublisherManager } = diContainer.cradle + + expect(() => + queuePublisherManager.publishSync('unknown-queue' as any, { + type: 'entity.updated', + payload: { + updatedData: 'msg', + }, + }), + ).toThrow('No publisher for queue unknown-queue') + }) }) }) diff --git a/packages/amqp/lib/AmqpTopicPublisherManager.spec.ts b/packages/amqp/lib/AmqpTopicPublisherManager.spec.ts index e803b6d0..70db04f8 100644 --- a/packages/amqp/lib/AmqpTopicPublisherManager.spec.ts +++ b/packages/amqp/lib/AmqpTopicPublisherManager.spec.ts @@ -89,5 +89,26 @@ describe('AmqpTopicPublisherManager', () => { expect(fakeConsumer2.messageCounter).toEqual(1) expect(fakeConsumer3.messageCounter).toEqual(0) }) + + it('throws when deprecated publish method is called', () => { + const { topicPublisherManager } = diContainer.cradle + + expect(() => topicPublisherManager.publish()).toThrow( + 'Please use `publishSync` method for AMQP publisher managers', + ) + }) + + it('throws when publishing to unknown exchange', () => { + const { topicPublisherManager } = diContainer.cradle + + expect(() => + topicPublisherManager.publishSync('unknown-exchange' as any, { + type: 'entity.updated', + payload: { + updatedData: 'msg', + }, + }), + ).toThrow('No publisher for exchange unknown-exchange') + }) }) }) diff --git a/packages/amqp/lib/amqpConnectionResolver.ts b/packages/amqp/lib/amqpConnectionResolver.ts index 4d5c928d..9f856b94 100644 --- a/packages/amqp/lib/amqpConnectionResolver.ts +++ b/packages/amqp/lib/amqpConnectionResolver.ts @@ -26,17 +26,21 @@ export async function resolveAmqpConnection(config: AmqpConfig): Promise MAX_RETRY_ATTEMPTS) { throw new Error('Failed to resolve AMQP connection') } + /* v8 ignore stop */ } } diff --git a/packages/amqp/lib/errors/AmqpConsumerErrorResolver.spec.ts b/packages/amqp/lib/errors/AmqpConsumerErrorResolver.spec.ts new file mode 100644 index 00000000..98a6cdb4 --- /dev/null +++ b/packages/amqp/lib/errors/AmqpConsumerErrorResolver.spec.ts @@ -0,0 +1,86 @@ +import { InternalError } from '@lokalise/node-core' +import { MessageInvalidFormatError, MessageValidationError } from '@message-queue-toolkit/core' +import { describe, expect, it } from 'vitest' +import { ZodError } from 'zod/v4' +import { AmqpConsumerErrorResolver } from './AmqpConsumerErrorResolver.ts' + +describe('AmqpConsumerErrorResolver', () => { + const resolver = new AmqpConsumerErrorResolver() + + it('returns MessageInvalidFormatError for SyntaxError', () => { + const syntaxError = new SyntaxError('Unexpected token') + + const result = resolver.processError(syntaxError) + + expect(result).toBeInstanceOf(MessageInvalidFormatError) + expect(result.message).toBe('Unexpected token') + }) + + it('returns MessageValidationError for ZodError', () => { + const zodError = new ZodError([ + { + code: 'invalid_type', + expected: 'string', + message: 'Expected string, received number', + path: ['id'], + input: 123, + }, + ]) + + const result = resolver.processError(zodError) + + expect(result).toBeInstanceOf(MessageValidationError) + expect(result.details).toEqual({ + error: zodError.issues, + }) + }) + + it('returns InternalError for standardized errors', () => { + // Create an error that matches the StandardizedError interface + // StandardizedError requires: name, message, code, and Symbol.for('StandardizedErrorSymbol') = true + const standardizedError = Object.assign(new Error('Custom error message'), { + code: 'CUSTOM_ERROR', + [Symbol.for('StandardizedErrorSymbol')]: true, + }) + + const result = resolver.processError(standardizedError) + + expect(result).toBeInstanceOf(InternalError) + expect(result.message).toBe('Custom error message') + expect(result.errorCode).toBe('CUSTOM_ERROR') + }) + + it('returns InternalError with default message for unknown error types', () => { + const unknownError = { someProperty: 'someValue' } + + const result = resolver.processError(unknownError) + + expect(result).toBeInstanceOf(InternalError) + expect(result.message).toBe('Error processing message') + expect(result.errorCode).toBe('INTERNAL_ERROR') + }) + + it('returns InternalError with default message for null error', () => { + const result = resolver.processError(null) + + expect(result).toBeInstanceOf(InternalError) + expect(result.message).toBe('Error processing message') + expect(result.errorCode).toBe('INTERNAL_ERROR') + }) + + it('returns InternalError with default message for undefined error', () => { + const result = resolver.processError(undefined) + + expect(result).toBeInstanceOf(InternalError) + expect(result.message).toBe('Error processing message') + expect(result.errorCode).toBe('INTERNAL_ERROR') + }) + + it('returns InternalError with default message for string error', () => { + const result = resolver.processError('some string error') + + expect(result).toBeInstanceOf(InternalError) + expect(result.message).toBe('Error processing message') + expect(result.errorCode).toBe('INTERNAL_ERROR') + }) +}) diff --git a/packages/amqp/lib/utils/AmqpQueueUtils.spec.ts b/packages/amqp/lib/utils/AmqpQueueUtils.spec.ts new file mode 100644 index 00000000..557258cd --- /dev/null +++ b/packages/amqp/lib/utils/AmqpQueueUtils.spec.ts @@ -0,0 +1,209 @@ +import { reloadConfig } from '@message-queue-toolkit/core' +import type { AwilixContainer } from 'awilix' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { TEST_AMQP_CONFIG } from '../../test/utils/testAmqpConfig.ts' +import type { Dependencies } from '../../test/utils/testContext.ts' +import { registerDependencies } from '../../test/utils/testContext.ts' +import { + checkExchangeExists, + deleteAmqpQueue, + ensureAmqpQueue, + ensureAmqpTopicSubscription, + ensureExchange, +} from './amqpQueueUtils.ts' + +describe('AmqpQueueUtils', () => { + let diContainer: AwilixContainer + + beforeEach(async () => { + diContainer = await registerDependencies(TEST_AMQP_CONFIG, undefined, false) + }) + + afterEach(async () => { + const { awilixManager } = diContainer.cradle + await awilixManager.executeDispose() + await diContainer.dispose() + }) + + describe('ensureExchange', () => { + it('creates exchange when creationConfig is provided', async () => { + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + const channel = await connection.createChannel() + + await ensureExchange(connection, channel, { exchange: 'test-exchange' }) + + // Verify exchange exists + await expect(channel.checkExchange('test-exchange')).resolves.not.toThrow() + + await channel.deleteExchange('test-exchange') + await channel.close() + }) + + it('throws when locatorConfig exchange does not exist', async () => { + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + const channel = await connection.createChannel() + + await expect( + ensureExchange(connection, channel, undefined, { exchange: 'non-existent-exchange' }), + ).rejects.toThrow('Exchange non-existent-exchange does not exist.') + + await channel.close() + }) + + it('throws when neither creationConfig nor locatorConfig is provided', async () => { + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + const channel = await connection.createChannel() + + await expect(ensureExchange(connection, channel)).rejects.toThrow( + 'locatorConfig is mandatory when creationConfig is not set', + ) + + await channel.close() + }) + + it('validates exchange exists when locatorConfig is provided', async () => { + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + const channel = await connection.createChannel() + + // First create the exchange + await channel.assertExchange('existing-exchange', 'topic') + + // Then verify it with locatorConfig + await expect( + ensureExchange(connection, channel, undefined, { exchange: 'existing-exchange' }), + ).resolves.not.toThrow() + + await channel.deleteExchange('existing-exchange') + await channel.close() + }) + }) + + describe('checkExchangeExists', () => { + it('throws when exchange does not exist', async () => { + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + + await expect(checkExchangeExists(connection, { exchange: 'non-existent' })).rejects.toThrow( + 'Exchange non-existent does not exist.', + ) + }) + + it('does not throw when exchange exists', async () => { + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + const channel = await connection.createChannel() + + await channel.assertExchange('temp-exchange', 'topic') + + await expect( + checkExchangeExists(connection, { exchange: 'temp-exchange' }), + ).resolves.not.toThrow() + + await channel.deleteExchange('temp-exchange') + await channel.close() + }) + }) + + describe('ensureAmqpQueue', () => { + it('throws when neither creationConfig nor locatorConfig is provided', async () => { + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + const channel = await connection.createChannel() + + await expect(ensureAmqpQueue(connection, channel)).rejects.toThrow( + 'locatorConfig is mandatory when creationConfig is not set', + ) + + await channel.close() + }) + }) + + describe('ensureAmqpTopicSubscription', () => { + it('throws when neither creationConfig nor locatorConfig is provided', async () => { + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + const channel = await connection.createChannel() + + await expect(ensureAmqpTopicSubscription(connection, channel)).rejects.toThrow( + 'locatorConfig is mandatory when creationConfig is not set', + ) + + await channel.close() + }) + }) + + describe('deleteAmqpQueue', () => { + it('does nothing when deleteIfExists is false', async () => { + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + const channel = await connection.createChannel() + + await channel.assertQueue('queue-to-keep') + + await deleteAmqpQueue( + channel, + { deleteIfExists: false }, + { queueName: 'queue-to-keep', queueOptions: {} }, + ) + + // Queue should still exist + await expect(channel.checkQueue('queue-to-keep')).resolves.not.toThrow() + + await channel.deleteQueue('queue-to-keep') + await channel.close() + }) + + it('throws in production without forceDeleteInProduction', async () => { + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + const channel = await connection.createChannel() + + // Mock production environment + vi.stubEnv('NODE_ENV', 'production') + reloadConfig() + + try { + await expect( + deleteAmqpQueue( + channel, + { deleteIfExists: true }, + { queueName: 'test-queue', queueOptions: {} }, + ), + ).rejects.toThrow('You are running autodeletion in production') + } finally { + vi.unstubAllEnvs() + reloadConfig() + await channel.close() + } + }) + + it('throws when queueName is not set', async () => { + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + const channel = await connection.createChannel() + + await expect( + deleteAmqpQueue( + channel, + { deleteIfExists: true, forceDeleteInProduction: true }, + { queueName: '', queueOptions: {} }, + ), + ).rejects.toThrow('QueueName must be set for automatic deletion') + + await channel.close() + }) + + it('deletes queue when all conditions are met', async () => { + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + const channel = await connection.createChannel() + + await channel.assertQueue('queue-to-delete') + + await deleteAmqpQueue( + channel, + { deleteIfExists: true, forceDeleteInProduction: true }, + { queueName: 'queue-to-delete', queueOptions: {} }, + ) + + // Queue should no longer exist + const checkChannel = await connection.createChannel() + checkChannel.on('error', () => {}) + await expect(checkChannel.checkQueue('queue-to-delete')).rejects.toThrow() + + await channel.close() + }) + }) +}) diff --git a/packages/amqp/package.json b/packages/amqp/package.json index 311f6589..99a19653 100644 --- a/packages/amqp/package.json +++ b/packages/amqp/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/amqp", - "version": "22.2.0", + "version": "23.0.0", "private": false, "license": "MIT", "description": "AMQP adapter for message-queue-toolkit", @@ -28,29 +28,29 @@ "prepublishOnly": "npm run lint && npm run build" }, "dependencies": { - "@lokalise/node-core": "^14.2.0" + "@lokalise/node-core": "^14.6.1" }, "peerDependencies": { - "@message-queue-toolkit/core": ">=22.0.0", + "@message-queue-toolkit/core": ">=25.0.0", "@message-queue-toolkit/schemas": ">=7.0.0", "amqplib": "^0.10.8", "zod": ">=3.25.76 <5.0.0" }, "devDependencies": { - "@biomejs/biome": "^2.3.2", + "@biomejs/biome": "^2.3.8", "@lokalise/biome-config": "^3.1.0", "@lokalise/tsconfig": "^3.0.0", "@message-queue-toolkit/core": "*", "@types/amqplib": "0.10.8", - "@types/node": "^24.0.3", - "@vitest/coverage-v8": "^3.2.4", + "@types/node": "^24.10.4", + "@vitest/coverage-v8": "^4.0.15", "amqplib": "^0.10.8", "awilix": "^12.0.5", "awilix-manager": "^6.1.0", "rimraf": "^6.0.1", - "typescript": "^5.9.2", - "vitest": "^3.2.4", - "zod": "^4.0.17" + "typescript": "^5.9.3", + "vitest": "^4.0.15", + "zod": "^4.1.13" }, "homepage": "https://github.com/kibertoad/message-queue-toolkit", "repository": { diff --git a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts index 542323c4..a15a2056 100644 --- a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts +++ b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts @@ -419,6 +419,35 @@ describe('AmqpPermissionConsumer', () => { expect((consumerErrorResolver.errors[0] as Error).message).toContain('Unexpected token') }) + it('handles message with empty body', async () => { + const initialAddCounter = consumer.addCounter + + // Send an empty buffer which should be rejected since it cannot be parsed + channel.sendToQueue(AmqpPermissionConsumer.QUEUE_NAME, Buffer.from('')) + + // Wait a bit for the consumer to process the message + await new Promise((resolve) => setTimeout(resolve, 200)) + + // The consumer should have processed nothing since message is invalid + expect(consumer.addCounter).toBe(initialAddCounter) + }) + + it('handles message without id field', async () => { + // Send a valid JSON message without the id field to test tryToExtractId returning abort + channel.sendToQueue( + AmqpPermissionConsumer.QUEUE_NAME, + objectToBuffer({ + messageType: 'add', + // No id field + }), + ) + + await waitAndRetry(() => consumerErrorResolver.errors.length > 0) + + // The message should be rejected due to schema validation (id is required) + expect(consumerErrorResolver.errors.length).toBeGreaterThan(0) + }) + it('Processes messages', async () => { publisher.publish({ id: '10', diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts index 7b2d1e62..a7db562f 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts @@ -98,6 +98,36 @@ describe('PermissionPublisher', () => { await expect(() => newPublisher.init()).rejects.toThrow(/does not exist/) }) + it('throws an error when neither locator nor creation config provides queueName', () => { + expect( + () => + new AmqpPermissionPublisher(diContainer.cradle, { + // @ts-expect-error intentionally passing invalid config + locatorConfig: {}, + // @ts-expect-error intentionally passing invalid config + creationConfig: {}, + }), + ).toThrow('Either locatorConfig or creationConfig must provide queueName') + }) + + it('abstract methods not implemented for publisher throw errors', async () => { + const newPublisher = new AmqpPermissionPublisher(diContainer.cradle) + await newPublisher.init() + + // Test that abstract methods throw "Not implemented for publisher" errors + // @ts-expect-error accessing protected method for coverage + expect(() => newPublisher.resolveMessage()).toThrow('Not implemented for publisher') + // @ts-expect-error accessing protected method for coverage + expect(() => newPublisher.processPrehandlers()).toThrow('Not implemented for publisher') + // @ts-expect-error accessing protected method for coverage + expect(() => newPublisher.preHandlerBarrier()).toThrow('Not implemented for publisher') + // @ts-expect-error accessing protected method for coverage + expect(() => newPublisher.resolveNextFunction()).toThrow('Not implemented for publisher') + expect(() => newPublisher.processMessage()).toThrow('Not implemented for publisher') + + await newPublisher.close() + }) + it('does not create a new queue when queue locator is passed', async () => { await channel.assertQueue(AmqpPermissionPublisher.QUEUE_NAME) diff --git a/packages/amqp/vitest.config.ts b/packages/amqp/vitest.config.ts index c09d85e6..a9b63c29 100644 --- a/packages/amqp/vitest.config.ts +++ b/packages/amqp/vitest.config.ts @@ -7,18 +7,16 @@ export default defineConfig({ watch: false, restoreMocks: true, pool: 'threads', - poolOptions: { - threads: { singleThread: true }, - }, + maxWorkers: 1, coverage: { provider: 'v8', include: ['lib/**/*.ts'], exclude: ['vitest.config.ts', 'lib/**/index.ts'], thresholds: { - lines: 89, + lines: 90, functions: 95, - branches: 79, - statements: 89, + branches: 80, + statements: 90, }, }, },