@@ -319,14 +319,16 @@ To access the original Kafka `IncomingMessage` object, use the `getMessage()` me
319
319
@MessagePattern (' hero.kill.dragon' )
320
320
killDragon (@Payload () message : KillDragonMessage , @Ctx () context : KafkaContext ) {
321
321
const originalMessage = context .getMessage ();
322
- const { headers, partition, timestamp } = originalMessage ;
322
+ const partition = context .getPartition ();
323
+ const { headers, timestamp } = originalMessage ;
323
324
}
324
325
@@switch
325
326
@Bind (Payload (), Ctx ())
326
327
@MessagePattern (' hero.kill.dragon' )
327
328
killDragon (message , context ) {
328
329
const originalMessage = context .getMessage ();
329
- const { headers, partition, timestamp } = originalMessage ;
330
+ const partition = context .getPartition ();
331
+ const { headers, timestamp } = originalMessage ;
330
332
}
331
333
```
332
334
@@ -441,8 +443,9 @@ Committing offsets is essential when working with Kafka. Per default, messages w
441
443
async handleUserCreated (@Payload () data : IncomingMessage , @Ctx () context : KafkaContext ) {
442
444
// business logic
443
445
444
- const originalMessage = context .getMessage ();
445
- const { topic, partition, offset } = originalMessage ;
446
+ const { offset } = context .getMessage ();
447
+ const partition = context .getPartition ();
448
+ const topic = context .getTopic ();
446
449
await this .client .commitOffsets ([{ topic , partition , offset }])
447
450
}
448
451
@@switch
@@ -451,8 +454,9 @@ async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaC
451
454
async handleUserCreated (data , context ) {
452
455
// business logic
453
456
454
- const originalMessage = context .getMessage ();
455
- const { topic, partition, offset } = originalMessage ;
457
+ const { offset } = context .getMessage ();
458
+ const partition = context .getPartition ();
459
+ const topic = context .getTopic ();
456
460
await this .client .commitOffsets ([{ topic , partition , offset }])
457
461
}
458
462
```
0 commit comments