Skip to content

Commit 0321198

Browse files
authored
Merge pull request #215 from lutovich/1.2-decouple-conn-from-session
Decouple session from connection
2 parents e38e827 + 48e0649 commit 0321198

18 files changed

+825
-178
lines changed

neokit

Submodule neokit updated 1 file

src/v1/driver.js

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class Driver {
5858
Driver._validateConnection.bind(this),
5959
config.connectionPoolSize
6060
);
61-
this._connectionProvider = this._createConnectionProvider(url, this._pool);
61+
this._connectionProvider = this._createConnectionProvider(url, this._pool, this._driverOnErrorCallback.bind(this));
6262
}
6363

6464
/**
@@ -113,15 +113,7 @@ class Driver {
113113
*/
114114
session(mode) {
115115
const sessionMode = Driver._validateSessionMode(mode);
116-
const connectionPromise = this._connectionProvider.acquireConnection(sessionMode);
117-
connectionPromise.catch((err) => {
118-
if (this.onError && err.code === SERVICE_UNAVAILABLE) {
119-
this.onError(err);
120-
} else {
121-
//we don't need to tell the driver about this error
122-
}
123-
});
124-
return this._createSession(connectionPromise);
116+
return this._createSession(sessionMode, this._connectionProvider);
125117
}
126118

