Skip to content

Commit d650cbf

Browse files
committed
Byte arrays support
This commit makes driver handle writing and reading of byte arrays. They are represented as instances of Int8Array class. Driver will fail-fast when server does not support byte arrays and will not even attempt to send the given array.
1 parent 021e9a5 commit d650cbf

File tree

5 files changed

+201
-22
lines changed

5 files changed

+201
-22
lines changed

src/v1/internal/connection-providers.js

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@ export class DirectConnectionProvider extends ConnectionProvider {
5454
}
5555

5656
acquireConnection(mode) {
57-
const connection = this._connectionPool.acquire(this._address);
58-
const connectionPromise = Promise.resolve(connection);
57+
const connectionPromise = acquireConnectionFromPool(this._connectionPool, this._address);
5958
return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback);
6059
}
6160
}
@@ -102,7 +101,7 @@ export class LoadBalancer extends ConnectionProvider {
102101
`Failed to obtain connection towards ${serverName} server. Known routing table is: ${this._routingTable}`,
103102
SESSION_EXPIRED));
104103
}
105-
return this._connectionPool.acquire(address);
104+
return acquireConnectionFromPool(this._connectionPool, address);
106105
}
107106

108107
_freshRoutingTable(accessMode) {
@@ -199,10 +198,9 @@ export class LoadBalancer extends ConnectionProvider {
199198
}
200199

201200
_createSessionForRediscovery(routerAddress) {
202-
const connection = this._connectionPool.acquire(routerAddress);
203201
// initialized connection is required for routing procedure call
204202
// server version needs to be known to decide which routing procedure to use
205-
const initializedConnectionPromise = connection.initializationCompleted();
203+
const initializedConnectionPromise = acquireConnectionFromPool(this._connectionPool, routerAddress);
206204
const connectionProvider = new SingleConnectionProvider(initializedConnectionPromise);
207205
return new Session(READ, connectionProvider);
208206
}
@@ -263,3 +261,19 @@ export class SingleConnectionProvider extends ConnectionProvider {
263261
return connectionPromise;
264262
}
265263
}
264+
265+
// todo: test that all connection providers return initialized connections
266+
267+
/**
268+
* Acquire an initialized connection from the given connection pool for the given address. Returned connection
269+
* promise will be resolved by a connection which completed initialization, i.e. received a SUCCESS response
270+
* for it's INIT message.
271+
* @param {Pool} connectionPool the connection pool to acquire connection from.
272+
* @param {string} address the server address.
273+
* @return {Promise<Connection>} the initialized connection.
274+
*/
275+
function acquireConnectionFromPool(connectionPool, address) {
276+
const connection = connectionPool.acquire(address);
277+
// initialized connection is required to be able to perform subsequent server version checks
278+
return connection.initializationCompleted();
279+
}

src/v1/internal/connector.js

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import {newError} from './../error';
2727
import ChannelConfig from './ch-config';
2828
import {parseHost, parsePort} from './util';
2929
import StreamObserver from './stream-observer';
30+
import {ServerVersion, VERSION_3_2_0} from './server-version';
3031

