Skip to content

Commit 9fd40a4

Browse files
Merge pull request #1726 from Santosl2/feat/add-async-lock
feat: add BaileysMessageProcessor for improved message handling
2 parents e321609 + d98fa52 commit 9fd40a4

File tree

4 files changed

+80
-1
lines changed

4 files changed

+80
-1
lines changed

package-lock.json

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
"qrcode": "^1.5.4",
9696
"qrcode-terminal": "^0.12.0",
9797
"redis": "^4.7.0",
98+
"rxjs": "^7.8.2",
9899
"sharp": "^0.34.2",
99100
"socket.io": "^4.8.1",
100101
"socket.io-client": "^4.8.1",
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import { Logger } from '@config/logger.config';
2+
import { BaileysEventMap, MessageUpsertType, proto } from 'baileys';
3+
import { catchError, concatMap, delay, EMPTY, from, retryWhen, Subject, Subscription, take, tap } from 'rxjs';
4+
5+
type MessageUpsertPayload = BaileysEventMap['messages.upsert'];
6+
type MountProps = {
7+
onMessageReceive: (payload: MessageUpsertPayload, settings: any) => Promise<void>;
8+
};
9+
10+
export class BaileysMessageProcessor {
11+
private processorLogs = new Logger('BaileysMessageProcessor');
12+
private subscription?: Subscription;
13+
14+
protected messageSubject = new Subject<{
15+
messages: proto.IWebMessageInfo[];
16+
type: MessageUpsertType;
17+
requestId?: string;
18+
settings: any;
19+
}>();
20+
21+
mount({ onMessageReceive }: MountProps) {
22+
this.subscription = this.messageSubject
23+
.pipe(
24+
tap(({ messages }) => {
25+
this.processorLogs.log(`Processing batch of ${messages.length} messages`);
26+
}),
27+
concatMap(({ messages, type, requestId, settings }) =>
28+
from(onMessageReceive({ messages, type, requestId }, settings)).pipe(
29+
retryWhen((errors) =>
30+
errors.pipe(
31+
tap((error) => this.processorLogs.warn(`Retrying message batch due to error: ${error.message}`)),
32+
delay(1000), // 1 segundo de delay
33+
take(3), // Máximo 3 tentativas
34+
),
35+
),
36+
),
37+
),
38+
catchError((error) => {
39+
this.processorLogs.error(`Error processing message batch: ${error}`);
40+
return EMPTY;
41+
}),
42+
)
43+
.subscribe({
44+
error: (error) => {
45+
this.processorLogs.error(`Message stream error: ${error}`);
46+
},
47+
});
48+
}
49+
50+
processMessage(payload: MessageUpsertPayload, settings: any) {
51+
const { messages, type, requestId } = payload;
52+
this.messageSubject.next({ messages, type, requestId, settings });
53+
}
54+
55+
onDestroy() {
56+
this.subscription?.unsubscribe();
57+
this.messageSubject.complete();
58+
}
59+
}

src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ import sharp from 'sharp';
148148
import { PassThrough, Readable } from 'stream';
149149
import { v4 } from 'uuid';
150150

151+
import { BaileysMessageProcessor } from './baileysMessage.processor';
151152
import { useVoiceCallsBaileys } from './voiceCalls/useVoiceCallsBaileys';
152153

153154
const groupMetadataCache = new CacheService(new CacheEngine(configService, 'groups').getEngine());
@@ -213,6 +214,8 @@ async function getVideoDuration(input: Buffer | string | Readable): Promise<numb
213214
}
214215

215216
export class BaileysStartupService extends ChannelStartupService {
217+
private messageProcessor = new BaileysMessageProcessor();
218+
216219
constructor(
217220
public readonly configService: ConfigService,
218221
public readonly eventEmitter: EventEmitter2,
@@ -224,6 +227,9 @@ export class BaileysStartupService extends ChannelStartupService {
224227
) {
225228
super(configService, eventEmitter, prismaRepository, chatwootCache);
226229
this.instance.qrcode = { count: 0 };
230+
this.messageProcessor.mount({
231+
onMessageReceive: this.messageHandle['messages.upsert'].bind(this), // Bind the method to the current context
232+
});
227233

228234
this.authStateProvider = new AuthStateProvider(this.providerFiles);
229235
}
@@ -243,6 +249,7 @@ export class BaileysStartupService extends ChannelStartupService {
243249
}
244250

245251
public async logoutInstance() {
252+
this.messageProcessor.onDestroy();
246253
await this.client?.logout('Log out instance: ' + this.instanceName);
247254

248255
this.client?.ws?.close();
@@ -1653,7 +1660,9 @@ export class BaileysStartupService extends ChannelStartupService {
16531660

16541661
if (events['messages.upsert']) {
16551662
const payload = events['messages.upsert'];
1656-
this.messageHandle['messages.upsert'](payload, settings);
1663+
1664+
this.messageProcessor.processMessage(payload, settings);
1665+
// this.messageHandle['messages.upsert'](payload, settings);
16571666
}
16581667

16591668
if (events['messages.update']) {

0 commit comments

Comments
 (0)