Skip to content

Commit 2d7d05f

Browse files
authored
Merge pull request #1961 from RedisInsight/feature/RI-4290-upload_data_in_bulk
Feature/ri 4290 upload data in bulk
2 parents cf97e38 + def29a1 commit 2d7d05f

File tree

59 files changed

+12327
-461
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+12327
-461
lines changed

redisinsight/api/src/__mocks__/redis.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import IORedis from 'ioredis';
22

3-
const getRedisCommanderMockFunctions = () => ({
3+
export const mockIORedisClientExec = jest.fn();
4+
const getRedisCommanderMockFunctions = jest.fn(() => ({
45
sendCommand: jest.fn(),
56
info: jest.fn(),
67
monitor: jest.fn(),
@@ -12,9 +13,11 @@ const getRedisCommanderMockFunctions = () => ({
1213
unsubscribe: jest.fn(),
1314
punsubscribe: jest.fn(),
1415
publish: jest.fn(),
16+
pipeline: jest.fn().mockReturnThis(),
17+
exec: mockIORedisClientExec,
1518
cluster: jest.fn(),
1619
quit: jest.fn(),
17-
});
20+
}));
1821

1922
export const mockIORedisClient = {
2023
...Object.create(IORedis.prototype),

redisinsight/api/src/app.routes.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { SlowLogModule } from 'src/modules/slow-log/slow-log.module';
66
import { PubSubModule } from 'src/modules/pub-sub/pub-sub.module';
77
import { ClusterMonitorModule } from 'src/modules/cluster-monitor/cluster-monitor.module';
88
import { DatabaseAnalysisModule } from 'src/modules/database-analysis/database-analysis.module';
9+
import { BulkActionsModule } from 'src/modules/bulk-actions/bulk-actions.module';
910

1011
export const routes: Routes = [
1112
{
@@ -39,6 +40,10 @@ export const routes: Routes = [
3940
path: '/:dbInstance',
4041
module: DatabaseAnalysisModule,
4142
},
43+
{
44+
path: '/:dbInstance',
45+
module: BulkActionsModule,
46+
},
4247
],
4348
},
4449
];

redisinsight/api/src/modules/bulk-actions/bulk-actions-analytics.service.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ export class BulkActionsAnalyticsService extends TelemetryBaseService {
3131
type: overview.type,
3232
duration: overview.duration,
3333
filter: {
34-
match: overview.filter.match === '*' ? '*' : 'PATTERN',
35-
type: overview.filter.type,
34+
match: overview.filter?.match === '*' ? '*' : 'PATTERN',
35+
type: overview.filter?.type,
3636
},
3737
progress: {
38-
scanned: overview.progress.scanned,
39-
total: overview.progress.total,
38+
scanned: overview.progress?.scanned,
39+
total: overview.progress?.total,
4040
},
4141
},
4242
);
@@ -54,16 +54,16 @@ export class BulkActionsAnalyticsService extends TelemetryBaseService {
5454
type: overview.type,
5555
duration: overview.duration,
5656
filter: {
57-
match: overview.filter.match === '*' ? '*' : 'PATTERN',
58-
type: overview.filter.type,
57+
match: overview.filter?.match === '*' ? '*' : 'PATTERN',
58+
type: overview.filter?.type,
5959
},
6060
progress: {
61-
scanned: overview.progress.scanned,
62-
total: overview.progress.total,
61+
scanned: overview.progress?.scanned,
62+
total: overview.progress?.total,
6363
},
6464
summary: {
65-
processed: overview.summary.processed,
66-
succeed: overview.summary.succeed,
65+
processed: overview.summary?.processed,
66+
succeed: overview.summary?.succeed,
6767
failed: overview.summary.failed,
6868
},
6969
},

redisinsight/api/src/modules/bulk-actions/bulk-actions.module.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,17 @@ import { BulkActionsService } from 'src/modules/bulk-actions/bulk-actions.servic
33
import { BulkActionsProvider } from 'src/modules/bulk-actions/providers/bulk-actions.provider';
44
import { BulkActionsGateway } from 'src/modules/bulk-actions/bulk-actions.gateway';
55
import { BulkActionsAnalyticsService } from 'src/modules/bulk-actions/bulk-actions-analytics.service';
6+
import { BulkImportController } from 'src/modules/bulk-actions/bulk-import.controller';
7+
import { BulkImportService } from 'src/modules/bulk-actions/bulk-import.service';
68

79
@Module({
10+
controllers: [BulkImportController],
811
providers: [
912
BulkActionsGateway,
1013
BulkActionsService,
1114
BulkActionsProvider,
1215
BulkActionsAnalyticsService,
16+
BulkImportService,
1317
],
1418
})
1519
export class BulkActionsModule {}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import {
2+
Body,
3+
ClassSerializerInterceptor,
4+
Controller, HttpCode, Post,
5+
UseInterceptors, UsePipes, ValidationPipe,
6+
} from '@nestjs/common';
7+
import {
8+
ApiConsumes, ApiTags,
9+
} from '@nestjs/swagger';
10+
import { ApiEndpoint } from 'src/decorators/api-endpoint.decorator';
11+
import { FormDataRequest } from 'nestjs-form-data';
12+
import { BulkImportService } from 'src/modules/bulk-actions/bulk-import.service';
13+
import { UploadImportFileDto } from 'src/modules/bulk-actions/dto/upload-import-file.dto';
14+
import { ClientMetadataParam } from 'src/common/decorators';
15+
import { ClientMetadata } from 'src/common/models';
16+
import { IBulkActionOverview } from 'src/modules/bulk-actions/interfaces/bulk-action-overview.interface';
17+
18+
@UsePipes(new ValidationPipe({ transform: true }))
19+
@UseInterceptors(ClassSerializerInterceptor)
20+
@ApiTags('Bulk Actions')
21+
@Controller('/bulk-actions')
22+
export class BulkImportController {
23+
constructor(private readonly service: BulkImportService) {}
24+
25+
@Post('import')
26+
@ApiConsumes('multipart/form-data')
27+
@HttpCode(200)
28+
@FormDataRequest()
29+
@ApiEndpoint({
30+
description: 'Import data from file',
31+
responses: [
32+
{
33+
type: Object,
34+
},
35+
],
36+
})
37+
async import(
38+
@Body() dto: UploadImportFileDto,
39+
@ClientMetadataParam() clientMetadata: ClientMetadata,
40+
): Promise<IBulkActionOverview> {
41+
return this.service.import(clientMetadata, dto);
42+
}
43+
}
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
import { Test, TestingModule } from '@nestjs/testing';
2+
import { BulkImportService } from 'src/modules/bulk-actions/bulk-import.service';
3+
import { DatabaseConnectionService } from 'src/modules/database/database-connection.service';
4+
import {
5+
mockClientMetadata,
6+
mockDatabaseConnectionService,
7+
mockIORedisClient,
8+
mockIORedisCluster, MockType
9+
} from 'src/__mocks__';
10+
import { MemoryStoredFile } from 'nestjs-form-data';
11+
import { BulkActionSummary } from 'src/modules/bulk-actions/models/bulk-action-summary';
12+
import { IBulkActionOverview } from 'src/modules/bulk-actions/interfaces/bulk-action-overview.interface';
13+
import { BulkActionStatus, BulkActionType } from 'src/modules/bulk-actions/constants';
14+
import { NotFoundException } from '@nestjs/common';
15+
import { BulkActionsAnalyticsService } from 'src/modules/bulk-actions/bulk-actions-analytics.service';
16+
17+
const generateNCommandsBuffer = (n: number) => Buffer.from(
18+
(new Array(n)).fill(1).map(() => ['set', ['foo', 'bar']]).join('\n'),
19+
);
20+
const generateNBatchCommands = (n: number) => (new Array(n)).fill(1).map(() => ['set', ['foo', 'bar']]);
21+
const generateNBatchCommandsResults = (n: number) => (new Array(n)).fill(1).map(() => [null, 'OK']);
22+
const mockBatchCommands = generateNBatchCommands(100);
23+
const mockBatchCommandsResult = generateNBatchCommandsResults(100);
24+
const mockBatchCommandsResultWithErrors = [...(new Array(99)).fill(1).map(() => [null, 'OK']), ['ReplyError']];
25+
const mockSummary: BulkActionSummary = Object.assign(new BulkActionSummary(), {
26+
processed: 100,
27+
succeed: 100,
28+
failed: 0,
29+
errors: [],
30+
});
31+
32+
const mockSummaryWithErrors = Object.assign(new BulkActionSummary(), {
33+
processed: 100,
34+
succeed: 99,
35+
failed: 1,
36+
errors: [],
37+
});
38+
39+
const mockImportResult: IBulkActionOverview = {
40+
id: 'empty',
41+
databaseId: mockClientMetadata.databaseId,
42+
type: BulkActionType.Import,
43+
summary: mockSummary.getOverview(),
44+
progress: null,
45+
filter: null,
46+
status: BulkActionStatus.Completed,
47+
duration: 100,
48+
};
49+
50+
const mockUploadImportFileDto = {
51+
file: {
52+
originalname: 'filename',
53+
size: 1,
54+
buffer: Buffer.from('SET foo bar'),
55+
} as unknown as MemoryStoredFile,
56+
};
57+
58+
describe('BulkImportService', () => {
59+
let service: BulkImportService;
60+
let databaseConnectionService: MockType<DatabaseConnectionService>;
61+
let analytics: MockType<BulkActionsAnalyticsService>;
62+
63+
beforeEach(async () => {
64+
jest.clearAllMocks();
65+
66+
const module: TestingModule = await Test.createTestingModule({
67+
providers: [
68+
BulkImportService,
69+
{
70+
provide: DatabaseConnectionService,
71+
useFactory: mockDatabaseConnectionService,
72+
},
73+
{
74+
provide: BulkActionsAnalyticsService,
75+
useFactory: () => ({
76+
sendActionStarted: jest.fn(),
77+
sendActionStopped: jest.fn(),
78+
}),
79+
},
80+
],
81+
}).compile();
82+
83+
service = module.get(BulkImportService);
84+
databaseConnectionService = module.get(DatabaseConnectionService);
85+
analytics = module.get(BulkActionsAnalyticsService);
86+
});
87+
88+
describe('executeBatch', () => {
89+
it('should execute batch in pipeline for standalone', async () => {
90+
mockIORedisClient.exec.mockResolvedValueOnce(mockBatchCommandsResult);
91+
expect(await service['executeBatch'](mockIORedisClient, mockBatchCommands)).toEqual(mockSummary);
92+
});
93+
it('should execute batch in pipeline for standalone with errors', async () => {
94+
mockIORedisClient.exec.mockResolvedValueOnce(mockBatchCommandsResultWithErrors);
95+
expect(await service['executeBatch'](mockIORedisClient, mockBatchCommands)).toEqual(mockSummaryWithErrors);
96+
});
97+
it('should return all failed in case of global error', async () => {
98+
mockIORedisClient.exec.mockRejectedValueOnce(new Error());
99+
expect(await service['executeBatch'](mockIORedisClient, mockBatchCommands)).toEqual({
100+
...mockSummary.getOverview(),
101+
succeed: 0,
102+
failed: mockSummary.getOverview().processed,
103+
});
104+
});
105+
it('should execute batch of commands without pipeline for cluster', async () => {
106+
mockIORedisCluster.call.mockRejectedValueOnce(new Error());
107+
mockIORedisCluster.call.mockResolvedValue('OK');
108+
expect(await service['executeBatch'](mockIORedisCluster, mockBatchCommands)).toEqual(mockSummaryWithErrors);
109+
});
110+
});
111+
112+
describe('import', () => {
113+
let spy;
114+
115+
beforeEach(() => {
116+
spy = jest.spyOn(service as any, 'executeBatch');
117+
});
118+
119+
it('should import data', async () => {
120+
spy.mockResolvedValue(mockSummary);
121+
expect(await service.import(mockClientMetadata, mockUploadImportFileDto)).toEqual({
122+
...mockImportResult,
123+
duration: jasmine.anything(),
124+
});
125+
expect(analytics.sendActionStopped).toHaveBeenCalledWith({
126+
...mockImportResult,
127+
duration: jasmine.anything(),
128+
});
129+
});
130+
131+
it('should import data (100K) from file in batches 10K each', async () => {
132+
spy.mockResolvedValue(Object.assign(new BulkActionSummary(), {
133+
processed: 10_000,
134+
succeed: 10_000,
135+
failed: 0,
136+
}));
137+
expect(await service.import(mockClientMetadata, {
138+
file: {
139+
...mockUploadImportFileDto.file,
140+
buffer: generateNCommandsBuffer(100_000),
141+
} as unknown as MemoryStoredFile,
142+
})).toEqual({
143+
...mockImportResult,
144+
summary: {
145+
processed: 100_000,
146+
succeed: 100_000,
147+
failed: 0,
148+
errors: [],
149+
},
150+
duration: jasmine.anything(),
151+
});
152+
});
153+
154+
it('should import data (10K) from file in batches 10K each', async () => {
155+
spy.mockResolvedValue(Object.assign(new BulkActionSummary(), {
156+
processed: 10_000,
157+
succeed: 10_000,
158+
failed: 0,
159+
}));
160+
expect(await service.import(mockClientMetadata, {
161+
file: {
162+
...mockUploadImportFileDto.file,
163+
buffer: generateNCommandsBuffer(10_000),
164+
} as unknown as MemoryStoredFile,
165+
})).toEqual({
166+
...mockImportResult,
167+
summary: {
168+
processed: 10_000,
169+
succeed: 10_000,
170+
failed: 0,
171+
errors: [],
172+
},
173+
duration: jasmine.anything(),
174+
});
175+
});
176+
177+
it('should not import any data due to parse error', async () => {
178+
spy.mockResolvedValue(Object.assign(new BulkActionSummary(), {
179+
processed: 0,
180+
succeed: 0,
181+
failed: 0,
182+
}));
183+
expect(await service.import(mockClientMetadata, {
184+
file: {
185+
...mockUploadImportFileDto.file,
186+
buffer: Buffer.from('{"incorrectdata"}\n{"incorrectdata"}'),
187+
} as unknown as MemoryStoredFile,
188+
})).toEqual({
189+
...mockImportResult,
190+
summary: {
191+
processed: 2,
192+
succeed: 0,
193+
failed: 2,
194+
errors: [],
195+
},
196+
duration: jasmine.anything(),
197+
});
198+
});
199+
200+
it('should throw an error in case of global error', async () => {
201+
try {
202+
databaseConnectionService.createClient.mockRejectedValueOnce(new NotFoundException());
203+
204+
await service.import(mockClientMetadata, mockUploadImportFileDto);
205+
206+
fail();
207+
} catch (e) {
208+
expect(e).toBeInstanceOf(NotFoundException);
209+
}
210+
});
211+
});
212+
});

0 commit comments

Comments
 (0)