Skip to content

Commit f2e4861

Browse files
authored
Improve coverage and prepare AMQP 23.0.0 release (#375)
1 parent 7eb1dc1 commit f2e4861

12 files changed

+426
-22
lines changed

packages/amqp/lib/AbstractAmqpPublisher.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ export abstract class AbstractAmqpPublisher<
8282
this.publish(message, options)
8383
})
8484
.catch((err) => {
85+
/* v8 ignore next */
8586
this.handleError(err)
8687
})
8788
return
@@ -140,15 +141,14 @@ export abstract class AbstractAmqpPublisher<
140141
return this.messageSchemaContainer.resolveSchema(message)
141142
}
142143

143-
/* c8 ignore start */
144+
/* v8 ignore start */
144145
protected resolveMessage(): Either<
145146
MessageInvalidFormatError | MessageValidationError,
146147
ResolvedMessage
147148
> {
148149
throw new Error('Not implemented for publisher')
149150
}
150151

151-
/* c8 ignore start */
152152
protected override processPrehandlers(): Promise<unknown> {
153153
throw new Error('Not implemented for publisher')
154154
}
@@ -161,13 +161,13 @@ export abstract class AbstractAmqpPublisher<
161161
throw new Error('Not implemented for publisher')
162162
}
163163

164+
override processMessage(): Promise<Either<'retryLater', 'success'>> {
165+
throw new Error('Not implemented for publisher')
166+
}
167+
/* v8 ignore stop */
168+
164169
override async close(): Promise<void> {
165170
this.initPromise = undefined
166171
await super.close()
167172
}
168-
169-
override processMessage(): Promise<Either<'retryLater', 'success'>> {
170-
throw new Error('Not implemented for publisher')
171-
}
172-
/* c8 ignore stop */
173173
}

packages/amqp/lib/AbstractAmqpService.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,12 @@ export abstract class AbstractAmqpService<
8686
try {
8787
this.channel = await this.connection.createChannel()
8888
} catch (err) {
89+
/* v8 ignore start */
8990
// @ts-expect-error
9091
this.logger.error(`Error creating channel: ${err.message}`)
9192
await this.connectionManager.reconnect()
9293
return
94+
/* v8 ignore stop */
9395
}
9496

9597
if (oldChannel) {
@@ -102,6 +104,7 @@ export abstract class AbstractAmqpService<
102104
this.isShuttingDown = false
103105
}
104106

107+
/* v8 ignore start */
105108
this.channel.on('close', () => {
106109
if (!this.isShuttingDown) {
107110
this.logger.error('AMQP connection lost!')
@@ -114,6 +117,7 @@ export abstract class AbstractAmqpService<
114117
this.channel.on('error', (err) => {
115118
this.handleError(err)
116119
})
120+
/* v8 ignore stop */
117121

118122
await this.createMissingEntities()
119123
}

packages/amqp/lib/AmqpConnectionManager.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export class AmqpConnectionManager {
2727

2828
private async createConnection(): Promise<ChannelModel> {
2929
const connection = await resolveAmqpConnection(this.config)
30+
/* v8 ignore start */
3031
connection.on('error', (err) => {
3132
this.logger.error(`AmqpConnectionManager: Connection error: ${err.message}`)
3233
this.connection = undefined
@@ -42,6 +43,7 @@ export class AmqpConnectionManager {
4243
}
4344
}
4445
})
46+
/* v8 ignore stop */
4547

4648
const promises: Promise<unknown>[] = []
4749

packages/amqp/lib/AmqpQueuePublisherManager.spec.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,5 +124,26 @@ describe('AmqpQueuePublisherManager', () => {
124124
}),
125125
).toThrow(/Error while publishing to AMQP Cannot read properties of undefined/)
126126
})
127+
128+
it('throws when deprecated publish method is called', () => {
129+
const { queuePublisherManager } = diContainer.cradle
130+
131+
expect(() => queuePublisherManager.publish()).toThrow(
132+
'Please use `publishSync` method for AMQP publisher managers',
133+
)
134+
})
135+
136+
it('throws when publishing to unknown queue', () => {
137+
const { queuePublisherManager } = diContainer.cradle
138+
139+
expect(() =>
140+
queuePublisherManager.publishSync('unknown-queue' as any, {
141+
type: 'entity.updated',
142+
payload: {
143+
updatedData: 'msg',
144+
},
145+
}),
146+
).toThrow('No publisher for queue unknown-queue')
147+
})
127148
})
128149
})

