Skip to content

Commit a0483c8

Browse files
authored
Kafka publisher (#284)
* dep update * Adding initial AbstractKafkaService * Fixing types * Exposing handler spy resolver * Avoid abstact queue service * Adding AbstractKafkaPublisher * Adjusting test context + adding test schemas * Publisher multi topic support * Fixing init issue + adjusting test context * Adding test publisher + fixing some issue on publish * Adding publisher tests * Improving publish error handling * Improving test coverage * Coverage improvements * Narrowing down publish type * Test updated * Improving types * Removing redundant config * Improving coverage * Removing unused type * Lint fix * Removing not needed override * Moving topics config * MessageSchemaContainer making messageTypeField optional * Publisher messageTypeField can be undefined * Adding missing tests * Error fix * Test fix * Reverting message definition change * Lint fix
1 parent 5d05a53 commit a0483c8

15 files changed

+645
-34
lines changed

packages/core/lib/index.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export type {
55
TransactionObservabilityManager,
66
SchemaMap,
77
ExtraParams,
8+
MessageProcessingResult,
89
} from './types/MessageQueueTypes.ts'
910

1011
export {
@@ -42,8 +43,13 @@ export type {
4243
Prehandler,
4344
PreHandlingOutputs,
4445
} from './queues/HandlerContainer.ts'
45-
export { HandlerSpy } from './queues/HandlerSpy.ts'
46-
export type { SpyResultInput, HandlerSpyParams, PublicHandlerSpy } from './queues/HandlerSpy.ts'
46+
export {
47+
HandlerSpy,
48+
resolveHandlerSpy,
49+
type SpyResultInput,
50+
type HandlerSpyParams,
51+
type PublicHandlerSpy,
52+
} from './queues/HandlerSpy.ts'
4753

4854
export { MessageSchemaContainer } from './queues/MessageSchemaContainer.ts'
4955
export type { MessageSchemaContainerOptions } from './queues/MessageSchemaContainer.ts'

packages/core/lib/queues/AbstractQueueService.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,9 @@ export abstract class AbstractQueueService<
167167
.filter((entry) => entry !== undefined)
168168

169169
return new MessageSchemaContainer<MessagePayloadSchemas>({
170+
messageTypeField: options.messageTypeField,
170171
messageSchemas,
171172
messageDefinitions,
172-
messageTypeField: options.messageTypeField,
173173
})
174174
}
175175

@@ -178,12 +178,11 @@ export abstract class AbstractQueueService<
178178
messageTypeField: string
179179
}) {
180180
const messageSchemas = options.messageSchemas
181-
const messageDefinitions: readonly CommonEventDefinition[] = []
182181

183182
return new MessageSchemaContainer<MessagePayloadSchemas>({
184-
messageSchemas,
185-
messageDefinitions,
186183
messageTypeField: options.messageTypeField,
184+
messageSchemas,
185+
messageDefinitions: [],
187186
})
188187
}
189188

packages/core/lib/queues/MessageSchemaContainer.ts

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,23 @@ import type { ZodSchema } from 'zod/v3'
55
export type MessageSchemaContainerOptions<MessagePayloadSchemas extends object> = {
66
messageDefinitions: readonly CommonEventDefinition[]
77
messageSchemas: readonly ZodSchema<MessagePayloadSchemas>[]
8-
messageTypeField: string
8+
messageTypeField?: string
99
}
1010

