Skip to content

Commit 2eb8bbc

Browse files
authored
Feat: Kafka execution context support (#292)
* Handler container supporting execution context * Test fixes * Adding execution context to the consumer * Lint fix * Release prepare
1 parent 1a5ea02 commit 2eb8bbc

9 files changed

+134
-87
lines changed

packages/kafka/lib/AbstractKafkaConsumer.ts

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,16 @@ import { safeJsonDeserializer } from './utils/safeJsonDeserializer.js'
2424
export type KafkaConsumerDependencies = KafkaDependencies &
2525
Pick<QueueConsumerDependencies, 'transactionObservabilityManager'>
2626

27-
export type KafkaConsumerOptions<TopicsConfig extends TopicConfig[]> = BaseKafkaOptions &
27+
export type KafkaConsumerOptions<
28+
TopicsConfig extends TopicConfig[],
29+
ExecutionContext,
30+
> = BaseKafkaOptions &
2831
Omit<
2932
ConsumerOptions<string, object, string, string>,
3033
'deserializers' | 'autocommit' | keyof KafkaConfig
3134
> &
3235
Omit<ConsumeOptions<string, object, string, string>, 'topics'> & {
33-
handlers: KafkaHandlerRouting<TopicsConfig>
36+
handlers: KafkaHandlerRouting<TopicsConfig, ExecutionContext>
3437
}
3538

3639
/*
@@ -41,24 +44,28 @@ const MAX_IN_MEMORY_RETRIES = 3
4144

4245
export abstract class AbstractKafkaConsumer<
4346
TopicsConfig extends TopicConfig[],
44-
> extends AbstractKafkaService<TopicsConfig, KafkaConsumerOptions<TopicsConfig>> {
47+
ExecutionContext,
48+
> extends AbstractKafkaService<TopicsConfig, KafkaConsumerOptions<TopicsConfig, ExecutionContext>> {
4549
private readonly consumer: Consumer<string, object, string, string>
4650
private consumerStream?: MessagesStream<string, object, string, string>
4751

4852
private readonly transactionObservabilityManager: TransactionObservabilityManager
49-
private readonly handlerContainer: KafkaHandlerContainer<TopicsConfig>
53+
private readonly handlerContainer: KafkaHandlerContainer<TopicsConfig, ExecutionContext>
54+
private readonly executionContext: ExecutionContext
5055

5156
constructor(
5257
dependencies: KafkaConsumerDependencies,
53-
options: KafkaConsumerOptions<TopicsConfig>,
58+
options: KafkaConsumerOptions<TopicsConfig, ExecutionContext>,
59+
executionContext: ExecutionContext,
5460
) {
5561
super(dependencies, options)
5662

5763
this.transactionObservabilityManager = dependencies.transactionObservabilityManager
58-
this.handlerContainer = new KafkaHandlerContainer<TopicsConfig>(
64+
this.handlerContainer = new KafkaHandlerContainer<TopicsConfig, ExecutionContext>(
5965
options.handlers,
6066
options.messageTypeField,
6167
)
68+
this.executionContext = executionContext
6269

6370
this.consumer = new Consumer({
6471
...this.options.kafka,
@@ -169,11 +176,11 @@ export abstract class AbstractKafkaConsumer<
169176

170177
private async tryToConsume<MessageValue extends object>(
171178
message: Message<string, MessageValue, string, string>,
172-
handler: KafkaHandler<MessageValue>,
179+
handler: KafkaHandler<MessageValue, ExecutionContext>,
173180
requestContext: RequestContext,
174181
): Promise<boolean> {
175182
try {
176-
await handler(message, requestContext)
183+
await handler(message, this.executionContext, requestContext)
177184
return true
178185
} catch (error) {
179186
this.handlerError(error, {

packages/kafka/lib/handler-container/KafkaHandlerConfig.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,20 @@ export interface RequestContext {
77
reqId: string
88
}
99

10-
export type KafkaHandler<MessageValue extends object> = (
10+
export type KafkaHandler<MessageValue extends object, ExecutionContext> = (
1111
message: Message<string, MessageValue, string, string>,
12+
context: ExecutionContext,
1213
requestContext: RequestContext,
1314
) => Promise<void> | void
1415

15-
export class KafkaHandlerConfig<MessageValue extends object> {
16+
export class KafkaHandlerConfig<MessageValue extends object, ExecutionContext> {
1617
public readonly schema: ZodSchema<MessageValue>
17-
public readonly handler: KafkaHandler<MessageValue>
18+
public readonly handler: KafkaHandler<MessageValue, ExecutionContext>
1819

19-
constructor(schema: ZodSchema<MessageValue>, handler: KafkaHandler<MessageValue>) {
20+
constructor(
21+
schema: ZodSchema<MessageValue>,
22+
handler: KafkaHandler<MessageValue, ExecutionContext>,
23+
) {
2024
this.schema = schema
2125
this.handler = handler
2226
}

packages/kafka/lib/handler-container/KafkaHandlerContainer.spec.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ describe('KafkaHandlerContainer', () => {
3737
new KafkaHandlerConfig(EMPTY_SCHEMA, () => Promise.resolve()),
3838
new KafkaHandlerConfig(EMPTY_SCHEMA, () => Promise.resolve()),
3939
],
40-
} satisfies KafkaHandlerRouting<typeof topicsConfig>
40+
}
4141

4242
// When & Then
4343
expect(
@@ -50,7 +50,7 @@ describe('KafkaHandlerContainer', () => {
5050

5151
it('should resolve handler with message type', () => {
5252
// Given
53-
const topicHandlers: KafkaHandlerRouting<TopicsConfig, any> = {
53+
const topicHandlers: KafkaHandlerRouting<TopicsConfig, any, any> = {
5454
all: [
5555
new KafkaHandlerConfig(CREATE_SCHEMA, () => Promise.resolve()),
5656
new KafkaHandlerConfig(UPDATE_SCHEMA, () => Promise.resolve()),
@@ -61,7 +61,7 @@ describe('KafkaHandlerContainer', () => {
6161
}
6262

6363
// When
64-
const container = new KafkaHandlerContainer<TopicsConfig>(topicHandlers, 'type')
64+
const container = new KafkaHandlerContainer(topicHandlers, 'type')
6565

6666
// Then
6767
expect(container.resolveHandler('all', { type: 'create' })?.schema).toBe(CREATE_SCHEMA)
@@ -79,7 +79,7 @@ describe('KafkaHandlerContainer', () => {
7979

8080
it('should resolve handler without message type', () => {
8181
// Given
82-
const topicHandlers: KafkaHandlerRouting<TopicsConfig, any> = {
82+
const topicHandlers: KafkaHandlerRouting<TopicsConfig, any, any> = {
8383
create: [new KafkaHandlerConfig(CREATE_SCHEMA, () => Promise.resolve())],
8484
empty: [new KafkaHandlerConfig(EMPTY_SCHEMA, () => Promise.resolve())],
8585
}
@@ -102,15 +102,15 @@ describe('KafkaHandlerContainer', () => {
102102
describe('topics', () => {
103103
it('should not fail with empty topics', () => {
104104
// When
105-
const container = new KafkaHandlerContainer<TopicsConfig>({})
105+
const container = new KafkaHandlerContainer({})
106106

107107
// Then
108108
expect(container.topics).toEqual([])
109109
})
110110

111111
it('should return all topics', () => {
112112
// Given
113-
const topicHandlers: KafkaHandlerRouting<TopicsConfig, any> = {
113+
const topicHandlers: KafkaHandlerRouting<TopicsConfig, any, any> = {
114114
all: [
115115
new KafkaHandlerConfig(CREATE_SCHEMA, () => Promise.resolve()),
116116
new KafkaHandlerConfig(UPDATE_SCHEMA, () => Promise.resolve()),
@@ -122,7 +122,7 @@ describe('KafkaHandlerContainer', () => {
122122
}
123123

124124
// When
125-
const container = new KafkaHandlerContainer<TopicsConfig>(topicHandlers, 'type')
125+
const container = new KafkaHandlerContainer(topicHandlers, 'type')
126126

127127
// Then
128128
expect(container.topics).toEqual(['all', 'create', 'empty'])

packages/kafka/lib/handler-container/KafkaHandlerContainer.ts

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,30 @@ import type { KafkaHandlerRouting } from './KafkaHandlerRoutingBuilder.ts'
99

1010
const DEFAULT_HANDLER_KEY = Symbol('default-handler')
1111

12-
type Handlers<TopicsConfig extends TopicConfig[]> = Record<
12+
type Handlers<TopicsConfig extends TopicConfig[], ExecutionContext> = Record<
1313
string,
14-
Record<string | symbol, KafkaHandlerConfig<SupportedMessageValues<TopicsConfig>>>
14+
Record<
15+
string | symbol,
16+
KafkaHandlerConfig<SupportedMessageValues<TopicsConfig>, ExecutionContext>
17+
>
1518
>
1619

17-
export class KafkaHandlerContainer<TopicsConfig extends TopicConfig[]> {
18-
private readonly handlers: Handlers<TopicsConfig>
20+
export class KafkaHandlerContainer<TopicsConfig extends TopicConfig[], ExecutionContext> {
21+
private readonly handlers: Handlers<TopicsConfig, ExecutionContext>
1922
private readonly messageTypeField?: string
2023

21-
constructor(topicHandlers: KafkaHandlerRouting<TopicsConfig>, messageTypeField?: string) {
24+
constructor(
25+
topicHandlers: KafkaHandlerRouting<TopicsConfig, ExecutionContext>,
26+
messageTypeField?: string,
27+
) {
2228
this.messageTypeField = messageTypeField
2329
this.handlers = this.mapTopicHandlers(topicHandlers)
2430
}
2531

2632
private mapTopicHandlers(
27-
topicHandlerRouting: KafkaHandlerRouting<TopicsConfig>,
28-
): Handlers<TopicsConfig> {
29-
const result: Handlers<TopicsConfig> = {}
33+
topicHandlerRouting: KafkaHandlerRouting<TopicsConfig, ExecutionContext>,
34+
): Handlers<TopicsConfig, ExecutionContext> {
35+
const result: Handlers<TopicsConfig, ExecutionContext> = {}
3036

3137
for (const [topic, topicHandlers] of Object.entries(topicHandlerRouting)) {
3238
if (!topicHandlers.length) continue
@@ -52,7 +58,9 @@ export class KafkaHandlerContainer<TopicsConfig extends TopicConfig[]> {
5258
resolveHandler<Topic extends SupportedTopics<TopicsConfig>>(
5359
topic: Topic,
5460
messageValue: SupportedMessageValuesForTopic<TopicsConfig, Topic>,
55-
): KafkaHandlerConfig<SupportedMessageValuesForTopic<TopicsConfig, Topic>> | undefined {
61+
):
62+
| KafkaHandlerConfig<SupportedMessageValuesForTopic<TopicsConfig, Topic>, ExecutionContext>
63+
| undefined {
5664
const handlers = this.handlers[topic]
5765
if (!handlers) return undefined
5866

packages/kafka/lib/handler-container/KafkaHandlerRoutingBuilder.spec.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,37 @@ const topicsConfig = [
1515
] as const satisfies TopicConfig[]
1616
type TopicsConfig = typeof topicsConfig
1717

18+
type ExecutionContext = {
19+
hello: string
20+
}
21+
1822
describe('KafkaHandlerRoutingBuilder', () => {
1923
it('should build routing config', () => {
2024
type ExpectedMessage<MessageValue> = Message<string, MessageValue, string, string>
2125

2226
// Given
23-
const builder = new KafkaHandlerRoutingBuilder<TopicsConfig>()
27+
const builder = new KafkaHandlerRoutingBuilder<TopicsConfig, ExecutionContext>()
2428
.addConfig(
2529
'all',
26-
new KafkaHandlerConfig(CREATE_SCHEMA, (message, requestContext) => {
30+
new KafkaHandlerConfig(CREATE_SCHEMA, (message, executionContext, requestContext) => {
2731
expectTypeOf(message).toEqualTypeOf<ExpectedMessage<z.infer<typeof CREATE_SCHEMA>>>()
32+
expectTypeOf(executionContext).toEqualTypeOf<ExecutionContext>()
2833
expectTypeOf(requestContext).toEqualTypeOf<RequestContext>()
2934
}),
3035
)
3136
.addConfig(
3237
'all',
33-
new KafkaHandlerConfig(UPDATE_SCHEMA, (message, requestContext) => {
38+
new KafkaHandlerConfig(UPDATE_SCHEMA, (message, executionContext, requestContext) => {
3439
expectTypeOf(message).toEqualTypeOf<ExpectedMessage<z.infer<typeof UPDATE_SCHEMA>>>()
40+
expectTypeOf(executionContext).toEqualTypeOf<ExecutionContext>()
3541
expectTypeOf(requestContext).toEqualTypeOf<RequestContext>()
3642
}),
3743
)
3844
.addConfig(
3945
'empty',
40-
new KafkaHandlerConfig(EMPTY_SCHEMA, (message, requestContext) => {
46+
new KafkaHandlerConfig(EMPTY_SCHEMA, (message, executionContext, requestContext) => {
4147
expectTypeOf(message).toEqualTypeOf<ExpectedMessage<z.infer<typeof EMPTY_SCHEMA>>>()
48+
expectTypeOf(executionContext).toEqualTypeOf<ExecutionContext>()
4249
expectTypeOf(requestContext).toEqualTypeOf<RequestContext>()
4350
}),
4451
)

packages/kafka/lib/handler-container/KafkaHandlerRoutingBuilder.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,27 @@ import type { KafkaHandlerConfig } from './KafkaHandlerConfig.ts'
88

99
export type KafkaHandlerRouting<
1010
TopicsConfig extends TopicConfig[],
11+
ExecutionContext,
1112
MessageValue extends SupportedMessageValues<TopicsConfig> = SupportedMessageValues<TopicsConfig>,
12-
> = Record<string, KafkaHandlerConfig<MessageValue>[]>
13+
> = Record<string, KafkaHandlerConfig<MessageValue, ExecutionContext>[]>
1314

14-
export class KafkaHandlerRoutingBuilder<TopicsConfig extends TopicConfig[]> {
15-
private readonly configs: KafkaHandlerRouting<TopicsConfig> = {}
15+
export class KafkaHandlerRoutingBuilder<
16+
const TopicsConfig extends TopicConfig[],
17+
ExecutionContext,
18+
> {
19+
private readonly configs: KafkaHandlerRouting<TopicsConfig, ExecutionContext> = {}
1620

1721
addConfig<
1822
Topic extends SupportedTopics<TopicsConfig>,
1923
MessageValue extends SupportedMessageValuesForTopic<TopicsConfig, Topic>,
20-
>(topic: Topic, config: KafkaHandlerConfig<MessageValue>): this {
24+
>(topic: Topic, config: KafkaHandlerConfig<MessageValue, ExecutionContext>): this {
2125
this.configs[topic] ??= []
2226
this.configs[topic].push(config)
2327

2428
return this
2529
}
2630

27-
build(): KafkaHandlerRouting<TopicsConfig> {
31+
build(): KafkaHandlerRouting<TopicsConfig, ExecutionContext> {
2832
return this.configs
2933
}
3034
}

packages/kafka/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/kafka",
3-
"version": "0.1.3",
3+
"version": "0.2.0",
44
"engines": {
55
"node": ">= 22.14.0"
66
},

packages/kafka/test/consumer/PermissionConsumer.spec.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -324,10 +324,10 @@ describe('PermissionConsumer', () => {
324324
consumer = new PermissionConsumer(testContext.cradle, {
325325
handlers: {
326326
'permission-general': [
327-
new KafkaHandlerConfig(PERMISSION_ADDED_SCHEMA, (message, requestContext) => {
327+
new KafkaHandlerConfig(PERMISSION_ADDED_SCHEMA, (message, _, requestContext) => {
328328
handlerCalls.push({ messageValue: message.value, requestContext })
329329
}),
330-
new KafkaHandlerConfig(PERMISSION_SCHEMA, (message, requestContext) => {
330+
new KafkaHandlerConfig(PERMISSION_SCHEMA, (message, _, requestContext) => {
331331
handlerCalls.push({ messageValue: message.value, requestContext })
332332
}),
333333
],
@@ -373,7 +373,7 @@ describe('PermissionConsumer', () => {
373373
headerRequestIdField,
374374
handlers: {
375375
'permission-general': [
376-
new KafkaHandlerConfig(PERMISSION_ADDED_SCHEMA, (message, requestContext) => {
376+
new KafkaHandlerConfig(PERMISSION_ADDED_SCHEMA, (message, _, requestContext) => {
377377
handlerCalls.push({ messageValue: message.value, requestContext })
378378
}),
379379
],

0 commit comments

Comments
 (0)