3132
let Channel;
3233
if( NodeChannel.available ) {
@@ -472,8 +473,17 @@ class Connection {
472473
return this._packer.packable(value, (err) => this._handleFatalError(err));
473474
}
474475

475-
setServerVersion(version) {
476-
this.server.version = version;
476+
/**
477+
* @protected
478+
*/
479+
_markInitialized(metadata) {
480+
const serverVersion = metadata.server;
481+
if (!this.server.version) {
482+
this.server.version = serverVersion;
483+
if (ServerVersion.fromString(serverVersion).compareTo(VERSION_3_2_0) < 0) {
484+
this._packer.disableByteArrays();
485+
}
486+
}
477487
}
478488
}
479489

@@ -524,7 +534,7 @@ class ConnectionState {
524534
},
525535
onCompleted: metaData => {
526536
if (metaData && metaData.server) {
527-
this._connection.setServerVersion(metaData.server);
537+
this._connection._markInitialized(metaData);
528538
}
529539
this._initialized = true;
530540
if (this._resolvePromise) {

src/v1/internal/packstream.js

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
import utf8 from "./utf8";
20-
import Integer, {int, isInt} from "../integer";
21-
import {newError} from "./../error";
19+
import utf8 from './utf8';
20+
import Integer, {int, isInt} from '../integer';
21+
import {newError} from './../error';
2222

2323
const TINY_STRING = 0x80;
2424
const TINY_LIST = 0x90;
@@ -38,6 +38,9 @@ const STRING_32 = 0xD2;
3838
const LIST_8 = 0xD4;
3939
const LIST_16 = 0xD5;
4040
const LIST_32 = 0xD6;
41+
const BYTES_8 = 0xCC;
42+
const BYTES_16 = 0xCD;
43+
const BYTES_32 = 0xCE;
4144
const MAP_8 = 0xD8;
4245
const MAP_16 = 0xD9;
4346
const MAP_32 = 0xDA;
@@ -74,6 +77,7 @@ class Structure {
7477
class Packer {
7578
constructor (channel) {
7679
this._ch = channel;
80+
this._byteArraysSupported = true;
7781
}
7882

7983
/**
@@ -95,6 +99,8 @@ class Packer {
9599
return () => this.packString(x, onError);
96100
} else if (isInt(x)) {
97101
return () => this.packInteger( x );
102+
} else if (x instanceof Int8Array) {
103+
return () => this.packBytes(x, onError);
98104
} else if (x instanceof Array) {
99105
return () => {
100106
this.packListHeader(x.length, onError);
@@ -225,6 +231,36 @@ class Packer {
225231
}
226232
}
227233

234+
packBytes(array, onError) {
235+
if(this._byteArraysSupported) {
236+
this.packBytesHeader(array.length, onError);
237+
for (let i = 0; i < array.length; i++) {
238+
this._ch.writeInt8(array[i]);
239+
}
240+
}else {
241+
onError(newError("Byte arrays are not supported by the database this driver is connected to"));
242+
}
243+
}
244+
245+
packBytesHeader(size, onError) {
246+
if (size < 0x100) {
247+
this._ch.writeUInt8(BYTES_8);
248+
this._ch.writeUInt8(size);
249+
} else if (size < 0x10000) {
250+
this._ch.writeUInt8(BYTES_16);
251+
this._ch.writeUInt8((size / 256 >> 0) % 256);
252+
this._ch.writeUInt8(size % 256);
253+
} else if (size < 0x100000000) {
254+
this._ch.writeUInt8(BYTES_32);
255+
this._ch.writeUInt8((size / 16777216 >> 0) % 256);
256+
this._ch.writeUInt8((size / 65536 >> 0) % 256);
257+
this._ch.writeUInt8((size / 256 >> 0) % 256);
258+
this._ch.writeUInt8(size % 256);
259+
} else {
260+
onError(newError('Byte arrays of size ' + size + ' are not supported'));
261+
}
262+
}
263+
228264
packMapHeader (size, onError) {
229265
if (size < 0x10) {
230266
this._ch.writeUInt8(TINY_MAP | size);
@@ -262,6 +298,10 @@ class Packer {
262298
onError(newError("Structures of size " + size + " are not supported"));
263299
}
264300
}
301+
302+
disableByteArrays() {
303+
this._byteArraysSupported = false;
304+
}
265305
}
266306

267307
/**
@@ -284,6 +324,14 @@ class Unpacker {
284324
return value;
285325
}
286326

327+
unpackBytes(size, buffer) {
328+
const value = new Int8Array(size);
329+
for (let i = 0; i < size; i++) {
330+
value[i] = buffer.readInt8();
331+
}
332+
return value;
333+
}
334+
287335
unpackMap (size, buffer) {
288336
let value = {};
289337
for(let i = 0; i < size; i++) {
@@ -344,6 +392,12 @@ class Unpacker {
344392
return this.unpackList(buffer.readUInt16(), buffer);
345393
} else if (marker == LIST_32) {
346394
return this.unpackList(buffer.readUInt32(), buffer);
395+
} else if (marker == BYTES_8) {
396+
return this.unpackBytes(buffer.readUInt8(), buffer);
397+
} else if (marker == BYTES_16) {
398+
return this.unpackBytes(buffer.readUInt16(), buffer);
399+
} else if (marker == BYTES_32) {
400+
return this.unpackBytes(buffer.readUInt32(), buffer);
347401
} else if (marker == MAP_8) {
348402
return this.unpackMap(buffer.readUInt8(), buffer);
349403
} else if (marker == MAP_16) {

test/internal/shared-neo4j.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ const password = 'password';
9595
const authToken = neo4j.auth.basic(username, password);
9696

9797
const neoCtrlVersionParam = '-e';
98-
const defaultNeo4jVersion = '3.1.3';
98+
const defaultNeo4jVersion = '3.2.0';
9999
const defaultNeoCtrlArgs = `${neoCtrlVersionParam} ${defaultNeo4jVersion}`;
100100

101101
function neo4jCertPath(dir) {

test/v1/types.test.js

Lines changed: 111 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import neo4j from '../../src/v1';
2121
import sharedNeo4j from '../internal/shared-neo4j';
22+
import _ from 'lodash';
23+
import {ServerVersion, VERSION_3_2_0} from '../../src/v1/internal/server-version';
2224

2325
describe('floating point values', () => {
2426
it('should support float 1.0 ', testValue(1));
@@ -136,18 +138,117 @@ describe('path values', () => {
136138
});
137139
});
138140

139-
function testValue(actual, expected) {
140-
return done => {
141+
describe('byte arrays', () => {
142+
143+
let originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL;
144+
let serverSupportsByteArrays = false;
145+
146+
beforeEach(done => {
147+
jasmine.DEFAULT_TIMEOUT_INTERVAL = 60000;
148+
141149
const driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken);
142150
const session = driver.session();
151+
session.run('RETURN 1').then(result => {
152+
driver.close();
153+
const serverVersion = ServerVersion.fromString(result.summary.server.version);
154+
serverSupportsByteArrays = serverVersion.compareTo(VERSION_3_2_0) >= 0;
155+
done();
156+
});
157+
});
143158

144-
session.run('RETURN {val} as v', {val: actual})
145-
.then(result => {
146-
expect(result.records[0].get('v')).toEqual(expected || actual);
147-
driver.close();
148-
done();
149-
}).catch(err => {
150-
console.log(err);
159+
afterEach(() => {
160+
jasmine.DEFAULT_TIMEOUT_INTERVAL = originalTimeout;
161+
});
162+
163+
it('should support returning empty byte array', done => {
164+
if(!serverSupportsByteArrays) {
165+
done();
166+
return;
167+
}
168+
169+
testValue(new Int8Array(0))(done);
170+
});
171+
172+
it('should support returning empty byte array', conditionalTestValues(serverSupportsByteArrays, new Int8Array(0)));
173+
174+
it('should support returning short byte arrays', conditionalTestValues(serverSupportsByteArrays, randomByteArrays(100, 1, 255)));
175+
176+
it('should support returning medium byte arrays', conditionalTestValues(serverSupportsByteArrays, randomByteArrays(50, 256, 65535)));
177+
178+
it('should support returning long byte arrays', conditionalTestValues(serverSupportsByteArrays, randomByteArrays(10, 65536, 2 * 65536)));
179+
180+
it('should fail to return byte array', done => {
181+
if (serverSupportsByteArrays) {
182+
done();
183+
return;
184+
}
185+
186+
const driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken);
187+
const session = driver.session();
188+
session.run('RETURN {array}', {array: randomByteArray(42)}).catch(error => {
189+
driver.close();
190+
expect(error.message).toEqual('Byte arrays are not supported by the database this driver is connected to');
191+
done();
192+
});
193+
});
194+
});
195+
196+
function conditionalTestValues(condition, values) {
197+
if (!condition) {
198+
return done => done();
199+
}
200+
201+
const driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken);
202+
const queriesPromise = values.reduce((acc, value) =>
203+
acc.then(() => runReturnQuery(driver, value)), Promise.resolve());
204+
return asTestFunction(queriesPromise, driver);
205+
}
206+
207+
function testValue(actual, expected) {
208+
const driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken);
209+
const queryPromise = runReturnQuery(driver, actual, expected);
210+
return asTestFunction(queryPromise, driver);
211+
}
212+
213+
function testValues(values) {
214+
const driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken);
215+
const queriesPromise = values.reduce((acc, value) =>
216+
acc.then(() => runReturnQuery(driver, value)), Promise.resolve());
217+
return asTestFunction(queriesPromise, driver);
218+
}
219+
220+
function runReturnQuery(driver, actual, expected) {
221+
const session = driver.session();
222+
return new Promise((resolve, reject) => {
223+
session.run('RETURN {val} as v', {val: actual}).then(result => {
224+
expect(result.records[0].get('v')).toEqual(expected || actual);
225+
session.close();
226+
resolve();
227+
}).catch(error => {
228+
reject(error);
151229
});
152-
};
230+
});
231+
}
232+
233+
function asTestFunction(promise, driver) {
234+
return done =>
235+
promise.then(() => {
236+
driver.close();
237+
done();
238+
}).catch(error => {
239+
driver.close();
240+
console.log(error);
241+
});
242+
}
243+
244+
function randomByteArrays(count, minLength, maxLength) {
245+
return _.range(count).map(() => {
246+
const length = _.random(minLength, maxLength);
247+
return randomByteArray(length);
248+
});
249+
}
250+
251+
function randomByteArray(length) {
252+
const array = _.range(length).map(() => _.random(-128, 127));
253+
return new Int8Array(array);
153254
}

0 commit comments

Comments
 (0)