Skip to content

Commit ae3a597

Browse files
update sequential
1 parent 0d33b21 commit ae3a597

File tree

9 files changed

+101
-43
lines changed

9 files changed

+101
-43
lines changed

src/configs/kafka.config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ export default registerAs(
2323
maxBytes: bytes('10mb'), // 5mb
2424
maxWaitTimeInMs: ms('5s'), // 5s
2525

26+
maxInFlightRequests: null, // set this to make customer guaranteed sequential
27+
2628
retry: {
2729
maxRetryTime: ms('60s'), // 30s
2830
initialRetryTime: ms('0.3s'), // 3s

src/kafka.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { ConfigService } from '@nestjs/config';
33
import { NestApplication } from '@nestjs/core';
44
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
55
import { ConsumerConfig, ConsumerSubscribeTopics } from 'kafkajs';
6-
import { KAFKA_TOPICS } from 'src/kafka/constants/kafka.constant';
6+
import { ENUM_KAFKA_TOPICS } from 'src/kafka/constants/kafka.topic.constant';
77

88
export default async function (app: NestApplication) {
99
const configService = app.get(ConfigService);
@@ -22,7 +22,7 @@ export default async function (app: NestApplication) {
2222
'kafka.allowAutoTopicCreation'
2323
);
2424
const subscribe: ConsumerSubscribeTopics = {
25-
topics: KAFKA_TOPICS,
25+
topics: Object.values(ENUM_KAFKA_TOPICS),
2626
...configService.get<ConsumerSubscribeTopics>(
2727
'kafka.consumerSubscribe'
2828
),
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
import { IKafkaCreateTopic } from 'src/kafka/interfaces/kafka.interface';
12
import { ENUM_KAFKA_TOPICS } from './kafka.topic.constant';
23

34
export const KAFKA_PRODUCER_SERVICE_NAME = 'KAFKA_PRODUCER_SERVICE';
45

5-
export const KAFKA_TOPICS: string[] = [
6-
...new Set(Object.values(ENUM_KAFKA_TOPICS)),
7-
];
8-
9-
export const KAFKA_TOPICS_REPLY: string[] = [
10-
...new Set(Object.values(ENUM_KAFKA_TOPICS).map((val) => `${val}.reply`)),
11-
];
6+
export const KafkaCreateTopis: IKafkaCreateTopic[] = Object.values(
7+
ENUM_KAFKA_TOPICS
8+
).map((val) => ({
9+
topic: val,
10+
topicReply: `${val}.reply`,
11+
}));
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
export enum ENUM_KAFKA_TOPICS {
2-
ACK_SUCCESS = 'nestjs.ack.success2',
3-
ACK_ERROR = 'nestjs.ack.error2',
2+
ACK_SUCCESS = 'nestjs.ack.success',
3+
ACK_ERROR = 'nestjs.ack.error',
44
}

src/kafka/interfaces/kafka.interface.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,10 @@ export interface IKafkaErrorException {
2626
errors: ValidationError;
2727
statusHttp: HttpStatus;
2828
}
29+
30+
export interface IKafkaCreateTopic {
31+
topic: string;
32+
topicReply: string;
33+
partition?: number;
34+
replicationFactor?: number;
35+
}

src/kafka/interfaces/kafka.producer-service.interface.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,17 @@ export interface IKafkaProducerService {
1717
options?: IKafkaProducerMessageOptions
1818
): void;
1919

20+
sendSequential<T, N>(
21+
topic: string,
22+
data: T,
23+
options?: IKafkaProducerSendMessageOptions
24+
): Promise<IKafkaMessage<N> | N>;
25+
26+
emitSequential<T>(
27+
topic: string,
28+
data: T,
29+
options?: IKafkaProducerMessageOptions
30+
): void;
31+
2032
createId(): string;
2133
}

src/kafka/kafka.module.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
import { DynamicModule, Global, Module } from '@nestjs/common';
22
import { ConfigModule, ConfigService } from '@nestjs/config';
33
import { ClientsModule, Transport } from '@nestjs/microservices';
4-
import { ConsumerSubscribeTopics, ProducerConfig } from 'kafkajs';
5-
import {
6-
KAFKA_PRODUCER_SERVICE_NAME,
7-
KAFKA_TOPICS_REPLY,
8-
} from './constants/kafka.constant';
4+
import { ProducerConfig } from 'kafkajs';
5+
import { KAFKA_PRODUCER_SERVICE_NAME } from './constants/kafka.constant';
96
import { KafkaRouterModule } from './router/kafka.router.module';
107
import { KafkaAdminService } from './services/kafka.admin.service';
118
import { KafkaProducerService } from './services/kafka.producer.service';

src/kafka/services/kafka.admin.service.ts

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@ import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
22
import { Admin, ITopicConfig, Kafka, KafkaConfig } from 'kafkajs';
33
import { Logger } from '@nestjs/common/services/logger.service';
44
import { ConfigService } from '@nestjs/config';
5-
import { KAFKA_TOPICS, KAFKA_TOPICS_REPLY } from '../constants/kafka.constant';
65
import { IKafkaAdminService } from 'src/kafka/interfaces/kafka.admin-service.interface';
6+
import { IKafkaCreateTopic } from 'src/kafka/interfaces/kafka.interface';
7+
import { KafkaCreateTopis } from 'src/kafka/constants/kafka.constant';
78

89
@Injectable()
910
export class KafkaAdminService
1011
implements IKafkaAdminService, OnModuleInit, OnModuleDestroy
1112
{
1213
private readonly kafka: Kafka;
1314
private readonly admin: Admin;
14-
private readonly topics: string[];
15-
private readonly topicsReply: string[];
15+
private readonly topics: IKafkaCreateTopic[];
1616
private readonly brokers: string[];
1717
private readonly clientId: string;
1818
private readonly kafkaOptions: KafkaConfig;
@@ -24,8 +24,7 @@ export class KafkaAdminService
2424
this.clientId = this.configService.get<string>('kafka.admin.clientId');
2525
this.brokers = this.configService.get<string[]>('kafka.brokers');
2626

27-
this.topics = KAFKA_TOPICS;
28-
this.topicsReply = KAFKA_TOPICS_REPLY;
27+
this.topics = KafkaCreateTopis;
2928

3029
this.kafkaOptions = {
3130
clientId: this.clientId,
@@ -74,27 +73,30 @@ export class KafkaAdminService
7473

7574
async createTopics(): Promise<boolean> {
7675
this.logger.log(`Topics ${this.topics}`);
77-
this.logger.log(`Topics Reply ${this.topicsReply}`);
7876

7977
const currentTopic: string[] = await this.getAllTopicUnique();
8078
const data: ITopicConfig[] = [];
8179

8280
for (const topic of this.topics) {
83-
if (!currentTopic.includes(topic)) {
81+
const partition: number = topic.partition ?? this.defaultPartition;
82+
const replicationFactor: number =
83+
topic.replicationFactor &&
84+
topic.replicationFactor <= this.brokers.length
85+
? topic.replicationFactor
86+
: this.brokers.length;
87+
if (!currentTopic.includes(topic.topic)) {
8488
data.push({
85-
topic,
86-
numPartitions: this.defaultPartition,
87-
replicationFactor: this.brokers.length,
89+
topic: topic.topic,
90+
numPartitions: partition,
91+
replicationFactor: replicationFactor,
8892
});
8993
}
90-
}
9194

92-
for (const replyTopic of this.topicsReply) {
93-
if (!currentTopic.includes(replyTopic)) {
95+
if (!currentTopic.includes(topic.topicReply)) {
9496
data.push({
95-
topic: replyTopic,
96-
numPartitions: this.defaultPartition,
97-
replicationFactor: this.brokers.length,
97+
topic: topic.topicReply,
98+
numPartitions: partition,
99+
replicationFactor,
98100
});
99101
}
100102
}
@@ -117,14 +119,12 @@ export class KafkaAdminService
117119
const data = [];
118120

119121
for (const topic of this.topics) {
120-
if (currentTopic.includes(topic)) {
121-
data.push(topic);
122+
if (currentTopic.includes(topic.topic)) {
123+
data.push(topic.topic);
122124
}
123-
}
124125

125-
for (const replyTopic of this.topicsReply) {
126-
if (currentTopic.includes(replyTopic)) {
127-
data.push(replyTopic);
126+
if (currentTopic.includes(topic.topicReply)) {
127+
data.push(topic.topicReply);
128128
}
129129
}
130130

src/kafka/services/kafka.producer.service.ts

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@ import {
1515
IKafkaProducerSendMessageOptions,
1616
} from 'src/kafka/interfaces/kafka.interface';
1717
import { IKafkaProducerService } from 'src/kafka/interfaces/kafka.producer-service.interface';
18-
import {
19-
KAFKA_PRODUCER_SERVICE_NAME,
20-
KAFKA_TOPICS,
21-
} from '../constants/kafka.constant';
18+
import { KAFKA_PRODUCER_SERVICE_NAME } from '../constants/kafka.constant';
19+
import { ENUM_KAFKA_TOPICS } from 'src/kafka/constants/kafka.topic.constant';
2220

2321
@Injectable()
2422
export class KafkaProducerService
@@ -40,7 +38,7 @@ export class KafkaProducerService
4038
}
4139

4240
async onApplicationBootstrap(): Promise<void> {
43-
KAFKA_TOPICS.forEach((topic) =>
41+
Object.values(ENUM_KAFKA_TOPICS).forEach((topic) =>
4442
this.clientKafka.subscribeToResponseOf(topic)
4543
);
4644

@@ -91,6 +89,48 @@ export class KafkaProducerService
9189
return;
9290
}
9391

92+
async sendSequential<T, N>(
93+
topic: string,
94+
data: T,
95+
options?: IKafkaProducerSendMessageOptions
96+
): Promise<IKafkaMessage<N> | N> {
97+
const message: IKafkaMessage<T> = {
98+
key: `${topic}-sequential-key`,
99+
value: data,
100+
headers: options && options.headers ? options.headers : undefined,
101+
};
102+
103+
const send = await firstValueFrom(
104+
this.clientKafka
105+
.send<any, string>(topic, JSON.stringify(message))
106+
.pipe(timeout(this.timeout))
107+
);
108+
109+
if (send.error) {
110+
throw send.error;
111+
}
112+
113+
return options && options.raw ? send : send.value;
114+
}
115+
116+
emitSequential<T>(
117+
topic: string,
118+
data: T,
119+
options?: IKafkaProducerMessageOptions
120+
): void {
121+
const message: IKafkaMessage<T> = {
122+
key: `${topic}-sequential-key`,
123+
value: data,
124+
headers: options && options.headers ? options.headers : undefined,
125+
};
126+
127+
this.clientKafka
128+
.emit<any, string>(topic, JSON.stringify(message))
129+
.pipe(timeout(this.timeout));
130+
131+
return;
132+
}
133+
94134
createId(): string {
95135
const rand: string = this.helperStringService.random(10);
96136
const timestamp = `${this.helperDateService.timestamp()}`;

0 commit comments

Comments
 (0)