Skip to content

Commit b66f7e2

Browse files
committed
Merge with 1.0
1 parent babf275 commit b66f7e2

File tree

14 files changed

+338
-51
lines changed

14 files changed

+338
-51
lines changed

src/v1/driver.js

Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
*/
1919

2020
import Session from './session';
21+
import {Pool} from './internal/pool';
2122
import {connect} from "./internal/connector";
23+
import StreamObserver from './internal/stream-observer';
2224

2325
/**
2426
* A Driver instance is used for mananging {@link Session}s.
@@ -37,24 +39,75 @@ class Driver {
3739
this._openSessions = {};
3840
this._sessionIdGenerator = 0;
3941
this._token = token || {};
42+
this._pool = new Pool(
43+
this._createConnection.bind(this),
44+
this._destroyConnection.bind(this),
45+
this._validateConnection.bind(this)
46+
);
47+
}
48+
49+
/**
50+
* Create a new connection instance.
51+
* @return {Connection} new connector-api session instance, a low level session API.
52+
* @access private
53+
*/
54+
_createConnection( release ) {
55+
let sessionId = this._sessionIdGenerator++;
56+
let streamObserver = new _ConnectionStreamObserver(this);
57+
let conn = connect(this._url);
58+
conn.initialize(this._userAgent, this._token, streamObserver);
59+
conn._id = sessionId;
60+
conn._release = () => release(conn);
61+
62+
this._openSessions[sessionId] = conn;
63+
return conn;
64+
}
65+
66+
/**
67+
* Check that a connection is usable
68+
* @return {boolean} true if the connection is open
69+
* @access private
70+
**/
71+
_validateConnection( conn ) {
72+
return conn.isOpen();
73+
}
74+
75+
/**
76+
* Dispose of a live session, closing any associated resources.
77+
* @return {Session} new session.
78+
* @access private
79+
*/
80+
_destroyConnection( conn ) {
81+
delete this._openSessions[conn._id];
82+
conn.close();
4083
}
4184

4285
/**
4386
* Create and return new session
4487
* @return {Session} new session.
4588
*/
4689
session() {
47-
let sessionId = this._sessionIdGenerator++;
48-
let conn = connect(this._url);
49-
conn.initialize(this._userAgent, this._token);
50-
let _driver = this;
51-
let _session = new Session( conn, () => {
52-
// On close of session, remove it from the list of open sessions
53-
delete _driver._openSessions[sessionId];
54-
});
90+
let conn = this._pool.acquire();
91+
return new Session( conn, (cb) => {
92+
// This gets called on Session#close(), and is where we return
93+
// the pooled 'connection' instance.
94+
95+
// We don't pool Session instances, to avoid users using the Session
96+
// after they've called close. The `Session` object is just a thin
97+
// wrapper around Connection anyway, so it makes little difference.
98+
99+
// Queue up a 'reset', to ensure the next user gets a clean
100+
// session to work with. No need to flush, this will get sent
101+
// along with whatever the next thing the user wants to do with
102+
// this session ends up being, so we save the network round trip.
103+
conn.reset();
104+
105+
// Return connection to the pool
106+
conn._release();
55107

56-
this._openSessions[sessionId] = _session;
57-
return _session;
108+
// Call user callback
109+
if(cb) { cb(); }
110+
});
58111
}
59112

60113
/**
@@ -70,4 +123,22 @@ class Driver {
70123
}
71124
}
72125

126+
/** Internal stream observer used for connection state */
127+
class _ConnectionStreamObserver extends StreamObserver {
128+
constructor(driver) {
129+
super();
130+
this._driver = driver;
131+
this._hasFailed = false;
132+
}
133+
onError(error) {
134+
if (!this._hasFailed) {
135+
super.onError(error);
136+
if(this._driver.onError) {
137+
this._driver.onError(error);
138+
}
139+
this._hasFailed = true;
140+
}
141+
}
142+
}
143+
73144
export default Driver

