Skip to content

Commit e86ea51

Browse files
authored
Feat: Kafka publisher connection check on init (#286)
* Improving imports * Implementing publisher connection check * Typo * Improving init * Adding tests * Adding comment
1 parent a0483c8 commit e86ea51

File tree

3 files changed

+44
-9
lines changed

3 files changed

+44
-9
lines changed

packages/kafka/lib/AbstractKafkaPublisher.ts

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ export abstract class AbstractKafkaPublisher<
3535
MessageSchemaContainer<SupportedMessageValuesInput<TopicsConfig>>
3636
>
3737

38-
private producer?: Producer<string, object, string, object>
38+
private readonly producer: Producer<string, object, string, object>
39+
private isInitiated: boolean
3940

4041
constructor(dependencies: KafkaDependencies, options: KafkaPublisherOptions<TopicsConfig>) {
4142
super(dependencies, options)
43+
this.isInitiated = false
4244

4345
this.topicsConfig = options.topicsConfig
4446
if (this.topicsConfig.length === 0) throw new Error('At least one topic must be defined')
@@ -51,10 +53,6 @@ export abstract class AbstractKafkaPublisher<
5153
messageDefinitions: [],
5254
})
5355
}
54-
}
55-
56-
init(): Promise<void> {
57-
if (this.producer) return Promise.resolve()
5856

5957
this.producer = new Producer({
6058
...this.options.kafka,
@@ -66,13 +64,28 @@ export abstract class AbstractKafkaPublisher<
6664
headerValue: jsonSerializer,
6765
},
6866
})
67+
}
6968

70-
return Promise.resolve()
69+
async init(): Promise<void> {
70+
if (this.isInitiated) return
71+
72+
try {
73+
await this.producer.listApis()
74+
this.isInitiated = true
75+
} catch (e) {
76+
throw new InternalError({
77+
message: 'Producer init failed',
78+
errorCode: 'KAFKA_PRODUCER_INIT_ERROR',
79+
cause: e,
80+
})
81+
}
7182
}
7283

7384
async close(): Promise<void> {
74-
await this.producer?.close()
75-
this.producer = undefined
85+
if (!this.isInitiated) return
86+
87+
await this.producer.close()
88+
this.isInitiated = false
7689
}
7790

7891
async publish<Topic extends SupportedTopics<TopicsConfig>>(

packages/kafka/lib/AbstractKafkaService.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ import {
55
resolveGlobalErrorLogObject,
66
stringValueSerializer,
77
} from '@lokalise/node-core'
8-
import type { HandlerSpy, HandlerSpyParams } from '@message-queue-toolkit/core'
98
import {
9+
type HandlerSpy,
10+
type HandlerSpyParams,
1011
type MessageProcessingResult,
1112
type PublicHandlerSpy,
1213
resolveHandlerSpy,

packages/kafka/test/publisher/PermissionPublisher.spec.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { randomUUID } from 'node:crypto'
12
import { InternalError } from '@lokalise/node-core'
23
import {
34
PERMISSION_ADDED_SCHEMA,
@@ -52,6 +53,26 @@ describe('PermissionPublisher - init', () => {
5253
)
5354
})
5455

56+
it('should fail if kafka is not available', async () => {
57+
// Given
58+
publisher = new PermissionPublisher(testContext.cradle, {
59+
kafka: { clientId: randomUUID(), bootstrapBrokers: ['localhost:9090'] },
60+
})
61+
62+
// When - Then
63+
await expect(publisher.init()).rejects.toThrowErrorMatchingInlineSnapshot(
64+
'[InternalError: Producer init failed]',
65+
)
66+
})
67+
68+
it('should not fail on close if publisher is not started ', async () => {
69+
// Given
70+
publisher = new PermissionPublisher(testContext.cradle)
71+
72+
// When - Then
73+
await expect(publisher.close()).resolves.not.toThrow()
74+
})
75+
5576
it('should fail if topic does not exists', async () => {
5677
// Given
5778
publisher = new PermissionPublisher(testContext.cradle, {

0 commit comments

Comments
 (0)