11import {
22 Sequelize ,
33 fn ,
4- cast ,
5- col ,
64 literal ,
75 Op ,
8- where ,
96} from 'sequelize' ;
10- import { Readable } from 'stream' ;
117import { v4 as uuidv4 } from 'uuid' ;
128import { FileInfo as FTPFileInfo , FileListing } from '../stream/sftp' ;
139import { SchemaNode } from '../stream/xml' ;
@@ -197,32 +193,18 @@ const getNextFileToProcess = async (
197193 } ,
198194 ) ;
199195
200- // Find the next import file to process
196+ // Find the next import file to process without join and locking mechanism
201197 const importFile = await ImportFile . findOne ( {
202198 attributes : [
203- [ 'id' , 'importFileId' ] ,
199+ 'id' ,
204200 'fileId' ,
205201 'status' ,
206202 'processAttempts' ,
207- ] ,
208- include : [
209- {
210- model : File ,
211- as : 'file' ,
212- attributes : [
213- 'key' ,
214- ] ,
215- } ,
216- {
217- model : Import ,
218- as : 'import' ,
219- attributes : [
220- 'definitions' ,
221- ] ,
222- } ,
203+ 'importId' ,
223204 ] ,
224205 where : {
225206 importId,
207+ fileId : { [ Op . ne ] : null } , // Ensure fileId is not null
226208 [ Op . or ] : [
227209 // New Work
228210 { status : IMPORT_STATUSES . COLLECTED } , // Import file is in the "collected" status
@@ -240,9 +222,38 @@ const getNextFileToProcess = async (
240222 ] ,
241223 limit : 1 , // Limit the result to 1 record
242224 lock : true , // Lock the row for update to prevent race conditions
225+ raw : true ,
243226 } ) ;
244227
245- return importFile ;
228+ if ( ! importFile ) {
229+ return null ;
230+ }
231+
232+ // Fetch the associated File data
233+ const file = await File . findOne ( {
234+ attributes : [ 'key' ] ,
235+ where : {
236+ id : importFile . fileId ,
237+ } ,
238+ raw : true ,
239+ } ) ;
240+
241+ // Fetch the associated Import data
242+ const importData = await Import . findOne ( {
243+ attributes : [ 'definitions' ] ,
244+ where : {
245+ id : importFile . importId ,
246+ } ,
247+ raw : true ,
248+ } ) ;
249+ return {
250+ importFileId : importFile . id ,
251+ fileId : importFile . fileId ,
252+ status : importFile . status ,
253+ processAttempts : importFile . processAttempts ,
254+ fileKey : file ?. key ,
255+ importDefinitions : importData ?. definitions ,
256+ } ;
246257} ;
247258
248259/**
@@ -381,11 +392,10 @@ const recordAvailableDataFiles = async (
381392 } ) ;
382393
383394 const fileMatches = ( currentImportDataFile , availableFile ) => (
384- importFileId === currentImportDataFile . importFileId
395+ importFileId === currentImportDataFile ? .importFileId
385396 && availableFile . path === currentImportDataFile . fileInfo . path
386397 && availableFile . name === currentImportDataFile . fileInfo . name
387398 ) ;
388-
389399 // Separate the available files into new, matched, and removed files
390400 // New files are those that are not already recorded in the database
391401 const newFiles = availableFiles
@@ -508,13 +518,10 @@ const logFileToBeCollected = async (
508518} > => {
509519 let key ;
510520
511- // Find the import file record based on the import ID and available file information
521+ // Step 1: Find and lock the import file record based on the import ID and available
522+ // file information
512523 const importFile = await ImportFile . findOne ( {
513- attributes : [
514- 'id' ,
515- 'fileId' ,
516- 'downloadAttempts' ,
517- ] ,
524+ attributes : [ 'id' , 'fileId' , 'downloadAttempts' ] ,
518525 where : {
519526 importId,
520527 ftpFileInfo : {
@@ -524,15 +531,16 @@ const logFileToBeCollected = async (
524531 } ,
525532 } ,
526533 } ,
527- include : [ {
528- model : File ,
529- as : 'file' ,
530- attributes : [ 'key' ] ,
531- require : false ,
532- } ] ,
533534 lock : true , // Lock the row for update to prevent race conditions
535+ raw : true ,
534536 } ) ;
535537
538+ if ( ! importFile ) {
539+ throw new Error ( 'Import file not found' ) ;
540+ }
541+
542+ const downloadAttempts = importFile . downloadAttempts + 1 ;
543+
536544 if ( ! importFile . fileId ) {
537545 // Generate a unique key for the file using the import ID, a UUID, and the file extension
538546 const uuid : string = uuidv4 ( ) ;
@@ -551,39 +559,35 @@ const logFileToBeCollected = async (
551559 await ImportFile . update (
552560 {
553561 fileId : fileRecord . id ,
554- downloadAttempts : importFile . dataValues . downloadAttempts + 1 ,
562+ downloadAttempts,
555563 status : IMPORT_STATUSES . COLLECTING ,
556564 } ,
557565 {
558566 where : {
559- importId,
560- ftpFileInfo : {
561- [ Op . contains ] : {
562- path : availableFile . fileInfo . path ,
563- name : availableFile . fileInfo . name ,
564- } ,
565- } ,
567+ id : importFile . id ,
566568 } ,
567569 lock : true , // Lock the row for update to prevent race conditions
568570 } ,
569571 ) ;
570572 } else {
573+ // Step 2: Fetch the associated file record
574+ const file = await File . findOne ( {
575+ attributes : [ 'key' ] ,
576+ where : {
577+ id : importFile . fileId ,
578+ } ,
579+ } ) ;
580+
571581 // Retrieve the key from the existing import file record
572- key = importFile . file . dataValues . key ;
582+ key = file ? file . key : null ;
573583 await ImportFile . update (
574584 {
575- downloadAttempts : importFile . dataValues . downloadAttempts + 1 ,
585+ downloadAttempts,
576586 status : IMPORT_STATUSES . COLLECTING ,
577587 } ,
578588 {
579589 where : {
580- importId,
581- ftpFileInfo : {
582- [ Op . contains ] : {
583- path : availableFile . fileInfo . path ,
584- name : availableFile . fileInfo . name ,
585- } ,
586- } ,
590+ id : importFile . id ,
587591 } ,
588592 lock : true , // Lock the row for update to prevent race conditions
589593 } ,
@@ -593,7 +597,7 @@ const logFileToBeCollected = async (
593597 return {
594598 importFileId : importFile . id ,
595599 key,
596- attempts : importFile . dataValues . downloadAttempts ,
600+ attempts : downloadAttempts ,
597601 } ;
598602} ;
599603
0 commit comments