Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/apps/chatwoot/api/chatwoot.webhook.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ export class ChatwootWebhookController {
return { success: true };
}

// Ignore messages with status "read" (from update_last_seen endpoint)
if (body.status === 'read') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no such field :(
The only message_updated we need - when agent clicks on "Retry" button in CHatWoot message.
Found a bit weird way to identify it, hopefully it'll work 🀞

return { success: true };
}

const data: InboxData = {
session: session,
app: id,
Expand Down
6 changes: 6 additions & 0 deletions src/apps/chatwoot/chatwoot.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { WAHAMessageAnyConsumer } from './consumers/waha/message.any';
import { WAHAMessageEditedConsumer } from './consumers/waha/message.edited';
import { WAHAMessageReactionConsumer } from './consumers/waha/message.reaction';
import { WAHAMessageRevokedConsumer } from './consumers/waha/message.revoked';
import { WAHAMessageReadConsumer } from './consumers/waha/message.ack';
import { WAHASessionStatusConsumer } from './consumers/waha/session.status';
import { ChatWootQueueService } from './services/ChatWootQueueService';
import { ChatWootScheduleService } from './services/ChatWootScheduleService';
Expand Down Expand Up @@ -55,6 +56,10 @@ const IMPORTS = lodash.flatten([
name: QueueName.WAHA_MESSAGE_REVOKED,
defaultJobOptions: merge(ExponentialRetriesJobOptions, JobRemoveOptions),
}),
RegisterAppQueue({
name: QueueName.WAHA_MESSAGE_READ,
defaultJobOptions: merge(ExponentialRetriesJobOptions, JobRemoveOptions),
}),
RegisterAppQueue({
name: QueueName.WAHA_SESSION_STATUS,
defaultJobOptions: merge(ExponentialRetriesJobOptions, JobRemoveOptions),
Expand Down Expand Up @@ -98,6 +103,7 @@ const PROVIDERS = [
WAHAMessageReactionConsumer,
WAHAMessageEditedConsumer,
WAHAMessageRevokedConsumer,
WAHAMessageReadConsumer,
MessageCleanupConsumer,
CheckVersionConsumer,
ChatWootWAHAQueueService,
Expand Down
9 changes: 9 additions & 0 deletions src/apps/chatwoot/client/ContactConversationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,13 @@ export class ContactConversationService {
}
}
}

public async markConversationAsRead(conversationId: number, sourceId: string): Promise<void> {
await this.conversationService.markAsRead(conversationId, sourceId);
}

public async getSourceIdByChatId(chatId: string): Promise<string | null> {
const contact = await this.contactService.searchByAnyID(chatId);
return contact ? contact.sourceId : null;
}
}
27 changes: 27 additions & 0 deletions src/apps/chatwoot/client/ConversationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,31 @@ export class ConversationService {
);
return conversation;
}

async markAsRead(conversationId: number, sourceId: string): Promise<void> {
try {
// Call the update_last_seen endpoint using direct HTTP request
// since the SDK doesn't have this method
const axios = require('axios');
const response = await axios.post(
`${this.config.url}/public/api/v1/inboxes/${this.config.inboxIdentifier}/contacts/${sourceId}/conversations/${conversationId}/update_last_seen`,
{},
{
headers: {
'api_access_token': this.config.inboxIdentifier,
'Content-Type': 'application/json',
},
}
);

this.logger.info(
`Marked conversation.id: ${conversationId} as read in inbox: ${this.config.inboxIdentifier} for contact: ${sourceId}`,
);
} catch (err) {
this.logger.error(
`Error marking conversation.id: ${conversationId} as read: ${err.message}`,
);
throw err;
}
}
}
1 change: 1 addition & 0 deletions src/apps/chatwoot/consumers/QueueName.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export enum QueueName {
WAHA_MESSAGE_REACTION = 'chatwoot.waha | message.reaction',
WAHA_MESSAGE_EDITED = 'chatwoot.waha | message.edited',
WAHA_MESSAGE_REVOKED = 'chatwoot.waha | message.revoked',
WAHA_MESSAGE_READ = 'chatwoot.waha | message.read',
//
// ChatWoot Events - Real
//
Expand Down
1 change: 1 addition & 0 deletions src/apps/chatwoot/consumers/waha/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export function ListenEventsForChatWoot() {
WAHAEvents.MESSAGE_REACTION,
WAHAEvents.MESSAGE_EDITED,
WAHAEvents.MESSAGE_REVOKED,
WAHAEvents.MESSAGE_ACK,
WAHAEvents.SESSION_STATUS,
];
}
Expand Down
97 changes: 97 additions & 0 deletions src/apps/chatwoot/consumers/waha/message.ack.ts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use event in consumer name - so it'd be message.ack.ts because it process message.ack event.

Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { Processor } from '@nestjs/bullmq';
import { JOB_CONCURRENCY } from '@waha/apps/app_sdk/constants';
import { QueueName } from '@waha/apps/chatwoot/consumers/QueueName';
import { EventData } from '@waha/apps/chatwoot/consumers/types';
import {
ChatWootWAHABaseConsumer,
IMessageInfo,
} from '@waha/apps/chatwoot/consumers/waha/base';
import { WAHASessionAPI } from '@waha/apps/chatwoot/session/WAHASelf';
import { WhatsAppContactInfo } from '@waha/apps/chatwoot/contacts/WhatsAppContactInfo';
import { SessionManager } from '@waha/core/abc/manager.abc';
import { RMutexService } from '@waha/modules/rmutex/rmutex.service';
import { WAHAEvents } from '@waha/structures/enums.dto';
import { WAHAWebhookMessageAck } from '@waha/structures/webhooks.dto';
import { Job } from 'bullmq';
import { PinoLogger } from 'nestjs-pino';

