Skip to content

Commit 9963fb6

Browse files
authored
Feat: kafka consumer (#285)
* dep update * Adding initial AbstractKafkaService * Fixing types * Exposing handler spy resolver * Avoid abstact queue service * Adding AbstractKafkaPublisher * Adjusting test context + adding test schemas * Publisher multi topic support * Fixing init issue + adjusting test context * Adding test publisher + fixing some issue on publish * Adding publisher tests * Improving publish error handling * Improving test coverage * Coverage improvements * Narrowing down publish type * Test updated * Improving types * Removing redundant config * Improving coverage * Removing unused type * Lint fix * Removing not needed override * Moving topics config * MessageSchemaContainer making messageTypeField optional * Publisher messageTypeField can be undefined * Adding missing tests * Error fix * Test fix * Reverting message definition change * Lint fix * Adding kafka handler container * Adding initial kafka consumer * Name improvement * Narrowing down typing on handler container * Adding tests to handler container * Implementing KafkaHandlerRoutingBuilder * Adding KafkaHandlerRoutingBuilder tests * Narrowing down type * Adding index + fixing imports * Initial consumer implementation * Initial PermissionConsumer + type error fix * Typo fix * Adding some tests * Using ts expect error * Preparing handlers for tests * Adding init consumer tests * Kafka handler returns full message * Adjusting publisher to be aligned with consumer behaviour * Lint fix * Adding in memory retry * Fixing bugs * Adding consumer tests * Removing platformatic tests * Improving coverage * Adding error listener * Adding ticket * Adding retry ticket * Creating test config constant * Revert "Creating test config constant" This reverts commit ba09074. * Improving tests * Fixing tests
1 parent e86ea51 commit 9963fb6

21 files changed

+952
-166
lines changed

packages/core/lib/queues/MessageSchemaContainer.ts

Lines changed: 32 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -8,75 +8,59 @@ export type MessageSchemaContainerOptions<MessagePayloadSchemas extends object>
88
messageTypeField?: string
99
}
1010

11-
const NO_MESSAGE_TYPE = 'NO_MESSAGE_TYPE'
11+
const DEFAULT_SCHEMA_KEY = Symbol('NO_MESSAGE_TYPE')
1212

