Skip to content

Commit 0bc6416

Browse files
committed
docs(microservices): add commit offsets doc for kafka
This is the associated documentation for PR #9270 of nest.
1 parent 14e5d47 commit 0bc6416

File tree

1 file changed

+56
-0
lines changed

1 file changed

+56
-0
lines changed

content/microservices/kafka.md

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,3 +389,59 @@ onModuleInit() {
389389
```
390390

391391
> info **Hint** Kafka reply topic naming conventions can be customized by extending `ClientKafka` in your own custom provider and overriding the `getResponsePatternName` method.
392+
393+
#### Commit offsets
394+
395+
Comitting offsets is essential when working with kafka. Per default messages will be automatically committed after a specific time. For more information visit [kafkajs docs](https://kafka.js.org/docs/consuming#autocommit). `ClientKafka` offers a way to manually commit offsets that functions like the [native kafkajs implementation](https://kafka.js.org/docs/consuming#manual-committing).
396+
397+
```typescript
398+
@@filename()
399+
400+
constructor(@Inject(KAFKA) client: ClientKafka) {}
401+
402+
@EventPattern('user_created')
403+
async handleUserCreated(@Payload() data: IncomingMessage) {
404+
// business logic
405+
406+
await this.client.commitOffsets([ { topic: data.topic, partition: data.partition, offset: data.offset } ])
407+
}
408+
@@switch
409+
410+
constructor(@Inject(KAFKA) client: ClientKafka) {}
411+
412+
@EventPattern('user_created')
413+
async handleUserCreated(data) {
414+
// business logic
415+
416+
await this.client.commitOffsets([ { topic: data.topic, partition: data.partition, offset: data.offset } ])
417+
}
418+
```
419+
420+
To disable auto-committing of messages set `autoCommit: false` in the `run` config.
421+
422+
```typescript
423+
@@filename(main)
424+
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
425+
transport: Transport.KAFKA,
426+
options: {
427+
client: {
428+
brokers: ['localhost:9092'],
429+
},
430+
run: {
431+
autoCommit: false
432+
}
433+
}
434+
});
435+
@@switch
436+
const app = await NestFactory.createMicroservice(AppModule, {
437+
transport: Transport.KAFKA,
438+
options: {
439+
client: {
440+
brokers: ['localhost:9092'],
441+
},
442+
run: {
443+
autoCommit: false
444+
}
445+
}
446+
});
447+
```

0 commit comments

Comments
 (0)