Skip to content

Commit 2dc47b3

Browse files
committed
fixup! WIP compressed write
1 parent eacfdff commit 2dc47b3

File tree

2 files changed

+48
-7
lines changed

2 files changed

+48
-7
lines changed

lib/protocol/PacketWriter.js

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@ var Buffer = require('safe-buffer').Buffer;
1212
var BufferList = require('./BufferList');
1313
var EventEmitter = require('events').EventEmitter;
1414
var Util = require('util');
15+
var Zlib = require('zlib');
1516

1617
module.exports = PacketWriter;
1718
function PacketWriter() {
18-
this._buffer = null;
19-
this._offset = 0;
20-
this._sync = false;
19+
this._buffer = null;
20+
this._deflateQueue = [];
21+
this._deflating = false;
22+
this._offset = 0;
23+
this._sync = false;
2124
}
2225
Util.inherits(PacketWriter, EventEmitter);
2326

@@ -48,10 +51,18 @@ PacketWriter.prototype.finalize = function finalize(parser) {
4851

4952
if (parser._compressed) {
5053
num = parser.incrementCompressedPacketNumber();
51-
buf = this._toCompressedPacket(num, buf);
54+
55+
if (this._sync) {
56+
buf = this._toCompressedPacket(num, buf);
57+
} else {
58+
this._toCompressedPacketAsync(num, buf);
59+
buf = null;
60+
}
5261
}
5362

54-
this.emit('data', buf);
63+
if (buf) {
64+
this.emit('data', buf);
65+
}
5566
}
5667
};
5768

@@ -239,7 +250,31 @@ PacketWriter.prototype._allocate = function _allocate(bytes) {
239250
oldBuffer.copy(this._buffer);
240251
};
241252

242-
PacketWriter.prototype._toCompressedPacket = function _toCompressedPacket(num, buf) {
253+
PacketWriter.prototype._deflateNextPacket = function _deflateNextPacket() {
254+
if (this._deflating) {
255+
return;
256+
}
257+
258+
var item = this._deflateQueue.shift();
259+
var buf = item[1];
260+
var num = item[0];
261+
var len = buf.length;
262+
var self = this;
263+
264+
this._deflating = true;
265+
Zlib.deflate(buf, function (err, data) {
266+
if (err) {
267+
self.emit('error', err);
268+
return;
269+
}
270+
271+
self._deflating = false;
272+
self.emit('data', self._toCompressedPacket(num, data, len));
273+
self._deflateNextPacket();
274+
});
275+
};
276+
277+
PacketWriter.prototype._toCompressedPacket = function _toCompressedPacket(num, buf, len) {
243278
var origBuffer = this._buffer;
244279
var origOffset = this._offset;
245280

@@ -248,7 +283,7 @@ PacketWriter.prototype._toCompressedPacket = function _toCompressedPacket(num, b
248283

249284
this.writeUnsignedNumber(3, buf.length);
250285
this.writeUnsignedNumber(1, num);
251-
this.writeUnsignedNumber(3, 0);
286+
this.writeUnsignedNumber(3, (len || 0));
252287
this.writeBuffer(buf);
253288

254289
var packet = this._buffer;
@@ -259,6 +294,11 @@ PacketWriter.prototype._toCompressedPacket = function _toCompressedPacket(num, b
259294
return packet;
260295
};
261296

297+
PacketWriter.prototype._toCompressedPacketAsync = function _toCompressedPacketAsync(num, buf) {
298+
this._deflateQueue.push(buf);
299+
this._deflateNextPacket();
300+
};
301+
262302
PacketWriter.prototype._toPacket = function _toPacket(num, buf) {
263303
var origBuffer = this._buffer;
264304
var origOffset = this._offset;

lib/protocol/Protocol.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ Protocol.prototype._emitPacket = function(packet) {
308308
var packetWriter = new PacketWriter();
309309
var protocol = this;
310310

311+
packetWriter.on('error', this.handleParserError.bind(this));
311312
packetWriter.on('data', function (data) {
312313
protocol.emit('data', data);
313314
});

0 commit comments

Comments
 (0)