Skip to content

Commit 64f70b4

Browse files
matheusmartinsInspermatheusmartinsInsper
authored andcommitted
fix: Refactor SQS controller to correct bug in sqs events by instance
- Implement dynamic queue creation based on enabled events - Add method to list existing queues for an instance - Improve error handling and logging for SQS operations - Remove unused queue removal methods - Update set method to handle queue creation/deletion on event changes - Add comments for future feature of forced queue deletion
1 parent 6c1e8de commit 64f70b4

File tree

1 file changed

+112
-58
lines changed

1 file changed

+112
-58
lines changed

src/api/integrations/event/sqs/sqs.controller.ts

Lines changed: 112 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import { PrismaRepository } from '@api/repository/repository.service';
22
import { WAMonitoringService } from '@api/services/monitor.service';
3-
import { SQS } from '@aws-sdk/client-sqs';
3+
import { CreateQueueCommand, DeleteQueueCommand, ListQueuesCommand, SQS } from '@aws-sdk/client-sqs';
44
import { configService, Log, Sqs } from '@config/env.config';
55
import { Logger } from '@config/logger.config';
66

77
import { EmitData, EventController, EventControllerInterface } from '../event.controller';
8+
import { EventDto } from '../event.dto';
89

910
export class SqsController extends EventController implements EventControllerInterface {
1011
private sqs: SQS;
@@ -45,6 +46,39 @@ export class SqsController extends EventController implements EventControllerInt
4546
return this.sqs;
4647
}
4748

49+
override async set(instanceName: string, data: EventDto): Promise<any> {
50+
if (!this.status) {
51+
return;
52+
}
53+
54+
if (!data[this.name]?.enabled) {
55+
data[this.name].events = [];
56+
} else {
57+
if (0 === data[this.name].events.length) {
58+
data[this.name].events = EventController.events;
59+
}
60+
}
61+
62+
await this.saveQueues(instanceName, data[this.name].events, data[this.name]?.enabled);
63+
64+
const payload: any = {
65+
where: {
66+
instanceId: this.monitor.waInstances[instanceName].instanceId,
67+
},
68+
update: {
69+
enabled: data[this.name]?.enabled,
70+
events: data[this.name].events,
71+
},
72+
create: {
73+
enabled: data[this.name]?.enabled,
74+
events: data[this.name].events,
75+
instanceId: this.monitor.waInstances[instanceName].instanceId,
76+
},
77+
};
78+
console.log('*** payload: ', payload);
79+
return this.prisma[this.name].upsert(payload);
80+
}
81+
4882
public async emit({
4983
instanceName,
5084
origin,
@@ -121,70 +155,90 @@ export class SqsController extends EventController implements EventControllerInt
121155
}
122156
}
123157

