Skip to content

Commit 096f33b

Browse files
Merge pull request #2230 from davidschuette/kafka-commit-offsets
docs(microservices): add commit offsets doc for kafka
2 parents adb865f + 22f8487 commit 096f33b

File tree

1 file changed

+55
-0
lines changed

1 file changed

+55
-0
lines changed

content/microservices/kafka.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,3 +426,58 @@ throw new KafkaRetriableException('...');
426426
```
427427

428428
> info **Hint** `KafkaRetriableException` class is exported from the `@nestjs/microservices` package.
429+
430+
#### Commit offsets
431+
432+
Committing offsets is essential when working with Kafka. Per default, messages will be automatically committed after a specific time. For more information visit [KafkaJS docs](https://kafka.js.org/docs/consuming#autocommit). `ClientKafka` offers a way to manually commit offsets that work like the [native KafkaJS implementation](https://kafka.js.org/docs/consuming#manual-committing).
433+
434+
```typescript
435+
@@filename()
436+
@EventPattern('user.created')
437+
async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaContext) {
438+
// business logic
439+
440+
const originalMessage = context.getMessage();
441+
const { topic, partition, offset } = originalMessage;
442+
await this.client.commitOffsets([{ topic, partition, offset }])
443+
}
444+
@@switch
445+
@Bind(Payload(), Ctx())
446+
@EventPattern('user.created')
447+
async handleUserCreated(data, context) {
448+
// business logic
449+
450+
const originalMessage = context.getMessage();
451+
const { topic, partition, offset } = originalMessage;
452+
await this.client.commitOffsets([{ topic, partition, offset }])
453+
}
454+
```
455+
456+
To disable auto-committing of messages set `autoCommit: false` in the `run` configuration, as follows:
457+
458+
```typescript
459+
@@filename(main)
460+
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
461+
transport: Transport.KAFKA,
462+
options: {
463+
client: {
464+
brokers: ['localhost:9092'],
465+
},
466+
run: {
467+
autoCommit: false
468+
}
469+
}
470+
});
471+
@@switch
472+
const app = await NestFactory.createMicroservice(AppModule, {
473+
transport: Transport.KAFKA,
474+
options: {
475+
client: {
476+
brokers: ['localhost:9092'],
477+
},
478+
run: {
479+
autoCommit: false
480+
}
481+
}
482+
});
483+
```

0 commit comments

Comments
 (0)