packages/amqp/lib/AmqpTopicPublisherManager.spec.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,5 +89,26 @@ describe('AmqpTopicPublisherManager', () => {
8989
expect(fakeConsumer2.messageCounter).toEqual(1)
9090
expect(fakeConsumer3.messageCounter).toEqual(0)
9191
})
92+
93+
it('throws when deprecated publish method is called', () => {
94+
const { topicPublisherManager } = diContainer.cradle
95+
96+
expect(() => topicPublisherManager.publish()).toThrow(
97+
'Please use `publishSync` method for AMQP publisher managers',
98+
)
99+
})
100+
101+
it('throws when publishing to unknown exchange', () => {
102+
const { topicPublisherManager } = diContainer.cradle
103+
104+
expect(() =>
105+
topicPublisherManager.publishSync('unknown-exchange' as any, {
106+
type: 'entity.updated',
107+
payload: {
108+
updatedData: 'msg',
109+
},
110+
}),
111+
).toThrow('No publisher for exchange unknown-exchange')
112+
})
92113
})
93114
})

packages/amqp/lib/amqpConnectionResolver.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,21 @@ export async function resolveAmqpConnection(config: AmqpConfig): Promise<Channel
2626
const connection = await connect(url)
2727
return connection
2828
} catch (_e) {
29+
/* v8 ignore start */
2930
globalLogger.error(
3031
`Failed to connect to AMQP broker at ${config.hostname}:${config.port}. Retrying in ${
3132
retryTime / 1000
3233
} seconds...`,
3334
)
35+
/* v8 ignore stop */
3436
}
37+
/* v8 ignore start */
3538
await setTimeout(retryTime)
3639
counter++
3740

3841
if (counter > MAX_RETRY_ATTEMPTS) {
3942
throw new Error('Failed to resolve AMQP connection')
4043
}
44+
/* v8 ignore stop */
4145
}
4246
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import { InternalError } from '@lokalise/node-core'
2+
import { MessageInvalidFormatError, MessageValidationError } from '@message-queue-toolkit/core'
3+
import { describe, expect, it } from 'vitest'
4+
import { ZodError } from 'zod/v4'
5+
import { AmqpConsumerErrorResolver } from './AmqpConsumerErrorResolver.ts'
6+
7+
describe('AmqpConsumerErrorResolver', () => {
8+
const resolver = new AmqpConsumerErrorResolver()
9+
10+
it('returns MessageInvalidFormatError for SyntaxError', () => {
11+
const syntaxError = new SyntaxError('Unexpected token')
12+
13+
const result = resolver.processError(syntaxError)
14+
15+
expect(result).toBeInstanceOf(MessageInvalidFormatError)
16+
expect(result.message).toBe('Unexpected token')
17+
})
18+
19+
it('returns MessageValidationError for ZodError', () => {
20+
const zodError = new ZodError([
21+
{
22+
code: 'invalid_type',
23+
expected: 'string',
24+
message: 'Expected string, received number',
25+
path: ['id'],
26+
input: 123,
27+
},
28+
])
29+
30+
const result = resolver.processError(zodError)
31+
32+
expect(result).toBeInstanceOf(MessageValidationError)
33+
expect(result.details).toEqual({
34+
error: zodError.issues,
35+
})
36+
})
37+
38+
it('returns InternalError for standardized errors', () => {
39+
// Create an error that matches the StandardizedError interface
40+
// StandardizedError requires: name, message, code, and Symbol.for('StandardizedErrorSymbol') = true
41+
const standardizedError = Object.assign(new Error('Custom error message'), {
42+
code: 'CUSTOM_ERROR',
43+
[Symbol.for('StandardizedErrorSymbol')]: true,
44+
})
45+
46+
const result = resolver.processError(standardizedError)
47+
48+
expect(result).toBeInstanceOf(InternalError)
49+
expect(result.message).toBe('Custom error message')
50+
expect(result.errorCode).toBe('CUSTOM_ERROR')
51+
})
52+
53+
it('returns InternalError with default message for unknown error types', () => {
54+
const unknownError = { someProperty: 'someValue' }
55+
56+
const result = resolver.processError(unknownError)
57+
58+
expect(result).toBeInstanceOf(InternalError)
59+
expect(result.message).toBe('Error processing message')
60+
expect(result.errorCode).toBe('INTERNAL_ERROR')
61+
})
62+
63+
it('returns InternalError with default message for null error', () => {
64+
const result = resolver.processError(null)
65+
66+
expect(result).toBeInstanceOf(InternalError)
67+
expect(result.message).toBe('Error processing message')
68+
expect(result.errorCode).toBe('INTERNAL_ERROR')
69+
})
70+
71+
it('returns InternalError with default message for undefined error', () => {
72+
const result = resolver.processError(undefined)
73+
74+
expect(result).toBeInstanceOf(InternalError)
75+
expect(result.message).toBe('Error processing message')
76+
expect(result.errorCode).toBe('INTERNAL_ERROR')
77+
})
78+
79+
it('returns InternalError with default message for string error', () => {
80+
const result = resolver.processError('some string error')
81+
82+
expect(result).toBeInstanceOf(InternalError)
83+
expect(result.message).toBe('Error processing message')
84+
expect(result.errorCode).toBe('INTERNAL_ERROR')
85+
})
86+
})

0 commit comments

Comments
 (0)