Skip to content

Commit b17b6c5

Browse files
Fix kafka retries when error, and dont commit message. That behaviour will break some workflow, so we add commit offset intercetor in first runing
1 parent ae3a597 commit b17b6c5

File tree

13 files changed

+122
-55
lines changed

13 files changed

+122
-55
lines changed

.env.example

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,5 @@ AWS_S3_BUCKET=baibay-development
5454
KAFKA_CLIENT_ID=KAFKA_ACK
5555
KAFKA_ADMIN_CLIENT_ID=KAFKA_ADMIN_ACK
5656
KAFKA_BROKERS=localhost:9092
57-
KAFKA_PRODUCER_ENABLE=true
5857
KAFKA_CONSUMER_ENABLE=true
5958
KAFKA_CONSUMER_GROUP=nestjs.ack

README.md

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,28 @@ Fork from [ack-nestjs-boilerplate](https://github.com/andrechristikan/ack-nestjs
88

99
## Instructions
1010

11-
Before run, you must to create the topics with `kafka-topics cli`. Or you can run `yarn kafka:create-topics`.
11+
Before run, you must to create the topics with `kafka-topics cli`. Or you can run `yarn kafka:create-topics`. (Point 9 at Behaviour)
12+
13+
14+
## Behaviour
15+
16+
1. Producer and consumer will use `nestjs/microservice`
17+
2. Producer will always imported as global module
18+
3. Message not commit offset in error
19+
4. Optional commit offset in first running
20+
5. Default kafka will create topic with 3 partition and 3 replication factor
21+
6. For guarantee sequential
22+
- Create topic with `1 partition` and `1 replication factor`
23+
- Set `maxInFlightRequests` to `1` in consumer config
24+
- Use `producerSendSequential` or `produceEmitSequential`
25+
7. `emit` means that whether or not you explicitly subscribe to the observable, the proxy will immediately try to deliver the event.
26+
8. `send` means that you have to explicitly subscribe to it before the message will be sent.
27+
9. `acks` will depends on kafka setting. Unfortunately, nestjs can not set acks when produce some message.
28+
10. Topics in `./src/kafka/constants/kafka.topic.constant.ts` must created before running.
29+
11. All kafka request will validate with `class-validation` if you set the dto class.
30+
12. Put KafkaController into `./src/kafka/router/kafka.router.module.ts`, so you can use `env.KAFKA_CONSUMER_ENABLE` for on/off
31+
32+
1233

1334
## Next
1435

src/common/common.module.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,6 @@ import { APP_LANGUAGE } from 'src/app/constants/app.constant';
158158
KAFKA_BROKERS: Joi.string()
159159
.default('localhost:9092')
160160
.required(),
161-
KAFKA_PRODUCER_ENABLE: Joi.boolean().default(true).required(),
162161
KAFKA_CONSUMER_ENABLE: Joi.boolean().default(true).required(),
163162
KAFKA_CONSUMER_GROUP: Joi.string()
164163
.default('nestjs.ack')

src/configs/kafka.config.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ export default registerAs(
3636
},
3737

3838
// producer
39-
producerEnable: process.env.KAFKA_PRODUCER_ENABLE === 'true',
4039
producer: {
4140
createPartitioner: Partitioners.LegacyPartitioner,
4241
transactionTimeout: ms('100s'), // 30000 .. 60000
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import { IKafkaCreateTopic } from 'src/kafka/interfaces/kafka.interface';
22
import { ENUM_KAFKA_TOPICS } from './kafka.topic.constant';
33

4-
export const KAFKA_PRODUCER_SERVICE_NAME = 'KAFKA_PRODUCER_SERVICE';
4+
export const KAFKA_SERVICE_NAME = 'KAFKA_SERVICE';
55

66
export const KafkaCreateTopis: IKafkaCreateTopic[] = Object.values(
77
ENUM_KAFKA_TOPICS
88
).map((val) => ({
99
topic: val,
1010
topicReply: `${val}.reply`,
11+
// partition?: number;
12+
// replicationFactor?: number;
1113
}));

src/kafka/controllers/kafka.controller.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { Logger } from 'src/common/logger/decorators/logger.decorator';
44
import { IResponse } from 'src/common/response/interfaces/response.interface';
55
import { ENUM_KAFKA_TOPICS } from 'src/kafka/constants/kafka.topic.constant';
66
import {
7+
MessageCommitOffsetInFirstRunning,
78
MessageTopic,
89
MessageValue,
910
} from 'src/kafka/decorators/kafka.decorator';
@@ -12,6 +13,7 @@ import { KafkaDto } from 'src/kafka/dtos/kafka.dto';
1213
@Controller()
1314
export class KafkaController {
1415
@Logger(ENUM_LOGGER_ACTION.TEST, { tags: ['helloKafka'] })
16+
@MessageCommitOffsetInFirstRunning()
1517
@MessageTopic(ENUM_KAFKA_TOPICS.ACK_SUCCESS)
1618
async helloKafka(
1719
@MessageValue() value: Record<string, any>
@@ -20,6 +22,7 @@ export class KafkaController {
2022
}
2123

2224
@Logger(ENUM_LOGGER_ACTION.TEST, { tags: ['helloKafkaError'] })
25+
@MessageCommitOffsetInFirstRunning()
2326
@MessageTopic(ENUM_KAFKA_TOPICS.ACK_ERROR)
2427
async errorKafka(@MessageValue() value: KafkaDto): Promise<IResponse> {
2528
return value;

src/kafka/controllers/kafka.test.controller.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,27 @@
1-
import { Controller, Get, Optional, VERSION_NEUTRAL } from '@nestjs/common';
1+
import { Controller, Get, VERSION_NEUTRAL } from '@nestjs/common';
22
import { ApiExcludeController } from '@nestjs/swagger';
33
import { Types } from 'mongoose';
44
import { ENUM_LOGGER_ACTION } from 'src/common/logger/constants/logger.enum.constant';
55
import { Logger } from 'src/common/logger/decorators/logger.decorator';
66
import { Response } from 'src/common/response/decorators/response.decorator';
77
import { IResponse } from 'src/common/response/interfaces/response.interface';
88
import { ENUM_KAFKA_TOPICS } from 'src/kafka/constants/kafka.topic.constant';
9-
import { KafkaException } from 'src/kafka/error/exceptions/kafka.exception';
10-
import { KafkaProducerService } from 'src/kafka/services/kafka.producer.service';
9+
import { KafkaHttpException } from 'src/kafka/error/exceptions/kafka.http-exception';
10+
import { KafkaService } from 'src/kafka/services/kafka.service';
1111

1212
@ApiExcludeController()
1313
@Controller({
1414
version: VERSION_NEUTRAL,
1515
path: '/kafka',
1616
})
1717
export class KafkaTestController {
18-
constructor(
19-
@Optional() private readonly kafkaProducerService: KafkaProducerService
20-
) {}
18+
constructor(private readonly kafkaService: KafkaService) {}
2119

2220
@Response('test.helloKafka')
2321
@Logger(ENUM_LOGGER_ACTION.TEST, { tags: ['testKafka'] })
2422
@Get('/')
2523
async helloKafka(): Promise<IResponse> {
26-
const response = await this.kafkaProducerService.send(
24+
const response = await this.kafkaService.produceSend(
2725
ENUM_KAFKA_TOPICS.ACK_SUCCESS,
2826
{
2927
test: 'test',
@@ -53,7 +51,7 @@ export class KafkaTestController {
5351
@Get('/error')
5452
async helloKafkaError(): Promise<IResponse> {
5553
try {
56-
const response = await this.kafkaProducerService.send(
54+
const response = await this.kafkaService.produceSend(
5755
ENUM_KAFKA_TOPICS.ACK_ERROR,
5856
{
5957
testNumber: [],
@@ -78,7 +76,7 @@ export class KafkaTestController {
7876

7977
return response;
8078
} catch (err: any) {
81-
throw new KafkaException(err);
79+
throw new KafkaHttpException(err);
8280
}
8381
}
8482
}

src/kafka/decorators/kafka.decorator.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { KafkaErrorFilter } from '../error/filters/kafka.error.filter';
1616
import { KafkaResponseInterceptor } from '../interceptors/kafka.response.interceptor';
1717
import { KafkaResponseTimeoutInterceptor } from '../interceptors/kafka.response.timeout.interceptor';
1818
import { KafkaValidationPipe } from '../pipes/kafka.validation.pipe';
19+
import { KafkaCommitOffsetFirstInterceptor } from 'src/kafka/interceptors/kafka.commit-offset-first.interceptor';
1920

2021
export function MessageTopic(topic: string): any {
2122
return applyDecorators(
@@ -46,3 +47,7 @@ export const MessageKey = createParamDecorator<string>(
4647
return key;
4748
}
4849
);
50+
51+
export function MessageCommitOffsetInFirstRunning(): any {
52+
return applyDecorators(UseInterceptors(KafkaCommitOffsetFirstInterceptor));
53+
}

src/kafka/error/exceptions/kafka.exception.ts renamed to src/kafka/error/exceptions/kafka.http-exception.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { HttpException, HttpStatus } from '@nestjs/common';
22
import { ENUM_ERROR_STATUS_CODE_ERROR } from 'src/common/error/constants/error.status-code.constant';
33
import { IKafkaErrorException } from 'src/kafka/interfaces/kafka.interface';
44

5-
export class KafkaException extends HttpException {
5+
export class KafkaHttpException extends HttpException {
66
constructor(exception: IKafkaErrorException) {
77
if (
88
'message' in exception &&
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import {
2+
Injectable,
3+
NestInterceptor,
4+
ExecutionContext,
5+
CallHandler,
6+
} from '@nestjs/common';
7+
import { RpcArgumentsHost } from '@nestjs/common/interfaces';
8+
import { KafkaContext } from '@nestjs/microservices';
9+
import { Observable } from 'rxjs';
10+
import { map } from 'rxjs/operators';
11+
import { KafkaService } from 'src/kafka/services/kafka.service';
12+
13+
@Injectable()
14+
export class KafkaCommitOffsetFirstInterceptor
15+
implements NestInterceptor<Promise<any>>
16+
{
17+
constructor(private readonly kafkaService: KafkaService) {}
18+
19+
async intercept(
20+
context: ExecutionContext,
21+
next: CallHandler
22+
): Promise<Observable<Promise<any> | string>> {
23+
const ctx: RpcArgumentsHost = context.switchToRpc();
24+
const kafkaContext = ctx.getContext<KafkaContext>();
25+
26+
try {
27+
await this.kafkaService.commitOffsets(kafkaContext);
28+
} catch (error: unknown) {}
29+
30+
return next.handle();
31+
}
32+
}

0 commit comments

Comments
 (0)