Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions packages/amqp/lib/AbstractAmqpPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export abstract class AbstractAmqpPublisher<
this.publish(message, options)
})
.catch((err) => {
/* v8 ignore next */
this.handleError(err)
})
return
Expand Down Expand Up @@ -140,15 +141,14 @@ export abstract class AbstractAmqpPublisher<
return this.messageSchemaContainer.resolveSchema(message)
}

/* c8 ignore start */
/* v8 ignore start */
protected resolveMessage(): Either<
MessageInvalidFormatError | MessageValidationError,
ResolvedMessage
> {
throw new Error('Not implemented for publisher')
}

/* c8 ignore start */
protected override processPrehandlers(): Promise<unknown> {
throw new Error('Not implemented for publisher')
}
Expand All @@ -161,13 +161,13 @@ export abstract class AbstractAmqpPublisher<
throw new Error('Not implemented for publisher')
}

override processMessage(): Promise<Either<'retryLater', 'success'>> {
throw new Error('Not implemented for publisher')
}
/* v8 ignore stop */

override async close(): Promise<void> {
this.initPromise = undefined
await super.close()
}

override processMessage(): Promise<Either<'retryLater', 'success'>> {
throw new Error('Not implemented for publisher')
}
/* c8 ignore stop */
}
4 changes: 4 additions & 0 deletions packages/amqp/lib/AbstractAmqpService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ export abstract class AbstractAmqpService<
try {
this.channel = await this.connection.createChannel()
} catch (err) {
/* v8 ignore start */
// @ts-expect-error
this.logger.error(`Error creating channel: ${err.message}`)
await this.connectionManager.reconnect()
return
/* v8 ignore stop */
}

if (oldChannel) {
Expand All @@ -102,6 +104,7 @@ export abstract class AbstractAmqpService<
this.isShuttingDown = false
}

/* v8 ignore start */
this.channel.on('close', () => {
if (!this.isShuttingDown) {
this.logger.error('AMQP connection lost!')
Expand All @@ -114,6 +117,7 @@ export abstract class AbstractAmqpService<
this.channel.on('error', (err) => {
this.handleError(err)
})
/* v8 ignore stop */

await this.createMissingEntities()
}
Expand Down
2 changes: 2 additions & 0 deletions packages/amqp/lib/AmqpConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export class AmqpConnectionManager {

private async createConnection(): Promise<ChannelModel> {
const connection = await resolveAmqpConnection(this.config)
/* v8 ignore start */
connection.on('error', (err) => {
this.logger.error(`AmqpConnectionManager: Connection error: ${err.message}`)
this.connection = undefined
Expand All @@ -42,6 +43,7 @@ export class AmqpConnectionManager {
}
}
})
/* v8 ignore stop */

const promises: Promise<unknown>[] = []

Expand Down
21 changes: 21 additions & 0 deletions packages/amqp/lib/AmqpQueuePublisherManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,26 @@ describe('AmqpQueuePublisherManager', () => {
}),
).toThrow(/Error while publishing to AMQP Cannot read properties of undefined/)
})

it('throws when deprecated publish method is called', () => {
const { queuePublisherManager } = diContainer.cradle

expect(() => queuePublisherManager.publish()).toThrow(
'Please use `publishSync` method for AMQP publisher managers',
)
})

it('throws when publishing to unknown queue', () => {
const { queuePublisherManager } = diContainer.cradle

expect(() =>
queuePublisherManager.publishSync('unknown-queue' as any, {
type: 'entity.updated',
payload: {
updatedData: 'msg',
},
}),
).toThrow('No publisher for queue unknown-queue')
})
})
})
21 changes: 21 additions & 0 deletions packages/amqp/lib/AmqpTopicPublisherManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,5 +89,26 @@ describe('AmqpTopicPublisherManager', () => {
expect(fakeConsumer2.messageCounter).toEqual(1)
expect(fakeConsumer3.messageCounter).toEqual(0)
})

it('throws when deprecated publish method is called', () => {
const { topicPublisherManager } = diContainer.cradle

expect(() => topicPublisherManager.publish()).toThrow(
'Please use `publishSync` method for AMQP publisher managers',
)
})

