11import { z } from 'zod' ;
22import {
3- ListObjectsV2Command ,
43 S3Client ,
54 SelectObjectContentCommand ,
65 SelectObjectContentEventStream ,
7- _Object ,
86} from '@aws-sdk/client-s3' ;
97import {
108 $TemplateCompletedEventV1 ,
119 $TemplateDeletedEventV1 ,
1210 $TemplateDraftedEventV1 ,
1311} from '@nhsdigital/nhs-notify-event-schemas-template-management' ;
12+ import { differenceInSeconds , addHours } from 'date-fns' ;
13+ import { S3Helper } from '../s3-helper' ;
1414
1515const $NHSNotifyTemplateEvent = z . discriminatedUnion ( 'type' , [
1616 $TemplateCompletedEventV1 ,
@@ -32,57 +32,21 @@ export class EventCacheHelper {
3232 return [ ] ;
3333 }
3434
35- const files = await this . getAllS3items ( this . buildEventCachePrefix ( from ) ) ;
35+ const files = await Promise . all (
36+ this . filePaths ( from ) . map ( ( path ) => {
37+ return S3Helper . listAll ( this . bucketName , path ) ;
38+ } )
39+ ) ;
3640
37- const filteredFiles = this . filterAndSortFiles ( files , from ) ;
41+ const filteredFiles = S3Helper . filterAndSort ( files . flat ( ) , from ) ;
3842
3943 const eventPromises = filteredFiles . map ( ( file ) =>
4044 this . queryFileForEvents ( file . Key ! , templateIds )
4145 ) ;
4246
4347 const results = await Promise . all ( eventPromises ) ;
4448
45- return results
46- . flat ( )
47- . sort (
48- ( a , b ) =>
49- new Date ( a . data . updatedAt ) . getTime ( ) -
50- new Date ( b . data . updatedAt ) . getTime ( )
51- ) ;
52- }
53-
54- private filterAndSortFiles ( files : _Object [ ] , from : Date ) : _Object [ ] {
55- return files
56- . filter (
57- ( { LastModified } ) => ( LastModified ?. getTime ( ) ?? 0 ) > from . getTime ( )
58- )
59- . sort (
60- ( a , b ) =>
61- ( b . LastModified ?. getTime ( ) ?? 0 ) - ( a . LastModified ?. getTime ( ) ?? 0 )
62- ) ;
63- }
64-
65- private async getAllS3items ( prefix : string ) : Promise < _Object [ ] > {
66- const allItems : _Object [ ] = [ ] ;
67- let continuationToken : string | undefined ;
68-
69- do {
70- const command = new ListObjectsV2Command ( {
71- Bucket : this . bucketName ,
72- Prefix : prefix ,
73- ContinuationToken : continuationToken ,
74- } ) ;
75-
76- const response = await this . s3 . send ( command ) ;
77-
78- if ( response . Contents ) {
79- allItems . push ( ...response . Contents ) ;
80- }
81-
82- continuationToken = response . NextContinuationToken ;
83- } while ( continuationToken ) ;
84-
85- return allItems ;
49+ return results . flat ( ) ;
8650 }
8751
8852 private async queryFileForEvents (
@@ -109,10 +73,10 @@ export class EventCacheHelper {
10973 return [ ] ;
11074 }
11175
112- return await this . processS3SelectResponse ( fileName , response . Payload ) ;
76+ return await this . parse ( fileName , response . Payload ) ;
11377 }
11478
115- private async processS3SelectResponse (
79+ private async parse (
11680 fileName : string ,
11781 payload : AsyncIterable < SelectObjectContentEventStream >
11882 ) : Promise < NHSNotifyTemplateEvent [ ] > {
@@ -152,6 +116,27 @@ export class EventCacheHelper {
152116 return events ;
153117 }
154118
119+ /*
120+ * Get files paths for the current hour
121+ * and next hour if the different in seconds is greater than toleranceInSeconds
122+ *
123+ * The way firehose stores files is yyyy/mm/dd/hh.
124+ * On a boundary of 15:59:58 you'll find files in both 15 and 16 hour folders
125+ */
126+ private filePaths ( start : Date , toleranceInSeconds = 30 ) : string [ ] {
127+ const paths = [ this . getEventCachePrefix ( start ) ] ;
128+
129+ const end = addHours ( start , 1 ) ;
130+
131+ const difference = differenceInSeconds ( end , start ) ;
132+
133+ if ( difference >= toleranceInSeconds ) {
134+ paths . push ( this . getEventCachePrefix ( end ) ) ;
135+ }
136+
137+ return paths ;
138+ }
139+
155140 private buildS3Query ( templateIds : string [ ] ) : string {
156141 const likeConditions = templateIds
157142 . map ( ( id ) => `s.Message LIKE '%${ id } %'` )
@@ -160,7 +145,7 @@ export class EventCacheHelper {
160145 return `SELECT * FROM S3Object s WHERE ${ likeConditions } ` ;
161146 }
162147
163- private buildEventCachePrefix ( date : Date ) : string {
148+ private getEventCachePrefix ( date : Date ) : string {
164149 return date
165150 . toISOString ( )
166151 . slice ( 0 , 13 )
0 commit comments