Skip to content

Commit 589a084

Browse files
Merge pull request #3455 from RedisInsight/be/feature/RI-5803_abort_controller
#RI-5803 - add abort controller
2 parents c411b8a + d4c9280 commit 589a084

File tree

4 files changed

+31
-12
lines changed

4 files changed

+31
-12
lines changed

redisinsight/api/src/modules/rdi/client/api.rdi.client.ts

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import axios, { AxiosInstance } from 'axios';
22
import { plainToClass } from 'class-transformer';
33
import { decode } from 'jsonwebtoken';
4+
import { Request } from 'express';
45

56
import { RdiClient } from 'src/modules/rdi/client/rdi.client';
67
import {
@@ -102,13 +103,21 @@ export class ApiRdiClient extends RdiClient {
102103
}
103104
}
104105

105-
async testConnections(config: string): Promise<RdiTestConnectionsResponseDto> {
106+
async testConnections(config: string, req: Request): Promise<RdiTestConnectionsResponseDto> {
106107
try {
107-
const response = await this.client.post(RdiUrl.TestConnections, config);
108+
const abortController = new AbortController();
109+
req.socket.on('close', () => {
110+
abortController.abort();
111+
});
112+
const response = await this.client.post(
113+
RdiUrl.TestConnections,
114+
config,
115+
{ signal: abortController.signal },
116+
);
108117

109118
const actionId = response.data.action_id;
110119

111-
return this.pollActionStatus(actionId);
120+
return this.pollActionStatus(actionId, abortController.signal);
112121
} catch (e) {
113122
throw wrapRdiPipelineError(e);
114123
}
@@ -169,15 +178,21 @@ export class ApiRdiClient extends RdiClient {
169178
}
170179
}
171180

172-
private async pollActionStatus(actionId: string): Promise<any> {
181+
private async pollActionStatus(actionId: string, abortSignal: AbortSignal): Promise<any> {
173182
const startTime = Date.now();
174183
while (true) {
184+
if (abortSignal.aborted) {
185+
throw new RdiPipelineInternalServerErrorException();
186+
}
175187
if (Date.now() - startTime > MAX_POLLING_TIME) {
176188
throw new RdiPipelineTimeoutException();
177189
}
178190

179191
try {
180-
const response = await this.client.get(`${RdiUrl.Action}/${actionId}`);
192+
const response = await this.client.get(
193+
`${RdiUrl.Action}/${actionId}`,
194+
{ signal: abortSignal },
195+
);
181196
const { status, data, error } = response.data;
182197

183198
if (status === 'failed') {

redisinsight/api/src/modules/rdi/client/rdi.client.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
import { Request } from 'express';
12
import {
23
Rdi,
3-
RdiClientMetadata, RdiJob, RdiPipeline, RdiStatisticsResult,
4+
RdiClientMetadata, RdiPipeline, RdiStatisticsResult,
45
} from 'src/modules/rdi/models';
56
import {
67
RdiDryRunJobDto, RdiDryRunJobResponseDto, RdiTestConnectionsResponseDto,
@@ -36,7 +37,7 @@ export abstract class RdiClient {
3637

3738
abstract dryRunJob(data: RdiDryRunJobDto): Promise<RdiDryRunJobResponseDto>;
3839

39-
abstract testConnections(config: string): Promise<RdiTestConnectionsResponseDto>;
40+
abstract testConnections(config: string, req: Request): Promise<RdiTestConnectionsResponseDto>;
4041

4142
abstract getStatistics(sections?: string): Promise<RdiStatisticsResult>;
4243

redisinsight/api/src/modules/rdi/rdi-pipeline.controller.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import {
22
Body,
33
ClassSerializerInterceptor, Controller, Get, Post, UseInterceptors, UsePipes, ValidationPipe,
4-
Query,
4+
Query, Req,
55
} from '@nestjs/common';
66
import { Rdi, RdiPipeline, RdiClientMetadata } from 'src/modules/rdi/models';
77
import { ApiTags } from '@nestjs/swagger';
8+
import { Request } from 'express';
89
import { ApiEndpoint } from 'src/decorators/api-endpoint.decorator';
910
import { RdiPipelineService } from 'src/modules/rdi/rdi-pipeline.service';
1011
import { RequestRdiClientMetadata } from 'src/modules/rdi/decorators';
@@ -72,10 +73,11 @@ export class RdiPipelineController {
7273
responses: [{ status: 200, type: RdiTestConnectionsResponseDto }],
7374
})
7475
async testConnections(
75-
@RequestRdiClientMetadata() rdiClientMetadata: RdiClientMetadata,
76+
@Req() req: Request,
77+
@RequestRdiClientMetadata() rdiClientMetadata: RdiClientMetadata,
7678
@Body() config: string,
7779
): Promise<RdiTestConnectionsResponseDto> {
78-
return this.rdiPipelineService.testConnections(rdiClientMetadata, config);
80+
return this.rdiPipelineService.testConnections(rdiClientMetadata, config, req);
7981
}
8082

8183
@Get('/strategies')

redisinsight/api/src/modules/rdi/rdi-pipeline.service.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Injectable, Logger } from '@nestjs/common';
2+
import { Request } from 'express';
23
import { RdiClientMetadata, RdiPipeline } from 'src/modules/rdi/models';
34
import { RdiClientProvider } from 'src/modules/rdi/providers/rdi.client.provider';
45
import { RdiDryRunJobDto, RdiTestConnectionsResponseDto } from 'src/modules/rdi/dto';
@@ -69,12 +70,12 @@ export class RdiPipelineService {
6970
}
7071
}
7172

72-
async testConnections(rdiClientMetadata: RdiClientMetadata, config: string): Promise<RdiTestConnectionsResponseDto> {
73+
async testConnections(rdiClientMetadata: RdiClientMetadata, config: string, req: Request): Promise<RdiTestConnectionsResponseDto> {
7374
this.logger.log('Trying to test connections');
7475

7576
const client = await this.rdiClientProvider.getOrCreate(rdiClientMetadata);
7677

77-
return await client.testConnections(config);
78+
return await client.testConnections(config, req);
7879
}
7980

8081
async getStrategies(rdiClientMetadata: RdiClientMetadata): Promise<object> {

0 commit comments

Comments
 (0)