Skip to content

Commit 89d4d34

Browse files
committed
feat: add BaileysMessageProcessor for improved message handling and integrate rxjs for asynchronous processing
1 parent 3960624 commit 89d4d34

File tree

4 files changed

+72
-1
lines changed

4 files changed

+72
-1
lines changed

package-lock.json

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
"qrcode": "^1.5.4",
9696
"qrcode-terminal": "^0.12.0",
9797
"redis": "^4.7.0",
98+
"rxjs": "^7.8.2",
9899
"sharp": "^0.32.6",
99100
"socket.io": "^4.8.1",
100101
"socket.io-client": "^4.8.1",
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import { Logger } from '@config/logger.config';
2+
import { BaileysEventMap, MessageUpsertType, proto } from 'baileys';
3+
import { catchError, concatMap, EMPTY, from, Subject, Subscription, tap } from 'rxjs';
4+
5+
type MessageUpsertPayload = BaileysEventMap['messages.upsert'];
6+
type MountProps = {
7+
onMessageReceive: (payload: MessageUpsertPayload, settings: any) => Promise<void>;
8+
};
9+
10+
export class BaileysMessageProcessor {
11+
private processorLogs = new Logger('BaileysMessageProcessor');
12+
private subscription?: Subscription;
13+
14+
protected messageSubject = new Subject<{
15+
messages: proto.IWebMessageInfo[];
16+
type: MessageUpsertType;
17+
requestId?: string;
18+
settings: any;
19+
}>();
20+
21+
mount({ onMessageReceive }: MountProps) {
22+
this.subscription = this.messageSubject
23+
.pipe(
24+
tap(({ messages }) => {
25+
this.processorLogs.log(`Processing batch of ${messages.length} messages`);
26+
}),
27+
concatMap(({ messages, type, requestId, settings }) =>
28+
from(onMessageReceive({ messages, type, requestId }, settings)),
29+
),
30+
catchError((error) => {
31+
this.processorLogs.error(`Error processing message batch: ${error}`);
32+
return EMPTY;
33+
}),
34+
)
35+
.subscribe({
36+
error: (error) => {
37+
this.processorLogs.error(`Message stream error: ${error}`);
38+
},
39+
});
40+
}
41+
42+
processMessage(payload: MessageUpsertPayload, settings: any) {
43+
const { messages, type, requestId } = payload;
44+
this.messageSubject.next({ messages, type, requestId, settings });
45+
}
46+
47+
onDestroy() {
48+
this.subscription?.unsubscribe();
49+
this.messageSubject.complete();
50+
}
51+
}

src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ import sharp from 'sharp';
147147
import { PassThrough, Readable } from 'stream';
148148
import { v4 } from 'uuid';
149149

150+
import { BaileysMessageProcessor } from './baileysMessage.processor';
150151
import { useVoiceCallsBaileys } from './voiceCalls/useVoiceCallsBaileys';
151152

152153
const groupMetadataCache = new CacheService(new CacheEngine(configService, 'groups').getEngine());
@@ -212,6 +213,8 @@ async function getVideoDuration(input: Buffer | string | Readable): Promise<numb
212213
}
213214

214215
export class BaileysStartupService extends ChannelStartupService {
216+
private messageProcessor = new BaileysMessageProcessor();
217+
215218
constructor(
216219
public readonly configService: ConfigService,
217220
public readonly eventEmitter: EventEmitter2,
@@ -223,6 +226,9 @@ export class BaileysStartupService extends ChannelStartupService {
223226
) {
224227
super(configService, eventEmitter, prismaRepository, chatwootCache);
225228
this.instance.qrcode = { count: 0 };
229+
this.messageProcessor.mount({
230+
onMessageReceive: this.messageHandle['messages.upsert'].bind(this), // Bind the method to the current context
231+
});
226232

227233
this.authStateProvider = new AuthStateProvider(this.providerFiles);
228234
}
@@ -242,6 +248,7 @@ export class BaileysStartupService extends ChannelStartupService {
242248
}
243249

244250
public async logoutInstance() {
251+
this.messageProcessor.onDestroy();
245252
await this.client?.logout('Log out instance: ' + this.instanceName);
246253

247254
this.client?.ws?.close();
@@ -1618,7 +1625,9 @@ export class BaileysStartupService extends ChannelStartupService {
16181625

16191626
if (events['messages.upsert']) {
16201627
const payload = events['messages.upsert'];
1621-
this.messageHandle['messages.upsert'](payload, settings);
1628+
1629+
this.messageProcessor.processMessage(payload, settings);
1630+
// this.messageHandle['messages.upsert'](payload, settings);
16221631
}
16231632

16241633
if (events['messages.update']) {

0 commit comments

Comments
 (0)