@@ -5,9 +5,10 @@ import { fsa } from '@chunkd/fs';
55import { HashTransform } from '@chunkd/fs/build/src/hash.stream.js' ;
66import { FsAwsS3 } from '@chunkd/fs-aws' ;
77import type { LambdaRequest , LogType } from '@linzjs/lambda' ;
8+ import { rmSync } from 'fs' ;
89import { Readable } from 'stream' ;
9- import type { Entry , ParseStream } from 'unzipper ' ;
10- import { Parse } from 'unzipper ' ;
10+ import type { Entry } from 'yauzl ' ;
11+ import yauzl from 'yauzl ' ;
1112
1213import { CachePrefix , ExportLayerId , kx } from './config.ts' ;
1314import type { KxDatasetExport , KxDatasetVersionDetail } from './kx.ts' ;
@@ -56,33 +57,51 @@ export async function extractAndWritePackage(
5657 datasetId : number ,
5758 log : LogType ,
5859) : Promise < void > {
59- const unzipperParser : ParseStream = Parse ( ) ;
60+ const tmpZipFile = fsa . toUrl ( `/tmp/${ datasetId } .zip` ) ;
61+ try {
62+ await fsa . write ( tmpZipFile , stream ) ;
6063
61- let writeProm : Promise < void > | undefined ;
62- let fileName : string | null = null ;
64+ await new Promise < void > ( ( resolve , reject ) => {
65+ yauzl . open ( tmpZipFile . pathname , { lazyEntries : true } , ( err , zipFile ) => {
66+ if ( err ) return reject ( new Error ( `Failed to open zip file ${ tmpZipFile . pathname } ` ) ) ;
6367
64- await stream
65- . pipe ( unzipperParser )
66- . on ( 'entry' , ( entry : Entry ) => {
67- log . debug ( { datasetId, path : entry . path } , 'Export:Zip:File' ) ;
68- if ( entry . path . endsWith ( PackageExtension ) ) {
69- log . info ( { datasetId, path : entry . path , target : targetFileUri . href } , 'Ingest:Read:Start' ) ;
70-
71- if ( fileName != null ) throw Error ( `Duplicate export package: ${ fileName } vs ${ entry . path } ` ) ;
72- fileName = entry . path ;
68+ zipFile . once ( 'end' , ( ) => {
69+ reject ( new Error ( 'Did not find geopackage entry' ) ) ;
70+ } ) ;
7371
74- const gzipOut = entry . pipe ( ht ) . pipe ( createGzip ( { level : 9 } ) ) ;
75- writeProm = fsa . write ( targetFileUri , gzipOut , {
76- contentType : 'application/geopackage+vnd.sqlite3' ,
77- contentEncoding : 'gzip' ,
72+ zipFile . once ( 'error' , reject ) ;
73+
74+ zipFile . on ( 'entry' , ( entry : Entry ) => {
75+ log . debug ( { datasetId, path : entry . fileName } , 'Export:Zip:File' ) ;
76+ if ( entry . fileName . endsWith ( PackageExtension ) ) {
77+ log . info ( { datasetId, path : entry . fileName , target : targetFileUri . href } , 'Ingest:Read:Start' ) ;
78+
79+ zipFile . openReadStream ( entry , ( err , readStream ) => {
80+ if ( err ) return reject ( new Error ( `Failed to read zip entry ${ entry . fileName } ` ) ) ;
81+
82+ const gzipOut = readStream . pipe ( ht ) . pipe ( createGzip ( { level : 9 } ) ) ;
83+ fsa
84+ . write ( targetFileUri , gzipOut , {
85+ contentType : 'application/geopackage+vnd.sqlite3' ,
86+ contentEncoding : 'gzip' ,
87+ } )
88+ . then ( ( ) => {
89+ zipFile . close ( ) ;
90+ resolve ( ) ;
91+ } )
92+ . catch ( reject ) ;
93+ } ) ;
94+ } else {
95+ zipFile . readEntry ( ) ;
96+ }
7897 } ) ;
79- } else {
80- entry . autodrain ( ) ;
81- }
82- } )
83- . promise ( ) ;
8498
85- if ( writeProm != null ) await writeProm ;
99+ zipFile . readEntry ( ) ;
100+ } ) ;
101+ } ) ;
102+ } finally {
103+ rmSync ( tmpZipFile ) ;
104+ }
86105}
87106
88107/** Ingest the export into our cache */
0 commit comments