127119
static _validateSessionMode(rawMode) {
@@ -133,13 +125,22 @@ class Driver {
133125
}
134126

135127
//Extension point
136-
_createConnectionProvider(address, connectionPool) {
137-
return new DirectConnectionProvider(address, connectionPool);
128+
_createConnectionProvider(address, connectionPool, driverOnErrorCallback) {
129+
return new DirectConnectionProvider(address, connectionPool, driverOnErrorCallback);
138130
}
139131

140132
//Extension point
141-
_createSession(connectionPromise) {
142-
return new Session(connectionPromise);
133+
_createSession(mode, connectionProvider) {
134+
return new Session(mode, connectionProvider);
135+
}
136+
137+
_driverOnErrorCallback(error) {
138+
const userDefinedOnErrorCallback = this.onError;
139+
if (userDefinedOnErrorCallback && error.code === SERVICE_UNAVAILABLE) {
140+
userDefinedOnErrorCallback(error);
141+
} else {
142+
// we don't need to tell the driver about this error
143+
}
143144
}
144145

145146
/**

src/v1/internal/connection-holder.js

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/**
2+
* Copyright (c) 2002-2017 "Neo Technology,","
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import {newError} from '../error';
21+
22+
/**
23+
* Utility to lazily initialize connections and return them back to the pool when unused.
24+
*/
25+
export default class ConnectionHolder {
26+
27+
/**
28+
* @constructor
29+
* @param {string} mode - the access mode for new connection holder.
30+
* @param {ConnectionProvider} connectionProvider - the connection provider to acquire connections from.
31+
*/
32+
constructor(mode, connectionProvider) {
33+
this._mode = mode;
34+
this._connectionProvider = connectionProvider;
35+
this._referenceCount = 0;
36+
this._connectionPromise = Promise.resolve(null);
37+
}
38+
39+
/**
40+
* Make this holder initialize new connection if none exists already.
41+
* @return {undefined}
42+
*/
43+
initializeConnection() {
44+
if (this._referenceCount === 0) {
45+
this._connectionPromise = this._connectionProvider.acquireConnection(this._mode);
46+
}
47+
this._referenceCount++;
48+
}
49+
50+
/**
51+
* Get the current connection promise.
52+
* @return {Promise<Connection>} promise resolved with the current connection.
53+
*/
54+
getConnection() {
55+
return this._connectionPromise;
56+
}
57+
58+
/**
59+
* Notify this holder that single party does not require current connection any more.
60+
* @return {Promise<Connection>} promise resolved with the current connection.
61+
*/
62+
releaseConnection() {
63+
if (this._referenceCount === 0) {
64+
return this._connectionPromise;
65+
}
66+
67+
this._referenceCount--;
68+
if (this._referenceCount === 0) {
69+
// release a connection without muting ACK_FAILURE, this is the last action on this connection
70+
return this._releaseConnection(true);
71+
}
72+
return this._connectionPromise;
73+
}
74+
75+
/**
76+
* Closes this holder and releases current connection (if any) despite any existing users.
77+
* @return {Promise<Connection>} promise resolved when current connection is released to the pool.
78+
*/
79+
close() {
80+
if (this._referenceCount === 0) {
81+
return this._connectionPromise;
82+
}
83+
this._referenceCount = 0;
84+
// release a connection and mute ACK_FAILURE, this might be called concurrently with other
85+
// operations and thus should ignore failure handling
86+
return this._releaseConnection(false);
87+
}
88+
89+
/**
90+
* Return the current pooled connection instance to the connection pool.
91+
* We don't pool Session instances, to avoid users using the Session after they've called close.
92+
* The `Session` object is just a thin wrapper around Connection anyway, so it makes little difference.
93+
* @return {Promise} - promise resolved then connection is returned to the pool.
94+
* @private
95+
*/
96+
_releaseConnection(sync) {
97+
this._connectionPromise = this._connectionPromise.then(connection => {
98+
if (connection) {
99+
if(sync) {
100+
connection.reset();
101+
} else {
102+
connection.resetAsync();
103+
}
104+
connection.sync();
105+
connection._release();
106+
}
107+
}).catch(ignoredError => {
108+
});
109+
110+
return this._connectionPromise;
111+
}
112+
}
113+
114+
class EmptyConnectionHolder extends ConnectionHolder {
115+
116+
initializeConnection() {
117+
// nothing to initialize
118+
}
119+
120+
getConnection() {
121+
return Promise.reject(newError('This connection holder does not serve connections'));
122+
}
123+
124+
releaseConnection() {
125+
return Promise.resolve();
126+
}
127+
128+
close() {
129+
return Promise.resolve();
130+
}
131+
}
132+
133+
/**
134+
* Connection holder that does not manage any connections.
135+
* @type {ConnectionHolder}
136+
*/
137+
export const EMPTY_CONNECTION_HOLDER = new EmptyConnectionHolder();

src/v1/internal/connection-providers.js

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,32 +29,46 @@ class ConnectionProvider {
2929
acquireConnection(mode) {
3030
throw new Error('Abstract method');
3131
}
32+
33+
_withAdditionalOnErrorCallback(connectionPromise, driverOnErrorCallback) {
34+
// install error handler from the driver on the connection promise; this callback is installed separately
35+
// so that it does not handle errors, instead it is just an additional error reporting facility.
36+
connectionPromise.catch(error => {
37+
driverOnErrorCallback(error)
38+
});
39+
// return the original connection promise
40+
return connectionPromise;
41+
}
3242
}
3343

3444
export class DirectConnectionProvider extends ConnectionProvider {
3545

36-
constructor(address, connectionPool) {
46+
constructor(address, connectionPool, driverOnErrorCallback) {
3747
super();
3848
this._address = address;
3949
this._connectionPool = connectionPool;
50+
this._driverOnErrorCallback = driverOnErrorCallback;
4051
}
4152

4253
acquireConnection(mode) {
43-
return Promise.resolve(this._connectionPool.acquire(this._address));
54+
const connection = this._connectionPool.acquire(this._address);
55+
const connectionPromise = Promise.resolve(connection);
56+
return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback);
4457
}
4558
}
4659

4760
export class LoadBalancer extends ConnectionProvider {
4861

49-
constructor(address, connectionPool) {
62+
constructor(address, connectionPool, driverOnErrorCallback) {
5063
super();
5164
this._routingTable = new RoutingTable(new RoundRobinArray([address]));
5265
this._rediscovery = new Rediscovery();
5366
this._connectionPool = connectionPool;
67+
this._driverOnErrorCallback = driverOnErrorCallback;
5468
}
5569

5670
acquireConnection(mode) {
57-
return this._freshRoutingTable().then(routingTable => {
71+
const connectionPromise = this._freshRoutingTable().then(routingTable => {
5872
if (mode === READ) {
5973
return this._acquireConnectionToServer(routingTable.readers, 'read');
6074
} else if (mode === WRITE) {
@@ -63,6 +77,7 @@ export class LoadBalancer extends ConnectionProvider {
6377
throw newError('Illegal mode ' + mode);
6478
}
6579
});
80+
return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback);
6681
}
6782

6883
forget(address) {
@@ -132,7 +147,8 @@ export class LoadBalancer extends ConnectionProvider {
132147
_createSessionForRediscovery(routerAddress) {
133148
const connection = this._connectionPool.acquire(routerAddress);
134149
const connectionPromise = Promise.resolve(connection);
135-
return new Session(connectionPromise);
150+
const connectionProvider = new SingleConnectionProvider(connectionPromise);
151+
return new Session(READ, connectionProvider);
136152
}
137153

138154
_updateRoutingTable(newRoutingTable) {
@@ -153,3 +169,17 @@ export class LoadBalancer extends ConnectionProvider {
153169
}
154170
}
155171
}
172+
173+
export class SingleConnectionProvider extends ConnectionProvider {
174+
175+
constructor(connectionPromise) {
176+
super();
177+
this._connectionPromise = connectionPromise;
178+
}
179+
180+
acquireConnection(mode) {
181+
const connectionPromise = this._connectionPromise;
182+
this._connectionPromise = null;
183+
return connectionPromise;
184+
}
185+
}

src/v1/internal/connector.js

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ function log(actor, msg) {
9696
}
9797
}
9898

99-
10099
function NO_OP(){}
101100

102101
let NO_OP_OBSERVER = {
@@ -384,9 +383,9 @@ class Connection {
384383
this._chunker.messageBoundary();
385384
}
386385

387-
/** Queue a RESET-message to be sent to the database */
388-
reset( observer ) {
389-
log("C", "RESET");
386+
/** Queue a RESET-message to be sent to the database. Mutes failure handling. */
387+
resetAsync( observer ) {
388+
log("C", "RESET_ASYNC");
390389
this._isHandlingFailure = true;
391390
let self = this;
392391
let wrappedObs = {
@@ -404,6 +403,14 @@ class Connection {
404403
this._chunker.messageBoundary();
405404
}
406405

406+
/** Queue a RESET-message to be sent to the database */
407+
reset(observer) {
408+
log('C', 'RESET');
409+
this._queueObserver(observer);
410+
this._packer.packStruct(RESET, [], (err) => this._handleFatalError(err));
411+
this._chunker.messageBoundary();
412+
}
413+
407414
/** Queue a ACK_FAILURE-message to be sent to the database */
408415
_ackFailure( observer ) {
409416
log("C", "ACK_FAILURE");

src/v1/internal/get-servers-util.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
* limitations under the License.
1818
*/
1919

20-
import RoundRobinArray from "./round-robin-array";
21-
import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from "../error";
22-
import Integer, {int} from "../integer";
20+
import RoundRobinArray from './round-robin-array';
21+
import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from '../error';
22+
import Integer, {int} from '../integer';
2323

2424
const PROCEDURE_CALL = 'CALL dbms.cluster.routing.getServers';
2525
const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound';

0 commit comments

Comments
 (0)