Skip to content

Commit dd4b7fb

Browse files
committed
feat(CronJob|SQS): adds a cronjob to listen SQS and insert person payload into queue
1 parent a9a0a20 commit dd4b7fb

File tree

7 files changed

+114
-35
lines changed

7 files changed

+114
-35
lines changed

src/api/modules/v1/components/Commands/Slinger.ts

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ import { FileService } from '$service/file/FileService.ts';
22
import { PersonService } from "$service/person/PersonService.ts";
33
import {
44
everyMinute,
5-
every15Minute,
6-
start
5+
hourly,
6+
cron,
7+
start,
78
} from '$deps';
89

910
export class Slinger {
@@ -12,21 +13,23 @@ export class Slinger {
1213
}
1314

1415
private init() {
15-
// this.handleDisareByMinute();
16-
this.handleDispareByFiftyMinute();
16+
this.dispareFiles();
17+
this.disparePersonIntoQueue();
18+
this.dispareFilesPayloadIntoQueue();
1719
start();
20+
this.init();
21+
}
22+
23+
private dispareFiles() {
24+
cron("* * * * * *", async () => await new FileService().listenFilesFromDB());
1825
}
1926

20-
private handleDisareByMinute() {
21-
everyMinute(async () => {
22-
await new FileService().listenFiles();
23-
});
27+
private disparePersonIntoQueue() {
28+
cron("* * * * * *", async () => await new PersonService().listenAndInsertPersonFromQueue());
2429
}
2530

26-
private handleDispareByFiftyMinute() {
27-
everyMinute(async () => {
28-
await new PersonService().listenAndInsertPersonFromQueue();
29-
});
31+
private dispareFilesPayloadIntoQueue() {
32+
cron("* * * * * *", async () => await new FileService().handlerPersonFromObjectIntoSQS());
3033
}
3134
}
3235

src/api/modules/v1/repository/File/FileRespository.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,27 @@ class FileRepository extends File {
5555
);
5656
}
5757

58+
public async pendingFilesByName(name: string) {
59+
return this.foundPendingFileByName(name);
60+
}
61+
62+
private async foundPendingFileByName(name: string): Promise<Array<Partial<IFileDTO>>> {
63+
return this.mysql.buildQuery(
64+
`
65+
SELECT
66+
id,
67+
name,
68+
CASE
69+
WHEN status = 0 THEN 'PENDING'
70+
WHEN status = 1 THEN 'LAUNCHED'
71+
END as status
72+
FROM ${this.table}
73+
WHERE name = ?
74+
AND status = ?;
75+
`, [name, FilenameEnum.PENDING]
76+
)
77+
}
78+
5879
public updatedAfterListenAll(id: string) {
5980
return this.updateStatusAfterListenAll(id);
6081
}

src/api/modules/v1/services/file/FileService.ts

Lines changed: 67 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,28 @@ import {
44
FilenameEnum,
55
IFileDTO,
66
IFileService,
7+
log,
78
} from "$common";
89
import { FileRespository } from "$repositories";
910
import { PersonService } from "$service/person/PersonService.ts";
11+
import { IObjectS3DTO, IPersonDTO } from '$interfaces';
12+
import { SimpleQueueService } from "$component/AWS/sqs.component.ts";
1013

1114
export class FileService implements IFileService {
12-
private s3: typeof S3;
1315
private fileRepository: typeof FileRespository;
14-
private personService: PersonService = new PersonService()
16+
private personService: PersonService;
17+
private s3: typeof S3;
18+
private sQs: SimpleQueueService;
19+
private status = {
20+
"PENDING": "PENDING",
21+
"LAUNCHED": "LAUNCHED"
22+
};
1523

1624
constructor() {
1725
this.s3 = S3;
26+
this.sQs = new SimpleQueueService();
1827
this.fileRepository = FileRespository;
28+
this.personService = new PersonService();
1929
}
2030

2131
public async handlerFilesPerson(files: Array<FormDataFile>): Promise<string> {
@@ -57,40 +67,76 @@ export class FileService implements IFileService {
5767
return this.fileRepository.list();
5868
}
5969

60-
public async listenFiles() {
61-
const files = await this.fileRepository.pendingFiles();
70+
public async listenFilesFromDB() {
71+
const filesPending = await this.fileRepository.pendingFiles();
6272

63-
if (!files) {
73+
if (!filesPending) {
6474
return;
6575
}
6676

67-
const pendings = files.filter(
68-
file => file.status === FilenameEnum.PENDING
69-
);
77+
for (const filePending of filesPending) {
78+
await this.personService.listenAndCreatePerson(filePending.name);
79+
await this.fileRepository.updatedAfterListenAll(filePending.id)
7080

71-
for (const pending of pendings) {
72-
if (pending.status === FilenameEnum.PENDING) {
73-
this.personService.listenAndCreatePerson(pending.name);
74-
await this.fileRepository.updatedAfterListenAll(pending.id)
75-
continue;
76-
};
81+
continue;
7782
}
7883

79-
return files;
84+
return filesPending;
8085
}
8186

82-
public async listAllObjectsFromBucket() {
87+
public async listAllObjectsFromBucket(): Promise<Array<IObjectS3DTO>> {
8388
const objects = await this.s3.listObjects() as Array<_Object>;
8489

85-
const objectsFiltered = objects.map(object => {
86-
return {
87-
name: object.Key,
88-
size: object.Size,
89-
}
90+
const objectsFiltered: Array<IObjectS3DTO> = objects.map(object => {
91+
return {
92+
name: object.Key,
93+
size: object.Size,
94+
}
9095
});
9196

9297
return objectsFiltered;
9398
}
99+
100+
public async handlerPersonFromObjectIntoSQS() {
101+
const objects = await this.listAllObjectsFromBucket();
102+
103+
const personList: Partial<IPersonDTO>[] = []
104+
105+
for (const object of objects) {
106+
const pendingsFiles = await this.fileRepository.pendingFilesByName(object.name as string);
107+
108+
if (!pendingsFiles) {
109+
return;
110+
}
111+
112+
pendingsFiles.forEach(async file => {
113+
if (String(file.status) !== this.status.PENDING) {
114+
log.success("Done process with success!");
115+
return;
116+
}
117+
118+
const getS3Object = await this.s3.readFileFromS3(object.name as string) as Array<string>;
119+
120+
for (const result of getS3Object) {
121+
const [name, age, sex, size, weight] = result.split(',');
122+
const person: Partial<IPersonDTO> = {
123+
name,
124+
age: Number(age),
125+
sex,
126+
size: Number(size),
127+
weight: Number(weight)
128+
}
129+
130+
await this.sQs.handleQueue(person);
131+
continue;
132+
}
133+
134+
await this.fileRepository.updatedAfterListenAll(file.id as string);
135+
});
136+
}
137+
138+
return personList;
139+
}
94140
}
95141

96142
export default new FileService()

src/api/modules/v1/services/person/PersonService.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,15 @@ export class PersonService {
114114
const personObject: Partial<IPersonDTO> = JSON.parse(payload);
115115

116116
await this.personRepository.createPersons(personObject);
117-
await this.fileRepository.updatedAfterListenAll();
117+
118118
person.push(personObject);
119119
continue;
120120
}
121121

122+
files.forEach(
123+
async file => await this.fileRepository.updatedAfterListenAll(file.id as string)
124+
)
125+
122126
return person;
123127
}
124128
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
export interface IObjectS3DTO {
2+
name: string | null | undefined;
3+
size: number | null | undefined;
4+
}

src/common/interfaces/IPerson.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { PersonEnum } from "../enums/index.ts";
22

33
export interface IPersonDTO {
4-
id: string;
4+
id?: string;
55
name: string;
66
age: number;
77
sex: string;

src/common/interfaces/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ export * from "./IPerson.ts";
22
export * from "./IFleService.ts";
33
export * from "./IFIleController.ts";
44
export * from './IFileDTO.ts';
5+
export * from './IObjectS3DTO.ts';

0 commit comments

Comments
 (0)