Skip to content

Commit c219e70

Browse files
committed
fix: add timeouts
1 parent dd5035f commit c219e70

File tree

6 files changed

+59
-27
lines changed

6 files changed

+59
-27
lines changed

package-lock.json

Lines changed: 0 additions & 17 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"lint": "npx eslint --fix --quiet \"{src,apps,libs,test}/**/*.ts\"",
1515
"lint:check": "npx eslint \"{src,apps,libs,test}/**/*.ts\"",
1616
"start": "nest start",
17-
"start:dev": "npx dotenv nest start --watch",
17+
"start:dev": "nest start --watch",
1818
"start:debug": "nest start --debug --watch",
1919
"start:prod": "node dist/main",
2020
"test": "jest",
@@ -60,7 +60,6 @@
6060
"@types/supertest": "^6.0.2",
6161
"@typescript-eslint/eslint-plugin": "^8.18.2",
6262
"@typescript-eslint/parser": "^8.18.2",
63-
"dotenv-cli": "^8.0.0",
6463
"eslint": "^9.17.0",
6564
"eslint-config-prettier": "^9.1.0",
6665
"eslint-plugin-prettier": "^5.2.1",

src/__saga__/models/saga.model.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ import { SagaStep } from './saga-step.model';
1010
import { DomainMessage } from 'src/__lib__/domain-message';
1111
import { MessageOrmEntity } from 'src/__relay__/message.orm-entity';
1212

13+
export enum SagaStatusEnum {
14+
PENDING = 'PENDING',
15+
COMPENSATION = 'COMPENSATION',
16+
COMPLETE = 'COMPLETE',
17+
}
18+
1319
@Entity('sagas')
1420
export class Saga {
1521
@PrimaryGeneratedColumn('uuid')
@@ -22,7 +28,7 @@ export class Saga {
2228
sagaType: string;
2329

2430
@Column()
25-
status: string; // InProgress, Completed, Compensated, Failed
31+
status: SagaStatusEnum;
2632

2733
@CreateDateColumn()
2834
startedAt: Date;
@@ -33,6 +39,12 @@ export class Saga {
3339
@Column({ default: 0 })
3440
retries: number;
3541

42+
@CreateDateColumn({ type: 'timestamptz' }) // Добавляем метку времени создания
43+
createdAt: Date;
44+
45+
@UpdateDateColumn({ type: 'timestamptz' }) // Добавляем метку времени обновления
46+
updatedAt: Date;
47+
3648
@Column({ nullable: true })
3749
currentStep: string;
3850

@@ -64,6 +76,7 @@ export class Saga {
6476

6577
const newStep = new SagaStep();
6678
newStep.createCompensationStep(event);
79+
this.status = SagaStatusEnum.COMPENSATION;
6780

6881
if (this.steps === undefined) {
6982
this.steps = [];
@@ -76,6 +89,8 @@ export class Saga {
7689
const newStep = new SagaStep();
7790
newStep.createCompleteStep(event);
7891

92+
this.status = SagaStatusEnum.COMPLETE;
93+
7994
if (this.steps === undefined) {
8095
this.steps = [];
8196
}

src/__saga__/registrator.controller.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { RegistatorService } from './registrator.service';
33
import { DomainMessage } from 'src/__lib__/domain-message';
44
import { config } from 'src/config';
55
import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
6+
import { Cron } from '@nestjs/schedule';
67

78
export class CreateSagaDto {
89
correlationId: string;
@@ -25,4 +26,9 @@ export class RegistatorController {
2526
async createSaga(@Body() payload: CreateSagaDto) {
2627
return this.registatorService.createSaga(payload.correlationId);
2728
}
29+
30+
@Cron('* * * * *')
31+
async timeOut() {
32+
await this.registatorService.checkSagaTimeout();
33+
}
2834
}

src/__saga__/registrator.repository.ts

Lines changed: 0 additions & 4 deletions
This file was deleted.

src/__saga__/registrator.service.ts

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import { Injectable, Logger } from '@nestjs/common';
22
import { InjectDataSource, InjectRepository } from '@nestjs/typeorm';
3-
import { DataSource, Repository } from 'typeorm';
4-
import { Saga } from './models/saga.model';
3+
import { DataSource, LessThanOrEqual, Repository } from 'typeorm';
4+
import { Saga, SagaStatusEnum } from './models/saga.model';
55
import { DomainMessage } from 'src/__lib__/domain-message';
6+
import { SagaCompensationEvent } from './saga-compensation.event';
7+
import { randomUUID } from 'crypto';
68

79
@Injectable()
810
export class RegistatorService {
@@ -44,6 +46,37 @@ export class RegistatorService {
4446
return saved;
4547
}
4648

49+
async checkSagaTimeout() {
50+
const twelveHoursAgo = new Date(Date.now() - 12 * 60 * 60 * 1000);
51+
const sagas = await this.sagaRepository.find({
52+
where: {
53+
status: SagaStatusEnum.PENDING,
54+
updatedAt: LessThanOrEqual(twelveHoursAgo),
55+
},
56+
take: 10,
57+
});
58+
59+
const allMessages = [];
60+
for (const saga of sagas) {
61+
const messages = saga.compensate(
62+
new SagaCompensationEvent({
63+
aggregateId: saga.id,
64+
saga: {
65+
correlationId: randomUUID(),
66+
sagaId: saga.id,
67+
},
68+
}),
69+
);
70+
for (const msg of messages) {
71+
allMessages.push(msg);
72+
}
73+
}
74+
return this.dataSource.transaction(async (transactionalEntityManager) => {
75+
await transactionalEntityManager.save(allMessages);
76+
return transactionalEntityManager.save(sagas);
77+
});
78+
}
79+
4780
async createSaga(correlationId: string): Promise<string> {
4881
this.logger.debug(`Create saga for correlationId ${correlationId}`);
4982
const saga = await this.sagaRepository.findOne({
@@ -55,7 +88,7 @@ export class RegistatorService {
5588
const newSaga = new Saga();
5689
newSaga.correlationId = correlationId;
5790
newSaga.sagaType = 'compensation';
58-
newSaga.status = 'In Progress';
91+
newSaga.status = SagaStatusEnum.PENDING;
5992
newSaga.steps = [];
6093
const created = await this.sagaRepository.save(newSaga);
6194
return created.id;

0 commit comments

Comments
 (0)