diff --git a/package-lock.json b/package-lock.json index 1af48c5..2fdc986 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,22 +11,24 @@ "dependencies": { "@aws-sdk/client-eventbridge": "^3.474.0", "@aws-sdk/client-s3": "^3.474.0", - "@chunkd/fs": "^11.2.0", "@chunkd/fs-aws": "^11.2.1", "@linzjs/geojson": "^6.9.1", "@linzjs/lambda": "^4.1.0", "aws-cdk-lib": "^2.184.1", - "fflate": "^0.8.1", - "stac-ts": "^1.0.3" + "stac-ts": "^1.0.3", + "unzipper": "^0.12.3" }, "devDependencies": { "@linzjs/style": "^5.4.0", "@types/aws-lambda": "^8.10.83", "@types/geojson": "^7946.0.8", "@types/node": "^20.10.4", + "@types/unzipper": "^0.10.11", + "@types/yazl": "^3.3.0", "aws-cdk": "^2.1004.0", "constructs": "^10.4.2", - "esbuild": "^0.19.9" + "esbuild": "^0.19.9", + "yazl": "^3.3.1" } }, "node_modules/@aws-cdk/asset-awscli-v1": { @@ -2745,6 +2747,26 @@ "undici-types": "~5.26.4" } }, + "node_modules/@types/unzipper": { + "version": "0.10.11", + "resolved": "https://registry.npmjs.org/@types/unzipper/-/unzipper-0.10.11.tgz", + "integrity": "sha512-D25im2zjyMCcgL9ag6N46+wbtJBnXIr7SI4zHf9eJD2Dw2tEB5e+p5MYkrxKIVRscs5QV0EhtU9rgXSPx90oJg==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, + "node_modules/@types/yazl": { + "version": "3.3.0", + "resolved": "https://registry.npmjs.org/@types/yazl/-/yazl-3.3.0.tgz", + "integrity": "sha512-mFL6lGkk2N5u5nIxpNV/K5LW3qVSbxhJrMxYGOOxZndWxMgCamr/iCsq/1t9kd8pEwhuNP91LC5qZm/qS9pOEw==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@typescript-eslint/eslint-plugin": { "version": "7.18.0", "resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-7.18.0.tgz", @@ -3601,6 +3623,12 @@ } ] }, + "node_modules/bluebird": { + "version": "3.7.2", + "resolved": "https://registry.npmjs.org/bluebird/-/bluebird-3.7.2.tgz", + "integrity": "sha512-XpNj6GDQzdfW+r2Wnn7xiSAd7TM3jzkxGXBGTtWKuSXv1xUV+azxAm8jdWZN06QTQk+2N2XB9jRDkvbmQmcRtg==", + "license": "MIT" + }, "node_modules/bowser": { "version": "2.11.0", "resolved": "https://registry.npmjs.org/bowser/-/bowser-2.11.0.tgz", @@ -3638,6 +3666,16 @@ "ieee754": "^1.1.4" } }, + "node_modules/buffer-crc32": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/buffer-crc32/-/buffer-crc32-1.0.0.tgz", + "integrity": "sha512-Db1SbgBS/fg/392AblrMJk97KggmvYhr4pB5ZIMTWtaivCPMWLkmb7m21cJvpvgK+J3nsU2CmmixNBZx4vFj/w==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=8.0.0" + } + }, "node_modules/call-bind": { "version": "1.0.8", "resolved": "https://registry.npmjs.org/call-bind/-/call-bind-1.0.8.tgz", @@ -3748,6 +3786,12 @@ "integrity": "sha512-wsNxBlAott2qg8Zv87q3eYZYgheb9lchtBfjHzzLHtXbttwSrHPs1NNQbBrmbb1YZvYg2+Vh0Dor76w4mFxJkA==", "license": "Apache-2.0" }, + "node_modules/core-util-is": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.3.tgz", + "integrity": "sha512-ZQBvi1DcpJ4GDqanjucZ2Hj3wEO5pZDS89BWbkcrvdxksJorwUDDZamX9ldFkp9aw2lmBDLgkObEA4DWNJ9FYQ==", + "license": "MIT" + }, "node_modules/cross-spawn": { "version": "7.0.6", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", @@ -3919,6 +3963,51 @@ "node": ">= 0.4" } }, + "node_modules/duplexer2": { + "version": "0.1.4", + "resolved": "https://registry.npmjs.org/duplexer2/-/duplexer2-0.1.4.tgz", + "integrity": "sha512-asLFVfWWtJ90ZyOUHMqk7/S2w2guQKxUI2itj3d92ADHhxUSbCMGi1f1cBcJ7xM1To+pE/Khbwo1yuNbMEPKeA==", + "license": "BSD-3-Clause", + "dependencies": { + "readable-stream": "^2.0.2" + } + }, + "node_modules/duplexer2/node_modules/isarray": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/isarray/-/isarray-1.0.0.tgz", + "integrity": "sha512-VLghIWNM6ELQzo7zwmcg0NmTVyWKYjvIeM83yjp0wRDTmUnrM678fQbcKBo6n2CJEF0szoG//ytg+TKla89ALQ==", + "license": "MIT" + }, + "node_modules/duplexer2/node_modules/readable-stream": { + "version": "2.3.8", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-2.3.8.tgz", + "integrity": "sha512-8p0AUk4XODgIewSi0l8Epjs+EVnWiK7NoDIEGU0HhE7+ZyY8D1IMY7odu5lRrFXGg71L15KG8QrPmum45RTtdA==", + "license": "MIT", + "dependencies": { + "core-util-is": "~1.0.0", + "inherits": "~2.0.3", + "isarray": "~1.0.0", + "process-nextick-args": "~2.0.0", + "safe-buffer": "~5.1.1", + "string_decoder": "~1.1.1", + "util-deprecate": "~1.0.1" + } + }, + "node_modules/duplexer2/node_modules/safe-buffer": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==", + "license": "MIT" + }, + "node_modules/duplexer2/node_modules/string_decoder": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.1.1.tgz", + "integrity": "sha512-n/ShnvDi6FHbbVfviro+WojiFzv+s8MPMHBczVePfUpDJLwoLT0ht1l4YwBCbi8pJAveEEdnkHyPyTP/mzRfwg==", + "license": "MIT", + "dependencies": { + "safe-buffer": "~5.1.0" + } + }, "node_modules/es-abstract": { "version": "1.23.9", "resolved": "https://registry.npmjs.org/es-abstract/-/es-abstract-1.23.9.tgz", @@ -4589,11 +4678,6 @@ "reusify": "^1.0.4" } }, - "node_modules/fflate": { - "version": "0.8.1", - "resolved": "https://registry.npmjs.org/fflate/-/fflate-0.8.1.tgz", - "integrity": "sha512-/exOvEuc+/iaUm105QIiOt4LpBdMTWsXxqR0HDF35vx3fmaKzw7354gTilCh5rkzEt8WYyG//ku3h3nRmd7CHQ==" - }, "node_modules/file-entry-cache": { "version": "6.0.1", "resolved": "https://registry.npmjs.org/file-entry-cache/-/file-entry-cache-6.0.1.tgz", @@ -4688,6 +4772,20 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/fs-extra": { + "version": "11.3.1", + "resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-11.3.1.tgz", + "integrity": "sha512-eXvGGwZ5CL17ZSwHWd3bbgk7UUpF6IFHtP57NYYakPvHOs8GDgDe5KJI36jIJzDkJ6eJjuzRA8eBQb6SkKue0g==", + "license": "MIT", + "dependencies": { + "graceful-fs": "^4.2.0", + "jsonfile": "^6.0.1", + "universalify": "^2.0.0" + }, + "engines": { + "node": ">=14.14" + } + }, "node_modules/fs.realpath": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/fs.realpath/-/fs.realpath-1.0.0.tgz", @@ -4934,6 +5032,12 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/graceful-fs": { + "version": "4.2.11", + "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.11.tgz", + "integrity": "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ==", + "license": "ISC" + }, "node_modules/graphemer": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/graphemer/-/graphemer-1.4.0.tgz", @@ -5585,6 +5689,18 @@ "dev": true, "license": "MIT" }, + "node_modules/jsonfile": { + "version": "6.2.0", + "resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-6.2.0.tgz", + "integrity": "sha512-FGuPw30AdOIUTRMC2OMRtQV+jkVj2cfPqSeWXv1NEAJ1qZ5zb1X6z1mFhbfOB/iy3ssJCD+3KuZ8r8C3uVFlAg==", + "license": "MIT", + "dependencies": { + "universalify": "^2.0.0" + }, + "optionalDependencies": { + "graceful-fs": "^4.1.6" + } + }, "node_modules/jsx-ast-utils": { "version": "3.3.5", "resolved": "https://registry.npmjs.org/jsx-ast-utils/-/jsx-ast-utils-3.3.5.tgz", @@ -5731,6 +5847,12 @@ "dev": true, "license": "MIT" }, + "node_modules/node-int64": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/node-int64/-/node-int64-0.4.0.tgz", + "integrity": "sha512-O5lz91xSOeoXP6DulyHfllpq+Eg00MWitZIbtPfoSEvqIHdl5gfcY6hYzDWnj0qD5tz52PI08u9qUvSVeUBeHw==", + "license": "MIT" + }, "node_modules/object-assign": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/object-assign/-/object-assign-4.1.1.tgz", @@ -6137,6 +6259,12 @@ "node": ">= 0.6.0" } }, + "node_modules/process-nextick-args": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/process-nextick-args/-/process-nextick-args-2.0.1.tgz", + "integrity": "sha512-3ouUOpQhtgrbOa17J7+uxOTpITYWaGP7/AhoR3+A+/1e9skrzelGi/dXzEYyvbxubEF6Wn2ypscTKiKJFFn1ag==", + "license": "MIT" + }, "node_modules/process-warning": { "version": "2.3.2", "resolved": "https://registry.npmjs.org/process-warning/-/process-warning-2.3.2.tgz", @@ -7033,6 +7161,28 @@ "integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==", "dev": true }, + "node_modules/universalify": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/universalify/-/universalify-2.0.1.tgz", + "integrity": "sha512-gptHNQghINnc/vTGIk0SOFGFNXw7JVrlRUtConJRlvaw6DuX0wO5Jeko9sWrMBhh+PsYAZ7oXAiOnf/UKogyiw==", + "license": "MIT", + "engines": { + "node": ">= 10.0.0" + } + }, + "node_modules/unzipper": { + "version": "0.12.3", + "resolved": "https://registry.npmjs.org/unzipper/-/unzipper-0.12.3.tgz", + "integrity": "sha512-PZ8hTS+AqcGxsaQntl3IRBw65QrBI6lxzqDEL7IAo/XCEqRTKGfOX56Vea5TH9SZczRVxuzk1re04z/YjuYCJA==", + "license": "MIT", + "dependencies": { + "bluebird": "~3.7.2", + "duplexer2": "~0.1.4", + "fs-extra": "^11.2.0", + "graceful-fs": "^4.2.2", + "node-int64": "^0.4.0" + } + }, "node_modules/uri-js": { "version": "4.4.1", "resolved": "https://registry.npmjs.org/uri-js/-/uri-js-4.4.1.tgz", @@ -7170,6 +7320,16 @@ "dev": true, "license": "ISC" }, + "node_modules/yazl": { + "version": "3.3.1", + "resolved": "https://registry.npmjs.org/yazl/-/yazl-3.3.1.tgz", + "integrity": "sha512-BbETDVWG+VcMUle37k5Fqp//7SDOK2/1+T7X8TD96M3D9G8jK5VLUdQVdVjGi8im7FGkazX7kk5hkU8X4L5Bng==", + "dev": true, + "license": "MIT", + "dependencies": { + "buffer-crc32": "^1.0.0" + } + }, "node_modules/yocto-queue": { "version": "0.1.0", "resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-0.1.0.tgz", diff --git a/package.json b/package.json index 39273f2..adf17b0 100644 --- a/package.json +++ b/package.json @@ -17,9 +17,12 @@ "@types/aws-lambda": "^8.10.83", "@types/geojson": "^7946.0.8", "@types/node": "^20.10.4", + "@types/unzipper": "^0.10.11", + "@types/yazl": "^3.3.0", "aws-cdk": "^2.1004.0", "constructs": "^10.4.2", - "esbuild": "^0.19.9" + "esbuild": "^0.19.9", + "yazl": "^3.3.1" }, "scripts": { "build": "tsc", @@ -40,7 +43,7 @@ "@linzjs/geojson": "^6.9.1", "@linzjs/lambda": "^4.1.0", "aws-cdk-lib": "^2.184.1", - "fflate": "^0.8.1", - "stac-ts": "^1.0.3" + "stac-ts": "^1.0.3", + "unzipper": "^0.12.3" } } diff --git a/src/__test__/storage.test.ts b/src/__test__/storage.test.ts new file mode 100644 index 0000000..5008ea7 --- /dev/null +++ b/src/__test__/storage.test.ts @@ -0,0 +1,33 @@ +import assert from 'node:assert'; +import { createHash } from 'node:crypto'; +import { Readable } from 'node:stream'; +import { describe, it } from 'node:test'; + +import { HashTransform } from '@chunkd/fs/build/src/hash.stream.js'; +import type { LogType } from '@linzjs/lambda'; +import { ZipFile } from 'yazl'; + +describe('StorageTest', () => { + it('should unzip geopackage file', async () => { + process.env['KX_API_KEY'] = 'test'; + const { extractAndWritePackage } = await import('../storage.ts'); + + const zipFile = new ZipFile(); + zipFile.addBuffer(Buffer.from('hello'), 'test.gpkg'); + zipFile.end(); + + const stream = Readable.from(zipFile.outputStream); + const targetFileUri = new URL('file:///tmp/output.gpkg'); + const ht = new HashTransform('sha256'); + const datasetId = 0; + const log: Partial = { + debug: () => {}, + info: () => {}, + }; + + await extractAndWritePackage(stream, targetFileUri, ht, datasetId, log as LogType); + + const expectedHash = createHash('sha256').update('hello').digest('hex'); + assert.strictEqual(ht.digestHex, expectedHash); + }); +}); diff --git a/src/storage.ts b/src/storage.ts index 0ddb7e2..35c3909 100644 --- a/src/storage.ts +++ b/src/storage.ts @@ -1,13 +1,13 @@ -import { PassThrough } from 'node:stream'; -import type { ReadableStream } from 'node:stream/web'; import { createGzip } from 'node:zlib'; import { S3Client } from '@aws-sdk/client-s3'; import { fsa } from '@chunkd/fs'; import { HashTransform } from '@chunkd/fs/build/src/hash.stream.js'; import { FsAwsS3 } from '@chunkd/fs-aws'; -import type { LambdaRequest } from '@linzjs/lambda'; -import * as fflate from 'fflate'; +import type { LambdaRequest, LogType } from '@linzjs/lambda'; +import { Readable } from 'stream'; +import type { Entry, ParseStream } from 'unzipper'; +import { Parse } from 'unzipper'; import { CachePrefix, ExportLayerId, kx } from './config.ts'; import type { KxDatasetExport, KxDatasetVersionDetail } from './kx.ts'; @@ -49,6 +49,37 @@ export async function listStacFiles(): Promise { return _fileList; } +export function extractAndWritePackage( + stream: Readable, + targetFileUri: URL, + ht: HashTransform, + datasetId: number, + log: LogType, +): Promise { + const unzipperParser: ParseStream = Parse(); + + return stream + .pipe(unzipperParser) + .on('entry', (entry: Entry) => { + log.debug({ datasetId, path: entry.path }, 'Export:Zip:File'); + if (entry.path.endsWith(PackageExtension)) { + log.info({ datasetId, path: entry.path, target: targetFileUri.href }, 'Ingest:Read:Start'); + const gzipOut = entry.pipe(ht).pipe(createGzip({ level: 9 })); + void fsa.write(targetFileUri, gzipOut, { + contentType: 'application/geopackage+vnd.sqlite3', + contentEncoding: 'gzip', + }); + gzipOut.on('finish', () => { + unzipperParser.end(); + stream.destroy(); + }); + } else { + entry.autodrain(); + } + }) + .promise(); +} + /** Ingest the export into our cache */ export async function ingest( req: LambdaRequest, @@ -98,49 +129,19 @@ export async function ingest( return false; } - const pt = new PassThrough(); - const ht = new HashTransform('sha256'); - const gzipOut = pt.pipe(ht).pipe(createGzip({ level: 9 })); - - let writeProm: Promise | undefined; - let fileName: string | null = null; - - const unzip = new fflate.Unzip((file) => { - req.log.debug({ datasetId: dataset.id, datasetUrl: datasetUrl.href, path: file.name }, 'Export:Zip:File'); - if (!file.name.endsWith(PackageExtension)) return; - if (fileName != null) throw Error(`Duplicate export package: ${fileName} vs ${file.name}`); - fileName = file.name; - file.ondata = (err, data, final): void => { - if (err) throw err; - if (writeProm == null) { - req.log.info( - { datasetId: dataset.id, datasetUrl: datasetUrl.href, path: file.name, target: targetFileUri.href }, - 'Ingest:Read:Start', - ); - writeProm = fsa.write(targetFileUri, gzipOut, { - contentType: 'application/geopackage+vnd.sqlite3', - contentEncoding: 'gzip', - }); - } - if (data) pt.write(data); - if (final) pt.end(); - }; - - file.start(); - }); - unzip.register(fflate.UnzipInflate); - req.log.info( { datasetId: dataset.id, datasetUrl: datasetUrl.href, source: nextLocation, status: source.status }, 'Ingest:Read:Http', ); - for await (const chunk of source.body as ReadableStream) unzip.push(chunk); + const ht = new HashTransform('sha256'); + const stream = Readable.fromWeb(source.body); + await extractAndWritePackage(stream, targetFileUri, ht, dataset.id, req.log); + req.log.info( { datasetId: dataset.id, datasetUrl: datasetUrl.href, target: targetFileUri.href }, 'Ingest:Read:Complete', ); - if (writeProm != null) await writeProm; req.log.info( { datasetId: dataset.id,