Skip to content

Commit 845d9f2

Browse files
Merge pull request #3843 from RedisInsight/feature/RI-6067-reset-rdi-pipelines
RI-6067 implemented reset/start/stop rdi functionality
2 parents 8955fa4 + 7a8169e commit 845d9f2

File tree

94 files changed

+2975
-322
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

94 files changed

+2975
-322
lines changed

redisinsight/api/src/__mocks__/rdi.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,15 @@ import { ApiRdiClient } from 'src/modules/rdi/client/api.rdi.client';
88
import { RdiEntity } from 'src/modules/rdi/entities/rdi.entity';
99
import { EncryptionStrategy } from 'src/modules/encryption/models';
1010
import { RdiDryRunJobDto } from 'src/modules/rdi/dto';
11+
import { sign } from 'jsonwebtoken';
1112

1213
export const mockRdiId = 'rdiId';
1314
export const mockRdiPasswordEncrypted = 'password_ENCRYPTED';
1415

1516
export const mockRdiPasswordPlain = 'some pass';
1617

18+
export const mockedRdiAccessToken = sign({ exp: Math.trunc(Date.now() / 1000) + 3600 }, 'test');
19+
1720
export class MockRdiClient extends ApiRdiClient {
1821
constructor(metadata: RdiClientMetadata, client: any = jest.fn()) {
1922
super(metadata, client);
@@ -31,6 +34,12 @@ export class MockRdiClient extends ApiRdiClient {
3134

3235
public deploy = jest.fn();
3336

37+
public startPipeline = jest.fn();
38+
39+
public stopPipeline = jest.fn();
40+
41+
public resetPipeline = jest.fn();
42+
3443
public deployJob = jest.fn();
3544

3645
public dryRunJob = jest.fn();

redisinsight/api/src/constants/custom-error-codes.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,7 @@ export enum CustomErrorCodes {
6262
RdiValidationError = 11_404,
6363
RdiNotFound = 11_405,
6464
RdiForbidden = 11_406,
65+
RdiResetPipelineFailure = 11_407,
66+
RdiStartPipelineFailure = 11_408,
67+
RdiStopPipelineFailure = 11_409,
6568
}

redisinsight/api/src/constants/error-messages.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ export default {
115115
AI_QUERY_MAX_TOKENS_RATE_LIMIT: 'Token count exceeds the conversation limit',
116116

117117
RDI_DEPLOY_PIPELINE_FAILURE: 'Failed to deploy pipeline',
118+
RDI_RESET_PIPELINE_FAILURE: 'Failed to reset pipeline',
119+
RDI_STOP_PIPELINE_FAILURE: 'Failed to stop pipeline',
120+
RDI_START_PIPELINE_FAILURE: 'Failed to start pipeline',
118121
RDI_TIMEOUT_ERROR: 'Encountered a timeout error while attempting to retrieve data',
119122
RDI_VALIDATION_ERROR: 'Validation error',
120123
INVALID_RDI_INSTANCE_ID: 'Invalid rdi instance id.',

redisinsight/api/src/modules/rdi/client/api.rdi.client.spec.ts

Lines changed: 89 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
import { sign } from 'jsonwebtoken';
1313
import { ApiRdiClient } from './api.rdi.client';
1414
import { RdiDyRunJobStatus, RdiPipeline, RdiStatisticsStatus } from '../models';
15-
import { RdiUrl, TOKEN_THRESHOLD } from '../constants';
15+
import { PipelineActions, RdiUrl, TOKEN_THRESHOLD } from '../constants';
1616

1717
const mockedAxios = axios as jest.Mocked<typeof axios>;
1818
jest.mock('axios');
@@ -164,6 +164,90 @@ describe('ApiRdiClient', () => {
164164
});
165165
});
166166

167+
describe('startPipeline', () => {
168+
it('should start the pipeline and poll for status', async () => {
169+
const actionId = '123';
170+
const postResponse = { data: { action_id: actionId } };
171+
const getResponse = {
172+
data: {
173+
status: 'completed',
174+
data: 'some data',
175+
error: '',
176+
},
177+
};
178+
179+
mockedAxios.post.mockResolvedValueOnce(postResponse);
180+
mockedAxios.get.mockResolvedValueOnce(getResponse);
181+
182+
const result = await client.startPipeline();
183+
184+
expect(mockedAxios.post).toHaveBeenCalledWith(RdiUrl.StartPipeline, expect.any(Object));
185+
expect(result).toEqual(getResponse.data.data);
186+
});
187+
188+
it('should throw an error if start pipeline fails', async () => {
189+
mockedAxios.post.mockRejectedValueOnce(mockRdiUnauthorizedError);
190+
191+
await expect(client.startPipeline()).rejects.toThrow(mockRdiUnauthorizedError.message);
192+
});
193+
});
194+
195+
describe('stopPipeline', () => {
196+
it('should stop the pipeline and poll for status', async () => {
197+
const actionId = '123';
198+
const postResponse = { data: { action_id: actionId } };
199+
const getResponse = {
200+
data: {
201+
status: 'completed',
202+
data: 'some data',
203+
error: '',
204+
},
205+
};
206+
207+
mockedAxios.post.mockResolvedValueOnce(postResponse);
208+
mockedAxios.get.mockResolvedValueOnce(getResponse);
209+
210+
const result = await client.stopPipeline();
211+
212+
expect(mockedAxios.post).toHaveBeenCalledWith(RdiUrl.StopPipeline, expect.any(Object));
213+
expect(result).toEqual(getResponse.data.data);
214+
});
215+
216+
it('should throw an error if stop pipeline fails', async () => {
217+
mockedAxios.post.mockRejectedValueOnce(mockRdiUnauthorizedError);
218+
219+
await expect(client.stopPipeline()).rejects.toThrow(mockRdiUnauthorizedError.message);
220+
});
221+
});
222+
223+
describe('resetPipeline', () => {
224+
it('should reset the pipeline and poll for status', async () => {
225+
const actionId = '123';
226+
const postResponse = { data: { action_id: actionId } };
227+
const getResponse = {
228+
data: {
229+
status: 'completed',
230+
data: 'some data',
231+
error: '',
232+
},
233+
};
234+
235+
mockedAxios.post.mockResolvedValueOnce(postResponse);
236+
mockedAxios.get.mockResolvedValueOnce(getResponse);
237+
238+
const result = await client.resetPipeline();
239+
240+
expect(mockedAxios.post).toHaveBeenCalledWith(RdiUrl.ResetPipeline, expect.any(Object));
241+
expect(result).toEqual(getResponse.data.data);
242+
});
243+
244+
it('should throw an error if reset pipeline fails', async () => {
245+
mockedAxios.post.mockRejectedValueOnce(mockRdiUnauthorizedError);
246+
247+
await expect(client.resetPipeline()).rejects.toThrow(mockRdiUnauthorizedError.message);
248+
});
249+
});
250+
167251
describe('dryRunJob', () => {
168252
it('should call the RDI client with the correct URL and data', async () => {
169253
const mockResponse = {
@@ -332,7 +416,7 @@ describe('ApiRdiClient', () => {
332416
it('should return response data on success', async () => {
333417
mockedAxios.get.mockResolvedValueOnce({ data: { status: 'completed', data: responseData } });
334418

335-
const result = await client['pollActionStatus'](actionId);
419+
const result = await client['pollActionStatus'](actionId, PipelineActions.Deploy);
336420

337421
expect(mockedAxios.get).toHaveBeenCalledWith(`${RdiUrl.Action}/${actionId}`, { signal: undefined });
338422
expect(result).toEqual(responseData);
@@ -341,13 +425,14 @@ describe('ApiRdiClient', () => {
341425
it('should throw an error if action status is failed', async () => {
342426
mockedAxios.get.mockResolvedValueOnce({ data: { status: 'failed', error: { message: 'Test error' } } });
343427

344-
await expect(client['pollActionStatus'](actionId)).rejects.toThrow('Test error');
428+
await expect(client['pollActionStatus'](actionId, PipelineActions.Deploy)).rejects.toThrow('Test error');
345429
});
346430

347431
it('should throw an error if an error occurs during polling', async () => {
348432
mockedAxios.get.mockRejectedValueOnce(mockRdiUnauthorizedError);
349433

350-
await expect(client['pollActionStatus'](actionId)).rejects.toThrow(mockRdiUnauthorizedError.message);
434+
await expect(client['pollActionStatus'](actionId, PipelineActions.Deploy))
435+
.rejects.toThrow(mockRdiUnauthorizedError.message);
351436
expect(mockedAxios.get).toHaveBeenCalledWith(`${RdiUrl.Action}/${actionId}`, { signal: undefined });
352437
});
353438
});

redisinsight/api/src/modules/rdi/client/api.rdi.client.ts

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import axios, { AxiosInstance } from 'axios';
22
import { plainToClass } from 'class-transformer';
3-
import { decode } from 'jsonwebtoken';
43

54
import { RdiClient } from 'src/modules/rdi/client/rdi.client';
65
import {
@@ -10,6 +9,7 @@ import {
109
POLLING_INTERVAL,
1110
MAX_POLLING_TIME,
1211
WAIT_BEFORE_POLLING,
12+
PipelineActions,
1313
} from 'src/modules/rdi/constants';
1414
import {
1515
RdiDryRunJobDto,
@@ -33,6 +33,9 @@ import { convertKeysToCamelCase } from 'src/utils/base.helper';
3333
import { RdiPipelineTimeoutException } from 'src/modules/rdi/exceptions/rdi-pipeline.timeout-error.exception';
3434
import * as https from 'https';
3535
import { convertApiDataToRdiPipeline, convertRdiPipelineToApiPayload } from 'src/modules/rdi/utils/pipeline.util';
36+
import { RdiResetPipelineFailedException } from '../exceptions/rdi-reset-pipeline-failed.exception';
37+
import { RdiStartPipelineFailedException } from '../exceptions/rdi-start-pipeline-failed.exception';
38+
import { RdiStopPipelineFailedException } from '../exceptions/rdi-stop-pipeline-failed.exception';
3639

3740
export class ApiRdiClient extends RdiClient {
3841
protected readonly client: AxiosInstance;
@@ -113,7 +116,46 @@ export class ApiRdiClient extends RdiClient {
113116

114117
const actionId = response.data.action_id;
115118

116-
return await this.pollActionStatus(actionId);
119+
return await this.pollActionStatus(actionId, PipelineActions.Deploy);
120+
} catch (e) {
121+
throw wrapRdiPipelineError(e);
122+
}
123+
}
124+
125+
async stopPipeline(): Promise<void> {
126+
try {
127+
const response = await this.client.post(
128+
RdiUrl.StopPipeline, {},
129+
);
130+
const actionId = response.data.action_id;
131+
132+
return await this.pollActionStatus(actionId, PipelineActions.Stop);
133+
} catch (e) {
134+
throw wrapRdiPipelineError(e);
135+
}
136+
}
137+
138+
async startPipeline(): Promise<void> {
139+
try {
140+
const response = await this.client.post(
141+
RdiUrl.StartPipeline, {},
142+
);
143+
const actionId = response.data.action_id;
144+
145+
return await this.pollActionStatus(actionId, PipelineActions.Start);
146+
} catch (e) {
147+
throw wrapRdiPipelineError(e);
148+
}
149+
}
150+
151+
async resetPipeline(): Promise<void> {
152+
try {
153+
const response = await this.client.post(
154+
RdiUrl.ResetPipeline, {},
155+
);
156+
const actionId = response.data.action_id;
157+
158+
return await this.pollActionStatus(actionId, PipelineActions.Reset);
117159
} catch (e) {
118160
throw wrapRdiPipelineError(e);
119161
}
@@ -199,7 +241,7 @@ export class ApiRdiClient extends RdiClient {
199241
}
200242
}
201243

202-
private async pollActionStatus(actionId: string, abortSignal?: AbortSignal): Promise<any> {
244+
private async pollActionStatus(actionId: string, action: PipelineActions, abortSignal?: AbortSignal): Promise<any> {
203245
await new Promise((resolve) => setTimeout(resolve, WAIT_BEFORE_POLLING));
204246

205247
const startTime = Date.now();
@@ -220,7 +262,18 @@ export class ApiRdiClient extends RdiClient {
220262
const { status, data, error } = response.data;
221263

222264
if (status === 'failed') {
223-
throw new RdiPipelineDeployFailedException(error?.message);
265+
switch (action) {
266+
case PipelineActions.Deploy:
267+
throw new RdiPipelineDeployFailedException(error?.message);
268+
case PipelineActions.Reset:
269+
throw new RdiResetPipelineFailedException(error?.message);
270+
case PipelineActions.Start:
271+
throw new RdiStartPipelineFailedException(error?.message);
272+
case PipelineActions.Stop:
273+
throw new RdiStopPipelineFailedException(error?.message);
274+
default:
275+
throw new RdiPipelineDeployFailedException(error?.message);
276+
}
224277
}
225278

226279
if (status === 'completed') {

redisinsight/api/src/modules/rdi/client/rdi.client.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ export abstract class RdiClient {
3535

3636
abstract deploy(pipeline: RdiPipeline): Promise<void>;
3737

38+
abstract stopPipeline(): Promise<void>;
39+
40+
abstract startPipeline(): Promise<void>;
41+
42+
abstract resetPipeline(): Promise<void>;
43+
3844
abstract dryRunJob(data: RdiDryRunJobDto): Promise<RdiDryRunJobResponseDto>;
3945

4046
abstract testConnections(config: object): Promise<RdiTestConnectionsResponseDto>;

redisinsight/api/src/modules/rdi/constants/index.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ export enum RdiUrl {
88
DryRunJob = 'api/v1/pipelines/jobs/dry-run',
99
JobFunctions = '/api/v1/pipelines/jobs/functions',
1010
Deploy = 'api/v1/pipelines',
11+
StopPipeline = 'api/v1/pipelines/stop',
12+
StartPipeline = 'api/v1/pipelines/start',
13+
ResetPipeline = 'api/v1/pipelines/reset',
1114
TestConnections = 'api/v1/pipelines/targets/dry-run',
1215
GetStatistics = 'api/v1/monitoring/statistics',
1316
GetPipelineStatus = 'api/v1/status',
@@ -22,3 +25,10 @@ export const RDI_SYNC_INTERVAL = 5 * 60 * 1_000; // 5 min
2225
export const POLLING_INTERVAL = 1_000;
2326
export const MAX_POLLING_TIME = 2 * 60 * 1000; // 2 min
2427
export const WAIT_BEFORE_POLLING = 1_000;
28+
29+
export enum PipelineActions {
30+
Deploy = 'Deploy',
31+
Reset = 'Reset',
32+
Start = 'Start',
33+
Stop = 'Stop',
34+
}

redisinsight/api/src/modules/rdi/exceptions/rdi-pipiline.error.handler.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@ import {
77
import { RdiPipelineForbiddenException } from './rdi-pipeline.forbidden.exception';
88

99
export const parseErrorMessage = (error: AxiosError<any>): string => {
10-
const detail = error.response?.data?.detail;
10+
const data = error.response?.data;
11+
if (typeof data === 'string') {
12+
return data;
13+
}
14+
15+
const detail = data?.detail;
1116
if (!detail) return error.message;
1217

1318
if (typeof detail === 'string') return detail;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import ERROR_MESSAGES from 'src/constants/error-messages';
2+
import { CustomErrorCodes } from 'src/constants';
3+
import { HttpStatus } from '@nestjs/common';
4+
import { RdiResetPipelineFailedException } from './rdi-reset-pipeline-failed.exception';
5+
6+
describe('RdiResetPipelineFailedException', () => {
7+
it('should create an exception with default message and status code', () => {
8+
const exception = new RdiResetPipelineFailedException();
9+
expect(exception.message).toEqual(ERROR_MESSAGES.RDI_RESET_PIPELINE_FAILURE);
10+
expect(exception.getStatus()).toEqual(HttpStatus.BAD_REQUEST);
11+
expect(exception.getResponse()).toEqual({
12+
message: ERROR_MESSAGES.RDI_RESET_PIPELINE_FAILURE,
13+
statusCode: HttpStatus.BAD_REQUEST,
14+
error: 'RdiResetPipelineFailed',
15+
errorCode: CustomErrorCodes.RdiResetPipelineFailure,
16+
errors: [undefined],
17+
});
18+
});
19+
20+
it('should create an exception with custom message and error', () => {
21+
const customMessage = 'Custom error message';
22+
const customError = 'Custom error';
23+
const exception = new RdiResetPipelineFailedException(customMessage, { error: customError });
24+
expect(exception.message).toEqual(customMessage);
25+
expect(exception.getStatus()).toEqual(HttpStatus.BAD_REQUEST);
26+
expect(exception.getResponse()).toEqual({
27+
message: customMessage,
28+
statusCode: HttpStatus.BAD_REQUEST,
29+
error: 'RdiResetPipelineFailed',
30+
errorCode: CustomErrorCodes.RdiResetPipelineFailure,
31+
errors: [customError],
32+
});
33+
});
34+
});
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import { HttpException, HttpExceptionOptions, HttpStatus } from '@nestjs/common';
2+
import ERROR_MESSAGES from 'src/constants/error-messages';
3+
import { CustomErrorCodes } from 'src/constants';
4+
5+
export class RdiResetPipelineFailedException extends HttpException {
6+
constructor(
7+
message = ERROR_MESSAGES.RDI_RESET_PIPELINE_FAILURE,
8+
options?: HttpExceptionOptions & { error?: string },
9+
) {
10+
const response = {
11+
message,
12+
statusCode: HttpStatus.BAD_REQUEST,
13+
error: 'RdiResetPipelineFailed',
14+
errorCode: CustomErrorCodes.RdiResetPipelineFailure,
15+
errors: [options?.error],
16+
};
17+
18+
super(response, response.statusCode, options);
19+
}
20+
}

0 commit comments

Comments
 (0)