it('throws when publishing to unknown exchange', () => {
const { topicPublisherManager } = diContainer.cradle

expect(() =>
topicPublisherManager.publishSync('unknown-exchange' as any, {
type: 'entity.updated',
payload: {
updatedData: 'msg',
},
}),
).toThrow('No publisher for exchange unknown-exchange')
})
})
})
4 changes: 4 additions & 0 deletions packages/amqp/lib/amqpConnectionResolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,21 @@ export async function resolveAmqpConnection(config: AmqpConfig): Promise<Channel
const connection = await connect(url)
return connection
} catch (_e) {
/* v8 ignore start */
globalLogger.error(
`Failed to connect to AMQP broker at ${config.hostname}:${config.port}. Retrying in ${
retryTime / 1000
} seconds...`,
)
/* v8 ignore stop */
}
/* v8 ignore start */
await setTimeout(retryTime)
counter++

if (counter > MAX_RETRY_ATTEMPTS) {
throw new Error('Failed to resolve AMQP connection')
}
/* v8 ignore stop */
}
}
86 changes: 86 additions & 0 deletions packages/amqp/lib/errors/AmqpConsumerErrorResolver.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { InternalError } from '@lokalise/node-core'
import { MessageInvalidFormatError, MessageValidationError } from '@message-queue-toolkit/core'
import { describe, expect, it } from 'vitest'
import { ZodError } from 'zod/v4'
import { AmqpConsumerErrorResolver } from './AmqpConsumerErrorResolver.ts'

describe('AmqpConsumerErrorResolver', () => {
const resolver = new AmqpConsumerErrorResolver()

it('returns MessageInvalidFormatError for SyntaxError', () => {
const syntaxError = new SyntaxError('Unexpected token')

const result = resolver.processError(syntaxError)

expect(result).toBeInstanceOf(MessageInvalidFormatError)
expect(result.message).toBe('Unexpected token')
})

it('returns MessageValidationError for ZodError', () => {
const zodError = new ZodError([
{
code: 'invalid_type',
expected: 'string',
message: 'Expected string, received number',
path: ['id'],
input: 123,
},
])

const result = resolver.processError(zodError)

expect(result).toBeInstanceOf(MessageValidationError)
expect(result.details).toEqual({
error: zodError.issues,
})
})

it('returns InternalError for standardized errors', () => {
// Create an error that matches the StandardizedError interface
// StandardizedError requires: name, message, code, and Symbol.for('StandardizedErrorSymbol') = true
const standardizedError = Object.assign(new Error('Custom error message'), {
code: 'CUSTOM_ERROR',
[Symbol.for('StandardizedErrorSymbol')]: true,
})

const result = resolver.processError(standardizedError)

expect(result).toBeInstanceOf(InternalError)
expect(result.message).toBe('Custom error message')
expect(result.errorCode).toBe('CUSTOM_ERROR')
})

it('returns InternalError with default message for unknown error types', () => {
const unknownError = { someProperty: 'someValue' }

const result = resolver.processError(unknownError)

expect(result).toBeInstanceOf(InternalError)
expect(result.message).toBe('Error processing message')
expect(result.errorCode).toBe('INTERNAL_ERROR')
})

it('returns InternalError with default message for null error', () => {
const result = resolver.processError(null)

expect(result).toBeInstanceOf(InternalError)
expect(result.message).toBe('Error processing message')
expect(result.errorCode).toBe('INTERNAL_ERROR')
})

it('returns InternalError with default message for undefined error', () => {
const result = resolver.processError(undefined)

expect(result).toBeInstanceOf(InternalError)
expect(result.message).toBe('Error processing message')
expect(result.errorCode).toBe('INTERNAL_ERROR')
})

it('returns InternalError with default message for string error', () => {
const result = resolver.processError('some string error')

expect(result).toBeInstanceOf(InternalError)
expect(result.message).toBe('Error processing message')
expect(result.errorCode).toBe('INTERNAL_ERROR')
})
})
Loading
Loading