Skip to content

Commit 94e6236

Browse files
authored
Kafka zod 4 (#317)
* Deps update * Zod import update * Minor error fixes * Migration fixes * Import fix * More fixes * More fixes * Fixing zod issues * Fixing types * Release prepare * Fixing tests * Lint fixes
1 parent cf5fc8a commit 94e6236

15 files changed

+121
-101
lines changed

packages/kafka/lib/AbstractKafkaConsumer.ts

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,16 @@ import {
1919
import { AbstractKafkaService, type BaseKafkaOptions } from './AbstractKafkaService.ts'
2020
import { KafkaHandlerContainer } from './handler-container/KafkaHandlerContainer.ts'
2121
import type { KafkaHandlerRouting } from './handler-container/KafkaHandlerRoutingBuilder.ts'
22-
import type { KafkaHandler, RequestContext } from './handler-container/index.ts'
23-
import type { KafkaConfig, KafkaDependencies, TopicConfig } from './types.ts'
24-
import { ILLEGAL_GENERATION, REBALANCE_IN_PROGRESS, UNKNOWN_MEMBER_ID } from './utils/errorCodes.js'
25-
import { safeJsonDeserializer } from './utils/safeJsonDeserializer.js'
22+
import type { KafkaHandler } from './handler-container/index.ts'
23+
import type {
24+
KafkaConfig,
25+
KafkaDependencies,
26+
RequestContext,
27+
SupportedMessageValues,
28+
TopicConfig,
29+
} from './types.ts'
30+
import { ILLEGAL_GENERATION, REBALANCE_IN_PROGRESS, UNKNOWN_MEMBER_ID } from './utils/errorCodes.ts'
31+
import { safeJsonDeserializer } from './utils/safeJsonDeserializer.ts'
2632

2733
export type KafkaConsumerDependencies = KafkaDependencies &
2834
Pick<QueueConsumerDependencies, 'transactionObservabilityManager'>
@@ -128,7 +134,11 @@ export abstract class AbstractKafkaConsumer<
128134
})
129135
}
130136

131-
this.consumerStream.on('data', (message) => this.consume(message))
137+
this.consumerStream.on('data', (message) =>
138+
this.consume(
139+
message as Message<string, SupportedMessageValues<TopicsConfig>, string, string>,
140+
),
141+
)
132142
this.consumerStream.on('error', (error) => this.handlerError(error))
133143
}
134144

@@ -140,7 +150,9 @@ export abstract class AbstractKafkaConsumer<
140150
await this.consumer.close()
141151
}
142152

143-
private async consume(message: Message<string, object, string, string>): Promise<void> {
153+
private async consume(
154+
message: Message<string, SupportedMessageValues<TopicsConfig>, string, string>,
155+
): Promise<void> {
144156
// message.value can be undefined if the message is not JSON-serializable
145157
if (!message.value) return this.commitMessage(message)
146158

@@ -260,7 +272,9 @@ export abstract class AbstractKafkaConsumer<
260272
}
261273
}
262274

