Skip to content

Commit 64013cb

Browse files
authored
Merge pull request #2882 from RedisInsight/feature/RI-5228-upload-large-files
Feature/ri 5228 upload large files
2 parents 7e6588e + 3746c48 commit 64013cb

File tree

9 files changed

+137
-142
lines changed

9 files changed

+137
-142
lines changed

redisinsight/api/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
"analytics-node": "^4.0.1",
6161
"axios": "^0.25.0",
6262
"body-parser": "^1.19.0",
63+
"busboy": "^1.6.0",
6364
"class-transformer": "^0.2.3",
6465
"class-validator": "^0.14.0",
6566
"connect-timeout": "^1.9.0",

redisinsight/api/src/modules/bulk-actions/bulk-import.controller.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
import {
22
Body,
33
ClassSerializerInterceptor,
4-
Controller, HttpCode, Post,
4+
Controller, HttpCode, Post, Req,
55
UseInterceptors, UsePipes, ValidationPipe,
66
} from '@nestjs/common';
7+
import * as Busboy from 'busboy';
8+
import { Readable } from 'stream';
9+
import { Request } from 'express';
710
import {
811
ApiConsumes, ApiTags,
912
} from '@nestjs/swagger';
1013
import { ApiEndpoint } from 'src/decorators/api-endpoint.decorator';
11-
import { FormDataRequest } from 'nestjs-form-data';
1214
import { BulkImportService } from 'src/modules/bulk-actions/bulk-import.service';
13-
import { UploadImportFileDto } from 'src/modules/bulk-actions/dto/upload-import-file.dto';
1415
import { ClientMetadataParam } from 'src/common/decorators';
1516
import { ClientMetadata } from 'src/common/models';
1617
import { IBulkActionOverview } from 'src/modules/bulk-actions/interfaces/bulk-action-overview.interface';
@@ -26,7 +27,6 @@ export class BulkImportController {
2627
@Post('import')
2728
@ApiConsumes('multipart/form-data')
2829
@HttpCode(200)
29-
@FormDataRequest()
3030
@ApiEndpoint({
3131
description: 'Import data from file',
3232
responses: [
@@ -36,10 +36,21 @@ export class BulkImportController {
3636
],
3737
})
3838
async import(
39-
@Body() dto: UploadImportFileDto,
39+
@Req() req: Request,
4040
@ClientMetadataParam() clientMetadata: ClientMetadata,
4141
): Promise<IBulkActionOverview> {
42-
return this.service.import(clientMetadata, dto);
42+
return new Promise((res, rej) => {
43+
const busboy = Busboy({ headers: req.headers });
44+
45+
busboy.on(
46+
'file',
47+
(_fieldName: string, fileStream: Readable) => {
48+
this.service.import(clientMetadata, fileStream).then(res).catch(rej);
49+
},
50+
);
51+
52+
req.pipe(busboy);
53+
});
4354
}
4455

4556
@Post('import/tutorial-data')

redisinsight/api/src/modules/bulk-actions/bulk-import.service.spec.ts

Lines changed: 38 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import {
77
mockIORedisClient,
88
mockIORedisCluster, MockType,
99
} from 'src/__mocks__';
10-
import { MemoryStoredFile } from 'nestjs-form-data';
1110
import { BulkActionSummary } from 'src/modules/bulk-actions/models/bulk-action-summary';
1211
import { IBulkActionOverview } from 'src/modules/bulk-actions/interfaces/bulk-action-overview.interface';
1312
import { BulkActionStatus, BulkActionType } from 'src/modules/bulk-actions/constants';
@@ -17,6 +16,7 @@ import * as fs from 'fs-extra';
1716
import config from 'src/utils/config';
1817
import { join } from 'path';
1918
import { wrapHttpError } from 'src/common/utils';
19+
import { Readable } from 'stream';
2020

2121
const PATH_CONFIG = config.get('dir_path');
2222

@@ -71,13 +71,7 @@ const mockEmptyImportResult: IBulkActionOverview = {
7171
duration: 0,
7272
};
7373

74-
const mockUploadImportFileDto = {
75-
file: {
76-
originalname: 'filename',
77-
size: 1,
78-
buffer: Buffer.from('SET foo bar'),
79-
} as unknown as MemoryStoredFile,
80-
};
74+
const mockReadableStream = Readable.from(Buffer.from('SET foo bar'));
8175

8276
const mockUploadImportFileByPathDto = {
8377
path: '/some/path',
@@ -152,7 +146,7 @@ describe('BulkImportService', () => {
152146

153147
it('should import data', async () => {
154148
spy.mockResolvedValue(mockSummary);
155-
expect(await service.import(mockClientMetadata, mockUploadImportFileDto)).toEqual({
149+
expect(await service.import(mockClientMetadata, mockReadableStream)).toEqual({
156150
...mockImportResult,
157151
duration: jasmine.anything(),
158152
});
@@ -168,21 +162,17 @@ describe('BulkImportService', () => {
168162
succeed: 10_000,
169163
failed: 0,
170164
}));
171-
expect(await service.import(mockClientMetadata, {
172-
file: {
173-
...mockUploadImportFileDto.file,
174-
buffer: generateNCommandsBuffer(100_000),
175-
} as unknown as MemoryStoredFile,
176-
})).toEqual({
177-
...mockImportResult,
178-
summary: {
179-
processed: 100_000,
180-
succeed: 100_000,
181-
failed: 0,
182-
errors: [],
183-
},
184-
duration: jasmine.anything(),
185-
});
165+
expect(await service.import(mockClientMetadata, Readable.from(generateNCommandsBuffer(100_000))))
166+
.toEqual({
167+
...mockImportResult,
168+
summary: {
169+
processed: 100_000,
170+
succeed: 100_000,
171+
failed: 0,
172+
errors: [],
173+
},
174+
duration: jasmine.anything(),
175+
});
186176
});
187177

188178
it('should import data (10K) from file in batches 10K each', async () => {
@@ -191,21 +181,17 @@ describe('BulkImportService', () => {
191181
succeed: 10_000,
192182
failed: 0,
193183
}));
194-
expect(await service.import(mockClientMetadata, {
195-
file: {
196-
...mockUploadImportFileDto.file,
197-
buffer: generateNCommandsBuffer(10_000),
198-
} as unknown as MemoryStoredFile,
199-
})).toEqual({
200-
...mockImportResult,
201-
summary: {
202-
processed: 10_000,
203-
succeed: 10_000,
204-
failed: 0,
205-
errors: [],
206-
},
207-
duration: jasmine.anything(),
208-
});
184+
expect(await service.import(mockClientMetadata, Readable.from(generateNCommandsBuffer(10_000))))
185+
.toEqual({
186+
...mockImportResult,
187+
summary: {
188+
processed: 10_000,
189+
succeed: 10_000,
190+
failed: 0,
191+
errors: [],
192+
},
193+
duration: jasmine.anything(),
194+
});
209195
});
210196

211197
it('should not import any data due to parse error', async () => {
@@ -214,12 +200,10 @@ describe('BulkImportService', () => {
214200
succeed: 0,
215201
failed: 0,
216202
}));
217-
expect(await service.import(mockClientMetadata, {
218-
file: {
219-
...mockUploadImportFileDto.file,
220-
buffer: Buffer.from('{"incorrectdata"}\n{"incorrectdata"}'),
221-
} as unknown as MemoryStoredFile,
222-
})).toEqual({
203+
expect(await service.import(
204+
mockClientMetadata,
205+
Readable.from(Buffer.from('{"incorrectdata"}\n{"incorrectdata"}')),
206+
)).toEqual({
223207
...mockImportResult,
224208
summary: {
225209
processed: 2,
@@ -233,21 +217,19 @@ describe('BulkImportService', () => {
233217
});
234218

235219
it('should ignore blank lines', async () => {
236-
await service.import(mockClientMetadata, {
237-
file: {
238-
...mockUploadImportFileDto.file,
239-
buffer: Buffer.from('\n SET foo bar \n \n SET foo bar \n '),
240-
} as unknown as MemoryStoredFile,
241-
})
242-
expect(spy).toBeCalledWith(mockIORedisClient, [['set', ['foo', 'bar']], ['set', ['foo', 'bar']]])
220+
await service.import(
221+
mockClientMetadata,
222+
Readable.from(Buffer.from('\n SET foo bar \n \n SET foo bar \n ')),
223+
);
224+
expect(spy).toBeCalledWith(mockIORedisClient, [['set', ['foo', 'bar']], ['set', ['foo', 'bar']]]);
243225
expect(mockIORedisClient.disconnect).toHaveBeenCalled();
244226
});
245227

246228
it('should throw an error in case of global error', async () => {
247229
try {
248230
databaseConnectionService.createClient.mockRejectedValueOnce(new NotFoundException());
249231

250-
await service.import(mockClientMetadata, mockUploadImportFileDto);
232+
await service.import(mockClientMetadata, mockReadableStream);
251233

252234
fail();
253235
} catch (e) {
@@ -275,15 +257,15 @@ describe('BulkImportService', () => {
275257

276258
await service.uploadFromTutorial(mockClientMetadata, mockUploadImportFileByPathDto);
277259

278-
expect(mockedFs.readFile).toHaveBeenCalledWith(join(PATH_CONFIG.homedir, mockUploadImportFileByPathDto.path));
260+
expect(mockedFs.createReadStream).toHaveBeenCalledWith(join(PATH_CONFIG.homedir, mockUploadImportFileByPathDto.path));
279261
});
280262

281263
it('should import file by path with static', async () => {
282264
mockedFs.pathExists.mockImplementationOnce(async () => true);
283265

284266
await service.uploadFromTutorial(mockClientMetadata, { path: '/static/guides/_data.file' });
285267

286-
expect(mockedFs.readFile).toHaveBeenCalledWith(join(PATH_CONFIG.homedir, '/guides/_data.file'));
268+
expect(mockedFs.createReadStream).toHaveBeenCalledWith(join(PATH_CONFIG.homedir, '/guides/_data.file'));
287269
});
288270

289271
it('should normalize path before importing and not search for file outside home folder', async () => {
@@ -293,7 +275,7 @@ describe('BulkImportService', () => {
293275
path: '/../../../danger',
294276
});
295277

296-
expect(mockedFs.readFile).toHaveBeenCalledWith(join(PATH_CONFIG.homedir, 'danger'));
278+
expect(mockedFs.createReadStream).toHaveBeenCalledWith(join(PATH_CONFIG.homedir, 'danger'));
297279
});
298280

299281
it('should normalize path before importing and throw an error when search for file outside home folder (relative)', async () => {
@@ -324,19 +306,5 @@ describe('BulkImportService', () => {
324306
expect(e.message).toEqual('Data file was not found');
325307
}
326308
});
327-
328-
it('should throw BadRequest when file size is greater then 100MB', async () => {
329-
mockedFs.pathExists.mockImplementationOnce(async () => true);
330-
mockedFs.stat.mockImplementationOnce(async () => ({ size: 100 * 1024 * 1024 + 1 } as fs.Stats));
331-
332-
try {
333-
await service.uploadFromTutorial(mockClientMetadata, mockUploadImportFileByPathDto);
334-
335-
fail();
336-
} catch (e) {
337-
expect(e).toBeInstanceOf(BadRequestException);
338-
expect(e.message).toEqual('Maximum file size is 100MB');
339-
}
340-
});
341309
});
342310
});

redisinsight/api/src/modules/bulk-actions/bulk-import.service.ts

Lines changed: 20 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import { BadRequestException, Injectable, Logger } from '@nestjs/common';
44
import { Readable } from 'stream';
55
import * as readline from 'readline';
66
import { wrapHttpError } from 'src/common/utils';
7-
import { UploadImportFileDto } from 'src/modules/bulk-actions/dto/upload-import-file.dto';
87
import { DatabaseConnectionService } from 'src/modules/database/database-connection.service';
98
import { ClientMetadata } from 'src/common/models';
109
import { splitCliCommandLine } from 'src/utils/cli-helper';
@@ -14,7 +13,6 @@ import { BulkActionStatus, BulkActionType } from 'src/modules/bulk-actions/const
1413
import { BulkActionsAnalyticsService } from 'src/modules/bulk-actions/bulk-actions-analytics.service';
1514
import { UploadImportFileByPathDto } from 'src/modules/bulk-actions/dto/upload-import-file-by-path.dto';
1615
import config from 'src/utils/config';
17-
import { MemoryStoredFile } from 'nestjs-form-data';
1816

1917
const BATCH_LIMIT = 10_000;
2018
const PATH_CONFIG = config.get('dir_path');
@@ -63,9 +61,9 @@ export class BulkImportService {
6361

6462
/**
6563
* @param clientMetadata
66-
* @param dto
64+
* @param fileStream
6765
*/
68-
public async import(clientMetadata: ClientMetadata, dto: UploadImportFileDto): Promise<IBulkActionOverview> {
66+
public async import(clientMetadata: ClientMetadata, fileStream: Readable): Promise<IBulkActionOverview> {
6967
const startTime = Date.now();
7068
const result: IBulkActionOverview = {
7169
id: 'empty',
@@ -92,18 +90,20 @@ export class BulkImportService {
9290
try {
9391
client = await this.databaseConnectionService.createClient(clientMetadata);
9492

95-
const stream = Readable.from(dto.file.buffer);
9693
let batch = [];
9794

98-
const batchResults: Promise<BulkActionSummary>[] = [];
95+
const batchResults: BulkActionSummary[] = [];
96+
97+
try {
98+
const rl = readline.createInterface({
99+
input: fileStream,
100+
});
99101

100-
await new Promise((res) => {
101-
const rl = readline.createInterface(stream);
102-
rl.on('line', (line) => {
102+
for await (const line of rl) {
103103
try {
104104
const [command, ...args] = splitCliCommandLine((line.trim()));
105105
if (batch.length >= BATCH_LIMIT) {
106-
batchResults.push(this.executeBatch(client, batch));
106+
batchResults.push(await this.executeBatch(client, batch));
107107
batch = [];
108108
}
109109
if (command) {
@@ -112,20 +112,16 @@ export class BulkImportService {
112112
} catch (e) {
113113
parseErrors += 1;
114114
}
115-
});
116-
rl.on('error', (error) => {
117-
result.summary.errors.push(error);
118-
result.status = BulkActionStatus.Failed;
119-
this.analyticsService.sendActionFailed(result, error);
120-
res(null);
121-
});
122-
rl.on('close', () => {
123-
batchResults.push(this.executeBatch(client, batch));
124-
res(null);
125-
});
126-
});
115+
}
116+
} catch (e) {
117+
result.summary.errors.push(e);
118+
result.status = BulkActionStatus.Failed;
119+
this.analyticsService.sendActionFailed(result, e);
120+
}
121+
122+
batchResults.push(await this.executeBatch(client, batch));
127123

128-
(await Promise.all(batchResults)).forEach((batchResult) => {
124+
batchResults.forEach((batchResult) => {
129125
result.summary.processed += batchResult.getOverview().processed;
130126
result.summary.succeed += batchResult.getOverview().succeed;
131127
result.summary.failed += batchResult.getOverview().failed;
@@ -176,15 +172,7 @@ export class BulkImportService {
176172
throw new BadRequestException('Data file was not found');
177173
}
178174

179-
if ((await fs.stat(path))?.size > 100 * 1024 * 1024) {
180-
throw new BadRequestException('Maximum file size is 100MB');
181-
}
182-
183-
const buffer = await fs.readFile(path);
184-
185-
return this.import(clientMetadata, {
186-
file: { buffer } as MemoryStoredFile,
187-
});
175+
return this.import(clientMetadata, fs.createReadStream(path));
188176
} catch (e) {
189177
this.logger.error('Unable to process an import file path from tutorial', e);
190178
throw wrapHttpError(e);

redisinsight/api/src/modules/bulk-actions/dto/upload-import-file.dto.ts

Lines changed: 0 additions & 19 deletions
This file was deleted.

0 commit comments

Comments
 (0)