Skip to content
11 changes: 11 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@
"@smithy/shared-ini-file-loader": "^2.2.8",
"@smithy/util-retry": "^2.2.0",
"@vscode/debugprotocol": "^1.57.0",
"@zip.js/zip.js": "^2.7.41",
"adm-zip": "^0.5.10",
"amazon-states-language-service": "^1.11.0",
"archiver": "^7.0.1",
Expand Down
148 changes: 115 additions & 33 deletions packages/core/src/shared/utilities/zipStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,70 +2,152 @@
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

import archiver from 'archiver'
import { WritableStreamBuffer } from 'stream-buffers'
import crypto from 'crypto'
import { getLogger } from '../logger'
import { readFileAsString } from '../filesystemUtilities'
// Use require instead of import since this package doesn't support commonjs
const { ZipWriter, TextReader } = require('@zip.js/zip.js')
import { getLogger } from '../logger/logger'

export interface ZipStreamResult {
sizeInBytes: number
md5: string
hash: string
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

streamBuffer: WritableStreamBuffer
}

export type ZipStreamProps = {
hashAlgorithm: 'md5' | 'sha256'
maxNumberOfFileStreams: number
compressionLevel: number
}

const defaultProps: ZipStreamProps = {
hashAlgorithm: 'sha256',
maxNumberOfFileStreams: 100,
compressionLevel: 1,
}

