Skip to content

Commit bd0c43f

Browse files
Merge pull request #2017 from Vitordotpy/fix/enhanced-chatwoot-database-connection
Fix Chatwoot DB Connection Instability and Implement Stale Conversation Cache Handling
2 parents d8268b0 + 5dc1d02 commit bd0c43f

File tree

1 file changed

+139
-60
lines changed

1 file changed

+139
-60
lines changed

src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts

Lines changed: 139 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ export class ChatwootService {
5353
private readonly cache: CacheService,
5454
) {}
5555

56-
private pgClient = postgresClient.getChatwootConnection();
56+
private async getPgClient() {
57+
return postgresClient.getChatwootConnection();
58+
}
5759

5860
private async getProvider(instance: InstanceDto): Promise<ChatwootModel | null> {
5961
const cacheKey = `${instance.instanceName}:getProvider`;
@@ -382,7 +384,8 @@ export class ChatwootService {
382384
if (!uri) return false;
383385

384386
const sqlTags = `SELECT id, taggings_count FROM tags WHERE name = $1 LIMIT 1`;
385-
const tagData = (await this.pgClient.query(sqlTags, [nameInbox]))?.rows[0];
387+
const pgClient = await this.getPgClient();
388+
const tagData = (await pgClient.query(sqlTags, [nameInbox]))?.rows[0];
386389
let tagId = tagData?.id;
387390
const taggingsCount = tagData?.taggings_count || 0;
388391

@@ -392,18 +395,18 @@ export class ChatwootService {
392395
DO UPDATE SET taggings_count = tags.taggings_count + 1
393396
RETURNING id`;
394397

395-
tagId = (await this.pgClient.query(sqlTag, [nameInbox, taggingsCount + 1]))?.rows[0]?.id;
398+
tagId = (await pgClient.query(sqlTag, [nameInbox, taggingsCount + 1]))?.rows[0]?.id;
396399

397400
const sqlCheckTagging = `SELECT 1 FROM taggings
398401
WHERE tag_id = $1 AND taggable_type = 'Contact' AND taggable_id = $2 AND context = 'labels' LIMIT 1`;
399402

400-
const taggingExists = (await this.pgClient.query(sqlCheckTagging, [tagId, contactId]))?.rowCount > 0;
403+
const taggingExists = (await pgClient.query(sqlCheckTagging, [tagId, contactId]))?.rowCount > 0;
401404

402405
if (!taggingExists) {
403406
const sqlInsertLabel = `INSERT INTO taggings (tag_id, taggable_type, taggable_id, context, created_at)
404407
VALUES ($1, 'Contact', $2, 'labels', NOW())`;
405408

406-
await this.pgClient.query(sqlInsertLabel, [tagId, contactId]);
409+
await pgClient.query(sqlInsertLabel, [tagId, contactId]);
407410
}
408411

409412
return true;
@@ -861,6 +864,7 @@ export class ChatwootService {
861864
messageBody?: any,
862865
sourceId?: string,
863866
quotedMsg?: MessageModel,
867+
messageBodyForRetry?: any,
864868
) {
865869
const client = await this.clientCw(instance);
866870

@@ -869,32 +873,86 @@ export class ChatwootService {
869873
return null;
870874
}
871875

872-
const replyToIds = await this.getReplyToIds(messageBody, instance);
876+
const doCreateMessage = async (convId: number) => {
877+
const replyToIds = await this.getReplyToIds(messageBody, instance);
873878

874-
const sourceReplyId = quotedMsg?.chatwootMessageId || null;
879+
const sourceReplyId = quotedMsg?.chatwootMessageId || null;
875880

876-
const message = await client.messages.create({
877-
accountId: this.provider.accountId,
878-
conversationId: conversationId,
879-
data: {
880-
content: content,
881-
message_type: messageType,
882-
attachments: attachments,
883-
private: privateMessage || false,
884-
source_id: sourceId,
885-
content_attributes: {
886-
...replyToIds,
881+
const message = await client.messages.create({
882+
accountId: this.provider.accountId,
883+
conversationId: convId,
884+
data: {
885+
content: content,
886+
message_type: messageType,
887+
attachments: attachments,
888+
private: privateMessage || false,
889+
source_id: sourceId,
890+
content_attributes: {
891+
...replyToIds,
892+
},
893+
source_reply_id: sourceReplyId ? sourceReplyId.toString() : null,
887894
},
888-
source_reply_id: sourceReplyId ? sourceReplyId.toString() : null,
889-
},
890-
});
895+
});
891896

892-
if (!message) {
893-
this.logger.warn('message not found');
894-
return null;
897+
if (!message) {
898+
this.logger.warn('message not found');
899+
return null;
900+
}
901+
902+
return message;
903+
};
904+
905+
try {
906+
return await doCreateMessage(conversationId);
907+
} catch (error) {
908+
return this.handleStaleConversationError(
909+
error,
910+
instance,
911+
conversationId,
912+
messageBody,
913+
messageBodyForRetry,
914+
'createMessage',
915+
(newConvId) => doCreateMessage(newConvId),
916+
);
895917
}
918+
}
896919

897-
return message;
920+
private async handleStaleConversationError(
921+
error: any,
922+
instance: InstanceDto,
923+
conversationId: number,
924+
messageBody: any,
925+
messageBodyForRetry: any,
926+
functionName: string,
927+
originalFunction: (newConversationId: number) => Promise<any>,
928+
) {
929+
if (axios.isAxiosError(error) && error.response?.status === 404) {
930+
this.logger.warn(
931+
`Conversation ${conversationId} not found in Chatwoot. Retrying operation from ${functionName}...`,
932+
);
933+
const bodyForRetry = messageBodyForRetry || messageBody;
934+
935+
if (!bodyForRetry || !bodyForRetry.key?.remoteJid) {
936+
this.logger.error(`Cannot retry ${functionName} without a message body for context.`);
937+
return null;
938+
}
939+
940+
const { remoteJid } = bodyForRetry.key;
941+
const cacheKey = `${instance.instanceName}:createConversation-${remoteJid}`;
942+
await this.cache.delete(cacheKey);
943+
944+
const newConversationId = await this.createConversation(instance, bodyForRetry);
945+
if (!newConversationId) {
946+
this.logger.error(`Failed to create new conversation for ${remoteJid} during retry.`);
947+
return null;
948+
}
949+
950+
this.logger.log(`Retrying ${functionName} for ${remoteJid} with new conversation ${newConversationId}`);
951+
return await originalFunction(newConversationId);
952+
} else {
953+
this.logger.error(`Error in ${functionName}: ${error}`);
954+
throw error;
955+
}
898956
}
899957

900958
public async getOpenConversationByContact(
@@ -987,6 +1045,7 @@ export class ChatwootService {
9871045
messageBody?: any,
9881046
sourceId?: string,
9891047
quotedMsg?: MessageModel,
1048+
messageBodyForRetry?: any,
9901049
) {
9911050
if (sourceId && this.isImportHistoryAvailable()) {
9921051
const messageAlreadySaved = await chatwootImport.getExistingSourceIds([sourceId], conversationId);
@@ -997,54 +1056,65 @@ export class ChatwootService {
9971056
}
9981057
}
9991058
}
1000-
const data = new FormData();
1059+
const doSendData = async (convId: number) => {
1060+
const data = new FormData();
10011061

1002-
if (content) {
1003-
data.append('content', content);
1004-
}
1062+
if (content) {
1063+
data.append('content', content);
1064+
}
10051065

1006-
data.append('message_type', messageType);
1066+
data.append('message_type', messageType);
10071067

1008-
data.append('attachments[]', fileStream, { filename: fileName });
1068+
data.append('attachments[]', fileStream, { filename: fileName });
10091069

1010-
const sourceReplyId = quotedMsg?.chatwootMessageId || null;
1070+
const sourceReplyId = quotedMsg?.chatwootMessageId || null;
10111071

1012-
if (messageBody && instance) {
1013-
const replyToIds = await this.getReplyToIds(messageBody, instance);
1072+
if (messageBody && instance) {
1073+
const replyToIds = await this.getReplyToIds(messageBody, instance);
10141074

1015-
if (replyToIds.in_reply_to || replyToIds.in_reply_to_external_id) {
1016-
const content = JSON.stringify({
1017-
...replyToIds,
1018-
});
1019-
data.append('content_attributes', content);
1075+
if (replyToIds.in_reply_to || replyToIds.in_reply_to_external_id) {
1076+
const content = JSON.stringify({
1077+
...replyToIds,
1078+
});
1079+
data.append('content_attributes', content);
1080+
}
10201081
}
1021-
}
10221082

1023-
if (sourceReplyId) {
1024-
data.append('source_reply_id', sourceReplyId.toString());
1025-
}
1083+
if (sourceReplyId) {
1084+
data.append('source_reply_id', sourceReplyId.toString());
1085+
}
10261086

1027-
if (sourceId) {
1028-
data.append('source_id', sourceId);
1029-
}
1087+
if (sourceId) {
1088+
data.append('source_id', sourceId);
1089+
}
10301090

1031-
const config = {
1032-
method: 'post',
1033-
maxBodyLength: Infinity,
1034-
url: `${this.provider.url}/api/v1/accounts/${this.provider.accountId}/conversations/${conversationId}/messages`,
1035-
headers: {
1036-
api_access_token: this.provider.token,
1037-
...data.getHeaders(),
1038-
},
1039-
data: data,
1091+
const config = {
1092+
method: 'post',
1093+
maxBodyLength: Infinity,
1094+
url: `${this.provider.url}/api/v1/accounts/${this.provider.accountId}/conversations/${convId}/messages`,
1095+
headers: {
1096+
api_access_token: this.provider.token,
1097+
...data.getHeaders(),
1098+
},
1099+
data: data,
1100+
};
1101+
1102+
const { data: responseData } = await axios.request(config);
1103+
return responseData;
10401104
};
10411105

10421106
try {
1043-
const { data } = await axios.request(config);
1044-
1045-
return data;
1107+
return await doSendData(conversationId);
10461108
} catch (error) {
1047-
this.logger.error(error);
1109+
return this.handleStaleConversationError(
1110+
error,
1111+
instance,
1112+
conversationId,
1113+
messageBody,
1114+
messageBodyForRetry,
1115+
'sendData',
1116+
(newConvId) => doSendData(newConvId),
1117+
);
10481118
}
10491119
}
10501120

@@ -2032,6 +2102,7 @@ export class ChatwootService {
20322102
body,
20332103
'WAID:' + body.key.id,
20342104
quotedMsg,
2105+
null,
20352106
);
20362107

20372108
if (!send) {
@@ -2051,6 +2122,7 @@ export class ChatwootService {
20512122
body,
20522123
'WAID:' + body.key.id,
20532124
quotedMsg,
2125+
null,
20542126
);
20552127

20562128
if (!send) {
@@ -2076,6 +2148,7 @@ export class ChatwootService {
20762148
},
20772149
'WAID:' + body.key.id,
20782150
quotedMsg,
2151+
body,
20792152
);
20802153
if (!send) {
20812154
this.logger.warn('message not sent');
@@ -2132,6 +2205,8 @@ export class ChatwootService {
21322205
instance,
21332206
body,
21342207
'WAID:' + body.key.id,
2208+
quotedMsg,
2209+
null,
21352210
);
21362211

21372212
if (!send) {
@@ -2173,6 +2248,7 @@ export class ChatwootService {
21732248
body,
21742249
'WAID:' + body.key.id,
21752250
quotedMsg,
2251+
null,
21762252
);
21772253

21782254
if (!send) {
@@ -2192,6 +2268,7 @@ export class ChatwootService {
21922268
body,
21932269
'WAID:' + body.key.id,
21942270
quotedMsg,
2271+
null,
21952272
);
21962273

21972274
if (!send) {
@@ -2262,6 +2339,7 @@ export class ChatwootService {
22622339
},
22632340
'WAID:' + body.key.id,
22642341
null,
2342+
body,
22652343
);
22662344
if (!send) {
22672345
this.logger.warn('edited message not sent');
@@ -2515,7 +2593,8 @@ export class ChatwootService {
25152593
and created_at >= now() - interval '6h'
25162594
order by created_at desc`;
25172595

2518-
const messagesData = (await this.pgClient.query(sqlMessages))?.rows;
2596+
const pgClient = await this.getPgClient();
2597+
const messagesData = (await pgClient.query(sqlMessages))?.rows;
25192598
const ids: string[] = messagesData
25202599
.filter((message) => !!message.source_id)
25212600
.map((message) => message.source_id.replace('WAID:', ''));

0 commit comments

Comments
 (0)