Skip to content

Commit 0d33b21

Browse files
update kafka fix producer
1 parent 8cb6651 commit 0d33b21

File tree

4 files changed

+14
-25
lines changed

4 files changed

+14
-25
lines changed

src/configs/kafka.config.ts

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,33 @@ export default registerAs(
1515
consumerEnable: process.env.KAFKA_CONSUMER_ENABLE === 'true',
1616
consumer: {
1717
groupId: process.env.KAFKA_CONSUMER_GROUP || 'nestjs.ack',
18-
sessionTimeout: ms('30s'), // 30s
19-
rebalanceTimeout: ms('60s'), //60s
20-
heartbeatInterval: ms('5s'), // 5s
18+
sessionTimeout: ms('60s'), // 6000 .. 300000
19+
rebalanceTimeout: ms('90s'), // 300000
20+
heartbeatInterval: ms('3s'), // 3000
2121

2222
maxBytesPerPartition: bytes('1mb'), // 1mb
23-
maxBytes: bytes('5mb'), // 5mb
23+
maxBytes: bytes('10mb'), // 5mb
2424
maxWaitTimeInMs: ms('5s'), // 5s
2525

2626
retry: {
27-
maxRetryTime: ms('30s'), // 30s
28-
initialRetryTime: ms('3s'), // 3s
27+
maxRetryTime: ms('60s'), // 30s
28+
initialRetryTime: ms('0.3s'), // 3s
2929
retries: 5,
3030
},
3131
},
3232
consumerSubscribe: {
33-
fromBeginning: true,
33+
fromBeginning: false,
3434
},
3535

3636
// producer
3737
producerEnable: process.env.KAFKA_PRODUCER_ENABLE === 'true',
3838
producer: {
3939
createPartitioner: Partitioners.LegacyPartitioner,
40-
transactionTimeout: ms('60s'), //60s
40+
transactionTimeout: ms('100s'), // 30000 .. 60000
41+
4142
retry: {
42-
maxRetryTime: ms('30s'), // 30s
43-
initialRetryTime: ms('3s'), // 3s
43+
maxRetryTime: ms('60s'), // 30s
44+
initialRetryTime: ms('0.3s'), // 3s
4445
retries: 5,
4546
},
4647
},

src/kafka/interceptors/kafka.response.interceptor.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export class KafkaResponseInterceptor implements NestInterceptor<Promise<any>> {
2323
if (response) {
2424
delete response.__class;
2525
delete response.__function;
26+
2627
return JSON.stringify({
2728
headers,
2829
key,

src/kafka/kafka.module.ts

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,6 @@ import { KafkaProducerService } from './services/kafka.producer.service';
3838
'kafka.allowAutoTopicCreation'
3939
),
4040
},
41-
subscribe: {
42-
topics: KAFKA_TOPICS_REPLY,
43-
...configService.get<ConsumerSubscribeTopics>(
44-
'kafka.consumerSubscribe'
45-
),
46-
},
47-
send: {
48-
timeout: configService.get<number>(
49-
'kafka.producerSend.timeout'
50-
),
51-
acks: -1,
52-
},
53-
producerOnlyMode: true,
5441
},
5542
}),
5643
},

src/kafka/services/kafka.producer.service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ export class KafkaProducerService
6262

6363
const send = await firstValueFrom(
6464
this.clientKafka
65-
.send<any, IKafkaMessage<T>>(topic, message)
65+
.send<any, string>(topic, JSON.stringify(message))
6666
.pipe(timeout(this.timeout))
6767
);
6868

@@ -85,7 +85,7 @@ export class KafkaProducerService
8585
};
8686

8787
this.clientKafka
88-
.emit<any, IKafkaMessage<T>>(topic, message)
88+
.emit<any, string>(topic, JSON.stringify(message))
8989
.pipe(timeout(this.timeout));
9090

9191
return;

0 commit comments

Comments
 (0)