@@ -4,7 +4,6 @@ import { BadRequestException, Injectable, Logger } from '@nestjs/common';
4
4
import { Readable } from 'stream' ;
5
5
import * as readline from 'readline' ;
6
6
import { wrapHttpError } from 'src/common/utils' ;
7
- import { UploadImportFileDto } from 'src/modules/bulk-actions/dto/upload-import-file.dto' ;
8
7
import { DatabaseConnectionService } from 'src/modules/database/database-connection.service' ;
9
8
import { ClientMetadata } from 'src/common/models' ;
10
9
import { splitCliCommandLine } from 'src/utils/cli-helper' ;
@@ -14,7 +13,6 @@ import { BulkActionStatus, BulkActionType } from 'src/modules/bulk-actions/const
14
13
import { BulkActionsAnalyticsService } from 'src/modules/bulk-actions/bulk-actions-analytics.service' ;
15
14
import { UploadImportFileByPathDto } from 'src/modules/bulk-actions/dto/upload-import-file-by-path.dto' ;
16
15
import config from 'src/utils/config' ;
17
- import { MemoryStoredFile } from 'nestjs-form-data' ;
18
16
19
17
const BATCH_LIMIT = 10_000 ;
20
18
const PATH_CONFIG = config . get ( 'dir_path' ) ;
@@ -63,9 +61,9 @@ export class BulkImportService {
63
61
64
62
/**
65
63
* @param clientMetadata
66
- * @param dto
64
+ * @param fileStream
67
65
*/
68
- public async import ( clientMetadata : ClientMetadata , dto : UploadImportFileDto ) : Promise < IBulkActionOverview > {
66
+ public async import ( clientMetadata : ClientMetadata , fileStream : Readable ) : Promise < IBulkActionOverview > {
69
67
const startTime = Date . now ( ) ;
70
68
const result : IBulkActionOverview = {
71
69
id : 'empty' ,
@@ -92,18 +90,20 @@ export class BulkImportService {
92
90
try {
93
91
client = await this . databaseConnectionService . createClient ( clientMetadata ) ;
94
92
95
- const stream = Readable . from ( dto . file . buffer ) ;
96
93
let batch = [ ] ;
97
94
98
- const batchResults : Promise < BulkActionSummary > [ ] = [ ] ;
95
+ const batchResults : BulkActionSummary [ ] = [ ] ;
96
+
97
+ try {
98
+ const rl = readline . createInterface ( {
99
+ input : fileStream ,
100
+ } ) ;
99
101
100
- await new Promise ( ( res ) => {
101
- const rl = readline . createInterface ( stream ) ;
102
- rl . on ( 'line' , ( line ) => {
102
+ for await ( const line of rl ) {
103
103
try {
104
104
const [ command , ...args ] = splitCliCommandLine ( ( line . trim ( ) ) ) ;
105
105
if ( batch . length >= BATCH_LIMIT ) {
106
- batchResults . push ( this . executeBatch ( client , batch ) ) ;
106
+ batchResults . push ( await this . executeBatch ( client , batch ) ) ;
107
107
batch = [ ] ;
108
108
}
109
109
if ( command ) {
@@ -112,20 +112,16 @@ export class BulkImportService {
112
112
} catch ( e ) {
113
113
parseErrors += 1 ;
114
114
}
115
- } ) ;
116
- rl . on ( 'error' , ( error ) => {
117
- result . summary . errors . push ( error ) ;
118
- result . status = BulkActionStatus . Failed ;
119
- this . analyticsService . sendActionFailed ( result , error ) ;
120
- res ( null ) ;
121
- } ) ;
122
- rl . on ( 'close' , ( ) => {
123
- batchResults . push ( this . executeBatch ( client , batch ) ) ;
124
- res ( null ) ;
125
- } ) ;
126
- } ) ;
115
+ }
116
+ } catch ( e ) {
117
+ result . summary . errors . push ( e ) ;
118
+ result . status = BulkActionStatus . Failed ;
119
+ this . analyticsService . sendActionFailed ( result , e ) ;
120
+ }
121
+
122
+ batchResults . push ( await this . executeBatch ( client , batch ) ) ;
127
123
128
- ( await Promise . all ( batchResults ) ) . forEach ( ( batchResult ) => {
124
+ batchResults . forEach ( ( batchResult ) => {
129
125
result . summary . processed += batchResult . getOverview ( ) . processed ;
130
126
result . summary . succeed += batchResult . getOverview ( ) . succeed ;
131
127
result . summary . failed += batchResult . getOverview ( ) . failed ;
@@ -176,15 +172,7 @@ export class BulkImportService {
176
172
throw new BadRequestException ( 'Data file was not found' ) ;
177
173
}
178
174
179
- if ( ( await fs . stat ( path ) ) ?. size > 100 * 1024 * 1024 ) {
180
- throw new BadRequestException ( 'Maximum file size is 100MB' ) ;
181
- }
182
-
183
- const buffer = await fs . readFile ( path ) ;
184
-
185
- return this . import ( clientMetadata , {
186
- file : { buffer } as MemoryStoredFile ,
187
- } ) ;
175
+ return this . import ( clientMetadata , fs . createReadStream ( path ) ) ;
188
176
} catch ( e ) {
189
177
this . logger . error ( 'Unable to process an import file path from tutorial' , e ) ;
190
178
throw wrapHttpError ( e ) ;
0 commit comments