/**
* Creates in-memory zip archives that output to a stream buffer.
*
* Example usage:
* ```ts
* const zipStream = new ZipStream()
* const zipStream = new ZipStream({
hashAlgorithm: 'sha256',
maxNumberOfFileStreams: 150,
compressionLevel: 1,
memLevel: 9,
})
* zipStream.writeString('Hello World', 'file1.txt')
* zipStream.writeFile('/path/to/some/file.txt', 'file2.txt')
* const result = await zipStream.finalize()
* console.log(result) // { sizeInBytes: ..., md5: ..., streamBuffer: ... }
* const result = await zipStream.finalize([optional onProgress handler, called 1x per sec])
* console.log(result) // { sizeInBytes: ..., hash: ..., streamBuffer: ... }
* ```
*/
export class ZipStream {
private _archive: archiver.Archiver
// TypeScript compiler is confused about using ZipWriter as a type
// @ts-ignore
private _zipWriter: ZipWriter<WritableStream>
private _streamBuffer: WritableStreamBuffer
private _hasher: crypto.Hash
private _numberOfFilesToStream: number = 0
private _numberOfFilesSucceeded: number = 0
private _filesToZip: [string, string][] = []
private _filesBeingZipped: number = 0
private _maxNumberOfFileStreams: number
boundFileCompletionCallback: (progress: number, total: number) => void
boundFileStartCallback: (totalBytes: number) => void

constructor(props: Partial<ZipStreamProps> = {}) {
// Allow any user-provided values to override default values
const mergedProps = { ...defaultProps, ...props }
const { hashAlgorithm, compressionLevel, maxNumberOfFileStreams } = mergedProps

this.boundFileCompletionCallback = this.onFinishedCompressingFile.bind(this)
this.boundFileStartCallback = this.onStartCompressingFile.bind(this)

this._zipWriter = new ZipWriter(
new WritableStream({
write: (chunk) => {
this._streamBuffer.write(chunk)
this._hasher.update(chunk)
},
}),
{ level: compressionLevel }
)
this._maxNumberOfFileStreams = maxNumberOfFileStreams

constructor() {
this._archive = archiver('zip')
this._streamBuffer = new WritableStreamBuffer()
this._archive.pipe(this._streamBuffer)
this._hasher = crypto.createHash('md5')

this._archive.on('data', (data) => {
this._hasher.update(data)
})
this._archive.on('error', (err) => {
throw err
})
this._archive.on('warning', (err) => {
getLogger().warn(err)
})
this._hasher = crypto.createHash(hashAlgorithm)
}

public onStartCompressingFile(totalBytes: number) {
this._filesBeingZipped++
}

public onFinishedCompressingFile(computedsize: number) {
this._numberOfFilesSucceeded++
this._filesBeingZipped--

if (this._filesToZip.length > 0 && this._filesBeingZipped < this._maxNumberOfFileStreams) {
const [fileToZip, path] = this._filesToZip.shift()!
void readFileAsString(fileToZip).then((content) => {
return this._zipWriter.add(path, new TextReader(content), {
onend: this.boundFileCompletionCallback,
onstart: this.boundFileStartCallback,
})
})
}
}

public writeString(data: string, path: string) {
this._archive.append(Buffer.from(data, 'utf-8'), { name: path })
return this._zipWriter.add(path, new TextReader(data))
}

public writeFile(file: string, path: string) {
this._archive.file(file, { name: path })
// We use _numberOfFilesToStream to make sure we don't finalize too soon
// (before the progress event has been fired for the last file)
// The problem is that we can't rely on progress.entries.total,
// because files can be added to the queue faster
// than the progress event is fired
this._numberOfFilesToStream++
// console.log('this._numberOfFilesToStream is now', this._numberOfFilesToStream)
// We only start zipping another file if we're under our limit
// of concurrent file streams
if (this._filesBeingZipped < this._maxNumberOfFileStreams) {
void readFileAsString(file).then((content) => {
return this._zipWriter.add(path, new TextReader(content), {
onend: this.boundFileCompletionCallback,
onstart: this.boundFileStartCallback,
})
})
} else {
// Queue it for later (see "write" event)
this._filesToZip.push([file, path])
}
}

public finalize(): Promise<ZipStreamResult> {
return new Promise((resolve, reject) => {
void this._archive.finalize()
this._archive.on('finish', () => {
resolve({
sizeInBytes: this._archive.pointer(),
md5: this._hasher.digest('base64'),
streamBuffer: this._streamBuffer,
})
public async finalize(onProgress?: (percentComplete: number) => void): Promise<ZipStreamResult> {
let finished = false
// We need to poll to check for all the file streams to be completely processed
// -- we are keeping track of this via the "progress" event handler
while (!finished) {
finished = await new Promise((resolve) => {
setTimeout(() => {
getLogger().verbose('success is', this._numberOfFilesSucceeded, '/', this._numberOfFilesToStream)
onProgress?.(Math.floor((100 * this._numberOfFilesSucceeded) / this._numberOfFilesToStream))
resolve(this._numberOfFilesToStream <= this._numberOfFilesSucceeded)
}, 1000)
})
})
}
// We're done streaming all files, so we can close the zip stream

await this._zipWriter.close()
return {
sizeInBytes: this._streamBuffer.size(),
hash: this._hasher.digest('base64'),
streamBuffer: this._streamBuffer,
}
}
}
10 changes: 5 additions & 5 deletions packages/core/src/test/shared/utilities/zipStream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ describe('zipStream', function () {
})

it('Should create a zip stream from text content', async function () {
const zipStream = new ZipStream()
zipStream.writeString('foo bar', 'file.txt')
const zipStream = new ZipStream({ hashAlgorithm: 'md5' })
await zipStream.writeString('foo bar', 'file.txt')
const result = await zipStream.finalize()

const zipBuffer = result.streamBuffer.getContents()
Expand All @@ -35,15 +35,15 @@ describe('zipStream', function () {
.createHash('md5')
.update(await fs.readFileBytes(zipPath))
.digest('base64')
assert.strictEqual(result.md5, expectedMd5)
assert.strictEqual(result.hash, expectedMd5)
assert.strictEqual(result.sizeInBytes, (await fs.stat(zipPath)).size)
})

it('Should create a zip stream from file', async function () {
const testFilePath = path.join(tmpDir, 'test.txt')
await fs.writeFile(testFilePath, 'foo bar')

const zipStream = new ZipStream()
const zipStream = new ZipStream({ hashAlgorithm: 'md5' })
zipStream.writeFile(testFilePath, 'file.txt')
const result = await zipStream.finalize()

Expand All @@ -57,7 +57,7 @@ describe('zipStream', function () {
.createHash('md5')
.update(await fs.readFileBytes(zipPath))
.digest('base64')
assert.strictEqual(result.md5, expectedMd5)
assert.strictEqual(result.hash, expectedMd5)
assert.strictEqual(result.sizeInBytes, (await fs.stat(zipPath)).size)
})
})
Loading