Skip to content

Commit 29d3780

Browse files
authored
feat: add async chunk compress (#6)
1 parent 36a85bd commit 29d3780

File tree

3 files changed

+104
-86
lines changed

3 files changed

+104
-86
lines changed

index.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ module.exports = {
55
createUncompressStream: function (opts) {
66
return new UncompressStream(opts)
77
}
8-
, createCompressStream: function () {
9-
return new CompressStream()
8+
, createCompressStream: function (opts) {
9+
return new CompressStream(opts)
1010
}
1111
}

lib/compress-stream.js

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ var Transform = require('stream').Transform
2020
, COMPRESSED = bufferFrom([ 0x00 ])
2121
, UNCOMPRESSED = bufferFrom([ 0x01 ])
2222

23-
, CompressStream = function () {
23+
, CompressStream = function (opts) {
2424
if (!(this instanceof CompressStream))
25-
return new CompressStream()
25+
return new CompressStream(opts)
2626

27+
this.asyncCompress = (opts && typeof(opts.asyncCompress) === 'boolean') ? opts.asyncCompress : false
2728
Transform.call(this)
2829

2930
// first push the identifier frame
@@ -61,36 +62,66 @@ CompressStream.prototype._uncompressed = function (chunk) {
6162
/**
6263
* Some compression benchmarks :
6364
*
64-
* i) Chunking in transform with snappy.compressSync (the new implementation)
65-
* ii) Chunking from outside with compressStream.write (using original snappy.compress)
65+
* i) Sync compress via snappy.compressSync ({asyncCompress:false}) default
66+
* ii) Async compress via snappy.compress ({asyncCompress:true})
6667
* iii) No chunking (Original)
6768
*
68-
* | Size | in transform | compressStream.write | orginal (no chunking) |
69-
* |-------------------|--------------|----------------------|-----------------------|
70-
* | 10kb (1 chunk) | 0.0229 ms | 0.0385 ms | 0.0388 ms |
71-
* | 100kb (2 chunks) | 0.0562 ms | 0.1051 ms | 0.0844 ms |
72-
* | 1000kb (16 chunks)| 0.382 ms | 0.7971 ms | 0.1998 ms |
69+
* | Size | sync compress | async compress | original (no chunking) |
70+
* |--------------------|---------------|----------------|------------------------|
71+
* | 10kb (1 chunk) | 0.0229 ms | 0.0385 ms | 0.0388 ms |
72+
* | 100kb (2 chunks) | 0.0562 ms | 0.1051 ms | 0.0844 ms |
73+
* | 1000kb (16 chunks) | 0.382 ms | 0.7971 ms | 0.1998 ms |
7374
*
7475
*/
7576

77+
7678
CompressStream.prototype._transform = function(chunk, enc, callback) {
77-
new Promise(() => {
79+
self = this;
80+
81+
function asyncCompressNext(startFrom) {
82+
const endAt = startFrom + Math.min(chunk.length - startFrom, UNCOMPRESSED_CHUNK_SIZE);
83+
const bytesChunk = chunk.slice(startFrom, endAt);
84+
snappy.compress(bytesChunk, function(err, compressed) {
85+
if (err) {
86+
callback(err)
87+
} else {
88+
89+
if (compressed.length < bytesChunk.length)
90+
self._compressed(bytesChunk, compressed)
91+
else
92+
self._uncompressed(bytesChunk)
93+
94+
if (endAt < chunk.length) {
95+
asyncCompressNext(endAt)
96+
} else {
97+
callback()
98+
}
99+
}
100+
})
101+
}
102+
103+
function syncCompress() {
78104
try {
79105
for (let startFrom = 0; startFrom < chunk.length; startFrom += UNCOMPRESSED_CHUNK_SIZE) {
80106
const endAt = startFrom + Math.min(chunk.length - startFrom, UNCOMPRESSED_CHUNK_SIZE);
81107
const bytesChunk = chunk.slice(startFrom, endAt);
82108
const compressed = snappy.compressSync(bytesChunk)
109+
83110
if (compressed.length < bytesChunk.length)
84-
this._compressed(bytesChunk, compressed)
111+
self._compressed(bytesChunk, compressed)
85112
else
86-
this._uncompressed(bytesChunk)
87-
113+
self._uncompressed(bytesChunk)
88114
}
89115
callback();
90116
} catch (err) {
91117
return callback(err);
92118
}
93-
}).catch(e => console.log(e))
119+
}
120+
if (this.asyncCompress) {
121+
asyncCompressNext(0)
122+
} else {
123+
syncCompress();
124+
}
94125
}
95126

96127
module.exports = CompressStream

test/compress-test.js

Lines changed: 57 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,79 +1,66 @@
1-
var spawn = require('child_process').spawn
2-
, createCompressStream = require('../').createCompressStream
3-
, test = require('tap').test
4-
, largerInput = require('fs').readFileSync(__filename)
5-
, largerInputString = largerInput.toString()
1+
const spawn = require('child_process').spawn,
2+
createCompressStream = require('../').createCompressStream,
3+
test = require('tap').test,
4+
largerInput = require('fs').readFileSync(__filename)
65

76
const UNCOMPRESSED_CHUNK_SIZE = 65536
87
let superLargeInput = largerInput;
98
for (let i = largerInput.length; i <= UNCOMPRESSED_CHUNK_SIZE; i += largerInput.length) {
109
superLargeInput = Buffer.concat([superLargeInput, largerInput]);
1110
}
12-
const superLargeInputString = superLargeInput.toString();
1311

14-
test('compress small string', function (t) {
15-
var child = spawn('python', [ '-m', 'snappy', '-d' ])
16-
, compressStream = createCompressStream()
17-
, data = ''
18-
19-
child.stdout.on('data', function (chunk) {
20-
data = data + chunk.toString()
21-
})
22-
23-
child.stdout.on('end', function () {
24-
t.equal(data, 'beep boop')
25-
t.end()
26-
})
27-
28-
child.stderr.pipe(process.stderr)
29-
30-
compressStream.pipe(child.stdin)
31-
32-
compressStream.write('beep boop')
33-
compressStream.end()
34-
})
35-
36-
test('compress large string', function (t) {
37-
var child = spawn('python', [ '-m', 'snappy', '-d' ])
38-
, compressStream = createCompressStream()
39-
, data = ''
40-
41-
child.stdout.on('data', function (chunk) {
42-
data = data + chunk.toString()
43-
})
44-
45-
child.stdout.on('end', function () {
46-
t.equal(data, largerInputString)
47-
t.end()
12+
[{
13+
testName: "small",
14+
testString: "beep boop",
15+
asyncCompress: true
16+
}, {
17+
testName: "small",
18+
testString: "beep boop",
19+
asyncCompress: false
20+
}, {
21+
testName: "large",
22+
testString: largerInput,
23+
asyncCompress: true
24+
}, {
25+
testName: "large",
26+
testString: largerInput,
27+
asyncCompress: false
28+
}, {
29+
testName: "super large",
30+
testString: superLargeInput,
31+
asyncCompress: true
32+
}, {
33+
testName: "super large",
34+
testString: superLargeInput,
35+
asyncCompress: false
36+
}].forEach(({
37+
testName,
38+
testString,
39+
asyncCompress
40+
}) => {
41+
42+
test(`compress ${testName} input - asyncCompress=${asyncCompress}`, function(t) {
43+
const child = spawn('python', ['-m', 'snappy', '-d']),
44+
compressStream = createCompressStream({
45+
asyncCompress
46+
})
47+
let data = ''
48+
49+
child.stdout.on('data', function(chunk) {
50+
data = data + chunk.toString()
51+
})
52+
53+
child.stdout.on('end', function() {
54+
t.equal(data, testString.toString())
55+
t.end()
56+
})
57+
58+
child.stderr.pipe(process.stderr)
59+
60+
compressStream.pipe(child.stdin)
61+
62+
compressStream.write(testString)
63+
compressStream.end()
4864
})
4965

50-
child.stderr.pipe(process.stderr)
51-
52-
compressStream.pipe(child.stdin)
53-
54-
compressStream.write(largerInputString)
55-
compressStream.end()
56-
})
57-
58-
59-
test('compress very very large string', function (t) {
60-
var child = spawn('python', [ '-m', 'snappy', '-d' ])
61-
, compressStream = createCompressStream()
62-
, data = ''
63-
64-
child.stdout.on('data', function (chunk) {
65-
data = data + chunk.toString()
66-
})
67-
68-
child.stdout.on('end', function () {
69-
t.equal(data, superLargeInputString)
70-
t.end()
71-
})
72-
73-
child.stderr.pipe(process.stderr)
74-
75-
compressStream.pipe(child.stdin)
76-
77-
compressStream.write(superLargeInputString)
78-
compressStream.end()
79-
})
66+
})

0 commit comments

Comments
 (0)