@Processor(QueueName.WAHA_MESSAGE_READ, { concurrency: JOB_CONCURRENCY })
export class WAHAMessageReadConsumer extends ChatWootWAHABaseConsumer {
constructor(
protected readonly manager: SessionManager,
log: PinoLogger,
rmutex: RMutexService,
) {
super(manager, log, rmutex, 'WAHAMessageReadConsumer');
}

GetChatId(event: WAHAWebhookMessageAck): string {
return event.payload.from;
}

async Process(
job: Job<EventData, any, WAHAEvents>,
info: IMessageInfo,
): Promise<any> {
const container = await this.DIContainer(job, job.data.app);
const event: WAHAWebhookMessageAck = job.data.event as any;
const session = new WAHASessionAPI(event.session, container.WAHASelf());
const handler = new MessageReadHandler(
job,
container.ContactConversationService(),
container.Logger(),
info,
session,
container.Locale(),
);
return await handler.handle(event);
}
}

class MessageReadHandler {
constructor(
private job: Job,
private contactConversationService: any,
private logger: any,
private info: IMessageInfo,
private session: WAHASessionAPI,
private l: any,
) {}

async handle(event: WAHAWebhookMessageAck) {
const payload = event.payload;

// Only process READ acknowledgments
if (payload.ack !== 3) { // WAMessageAck.READ = 3
this.logger.debug(`Ignoring non-READ ack: ${payload.ack} for message ${payload.id}`);
return;
}

try {
// Create contact info from the chat ID
const contactInfo = WhatsAppContactInfo(this.session, payload.from, this.l);

// Get or create conversation for this contact
const conversation = await this.contactConversationService.ConversationByContact(contactInfo);

// Get the sourceId by searching for the contact
const sourceId = await this.contactConversationService.getSourceIdByChatId(payload.from);
if (!sourceId) {
this.logger.error(`Contact not found for chatId: ${payload.from}`);
return;
}

// Mark the conversation as read in Chatwoot
await this.contactConversationService.markConversationAsRead(conversation.conversationId, sourceId);

this.logger.info(
`Marked conversation ${conversation.conversationId} as read for chatId: ${payload.from} (message: ${payload.id}, sourceId: ${sourceId})`,
);
} catch (err) {
this.logger.error(
`Error marking conversation as read for chatId ${payload.from}: ${err.message}`,
);
throw err;
}
}
}
4 changes: 4 additions & 0 deletions src/apps/chatwoot/services/ChatWootWAHAQueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ export class ChatWootWAHAQueueService {
private readonly queueMessageEdited: Queue,
@InjectQueue(QueueName.WAHA_MESSAGE_REVOKED)
private readonly queueMessageRevoked: Queue,
@InjectQueue(QueueName.WAHA_MESSAGE_READ)
private readonly queueMessageRead: Queue,
@InjectQueue(QueueName.WAHA_SESSION_STATUS)
private readonly queueSessionStatus: Queue,
) {}
Expand All @@ -42,6 +44,8 @@ export class ChatWootWAHAQueueService {
return this.queueMessageEdited;
case WAHAEvents.MESSAGE_REVOKED:
return this.queueMessageRevoked;
case WAHAEvents.MESSAGE_ACK:
return this.queueMessageRead;
case WAHAEvents.SESSION_STATUS:
return this.queueSessionStatus;
default:
Expand Down