Skip to content

Commit 4875876

Browse files
Merge branch 'stage'
2 parents 69ed453 + cf9da7f commit 4875876

File tree

4 files changed

+35
-29
lines changed

4 files changed

+35
-29
lines changed

package-lock.json

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
"class-validator": "^0.14.0",
5858
"compression": "^1.7.4",
5959
"cookie-parser": "^1.4.5",
60-
"kafkajs": "^1.15.0",
60+
"kafkajs": "^2.2.4",
6161
"lodash": "^4.17.20",
6262
"mqtt": "^4.3.7",
6363
"njwt": "^1.0.0",

src/resources/device-model-schema.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ export const deviceModelSchema = {
134134
description:
135135
"Property. Model:'https://schema.org/Text'. Device's manufacturer name.",
136136
},
137+
name: {
138+
type: "string",
139+
description:
140+
"Property. Model:'https://schema.org/Text'. Device's model name in Portal"
141+
}
137142
},
138143
},
139144
],
@@ -145,5 +150,6 @@ export const deviceModelSchema = {
145150
"manufacturerName",
146151
"brandName",
147152
"modelName",
153+
"name"
148154
],
149155
};

src/services/kafka/kafka.service.ts

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
3030
const kafkaConfig = {
3131
clientId: process.env.KAFKA_CLIENTID || "os2iot-client",
3232
brokers: [
33-
`${process.env.KAFKA_HOSTNAME || "localhost"}:${
33+
`${ process.env.KAFKA_HOSTNAME || "localhost" }:${
3434
process.env.KAFKA_PORT || "9093"
3535
}`,
3636
],
@@ -66,8 +66,8 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
6666
await this.consumer.on(this.consumer.events.STOP, () => {
6767
this.logger.debug("STOP ... ");
6868
});
69-
await this.consumer.on(this.consumer.events.CRASH, ({ error }) => {
70-
this.logger.debug("CRASH ... " + error);
69+
await this.consumer.on(this.consumer.events.CRASH, ({ payload }) => {
70+
this.logger.debug("CRASH ... " + payload);
7171
});
7272
await this.consumer.on(this.consumer.events.DISCONNECT, () => {
7373
this.logger.debug("DISCONNECT ... ");
@@ -96,16 +96,16 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
9696
autoCommit: true,
9797
autoCommitThreshold: 1,
9898
eachMessage: async ({
99-
topic,
100-
message,
101-
}: {
99+
topic,
100+
message,
101+
}: {
102102
topic: string;
103103
message: KafkaMessage;
104104
}) => {
105105
try {
106106
const arr = SUBSCRIBER_COMBINED_REF_MAP.get(topic);
107107
this.logger.debug(
108-
`Got kafka message, have ${arr.length} receivers ...`
108+
`Got kafka message, have ${ arr.length } receivers ...`
109109
);
110110
arr.forEach(async tuple => {
111111
const object = tuple[0];
@@ -118,8 +118,8 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
118118
await fn.apply(object, [msg]);
119119
});
120120
} catch (err) {
121-
this.logger.error(`Error occurred in eachMessage: ${err}`);
122-
this.logger.error(`${JSON.stringify(err)}`);
121+
this.logger.error(`Error occurred in eachMessage: ${ err }`);
122+
this.logger.error(`${ JSON.stringify(err) }`);
123123
}
124124
},
125125
})
@@ -132,18 +132,18 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
132132
kafkaTopic: string,
133133
kafkaMessage: KafkaPayload
134134
): Promise<void | RecordMetadata[]> {
135-
this.logger.debug(`Connecting producer ...`);
136-
await this.producer.connect();
137-
this.logger.debug(`Connected ...`);
135+
const message = {
136+
topic: kafkaTopic,
137+
messages: [{ value: JSON.stringify(kafkaMessage) }],
138+
};
138139
const metadata = await this.producer
139-
.send({
140-
topic: kafkaTopic,
141-
messages: [{ value: JSON.stringify(kafkaMessage) }],
142-
})
143-
.catch(e => this.logger.error(e.message, e));
144-
this.logger.debug(`Sent from producer`);
145-
await this.producer.disconnect();
146-
this.logger.debug(`Disconnected ...`);
140+
.send(message)
141+
.catch(async e => {
142+
await this.producer.connect()
143+
this.logger.warn("Kafka failed sending message, retrying", e)
144+
await this.producer.send(message).catch(e => this.logger.error(e.message, e)
145+
)
146+
});
147147
return metadata;
148148
}
149149
}

0 commit comments

Comments
 (0)