1313
export class MessageSchemaContainer<MessagePayloadSchemas extends object> {
14-
public readonly messageDefinitions: Record<string, CommonEventDefinition>
15-
private readonly messageSchemas: Record<string, ZodSchema<MessagePayloadSchemas>>
14+
public readonly messageDefinitions: Record<string | symbol, CommonEventDefinition>
15+
private readonly messageSchemas: Record<string | symbol, ZodSchema<MessagePayloadSchemas>>
1616
private readonly messageTypeField?: string
1717

1818
constructor(options: MessageSchemaContainerOptions<MessagePayloadSchemas>) {
19-
if (options.messageTypeField === undefined && options.messageSchemas.length > 1) {
20-
throw new Error(
21-
'if messageTypeField is not provided, messageSchemas must have a single schema',
22-
)
23-
}
24-
2519
this.messageTypeField = options.messageTypeField
26-
this.messageSchemas = this.resolveSchemaMap(options.messageSchemas)
27-
this.messageDefinitions = this.resolveDefinitionsMap(options.messageDefinitions ?? [])
20+
this.messageSchemas = this.resolveMap(options.messageSchemas)
21+
this.messageDefinitions = this.resolveMap(options.messageDefinitions ?? [])
2822
}
2923

3024
public resolveSchema(
3125
// biome-ignore lint/suspicious/noExplicitAny: This is expected
3226
message: Record<string, any>,
3327
): Either<Error, ZodSchema<MessagePayloadSchemas>> {
34-
if (!this.messageTypeField) {
35-
return this.messageSchemas[NO_MESSAGE_TYPE]
36-
? { result: this.messageSchemas[NO_MESSAGE_TYPE] }
37-
: { error: new Error('Unsupported message') }
38-
}
28+
const messageType = this.messageTypeField ? message[this.messageTypeField] : undefined
3929

40-
const schema = this.messageSchemas[message[this.messageTypeField]]
30+
const schema = this.messageSchemas[messageType ?? DEFAULT_SCHEMA_KEY]
4131
if (!schema) {
42-
return { error: new Error(`Unsupported message type: ${message[this.messageTypeField]}`) }
32+
return {
33+
error: new Error(
34+
`Unsupported message type: ${messageType ?? DEFAULT_SCHEMA_KEY.toString()}`,
35+
),
36+
}
4337
}
4438
return { result: schema }
4539
}
4640

47-
private resolveSchemaMap(
48-
supportedSchemas: readonly ZodSchema<MessagePayloadSchemas>[],
49-
): Record<string, ZodSchema<MessagePayloadSchemas>> {
50-
if (!this.messageTypeField) {
51-
if (!supportedSchemas[0]) return {}
52-
return { [NO_MESSAGE_TYPE]: supportedSchemas[0] }
53-
}
41+
private resolveMap<T extends CommonEventDefinition | ZodSchema<MessagePayloadSchemas>>(
42+
array: readonly T[],
43+
): Record<string | symbol, T> {
44+
const result: Record<string | symbol, T> = {}
5445

55-
return supportedSchemas.reduce(
56-
(acc, schema) => {
57-
// @ts-ignore
58-
acc[schema.shape[this.messageTypeField].value] = schema
59-
return acc
60-
},
61-
{} as Record<string, ZodSchema<MessagePayloadSchemas>>,
62-
)
63-
}
46+
for (const item of array) {
47+
let type: string | undefined
48+
49+
if (this.messageTypeField) {
50+
type =
51+
'publisherSchema' in item
52+
? // @ts-expect-error
53+
item.publisherSchema?.shape[this.messageTypeField]?.value
54+
: // @ts-expect-error
55+
item.shape?.[this.messageTypeField]?.value
56+
}
57+
58+
const key = type ?? DEFAULT_SCHEMA_KEY
59+
if (result[key]) throw new Error(`Duplicate schema for type: ${key.toString()}`)
6460

65-
private resolveDefinitionsMap(
66-
supportedDefinitions: readonly CommonEventDefinition[],
67-
): Record<string, CommonEventDefinition> {
68-
if (!this.messageTypeField) {
69-
if (!supportedDefinitions[0]) return {}
70-
return { [NO_MESSAGE_TYPE]: supportedDefinitions[0] }
61+
result[key] = item
7162
}
7263

73-
return supportedDefinitions.reduce(
74-
(acc, definition) => {
75-
// @ts-ignore
76-
acc[definition.publisherSchema.shape[this.messageTypeField].value] = definition
77-
return acc
78-
},
79-
{} as Record<string, CommonEventDefinition>,
80-
)
64+
return result
8165
}
8266
}

packages/core/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/core",
3-
"version": "21.2.1",
3+
"version": "21.3.0",
44
"private": false,
55
"license": "MIT",
66
"description": "Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently",
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
import { setTimeout } from 'node:timers/promises'
2+
import { InternalError, stringValueSerializer } from '@lokalise/node-core'
3+
import {
4+
type ConsumeOptions,
5+
Consumer,
6+
type ConsumerOptions,
7+
type Message,
8+
type MessagesStream,
9+
jsonDeserializer,
10+
stringDeserializer,
11+
} from '@platformatic/kafka'
12+
import { AbstractKafkaService, type BaseKafkaOptions } from './AbstractKafkaService.ts'
13+
import { KafkaHandlerContainer } from './handler-container/KafkaHandlerContainer.js'
14+
import type { KafkaHandlerRouting } from './handler-container/KafkaHandlerRoutingBuilder.js'
15+
import type { KafkaHandler } from './handler-container/index.js'
16+
import type { KafkaDependencies, TopicConfig } from './types.ts'
17+
18+
export type KafkaConsumerOptions<TopicsConfig extends TopicConfig[]> = BaseKafkaOptions &
19+
Omit<
20+
ConsumerOptions<string, object, string, object>,
21+
'deserializers' | 'autocommit' | 'bootstrapBrokers'
22+
> &
23+
Omit<ConsumeOptions<string, object, string, object>, 'topics'> & {
24+
handlers: KafkaHandlerRouting<TopicsConfig>
25+
}
26+
27+
/*
28+
TODO: Proper retry mechanism + DLQ -> https://lokalise.atlassian.net/browse/EDEXP-498
29+
In the meantime, we will retry in memory up to 3 times
30+
*/
31+
const MAX_IN_MEMORY_RETRIES = 3
32+
33+
export abstract class AbstractKafkaConsumer<
34+
TopicsConfig extends TopicConfig[],
35+
> extends AbstractKafkaService<TopicsConfig, KafkaConsumerOptions<TopicsConfig>> {
36+
private readonly consumer: Consumer<string, object, string, object>
37+
private consumerStream?: MessagesStream<string, object, string, object>
38+
39+
private readonly handlerContainer: KafkaHandlerContainer<TopicsConfig>
40+
41+
constructor(dependencies: KafkaDependencies, options: KafkaConsumerOptions<TopicsConfig>) {
42+
super(dependencies, options)
43+
44+
this.handlerContainer = new KafkaHandlerContainer<TopicsConfig>(
45+
options.handlers,
46+
options.messageTypeField,
47+
)
48+
49+
this.consumer = new Consumer({
50+
...this.options.kafka,
51+
...this.options,
52+
autocommit: false, // Handling commits manually
53+
deserializers: {
54+
key: stringDeserializer,
55+
value: jsonDeserializer,
56+
headerKey: stringDeserializer,
57+
headerValue: jsonDeserializer,
58+
},
59+
})
60+
}
61+
62+
async init(): Promise<void> {
63+
if (this.consumerStream) return Promise.resolve()
64+
const topics = this.handlerContainer.topics
65+
if (topics.length === 0) throw new Error('At least one topic must be defined')
66+
67+
try {
68+
const { handlers, ...consumeOptions } = this.options // Handlers cannot be passed to consume method
69+
this.consumerStream = await this.consumer.consume({ ...consumeOptions, topics })
70+
} catch (error) {
71+
throw new InternalError({
72+
message: 'Consumer init failed',
73+
errorCode: 'KAFKA_CONSUMER_INIT_ERROR',
74+
cause: error,
75+
})
76+
}
77+
78+
this.consumerStream.on('data', (message) => this.consume(message))
79+
this.consumerStream.on('error', (error) => this.handlerError(error))
80+
}
81+
82+
async close(): Promise<void> {
83+
if (!this.consumerStream) return Promise.resolve()
84+
85+
await new Promise((done) => this.consumerStream?.close(done))
86+
this.consumerStream = undefined
87+
await this.consumer.close()
88+
}
89+
90+
/*
91+
TODO: https://lokalise.atlassian.net/browse/EDEXP-493
92+
- Improve logging with logger child on constructor + add request context?
93+
- Message logging
94+
- Observability
95+
*/
96+
97+
private async consume(message: Message<string, object, string, object>): Promise<void> {
98+
const handler = this.handlerContainer.resolveHandler(message.topic, message.value)
99+
// if there is no handler for the message, we ignore it (simulating subscription)
100+
if (!handler) return message.commit()
101+
102+
const parseResult = handler.schema.safeParse(message.value)
103+
if (!parseResult.success) {
104+
this.handlerError(parseResult.error, {
105+
topic: message.topic,
106+
message: stringValueSerializer(message.value),
107+
})
108+
this.handleMessageProcessed({
109+
topic: message.topic,
110+
message: message.value,
111+
processingResult: { status: 'error', errorReason: 'invalidMessage' },
112+
})
113+
114+
return message.commit()
115+
}
116+
117+
const validatedMessage = parseResult.data
118+
119+
let retries = 0
120+
let consumed = false
121+
do {
122+
// exponential backoff -> 2^(retry-1)
123+
if (retries > 0) await setTimeout(Math.pow(2, retries - 1))
124+
125+
consumed = await this.tryToConsume({ ...message, value: validatedMessage }, handler.handler)
126+
if (consumed) break
127+
128+
retries++
129+
} while (retries < MAX_IN_MEMORY_RETRIES)
130+
131+
if (consumed) {
132+
this.handleMessageProcessed({
133+
topic: message.topic,
134+
message: validatedMessage,
135+
processingResult: { status: 'consumed' },
136+
})
137+
} else {
138+
this.handleMessageProcessed({
139+
topic: message.topic,
140+
message: validatedMessage,
141+
processingResult: { status: 'error', errorReason: 'handlerError' },
142+
})
143+
}
144+
145+
return message.commit()
146+
}
147+
148+
private async tryToConsume<MessageValue extends object>(
149+
message: Message<string, MessageValue, string, object>,
150+
handler: KafkaHandler<MessageValue>,
151+
): Promise<boolean> {
152+
try {
153+
await handler(message)
154+
return true
155+
} catch (error) {
156+
this.handlerError(error, {
157+
topic: message.topic,
158+
message: stringValueSerializer(message.value),
159+
})
160+
}
161+
162+
return false
163+
}
164+
}

packages/kafka/lib/AbstractKafkaPublisher.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ import {
77
jsonSerializer,
88
stringSerializer,
99
} from '@platformatic/kafka'
10-
import { AbstractKafkaService, type BaseKafkaOptions } from './AbstractKafkaService.js'
10+
import { AbstractKafkaService, type BaseKafkaOptions } from './AbstractKafkaService.ts'
1111
import type {
1212
KafkaDependencies,
1313
SupportedMessageValuesInput,
1414
SupportedMessageValuesInputForTopic,
1515
SupportedTopics,
1616
TopicConfig,
17-
} from './types.js'
17+
} from './types.ts'
1818

1919
export type KafkaPublisherOptions<TopicsConfig extends TopicConfig[]> = BaseKafkaOptions &
2020
Omit<ProduceOptions<string, object, string, object>, 'serializers'> & {

packages/kafka/lib/AbstractKafkaService.ts

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import type {
1818
KafkaDependencies,
1919
SupportedMessageValues,
2020
TopicConfig,
21-
} from './types.js'
21+
} from './types.ts'
2222

2323
export type BaseKafkaOptions = {
2424
kafka: KafkaConfig
@@ -57,22 +57,18 @@ export abstract class AbstractKafkaService<
5757
)
5858
}
5959