263-
private buildTransactionName(message: Message<string, object, string, string>) {
275+
private buildTransactionName(
276+
message: Message<string, SupportedMessageValues<TopicsConfig>, string, string>,
277+
) {
264278
const messageType = this.resolveMessageType(message.value)
265279

266280
let name = `kafka:${this.constructor.name}:${message.topic}`

packages/kafka/lib/AbstractKafkaPublisher.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ import {
88
stringSerializer,
99
} from '@platformatic/kafka'
1010
import { AbstractKafkaService, type BaseKafkaOptions } from './AbstractKafkaService.ts'
11-
import type { RequestContext } from './handler-container/index.js'
1211
import type {
1312
KafkaDependencies,
14-
SupportedMessageValuesInput,
13+
RequestContext,
14+
SupportedMessageValuesForTopic,
1515
SupportedMessageValuesInputForTopic,
1616
SupportedTopics,
1717
TopicConfig,
@@ -31,10 +31,7 @@ export abstract class AbstractKafkaPublisher<
3131
TopicsConfig extends TopicConfig[],
3232
> extends AbstractKafkaService<TopicsConfig, KafkaPublisherOptions<TopicsConfig>> {
3333
private readonly topicsConfig: TopicsConfig
34-
private readonly schemaContainers: Record<
35-
string,
36-
MessageSchemaContainer<SupportedMessageValuesInput<TopicsConfig>>
37-
>
34+
private readonly schemaContainers: Record<string, MessageSchemaContainer<object>>
3835

3936
private readonly producer: Producer<string, object, string, string>
4037
private isInitiated: boolean
@@ -97,14 +94,17 @@ export abstract class AbstractKafkaPublisher<
9794
): Promise<void> {
9895
const messageProcessingStartTimestamp = Date.now()
9996

100-
const schemaResult = this.schemaContainers[topic]?.resolveSchema(message)
97+
const schemaResult = this.schemaContainers[topic]?.resolveSchema(message as object)
10198
if (!schemaResult) throw new Error(`Message schemas not found for topic: ${topic}`)
10299
if (schemaResult.error) throw schemaResult.error
103100

104101
await this.init() // lazy init
105102

106103
try {
107-
const parsedMessage = schemaResult.result.parse(message)
104+
const parsedMessage = schemaResult.result.parse(message) as SupportedMessageValuesForTopic<
105+
TopicsConfig,
106+
Topic
107+
>
108108

109109
const headers = {
110110
...options?.headers,

packages/kafka/lib/AbstractKafkaService.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,11 @@ export type BaseKafkaOptions = {
3636
logMessages?: boolean
3737
} & Omit<BaseOptions, keyof KafkaConfig> // Exclude properties that are already in KafkaConfig
3838

39-
type ProcessedMessage = MayOmit<
40-
Pick<Message<string, object, string, string>, 'topic' | 'value' | 'timestamp'>,
39+
type ProcessedMessage<TopicsConfig extends TopicConfig[]> = MayOmit<
40+
Pick<
41+
Message<string, SupportedMessageValues<TopicsConfig>, string, string>,
42+
'topic' | 'value' | 'timestamp'
43+
>,
4144
'timestamp'
4245
>
4346

@@ -76,6 +79,7 @@ export abstract class AbstractKafkaService<
7679

7780
protected resolveMessageType(message: SupportedMessageValues<TopicsConfig>): string | undefined {
7881
if (!this.options.messageTypeField) return undefined
82+
// @ts-expect-error
7983
return message[this.options.messageTypeField] as string | undefined
8084
}
8185

@@ -89,7 +93,7 @@ export abstract class AbstractKafkaService<
8993
}
9094

9195
protected handleMessageProcessed(params: {
92-
message: ProcessedMessage
96+
message: ProcessedMessage<TopicsConfig>
9397
processingResult: MessageProcessingResult
9498
messageProcessingStartTimestamp: number
9599
}) {

packages/kafka/lib/handler-container/KafkaHandlerConfig.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
1-
import type { CommonLogger } from '@lokalise/node-core'
21
import type { Message } from '@platformatic/kafka'
3-
import type { ZodSchema, ZodTypeDef } from 'zod'
4-
5-
export interface RequestContext {
6-
logger: CommonLogger
7-
reqId: string
8-
}
2+
import type { ZodSchema } from 'zod/v4'
3+
import type { RequestContext } from '../types.js'
94

105
export type KafkaHandler<MessageValue extends object, ExecutionContext> = (
116
message: Message<string, MessageValue, string, string>,
@@ -14,11 +9,13 @@ export type KafkaHandler<MessageValue extends object, ExecutionContext> = (
149
) => Promise<void> | void
1510

1611
export class KafkaHandlerConfig<MessageValue extends object, ExecutionContext> {
17-
public readonly schema: ZodSchema<MessageValue, ZodTypeDef, unknown>
12+
// biome-ignore lint/suspicious/noExplicitAny: Input for schema is flexible
13+
public readonly schema: ZodSchema<MessageValue, any>
1814
public readonly handler: KafkaHandler<MessageValue, ExecutionContext>
1915

2016
constructor(
21-
schema: ZodSchema<MessageValue, ZodTypeDef, unknown>,
17+
// biome-ignore lint/suspicious/noExplicitAny: Input for schema is flexible
18+
schema: ZodSchema<MessageValue, any>,
2219
handler: KafkaHandler<MessageValue, ExecutionContext>,
2320
) {
2421
this.schema = schema

packages/kafka/lib/handler-container/KafkaHandlerContainer.spec.ts

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import z from 'zod/v3'
1+
import z from 'zod/v4'
22
import type { TopicConfig } from '../types.ts'
33
import { KafkaHandlerConfig } from './KafkaHandlerConfig.ts'
44
import { KafkaHandlerContainer } from './KafkaHandlerContainer.ts'
5-
import type { KafkaHandlerRouting } from './KafkaHandlerRoutingBuilder.ts'
5+
import { KafkaHandlerRoutingBuilder } from './KafkaHandlerRoutingBuilder.ts'
66

77
const CREATE_SCHEMA = z.object({
88
type: z.literal('create'),
@@ -44,32 +44,33 @@ describe('KafkaHandlerContainer', () => {
4444

4545
// When & Then
4646
expect(
47-
() => new KafkaHandlerContainer(topicHandlers1, 'type'),
47+
() => new KafkaHandlerContainer(topicHandlers1 as any, 'type'),
4848
).toThrowErrorMatchingInlineSnapshot('[Error: Duplicate handler for topic create]')
49-
expect(() => new KafkaHandlerContainer(topicHandlers2)).toThrowErrorMatchingInlineSnapshot(
50-
'[Error: Duplicate handler for topic empty]',
51-
)
49+
expect(
50+
() => new KafkaHandlerContainer(topicHandlers2 as any),
51+
).toThrowErrorMatchingInlineSnapshot('[Error: Duplicate handler for topic empty]')
5252
})
5353

5454
it('should resolve handler with message type', () => {
5555
// Given
56-
const topicHandlers: KafkaHandlerRouting<TopicsConfig, any, any> = {
57-
all: [
58-
new KafkaHandlerConfig(CREATE_SCHEMA, () => Promise.resolve()),
59-
new KafkaHandlerConfig(UPDATE_SCHEMA, () => Promise.resolve()),
60-
new KafkaHandlerConfig(EMPTY_SCHEMA, () => Promise.resolve()),
61-
],
62-
create: [new KafkaHandlerConfig(CREATE_SCHEMA, () => Promise.resolve())],
63-
empty: [new KafkaHandlerConfig(EMPTY_SCHEMA, () => Promise.resolve())],
64-
}
56+
const routing = new KafkaHandlerRoutingBuilder<TopicsConfig, any>()
57+
.addConfig('all', new KafkaHandlerConfig(CREATE_SCHEMA, () => Promise.resolve()))
58+
.addConfig('all', new KafkaHandlerConfig(UPDATE_SCHEMA, () => Promise.resolve()))
59+
.addConfig('all', new KafkaHandlerConfig(EMPTY_SCHEMA, () => Promise.resolve()))
60+
.addConfig('create', new KafkaHandlerConfig(CREATE_SCHEMA, () => Promise.resolve()))
61+
.addConfig('empty', new KafkaHandlerConfig(EMPTY_SCHEMA, () => Promise.resolve()))
6562

6663
// When
67-
const container = new KafkaHandlerContainer(topicHandlers, 'type')
64+
const container = new KafkaHandlerContainer(routing.build(), 'type')
6865

6966
// Then
70-
expect(container.resolveHandler('all', { type: 'create' })?.schema).toBe(CREATE_SCHEMA)
67+
expect(container.resolveHandler('all', { type: 'create', prop: 1 })?.schema).toBe(
68+
CREATE_SCHEMA,
69+
)
7170
expect(container.resolveHandler('all', { type: 'update' })?.schema).toBe(UPDATE_SCHEMA)
72-
expect(container.resolveHandler('all', { type: 'non-existing' })?.schema).toBe(EMPTY_SCHEMA)
71+
expect(container.resolveHandler('all', { type: 'non-existing' as any })?.schema).toBe(
72+
EMPTY_SCHEMA,
73+
)
7374
expect(container.resolveHandler('all', {})?.schema).toBe(EMPTY_SCHEMA)
7475

7576
expect(container.resolveHandler('create', { type: 'create', prop: 1 })?.schema).toBe(
@@ -81,18 +82,19 @@ describe('KafkaHandlerContainer', () => {
8182
expect(container.resolveHandler('create', {} as any)?.schema).toBe(undefined)
8283

8384
expect(container.resolveHandler('empty', {} as any)?.schema).toBe(EMPTY_SCHEMA)
84-
expect(container.resolveHandler('empty', { type: 'create' })?.schema).toBe(EMPTY_SCHEMA)
85+
expect(container.resolveHandler('empty', { type: 'create' } as any)?.schema).toBe(
86+
EMPTY_SCHEMA,
87+
)
8588
})
8689

8790
it('should resolve handler without message type', () => {
8891
// Given
89-
const topicHandlers: KafkaHandlerRouting<TopicsConfig, any, any> = {
90-
create: [new KafkaHandlerConfig(CREATE_SCHEMA, () => Promise.resolve())],
91-
empty: [new KafkaHandlerConfig(EMPTY_SCHEMA, () => Promise.resolve())],
92-
}
92+
const routing = new KafkaHandlerRoutingBuilder<TopicsConfig, any>()
93+
.addConfig('create', new KafkaHandlerConfig(CREATE_SCHEMA, () => Promise.resolve()))
94+
.addConfig('empty', new KafkaHandlerConfig(EMPTY_SCHEMA, () => Promise.resolve()))
9395

9496
// When
95-
const container = new KafkaHandlerContainer(topicHandlers)
97+
const container = new KafkaHandlerContainer(routing.build())
9698

9799
// Then
98100
expect(container.resolveHandler('create', { type: 'create', prop: 1 })?.schema).toBe(
@@ -104,7 +106,9 @@ describe('KafkaHandlerContainer', () => {
104106
expect(container.resolveHandler('create', {} as any)?.schema).toBe(CREATE_SCHEMA)
105107

106108
expect(container.resolveHandler('empty', {} as any)?.schema).toBe(EMPTY_SCHEMA)
107-
expect(container.resolveHandler('empty', { type: 'create' })?.schema).toBe(EMPTY_SCHEMA)
109+
expect(container.resolveHandler('empty', { type: 'create' } as any)?.schema).toBe(
110+
EMPTY_SCHEMA,
111+
)
108112
})
109113
})
110114

@@ -119,19 +123,15 @@ describe('KafkaHandlerContainer', () => {
119123

120124
it('should return all topics', () => {
121125
// Given
122-
const topicHandlers: KafkaHandlerRouting<TopicsConfig, any, any> = {
123-
all: [
124-
new KafkaHandlerConfig(CREATE_SCHEMA, () => Promise.resolve()),
125-
new KafkaHandlerConfig(UPDATE_SCHEMA, () => Promise.resolve()),
126-
new KafkaHandlerConfig(EMPTY_SCHEMA, () => Promise.resolve()),
127-
],
128-
create: [new KafkaHandlerConfig(CREATE_SCHEMA, () => Promise.resolve())],
129-
empty: [new KafkaHandlerConfig(EMPTY_SCHEMA, () => Promise.resolve())],
130-
another: [],
131-
}
126+
const routing = new KafkaHandlerRoutingBuilder<TopicsConfig, any>()
127+
.addConfig('all', new KafkaHandlerConfig(CREATE_SCHEMA, () => Promise.resolve()))
128+
.addConfig('all', new KafkaHandlerConfig(UPDATE_SCHEMA, () => Promise.resolve()))
129+
.addConfig('all', new KafkaHandlerConfig(EMPTY_SCHEMA, () => Promise.resolve()))
130+
.addConfig('create', new KafkaHandlerConfig(CREATE_SCHEMA, () => Promise.resolve()))
131+
.addConfig('empty', new KafkaHandlerConfig(EMPTY_SCHEMA, () => Promise.resolve()))
132132

133133
// When
134-
const container = new KafkaHandlerContainer(topicHandlers, 'type')
134+
const container = new KafkaHandlerContainer({ ...routing.build(), another: [] }, 'type')
135135

136136
// Then
137137
expect(container.topics).toEqual(['all', 'create', 'empty'])

packages/kafka/lib/handler-container/KafkaHandlerContainer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ export class KafkaHandlerContainer<TopicsConfig extends TopicConfig[], Execution
6565
if (!handlers) return undefined
6666

6767
let messageValueType: string | undefined = undefined
68+
// @ts-expect-error
6869
if (this.messageTypeField) messageValueType = messageValue[this.messageTypeField]
6970

7071
return messageValueType

packages/kafka/lib/handler-container/KafkaHandlerRoutingBuilder.spec.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import type { Message } from '@platformatic/kafka'
22
import { expectTypeOf } from 'vitest'
3-
import z from 'zod'
4-
import type { TopicConfig } from '../types.ts'
5-
import { KafkaHandlerConfig, type RequestContext } from './KafkaHandlerConfig.ts'
3+
import z from 'zod/v4'
4+
import type { RequestContext, TopicConfig } from '../types.ts'
5+
import { KafkaHandlerConfig } from './KafkaHandlerConfig.ts'
66
import { KafkaHandlerRoutingBuilder } from './KafkaHandlerRoutingBuilder.ts'
77

88
const CREATE_SCHEMA = z.object({ type: z.literal('create') })

packages/kafka/lib/handler-container/KafkaHandlerRoutingBuilder.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@ import type {
66
} from '../types.ts'
77
import type { KafkaHandlerConfig } from './KafkaHandlerConfig.ts'
88

9-
export type KafkaHandlerRouting<
10-
TopicsConfig extends TopicConfig[],
11-
ExecutionContext,
12-
MessageValue extends SupportedMessageValues<TopicsConfig> = SupportedMessageValues<TopicsConfig>,
13-
> = Record<string, KafkaHandlerConfig<MessageValue, ExecutionContext>[]>
9+
export type KafkaHandlerRouting<TopicsConfig extends TopicConfig[], ExecutionContext> = Record<
10+
string,
11+
KafkaHandlerConfig<SupportedMessageValues<TopicsConfig>, ExecutionContext>[]
12+
>
1413

1514
export class KafkaHandlerRoutingBuilder<
1615
const TopicsConfig extends TopicConfig[],
@@ -23,7 +22,9 @@ export class KafkaHandlerRoutingBuilder<
2322
MessageValue extends SupportedMessageValuesForTopic<TopicsConfig, Topic>,
2423
>(topic: Topic, config: KafkaHandlerConfig<MessageValue, ExecutionContext>): this {
2524
this.configs[topic] ??= []
26-
this.configs[topic].push(config)
25+
this.configs[topic].push(
26+
config as KafkaHandlerConfig<SupportedMessageValues<TopicsConfig>, ExecutionContext>,
27+
)
2728

2829
return this
2930
}

packages/kafka/lib/types.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1+
import type { CommonLogger } from '@lokalise/node-core'
12
import type { QueueDependencies } from '@message-queue-toolkit/core'
23
import type { ConnectionOptions } from '@platformatic/kafka'
3-
import type { ZodSchema } from 'zod'
4-
import type z from 'zod/v3'
4+
import type { ZodSchema, z } from 'zod/v4'
5+
6+
export interface RequestContext {
7+
logger: CommonLogger
8+
reqId: string
9+
}
510

611
export type KafkaDependencies = QueueDependencies
712

@@ -12,7 +17,7 @@ export type KafkaConfig = {
1217

1318
export type TopicConfig<Topic extends string = string> = {
1419
topic: Topic
15-
schemas: ZodSchema[]
20+
schemas: ZodSchema<object>[]
1621
}
1722

1823
export type SupportedTopics<TopicsConfig extends TopicConfig[]> = TopicsConfig[number]['topic']
@@ -34,6 +39,6 @@ type MessageSchemas<TopicsConfig extends TopicConfig[]> = TopicsConfig[number]['
3439
export type SupportedMessageValuesInput<TopicsConfig extends TopicConfig[]> = z.input<
3540
MessageSchemas<TopicsConfig>
3641
>
37-
export type SupportedMessageValues<TopicsConfig extends TopicConfig[]> = z.input<
42+
export type SupportedMessageValues<TopicsConfig extends TopicConfig[]> = z.output<
3843
MessageSchemas<TopicsConfig>
3944
>

packages/kafka/lib/utils/safeJsonDeserializer.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { stringValueSerializer } from '@lokalise/node-core'
2-
import { safeJsonDeserializer } from './safeJsonDeserializer.js'
2+
import { safeJsonDeserializer } from './safeJsonDeserializer.ts'
33

44
describe('safeJsonDeserializer', () => {
55
it('should deserialize valid JSON strings', () => {

0 commit comments

Comments
 (0)