11+
const NO_MESSAGE_TYPE = 'NO_MESSAGE_TYPE'
12+
1113
export class MessageSchemaContainer<MessagePayloadSchemas extends object> {
1214
public readonly messageDefinitions: Record<string, CommonEventDefinition>
1315
private readonly messageSchemas: Record<string, ZodSchema<MessagePayloadSchemas>>
14-
private readonly messageTypeField: string
16+
private readonly messageTypeField?: string
1517

1618
constructor(options: MessageSchemaContainerOptions<MessagePayloadSchemas>) {
19+
if (options.messageTypeField === undefined && options.messageSchemas.length > 1) {
20+
throw new Error(
21+
'if messageTypeField is not provided, messageSchemas must have a single schema',
22+
)
23+
}
24+
1725
this.messageTypeField = options.messageTypeField
1826
this.messageSchemas = this.resolveSchemaMap(options.messageSchemas)
1927
this.messageDefinitions = this.resolveDefinitionsMap(options.messageDefinitions ?? [])
@@ -23,20 +31,27 @@ export class MessageSchemaContainer<MessagePayloadSchemas extends object> {
2331
// biome-ignore lint/suspicious/noExplicitAny: This is expected
2432
message: Record<string, any>,
2533
): Either<Error, ZodSchema<MessagePayloadSchemas>> {
34+
if (!this.messageTypeField) {
35+
return this.messageSchemas[NO_MESSAGE_TYPE]
36+
? { result: this.messageSchemas[NO_MESSAGE_TYPE] }
37+
: { error: new Error('Unsupported message') }
38+
}
39+
2640
const schema = this.messageSchemas[message[this.messageTypeField]]
2741
if (!schema) {
28-
return {
29-
error: new Error(`Unsupported message type: ${message[this.messageTypeField]}`),
30-
}
31-
}
32-
return {
33-
result: schema,
42+
return { error: new Error(`Unsupported message type: ${message[this.messageTypeField]}`) }
3443
}
44+
return { result: schema }
3545
}
3646

3747
private resolveSchemaMap(
3848
supportedSchemas: readonly ZodSchema<MessagePayloadSchemas>[],
3949
): Record<string, ZodSchema<MessagePayloadSchemas>> {
50+
if (!this.messageTypeField) {
51+
if (!supportedSchemas[0]) return {}
52+
return { [NO_MESSAGE_TYPE]: supportedSchemas[0] }
53+
}
54+
4055
return supportedSchemas.reduce(
4156
(acc, schema) => {
4257
// @ts-ignore
@@ -50,6 +65,11 @@ export class MessageSchemaContainer<MessagePayloadSchemas extends object> {
5065
private resolveDefinitionsMap(
5166
supportedDefinitions: readonly CommonEventDefinition[],
5267
): Record<string, CommonEventDefinition> {
68+
if (!this.messageTypeField) {
69+
if (!supportedDefinitions[0]) return {}
70+
return { [NO_MESSAGE_TYPE]: supportedDefinitions[0] }
71+
}
72+
5373
return supportedDefinitions.reduce(
5474
(acc, definition) => {
5575
// @ts-ignore

packages/core/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/core",
3-
"version": "21.2.0",
3+
"version": "21.2.1",
44
"private": false,
55
"license": "MIT",
66
"description": "Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently",
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import { InternalError, stringValueSerializer } from '@lokalise/node-core'
2+
import { MessageSchemaContainer } from '@message-queue-toolkit/core'
3+
import {
4+
type MessageToProduce,
5+
type ProduceOptions,
6+
Producer,
7+
jsonSerializer,
8+
stringSerializer,
9+
} from '@platformatic/kafka'
10+
import { AbstractKafkaService, type BaseKafkaOptions } from './AbstractKafkaService.js'
11+
import type {
12+
KafkaDependencies,
13+
SupportedMessageValuesInput,
14+
SupportedMessageValuesInputForTopic,
15+
SupportedTopics,
16+
TopicConfig,
17+
} from './types.js'
18+
19+
export type KafkaPublisherOptions<TopicsConfig extends TopicConfig[]> = BaseKafkaOptions &
20+
Omit<ProduceOptions<string, object, string, object>, 'serializers'> & {
21+
topicsConfig: TopicsConfig
22+
}
23+
24+
export type KafkaMessageOptions = Omit<
25+
MessageToProduce<string, object, string, object>,
26+
'topic' | 'value'
27+
>
28+
29+
export abstract class AbstractKafkaPublisher<
30+
TopicsConfig extends TopicConfig[],
31+
> extends AbstractKafkaService<TopicsConfig, KafkaPublisherOptions<TopicsConfig>> {
32+
private readonly topicsConfig: TopicsConfig
33+
private readonly schemaContainers: Record<
34+
string,
35+
MessageSchemaContainer<SupportedMessageValuesInput<TopicsConfig>>
36+
>
37+
38+
private producer?: Producer<string, object, string, object>
39+
40+
constructor(dependencies: KafkaDependencies, options: KafkaPublisherOptions<TopicsConfig>) {
41+
super(dependencies, options)
42+
43+
this.topicsConfig = options.topicsConfig
44+
if (this.topicsConfig.length === 0) throw new Error('At least one topic must be defined')
45+
46+
this.schemaContainers = {}
47+
for (const { topic, schemas } of this.topicsConfig) {
48+
this.schemaContainers[topic] = new MessageSchemaContainer({
49+
messageSchemas: schemas,
50+
messageTypeField: this.options.messageTypeField,
51+
messageDefinitions: [],
52+
})
53+
}
54+
}
55+
56+
init(): Promise<void> {
57+
if (this.producer) return Promise.resolve()
58+
59+
this.producer = new Producer({
60+
...this.options.kafka,
61+
...this.options,
62+
serializers: {
63+
key: stringSerializer,
64+
value: jsonSerializer,
65+
headerKey: stringSerializer,
66+
headerValue: jsonSerializer,
67+
},
68+
})
69+
70+
return Promise.resolve()
71+
}
72+
73+
async close(): Promise<void> {
74+
await this.producer?.close()
75+
this.producer = undefined
76+
}
77+
78+
async publish<Topic extends SupportedTopics<TopicsConfig>>(
79+
topic: Topic,
80+
message: SupportedMessageValuesInputForTopic<TopicsConfig, Topic>,
81+
options?: KafkaMessageOptions,
82+
): Promise<void> {
83+
const schemaResult = this.schemaContainers[topic]?.resolveSchema(message)
84+
if (!schemaResult) throw new Error(`Message schemas not found for topic: ${topic}`)
85+
if (schemaResult.error) throw schemaResult.error
86+
87+
await this.init() // lazy init
88+
89+
try {
90+
const parsedMessage = schemaResult.result.parse(message)
91+
92+
// biome-ignore lint/style/noNonNullAssertion: Should always exist due to lazy init
93+
await this.producer!.send({ messages: [{ ...options, topic, value: parsedMessage }] })
94+
95+
this.handleMessageProcessed({
96+
message: parsedMessage,
97+
processingResult: { status: 'published' },
98+
topic,
99+
})
100+
} catch (error) {
101+
const errorDetails = {
102+
topic,
103+
publisher: this.constructor.name,
104+
message: stringValueSerializer(message),
105+
}
106+
this.handlerError(error, errorDetails)
107+
throw new InternalError({
108+
message: `Error while publishing to Kafka: ${(error as Error).message}`,
109+
errorCode: 'KAFKA_PUBLISH_ERROR',
110+
cause: error,
111+
details: errorDetails,
112+
})
113+
}
114+
}
115+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import { types } from 'node:util'
2+
import {
3+
type CommonLogger,
4+
type ErrorReporter,
5+
resolveGlobalErrorLogObject,
6+
stringValueSerializer,
7+
} from '@lokalise/node-core'
8+
import type { HandlerSpy, HandlerSpyParams } from '@message-queue-toolkit/core'
9+
import {
10+
type MessageProcessingResult,
11+
type PublicHandlerSpy,
12+
resolveHandlerSpy,
13+
} from '@message-queue-toolkit/core'
14+
import type { BaseOptions } from '@platformatic/kafka'
15+
import type {
16+
KafkaConfig,
17+
KafkaDependencies,
18+
SupportedMessageValues,
19+
TopicConfig,
20+
} from './types.js'
21+
22+
export type BaseKafkaOptions = {
23+
kafka: KafkaConfig
24+
messageTypeField?: string
25+
messageIdField?: string
26+
handlerSpy?: HandlerSpy<object> | HandlerSpyParams | boolean
27+
logMessages?: boolean
28+
} & Omit<BaseOptions, keyof KafkaConfig> // Exclude properties that are already in KafkaConfig
29+
30+
export abstract class AbstractKafkaService<
31+
TopicsConfig extends TopicConfig[],
32+
KafkaOptions extends BaseKafkaOptions,
33+
> {
34+
protected readonly errorReporter: ErrorReporter
35+
protected readonly logger: CommonLogger
36+
37+
protected readonly options: KafkaOptions
38+
protected readonly _handlerSpy?: HandlerSpy<SupportedMessageValues<TopicsConfig>>
39+
40+
constructor(dependencies: KafkaDependencies, options: KafkaOptions) {
41+
this.logger = dependencies.logger
42+
this.errorReporter = dependencies.errorReporter
43+
this.options = options
44+
45+
this._handlerSpy = resolveHandlerSpy(options)
46+
}
47+
48+
abstract init(): Promise<void>
49+
abstract close(): Promise<void>
50+
51+
get handlerSpy(): PublicHandlerSpy<SupportedMessageValues<TopicsConfig>> {
52+
if (this._handlerSpy) return this._handlerSpy
53+
54+
throw new Error(
55+
'HandlerSpy was not instantiated, please pass `handlerSpy` parameter during creation.',
56+
)
57+
}
58+
59+
protected resolveMessageType(
60+
message: SupportedMessageValues<TopicsConfig> | null | undefined,
61+
): string | undefined {
62+
if (!this.options.messageTypeField || !message) return undefined
63+
return message[this.options.messageTypeField]
64+
}
65+
66+
protected resolveMessageId(
67+
message: SupportedMessageValues<TopicsConfig> | null | undefined,
68+
): string | undefined {
69+
if (!this.options.messageIdField || !message) return undefined
70+
return message[this.options.messageIdField]
71+
}
72+
73+
protected handleMessageProcessed(params: {
74+
message: SupportedMessageValues<TopicsConfig> | null
75+
processingResult: MessageProcessingResult
76+
topic: string
77+
}) {
78+
const { message, processingResult, topic } = params
79+
const messageId = this.resolveMessageId(message)
80+
81+
this._handlerSpy?.addProcessedMessage({ message, processingResult }, messageId)
82+
83+
if (this.options.logMessages) {
84+
this.logger.debug(
85+
{
86+
message: stringValueSerializer(message),
87+
topic,
88+
processingResult,
89+
messageId,
90+
messageType: this.resolveMessageType(message),
91+
},
92+
`Finished processing message ${messageId}`,
93+
)
94+
}
95+
}
96+
97+
protected handlerError(error: unknown, context: Record<string, unknown> = {}): void {
98+
this.logger.error({ ...resolveGlobalErrorLogObject(error), ...context })
99+
if (types.isNativeError(error)) this.errorReporter.report({ error, context })
100+
}
101+
}

packages/kafka/lib/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export * from './types.ts'
2+
export * from './AbstractKafkaPublisher.js'

packages/kafka/lib/test-confluentic.spec.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ import { randomUUID } from 'node:crypto'
22
import { KafkaJS } from '@confluentinc/kafka-javascript'
33
import { waitAndRetry } from '@lokalise/universal-ts-utils/node'
44
import {} from '@platformatic/kafka'
5-
import { type TestContext, registerDependencies } from '../test/testContext.ts'
5+
import { type TestContext, createTestContext } from '../test/utils/testContext.ts'
66

77
describe('Test confluentic-kafka', () => {
88
let testContext: TestContext
99

1010
beforeAll(async () => {
11-
testContext = await registerDependencies()
11+
testContext = await createTestContext()
1212
})
1313

1414
afterAll(async () => {
@@ -25,7 +25,7 @@ describe('Test confluentic-kafka', () => {
2525

2626
const kafka = new KafkaJS.Kafka({
2727
'client.id': clientId,
28-
'bootstrap.servers': testContext.cradle.kafkaConfig.brokers.join(','),
28+
'bootstrap.servers': testContext.cradle.kafkaConfig.bootstrapBrokers.join(','),
2929
})
3030

3131
// Topics can be created from producers, but as we will first connect a consumer, we need to create the topic first

0 commit comments

Comments
 (0)