60-
protected resolveMessageType(
61-
message: SupportedMessageValues<TopicsConfig> | null | undefined,
62-
): string | undefined {
63-
if (!this.options.messageTypeField || !message) return undefined
60+
protected resolveMessageType(message: SupportedMessageValues<TopicsConfig>): string | undefined {
61+
if (!this.options.messageTypeField) return undefined
6462
return message[this.options.messageTypeField]
6563
}
6664

67-
protected resolveMessageId(
68-
message: SupportedMessageValues<TopicsConfig> | null | undefined,
69-
): string | undefined {
70-
if (!this.options.messageIdField || !message) return undefined
65+
protected resolveMessageId(message: SupportedMessageValues<TopicsConfig>): string | undefined {
66+
if (!this.options.messageIdField) return undefined
7167
return message[this.options.messageIdField]
7268
}
7369

7470
protected handleMessageProcessed(params: {
75-
message: SupportedMessageValues<TopicsConfig> | null
71+
message: SupportedMessageValues<TopicsConfig>
7672
processingResult: MessageProcessingResult
7773
topic: string
7874
}) {
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import type { Message } from '@platformatic/kafka'
2+
import type { ZodSchema } from 'zod'
3+
4+
export type KafkaHandler<MessageValue extends object> = (
5+
message: Message<string, MessageValue, string, object>,
6+
) => Promise<void> | void
7+
8+
export class KafkaHandlerConfig<MessageValue extends object> {
9+
public readonly schema: ZodSchema<MessageValue>
10+
public readonly handler: KafkaHandler<MessageValue>
11+
12+
constructor(schema: ZodSchema<MessageValue>, handler: KafkaHandler<MessageValue>) {
13+
this.schema = schema
14+
this.handler = handler
15+
}
16+
}

0 commit comments

Comments
 (0)