Skip to content

Commit 6500880

Browse files
Merge pull request #3050 from esahin90/sahin/fix-kafka-commit-offsets
docs(microservices/kafka): Update commit offsets to use KafkaContext
2 parents 96afe18 + 55a266b commit 6500880

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

content/microservices/kafka.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ throw new KafkaRetriableException('...');
435435
436436
#### Commit offsets
437437

438-
Committing offsets is essential when working with Kafka. Per default, messages will be automatically committed after a specific time. For more information visit [KafkaJS docs](https://kafka.js.org/docs/consuming#autocommit). `ClientKafka` offers a way to manually commit offsets that work like the [native KafkaJS implementation](https://kafka.js.org/docs/consuming#manual-committing).
438+
Committing offsets is essential when working with Kafka. Per default, messages will be automatically committed after a specific time. For more information visit [KafkaJS docs](https://kafka.js.org/docs/consuming#autocommit). `KafkaContext` offers a way to access the active consumer for manually committing offsets. The consumer is the KafkaJS consumer and works as the [native KafkaJS implementation](https://kafka.js.org/docs/consuming#manual-committing).
439439

440440
```typescript
441441
@@filename()
@@ -446,7 +446,8 @@ async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaC
446446
const { offset } = context.getMessage();
447447
const partition = context.getPartition();
448448
const topic = context.getTopic();
449-
await this.client.commitOffsets([{ topic, partition, offset }])
449+
const consumer = context.getConsumer();
450+
await consumer.commitOffsets([{ topic, partition, offset }])
450451
}
451452
@@switch
452453
@Bind(Payload(), Ctx())
@@ -457,7 +458,8 @@ async handleUserCreated(data, context) {
457458
const { offset } = context.getMessage();
458459
const partition = context.getPartition();
459460
const topic = context.getTopic();
460-
await this.client.commitOffsets([{ topic, partition, offset }])
461+
const consumer = context.getConsumer();
462+
await consumer.commitOffsets([{ topic, partition, offset }])
461463
}
462464
```
463465

0 commit comments

Comments
 (0)