124-
public async initQueues(instanceName: string, events: string[]) {
125-
if (!events || !events.length) return;
158+
private async saveQueues(instanceName: string, events: string[], enable: boolean) {
159+
if (enable) {
160+
const eventsFinded = await this.listQueuesByInstance(instanceName);
161+
console.log('eventsFinded', eventsFinded);
126162

127-
const queues = events.map((event) => {
128-
return `${event.replace(/_/g, '_').toLowerCase()}`;
129-
});
163+
for (const event of events) {
164+
const normalizedEvent = event.toLowerCase();
165+
if (eventsFinded.includes(normalizedEvent)) {
166+
this.logger.info(`A queue para o evento "${normalizedEvent}" já existe. Ignorando criação.`);
167+
continue;
168+
}
130169

131-
queues.forEach((event) => {
132-
const queueName = `${instanceName}_${event}.fifo`;
170+
const queueName = `${instanceName}_${normalizedEvent}.fifo`;
133171

134-
this.sqs.createQueue(
135-
{
136-
QueueName: queueName,
137-
Attributes: {
138-
FifoQueue: 'true',
139-
},
140-
},
141-
(err, data) => {
142-
if (err) {
143-
this.logger.error(`Error creating queue ${queueName}: ${err.message}`);
144-
} else {
145-
this.logger.info(`Queue ${queueName} created: ${data.QueueUrl}`);
146-
}
147-
},
148-
);
149-
});
172+
try {
173+
const createCommand = new CreateQueueCommand({
174+
QueueName: queueName,
175+
Attributes: {
176+
FifoQueue: 'true',
177+
},
178+
});
179+
const data = await this.sqs.send(createCommand);
180+
this.logger.info(`Queue ${queueName} criada: ${data.QueueUrl}`);
181+
} catch (err: any) {
182+
this.logger.error(`Erro ao criar queue ${queueName}: ${err.message}`);
183+
}
184+
}
185+
}
150186
}
151187

152-
public async removeQueues(instanceName: string, events: any) {
153-
const eventsArray = Array.isArray(events) ? events.map((event) => String(event)) : [];
154-
if (!events || !eventsArray.length) return;
188+
private async listQueuesByInstance(instanceName: string) {
189+
let existingQueues: string[] = [];
190+
try {
191+
const listCommand = new ListQueuesCommand({
192+
QueueNamePrefix: `${instanceName}_`,
193+
});
194+
const listData = await this.sqs.send(listCommand);
195+
if (listData.QueueUrls && listData.QueueUrls.length > 0) {
196+
// Extrai o nome da fila a partir da URL
197+
existingQueues = listData.QueueUrls.map((queueUrl) => {
198+
const parts = queueUrl.split('/');
199+
return parts[parts.length - 1];
200+
});
201+
}
202+
} catch (error: any) {
203+
this.logger.error(`Erro ao listar filas para a instância ${instanceName}: ${error.message}`);
204+
return;
205+
}
155206

156-
const queues = eventsArray.map((event) => {
157-
return `${event.replace(/_/g, '_').toLowerCase()}`;
158-
});
207+
// Mapeia os eventos já existentes nas filas: remove o prefixo e o sufixo ".fifo"
208+
return existingQueues
209+
.map((queueName) => {
210+
// Espera-se que o nome seja `${instanceName}_${event}.fifo`
211+
if (queueName.startsWith(`${instanceName}_`) && queueName.endsWith('.fifo')) {
212+
return queueName.substring(instanceName.length + 1, queueName.length - 5).toLowerCase();
213+
}
214+
return '';
215+
})
216+
.filter((event) => event !== '');
217+
}
159218

160-
queues.forEach((event) => {
161-
const queueName = `${instanceName}_${event}.fifo`;
219+
private async removeQueuesByInstance(instanceName: string) {
220+
try {
221+
const listCommand = new ListQueuesCommand({
222+
QueueNamePrefix: `${instanceName}_`,
223+
});
224+
const listData = await this.sqs.send(listCommand);
162225

163-
this.sqs.getQueueUrl(
164-
{
165-
QueueName: queueName,
166-
},
167-
(err, data) => {
168-
if (err) {
169-
this.logger.error(`Error getting queue URL for ${queueName}: ${err.message}`);
170-
} else {
171-
const queueUrl = data.QueueUrl;
172-
173-
this.sqs.deleteQueue(
174-
{
175-
QueueUrl: queueUrl,
176-
},
177-
(deleteErr) => {
178-
if (deleteErr) {
179-
this.logger.error(`Error deleting queue ${queueName}: ${deleteErr.message}`);
180-
} else {
181-
this.logger.info(`Queue ${queueName} deleted`);
182-
}
183-
},
184-
);
185-
}
186-
},
187-
);
188-
});
226+
if (!listData.QueueUrls || listData.QueueUrls.length === 0) {
227+
this.logger.info(`No queues found for instance ${instanceName}`);
228+
return;
229+
}
230+
231+
for (const queueUrl of listData.QueueUrls) {
232+
try {
233+
const deleteCommand = new DeleteQueueCommand({ QueueUrl: queueUrl });
234+
await this.sqs.send(deleteCommand);
235+
this.logger.info(`Queue ${queueUrl} deleted`);
236+
} catch (err: any) {
237+
this.logger.error(`Error deleting queue ${queueUrl}: ${err.message}`);
238+
}
239+
}
240+
} catch (err: any) {
241+
this.logger.error(`Error listing queues for instance ${instanceName}: ${err.message}`);
242+
}
189243
}
190244
}

0 commit comments

Comments
 (0)