@@ -26,65 +26,99 @@ public async Task Run([QueueTrigger("%FileExtractQueueName%")] FileExtractQueueM
2626
2727 await using var transaction = await serviceLayerDbContext . Database . BeginTransactionAsync ( ) ;
2828
29+ var file = await GetFileAsync ( message . FileId ) ;
30+ if ( file == null )
31+ {
32+ logger . LogWarning ( "Exiting function." ) ;
33+ return ;
34+ }
35+
36+ if ( ! IsFileSuitableForExtraction ( file ) )
37+ {
38+ logger . LogWarning ( "Exiting function." ) ;
39+ return ;
40+ }
41+
42+ await UpdateFileStatusForExtraction ( file ) ;
43+ await transaction . CommitAsync ( ) ;
44+
45+ try
46+ {
47+ await ProcessFileExtraction ( file , message ) ;
48+ }
49+ catch ( Exception ex )
50+ {
51+ await HandleExtractionError ( file , message , ex ) ;
52+ }
53+ }
54+
55+ private async Task < MeshFile ? > GetFileAsync ( string fileId )
56+ {
2957 var file = await serviceLayerDbContext . MeshFiles
30- . FirstOrDefaultAsync ( f => f . FileId == message . FileId ) ;
58+ . FirstOrDefaultAsync ( f => f . FileId == fileId ) ;
3159
3260 if ( file == null )
3361 {
34- logger . LogWarning ( "File with id: {fileId} not found in MeshFiles table. Exiting function." , message . FileId ) ;
35- return ;
62+ logger . LogWarning ( "File with id: {fileId} not found in MeshFiles table." , fileId ) ;
3663 }
3764
65+ return file ;
66+ }
67+
68+ private bool IsFileSuitableForExtraction ( MeshFile file )
69+ {
3870 // We only want to extract files if they are in a Discovered state,
3971 // or are in an Extracting state and were last touched over 12 hours ago.
4072 var expectedStatuses = new [ ] { MeshFileStatus . Discovered , MeshFileStatus . Extracting } ;
4173 if ( ! expectedStatuses . Contains ( file . Status ) ||
4274 ( file . Status == MeshFileStatus . Extracting && file . LastUpdatedUtc > DateTime . UtcNow . AddHours ( - 12 ) ) )
4375 {
4476 logger . LogWarning (
45- "File with id: {fileId} found in MeshFiles table but has unexpected Status: {status}, LastUpdatedUtc: {lastUpdatedUtc}. Exiting function." ,
46- message . FileId ,
47- file . Status ,
48- file . LastUpdatedUtc ) ;
49- return ;
77+ "File with id: {fileId} found in MeshFiles table but has unexpected status: {status}." ,
78+ file . FileId ,
79+ file . Status ) ;
80+ return false ;
5081 }
82+ return true ;
83+ }
5184
85+ private async Task UpdateFileStatusForExtraction ( MeshFile file )
86+ {
5287 file . Status = MeshFileStatus . Extracting ;
5388 file . LastUpdatedUtc = DateTime . UtcNow ;
54-
5589 await serviceLayerDbContext . SaveChangesAsync ( ) ;
56- await transaction . CommitAsync ( ) ;
90+ }
5791
58- try
92+ private async Task ProcessFileExtraction ( MeshFile file , FileExtractQueueMessage message )
93+ {
94+ var meshResponse = await meshInboxService . GetMessageByIdAsync ( configuration . NbssMeshMailboxId , file . FileId ) ;
95+ if ( ! meshResponse . IsSuccessful )
5996 {
60- var meshResponse = await meshInboxService . GetMessageByIdAsync ( configuration . NbssMeshMailboxId , file . FileId ) ;
61- if ( ! meshResponse . IsSuccessful )
62- {
63- throw new InvalidOperationException ( $ "Mesh extraction failed: { meshResponse . Error } ") ;
64- }
65-
66- var blobPath = await meshFileBlobStore . UploadAsync ( file , meshResponse . Response . FileAttachment . Content ) ;
67-
68- var meshAcknowledgementResponse = await meshInboxService . AcknowledgeMessageByIdAsync ( configuration . NbssMeshMailboxId , message . FileId ) ;
69- if ( ! meshAcknowledgementResponse . IsSuccessful )
70- {
71- throw new InvalidOperationException ( $ "Mesh acknowledgement failed: { meshAcknowledgementResponse . Error } ") ;
72- }
73-
74- file . BlobPath = blobPath ;
75- file . Status = MeshFileStatus . Extracted ;
76- file . LastUpdatedUtc = DateTime . UtcNow ;
77- await serviceLayerDbContext . SaveChangesAsync ( ) ;
78-
79- await fileTransformQueueClient . EnqueueFileTransformAsync ( file ) ;
97+ throw new InvalidOperationException ( $ "Mesh extraction failed: { meshResponse . Error } ") ;
8098 }
81- catch ( Exception ex )
99+
100+ var blobPath = await meshFileBlobStore . UploadAsync ( file , meshResponse . Response . FileAttachment . Content ) ;
101+
102+ var meshAcknowledgementResponse = await meshInboxService . AcknowledgeMessageByIdAsync ( configuration . NbssMeshMailboxId , message . FileId ) ;
103+ if ( ! meshAcknowledgementResponse . IsSuccessful )
82104 {
83- logger . LogError ( ex , "An exception occurred during file extraction for fileId: {fileId}" , message . FileId ) ;
84- file . Status = MeshFileStatus . FailedExtract ;
85- file . LastUpdatedUtc = DateTime . UtcNow ;
86- await serviceLayerDbContext . SaveChangesAsync ( ) ;
87- await fileExtractQueueClient . SendToPoisonQueueAsync ( message ) ;
105+ throw new InvalidOperationException ( $ "Mesh acknowledgement failed: { meshAcknowledgementResponse . Error } ") ;
88106 }
107+
108+ file . BlobPath = blobPath ;
109+ file . Status = MeshFileStatus . Extracted ;
110+ file . LastUpdatedUtc = DateTime . UtcNow ;
111+ await serviceLayerDbContext . SaveChangesAsync ( ) ;
112+
113+ await fileTransformQueueClient . EnqueueFileTransformAsync ( file ) ;
114+ }
115+
116+ private async Task HandleExtractionError ( MeshFile file , FileExtractQueueMessage message , Exception ex )
117+ {
118+ logger . LogError ( ex , "An exception occurred during file extraction for fileId: {fileId}" , message . FileId ) ;
119+ file . Status = MeshFileStatus . FailedExtract ;
120+ file . LastUpdatedUtc = DateTime . UtcNow ;
121+ await serviceLayerDbContext . SaveChangesAsync ( ) ;
122+ await fileExtractQueueClient . SendToPoisonQueueAsync ( message ) ;
89123 }
90124}
0 commit comments