src/v1/graph-types.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ class PathSegment {
146146
* @param {Relationship} rel - Relationship segment
147147
* @param {string} end - Identity of end Node
148148
*/
149-
constructor( start, rel, end ) {
149+
constructor(start, rel, end) {
150150
this.start = start;
151151
this.relationship = rel;
152152
this.end = end;
@@ -159,12 +159,14 @@ class PathSegment {
159159
class Path {
160160
/**
161161
* @constructor
162+
* @param {Node} start - start node
163+
* @param {Node} end - end node
162164
* @param {Array} segments - Array of Segments
163165
*/
164-
constructor(segments) {
166+
constructor(start, end, segments) {
167+
this.start = start;
168+
this.end = end;
165169
this.segments = segments;
166-
this.start = segments[0].start;
167-
this.end = segments[segments.length - 1].end;
168170
this.length = segments.length;
169171
}
170172
}

src/v1/internal/ch-node.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ class NodeChannel {
6060
_self.onmessage( new NodeBuffer( buffer ) );
6161
}
6262
});
63+
64+
this._conn.on('error', function(err){
65+
if( _self.onerror ) {
66+
_self.onerror(err);
67+
}
68+
});
6369
}
6470

6571
/**

src/v1/internal/ch-websocket.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ class WebSocketChannel {
5454
self.onmessage( b );
5555
}
5656
};
57+
58+
this._ws.onerror = (err) => {
59+
if( self.onerror ) {
60+
self.onerror(err);
61+
}
62+
}
5763
}
5864

5965
/**

src/v1/internal/connector.js

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ else {
4040
let
4141
// Signature bytes for each message type
4242
INIT = 0x01, // 0000 0001 // INIT <user_agent>
43-
ACK_FAILURE = 0x0F, // 0000 1111 // ACK_FAILURE
43+
ACK_FAILURE = 0x0D, // 0000 1101 // ACK_FAILURE
44+
RESET = 0x0F, // 0000 1111 // RESET
4445
RUN = 0x10, // 0001 0000 // RUN <statement> <parameters>
4546
DISCARD_ALL = 0x2F, // 0010 1111 // DISCARD *
4647
PULL_ALL = 0x3F, // 0011 1111 // PULL *
@@ -80,7 +81,7 @@ function LOG(err) {
8081
let NO_OP_OBSERVER = {
8182
onNext : NO_OP,
8283
onCompleted : NO_OP,
83-
onError : LOG
84+
onError : NO_OP
8485
}
8586

8687
/** Maps from packstream structures to Neo4j domain objects */
@@ -114,6 +115,7 @@ let _mappers = {
114115
sequence = unpacker.unpack(buf);
115116
let prevNode = nodes[0],
116117
segments = [];
118+
117119
for (let i = 0; i < sequence.length; i += 2) {
118120
let relIndex = sequence[i],
119121
nextNode = nodes[sequence[i + 1]],
@@ -138,9 +140,9 @@ let _mappers = {
138140
segments.push( new GraphType.PathSegment( prevNode, rel, nextNode ) );
139141
prevNode = nextNode;
140142
}
141-
return new GraphType.Path( segments );
143+
return new GraphType.Path(nodes[0], nodes[nodes.length - 1], segments );
142144
}
143-
}
145+
};
144146

145147
/**
146148
* A connection manages sending and recieving messages over a channel. A
@@ -176,6 +178,9 @@ class Connection {
176178
this._unpacker = new packstream.Unpacker();
177179
this._isHandlingFailure = false;
178180

181+
// Set to true on fatal errors, to get this out of session pool.
182+
this._isBroken = false;
183+
179184
// For deserialization, explain to the unpacker how to unpack nodes, rels, paths;
180185
this._unpacker.structMappers[NODE] = _mappers.node;
181186
this._unpacker.structMappers[RELATIONSHIP] = _mappers.rel;
@@ -197,14 +202,21 @@ class Connection {
197202
}
198203

199204
} else {
200-
// TODO: Report error
205+
this._isBroken = true;
201206
console.log("FATAL, unknown protocol version:", proposed)
202207
}
203208
};
204209

210+
this._ch.onerror = (error) => {
211+
self._isBroken = true;
212+
if(this._currentObserver.onError) {
213+
this._currentObserver.onError(error);
214+
}
215+
}
216+
205217
this._dechunker.onmessage = (buf) => {
206218
self._handleMessage( self._unpacker.unpack( buf ) );
207-
}
219+
};
208220

209221
let handshake = alloc( 5 * 4 );
210222
//magic preamble
@@ -259,15 +271,16 @@ class Connection {
259271
break;
260272
case IGNORED:
261273
try {
262-
if (this._errorMsg)
274+
if (this._errorMsg && this._currentObserver.onError)
263275
this._currentObserver.onError(this._errorMsg);
264-
else
276+
else if(this._currentObserver.onError)
265277
this._currentObserver.onError(msg);
266278
} finally {
267279
this._currentObserver = this._pendingObservers.shift();
268280
}
269281
break;
270282
default:
283+
this._isBroken = true;
271284
console.log("UNKNOWN MESSAGE: ", msg);
272285
}
273286
}
@@ -300,6 +313,13 @@ class Connection {
300313
this._chunker.messageBoundary();
301314
}
302315

316+
/** Queue a RESET-message to be sent to the database */
317+
reset( observer ) {
318+
this._queueObserver(observer);
319+
this._packer.packStruct( RESET );
320+
this._chunker.messageBoundary();
321+
}
322+
303323
/** Queue a ACK_FAILURE-message to be sent to the database */
304324
_ackFailure( observer ) {
305325
this._queueObserver(observer);
@@ -324,6 +344,11 @@ class Connection {
324344
this._chunker.flush();
325345
}
326346

347+
/** Check if this connection is in working condition */
348+
isOpen() {
349+
return !this._isBroken && this._ch._open;
350+
}
351+
327352
/**
328353
* Call close on the channel.
329354
* @param {function} cb - Function to call on close.

src/v1/internal/pool.js

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
class Pool {
21+
/**
22+
* @param create an allocation function that creates a new resource. It's given
23+
* a single argument, a function that will return the resource to
24+
* the pool if invoked, which is meant to be called on .dispose
25+
* or .close or whatever mechanism the resource uses to finalize.
26+
* @param destroy called with the resource when it is evicted from this pool
27+
* @param validate called at various times (like when an instance is acquired and
28+
* when it is returned). If this returns false, the resource will
29+
* be evicted
30+
* @param maxIdle the max number of resources that are allowed idle in the pool at
31+
* any time. If this threshold is exceeded, resources will be evicted.
32+
*/
33+
constructor(create, destroy=(()=>true), validate=(()=>true), maxIdle=50) {
34+
this._create = create;
35+
this._destroy = destroy;
36+
this._validate = validate;
37+
this._maxIdle = maxIdle;
38+
this._pool = [];
39+
this._release = this._release.bind(this);
40+
}
41+
42+
acquire() {
43+
if( this._pool.length > 0 ) {
44+
return this._pool.pop();
45+
} else {
46+
return this._create( this._release );
47+
}
48+
}
49+
50+
_release(resource) {
51+
if( this._pool.length >= this._maxIdle || !this._validate(resource) ) {
52+
this._destroy(resource);
53+
} else {
54+
this._pool.push(resource);
55+
}
56+
}
57+
}
58+
59+
export default {
60+
Pool
61+
}

src/v1/result-summary.js

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class ProfiledPlan {
8989
/**
9090
* Create a ProfiledPlan instance
9191
* @constructor
92-
* @param {Object} plan - Object with plan data
92+
* @param {Object} profile - Object with profile data
9393
*/
9494
constructor(profile) {
9595
this.operatorType = profile.operatorType;
@@ -234,10 +234,11 @@ class Notification {
234234
this.code = notification.code;
235235
this.title = notification.title;
236236
this.description = notification.description;
237-
this.position = this._constructPosition(notification.position);
237+
this.severity = notification.severity;
238+
this.position = Notification._constructPosition(notification.position);
238239
}
239240

240-
_constructPosition(pos) {
241+
static _constructPosition(pos) {
241242
if(!pos) {
242243
return {};
243244
}
@@ -254,7 +255,7 @@ const statementType = {
254255
READ_WRITE: 'rw',
255256
WRITE_ONLY: 'w',
256257
SCHEMA_WRITE: 's'
257-
}
258+
};
258259

259260
export default {
260261
ResultSummary,

0 commit comments

Comments
 (0)