Skip to content

Commit 1a5ea02

Browse files
authored
Kafka handling non json messages successfully (#291)
* Using proper docker image * TDD adding test with error case * Adding safeJsonDeserializer * Release prepare * Closing producer
1 parent b3a4fb5 commit 1a5ea02

File tree

7 files changed

+84
-4
lines changed

7 files changed

+84
-4
lines changed

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ services:
3838
restart: on-failure
3939

4040
kafka:
41-
image: apache/kafka:latest
41+
image: apache/kafka:3.7.1
4242
container_name: kafka
4343
ports:
4444
- 9092:9092

packages/kafka/lib/AbstractKafkaConsumer.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ import {
1212
type ConsumerOptions,
1313
type Message,
1414
type MessagesStream,
15-
jsonDeserializer,
1615
stringDeserializer,
1716
} from '@platformatic/kafka'
1817
import { AbstractKafkaService, type BaseKafkaOptions } from './AbstractKafkaService.ts'
1918
import { KafkaHandlerContainer } from './handler-container/KafkaHandlerContainer.ts'
2019
import type { KafkaHandlerRouting } from './handler-container/KafkaHandlerRoutingBuilder.ts'
2120
import type { KafkaHandler, RequestContext } from './handler-container/index.ts'
2221
import type { KafkaConfig, KafkaDependencies, TopicConfig } from './types.ts'
22+
import { safeJsonDeserializer } from './utils/safeJsonDeserializer.js'
2323

2424
export type KafkaConsumerDependencies = KafkaDependencies &
2525
Pick<QueueConsumerDependencies, 'transactionObservabilityManager'>
@@ -66,7 +66,7 @@ export abstract class AbstractKafkaConsumer<
6666
autocommit: false, // Handling commits manually
6767
deserializers: {
6868
key: stringDeserializer,
69-
value: jsonDeserializer,
69+
value: safeJsonDeserializer,
7070
headerKey: stringDeserializer,
7171
headerValue: stringDeserializer,
7272
},
@@ -102,6 +102,9 @@ export abstract class AbstractKafkaConsumer<
102102
}
103103

104104
private async consume(message: Message<string, object, string, string>): Promise<void> {
105+
// message.value can be undefined if the message is not JSON-serializable
106+
if (!message.value) return message.commit()
107+
105108
const handler = this.handlerContainer.resolveHandler(message.topic, message.value)
106109
// if there is no handler for the message, we ignore it (simulating subscription)
107110
if (!handler) return message.commit()
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { stringValueSerializer } from '@lokalise/node-core'
2+
import { safeJsonDeserializer } from './safeJsonDeserializer.js'
3+
4+
describe('safeJsonDeserializer', () => {
5+
it('should deserialize valid JSON strings', () => {
6+
const validJson = JSON.stringify({ key: 'value' })
7+
const result = safeJsonDeserializer(validJson)
8+
expect(result).toEqual({ key: 'value' })
9+
})
10+
11+
it('should return undefined for invalid JSON strings', () => {
12+
const invalidJson = stringValueSerializer({ key: 'value' })
13+
const result = safeJsonDeserializer(invalidJson)
14+
expect(result).toBeUndefined()
15+
})
16+
17+
it('should deserialize for valid buffer inputs', () => {
18+
const buffer = Buffer.from(JSON.stringify({ key: 'value' }))
19+
const result = safeJsonDeserializer(buffer)
20+
expect(result).toEqual({ key: 'value' })
21+
})
22+
23+
it('should return undefined for invalid buffer inputs', () => {
24+
const invalidBuffer = Buffer.from('invalid json')
25+
const result = safeJsonDeserializer(invalidBuffer)
26+
expect(result).toBeUndefined()
27+
})
28+
29+
it('should return undefined for invalid inputs', () => {
30+
const result = safeJsonDeserializer(1 as any)
31+
expect(result).toBeUndefined()
32+
})
33+
34+
it('should return undefined for undefined inputs', () => {
35+
const result = safeJsonDeserializer(undefined)
36+
expect(result).toBeUndefined()
37+
})
38+
})
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
export const safeJsonDeserializer = (data?: string | Buffer): object | undefined => {
2+
if (!data) return undefined
3+
// Checking to be safe
4+
if (!Buffer.isBuffer(data) && typeof data !== 'string') return undefined
5+
6+
try {
7+
return JSON.parse(data.toString('utf-8'))
8+
} catch (_) {
9+
return undefined
10+
}
11+
}

packages/kafka/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/kafka",
3-
"version": "0.1.2",
3+
"version": "0.1.3",
44
"engines": {
55
"node": ">= 22.14.0"
66
},

packages/kafka/test/consumer/PermissionConsumer.spec.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import { randomUUID } from 'node:crypto'
2+
import { waitAndRetry } from '@lokalise/universal-ts-utils/node'
3+
import { Producer, stringSerializers } from '@platformatic/kafka'
24
import { afterAll, expect } from 'vitest'
35
import z from 'zod/v3'
46
import { KafkaHandlerConfig, type RequestContext } from '../../lib/index.js'
@@ -244,6 +246,31 @@ describe('PermissionConsumer', () => {
244246
const spy = await consumer.handlerSpy.waitForMessageWithId('1', 'error')
245247
expect(spy.processingResult).toMatchObject({ errorReason: 'invalidMessage' })
246248
})
249+
250+
it('should ignore non json messages', async () => {
251+
// Given
252+
const errorSpy = vi.spyOn(testContext.cradle.errorReporter, 'report')
253+
254+
const producer = new Producer<string, string, string, string>({
255+
...testContext.cradle.kafkaConfig,
256+
clientId: randomUUID(),
257+
autocreateTopics: true,
258+
serializers: stringSerializers,
259+
})
260+
consumer = new PermissionConsumer(testContext.cradle)
261+
await consumer.init()
262+
263+
// When
264+
await producer.send({
265+
messages: [{ topic: 'permission-general', value: 'not valid json' }],
266+
})
267+
268+
// Then
269+
await waitAndRetry(() => errorSpy.mock.calls.length > 0, 10, 100)
270+
expect(errorSpy).not.toHaveBeenCalled()
271+
272+
await producer.close()
273+
})
247274
})
248275

249276
describe('observability - request context', () => {

packages/kafka/test/utils/testContext.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ const resolveDIConfig = (awilixManager: AwilixManager): DiConfig => ({
7373
({
7474
report: () => {},
7575
}) satisfies ErrorReporter,
76+
SINGLETON_CONFIG,
7677
),
7778
transactionObservabilityManager: asFunction(
7879
() =>

0 commit comments

Comments
 (0)