Skip to content

Commit e016a87

Browse files
committed
Revert "Revert "Merge pull request #11 from neo4j/chunking-fixes""
This reverts commit 5da2482.
1 parent 5da2482 commit e016a87

File tree

4 files changed

+120
-21
lines changed

4 files changed

+120
-21
lines changed

lib/internal/buf.js

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,18 @@ class HeapBuffer extends BaseBuffer {
390390
}
391391

392392
getSlice ( start, length ) {
393-
return new HeapBuffer( this._buffer.slice( start, start + length ) );
393+
if( this._buffer.slice ) {
394+
return new HeapBuffer( this._buffer.slice( start, start + length ) );
395+
} else {
396+
// Some platforms (eg. phantomjs) don't support slice, so fall back to a copy
397+
// We do this rather than return a SliceBuffer, because sliceBuffer cannot
398+
// be passed to native network write ops etc - we need ArrayBuffer for that
399+
let copy = new HeapBuffer(length);
400+
for (var i = 0; i < length; i++) {
401+
copy.putUInt8( i, this.getUInt8( i + start ) );
402+
};
403+
return copy;
404+
}
394405
}
395406

396407
/**
@@ -463,20 +474,28 @@ class CombinedBuffer extends BaseBuffer {
463474
return buffer.getUInt8(position);
464475
}
465476
};
466-
};
477+
}
467478

468479
getInt8 ( position ) {
469480
// Surely there's a faster way to do this.. some sort of lookup table thing?
470481
for (let i = 0; i < this._buffers.length; i++) {
471482
let buffer = this._buffers[i];
472483
// If the position is not in the current buffer, skip the current buffer
473-
if( position > buffer.length ) {
484+
if( position >= buffer.length ) {
474485
position -= buffer.length;
475486
} else {
476487
return buffer.getInt8(position);
477488
}
478489
};
479-
};
490+
}
491+
492+
getFloat64 ( position ) {
493+
// At some point, a more efficient impl. For now, we copy the 8 bytes
494+
// we want to read and depend on the platform impl of IEEE 754.
495+
let b = alloc(8);
496+
for (var i = 0; i < 8; i++) { b.putUInt8(i, this.getUInt8( position + i )); };
497+
return b.getFloat64(0);
498+
}
480499
}
481500

482501
/**
@@ -492,23 +511,23 @@ class NodeBuffer extends BaseBuffer {
492511

493512
getUInt8 (position) {
494513
return this._buffer.readUInt8( position );
495-
};
514+
}
496515

497516
getInt8 (position) {
498517
return this._buffer.readInt8( position );
499-
};
518+
}
500519

501520
getFloat64 (position) {
502521
return this._buffer.readDoubleBE(position);
503-
};
522+
}
504523

505524
putUInt8 (position, val) {
506525
this._buffer.writeUInt8( val, position );
507-
};
526+
}
508527

509528
putInt8 (position, val) {
510529
this._buffer.writeInt8( val, position );
511-
};
530+
}
512531

513532
putFloat64 ( position, val ) {
514533
this._buffer.writeDoubleBE( val, position );
@@ -530,7 +549,7 @@ class NodeBuffer extends BaseBuffer {
530549

531550
getSlice ( start, length ) {
532551
return new NodeBuffer( this._buffer.slice( start, start + length ) );
533-
};
552+
}
534553
}
535554

536555
// Use HeapBuffer by default, unless Buffer API is available, see below

lib/internal/chunking.js

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -156,19 +156,14 @@ class Dechunker {
156156
}
157157

158158
IN_CHUNK ( buf ) {
159-
160-
if ( this._chunkSize < buf.remaining() ) {
161-
// Current packet is larger than current chunk, slice of the chunk
159+
if ( this._chunkSize <= buf.remaining() ) {
160+
// Current packet is larger than current chunk, or same size:
162161
this._currentMessage.push( buf.readSlice( this._chunkSize ) );
163162
return this.AWAITING_CHUNK;
164-
} else if ( this._chunkSize == buf.remaining() ) {
165-
// Current packet perfectly maps to current chunk
166-
this._currentMessage.push( buf.readSlice( buf.length ) );
167-
return this.AWAITING_CHUNK;
168163
} else {
169164
// Current packet is smaller than the chunk we're reading, split the current chunk itself up
170-
this._chunkSize -= data.remaining();
171-
this._currentMessage.push( buf.readSlice( buf.length ) );
165+
this._chunkSize -= buf.remaining();
166+
this._currentMessage.push( buf.readSlice( buf.remaining() ) );
172167
return this.IN_CHUNK;
173168
}
174169
}

test/internal/buf.test.js

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919

2020
var alloc = require('../../build/node/internal/buf').alloc;
21+
var CombinedBuffer = require('../../build/node/internal/buf').CombinedBuffer;
2122
var utf8 = require('../../build/node/internal/utf8');
2223
var Unpacker = require("../../build/node/internal/packstream.js").Unpacker;
2324

@@ -63,19 +64,55 @@ describe('buffers', function() {
6364
});
6465

6566
it('should decode list correctly', function() {
66-
//Given
67-
67+
// Given
6868
var b = alloc(5);
6969
b.writeUInt8(0x92);
7070
b.writeUInt8(0x81);
7171
b.writeUInt8(0x61);
7272
b.writeUInt8(0x81);
7373
b.writeUInt8(0x62);
7474
b.reset();
75+
76+
// When
7577
var data = new Unpacker().unpack( b );
78+
79+
// Then
7680
expect(data[0]).toBe('a');
7781
expect(data[1]).toBe('b');
82+
});
83+
});
84+
85+
describe('CombinedBuffer', function() {
86+
it('should read int8', function() {
87+
// Given
88+
var b1 = alloc(1);
89+
var b2 = alloc(1);
90+
b1.putInt8(0, 1);
91+
b2.putInt8(0, 2);
92+
93+
var b = new CombinedBuffer([b1,b2]);
7894

95+
// When
96+
var first = b.readInt8();
97+
var second = b.readInt8();
98+
99+
// Then
100+
expect(first).toBe(1);
101+
expect(second).toBe(2);
102+
});
103+
104+
it('should read divided float64', function() {
105+
// Given
106+
var inner = alloc(8);
107+
inner.putFloat64(0, 0.1);
108+
109+
var b = new CombinedBuffer([inner.readSlice(4),inner.readSlice(4)]);
110+
111+
// When
112+
var read = b.readFloat64();
113+
114+
// Then
115+
expect(read).toBe(0.1);
79116
});
80117
});
81118

test/internal/chunking.test.js

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
var Chunker = require('../../build/node/internal/chunking').Chunker;
2121
var Dechunker = require('../../build/node/internal/chunking').Dechunker;
2222
var alloc = require('../../build/node/internal/buf').alloc;
23+
var CombinedBuffer = require('../../build/node/internal/buf').CombinedBuffer;
2324

2425
describe('Chunker', function() {
2526
it('should chunk simple data', function() {
@@ -83,6 +84,41 @@ describe('Dechunker', function() {
8384
expect( messages.length ).toBe( 1 );
8485
expect( messages[0].toHex() ).toBe( "00 01 00 02 00 03 " );
8586
});
87+
88+
it('should handle message split at any point', function() {
89+
// Given
90+
var ch = new TestChannel();
91+
var chunker = new Chunker(ch);
92+
93+
// And given the following message
94+
chunker.writeInt8(1);
95+
chunker.writeInt16(2);
96+
chunker.writeInt32(3);
97+
chunker.writeUInt8(4);
98+
chunker.writeUInt32(5);
99+
chunker.messageBoundary();
100+
chunker.flush();
101+
102+
var chunked = ch.toBuffer();
103+
104+
// When I try splitting this chunked data at every possible position
105+
// into two separate buffers, and send those to the dechunker
106+
for (var i = 1; i < chunked.length; i++) {
107+
var slice1 = chunked.getSlice( 0, i );
108+
var slice2 = chunked.getSlice( i, chunked.length - i );
109+
110+
// Dechunk the slices
111+
var messages = [];
112+
var dechunker = new Dechunker();
113+
dechunker.onmessage = function(buffer) { messages.push(buffer); };
114+
dechunker.write( slice1 );
115+
dechunker.write( slice2 );
116+
117+
// Then, the output should be correct
118+
expect( messages.length ).toBe( 1 );
119+
expect( messages[0].toHex() ).toBe( "01 00 02 00 00 00 03 04 00 00 00 05 " );
120+
};
121+
});
86122
});
87123

88124
function TestChannel() {
@@ -101,6 +137,18 @@ TestChannel.prototype.toHex = function() {
101137
return out;
102138
};
103139

140+
TestChannel.prototype.toBuffer = function() {
141+
return new CombinedBuffer( this._written );
142+
};
143+
144+
TestChannel.prototype.toHex = function() {
145+
var out = "";
146+
for( var i=0; i<this._written.length; i++ ) {
147+
out += this._written[i].toHex();
148+
}
149+
return out;
150+
};
151+
104152
function bytes() {
105153
var b = alloc( arguments.length );
106154
for( var i=0; i<arguments.length; i++ ) {

0 commit comments

Comments
 (0)