Skip to content

Commit e38e827

Browse files
author
Zhen Li
authored
Merge pull request #213 from lutovich/1.2-cleanup-around-session
Cleanup around connections and sessions
2 parents dcb1827 + 8bf582c commit e38e827

File tree

7 files changed

+1068
-270
lines changed

7 files changed

+1068
-270
lines changed

src/v1/driver.js

Lines changed: 13 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717
* limitations under the License.
1818
*/
1919

20-
import Session from "./session";
21-
import Pool from "./internal/pool";
22-
import {connect} from "./internal/connector";
23-
import StreamObserver from "./internal/stream-observer";
24-
import {newError, SERVICE_UNAVAILABLE} from "./error";
20+
import Session from './session';
21+
import Pool from './internal/pool';
22+
import {connect} from './internal/connector';
23+
import StreamObserver from './internal/stream-observer';
24+
import {newError, SERVICE_UNAVAILABLE} from './error';
25+
import {DirectConnectionProvider} from './internal/connection-providers';
2526

2627
const READ = 'READ', WRITE = 'WRITE';
2728
/**
@@ -57,6 +58,7 @@ class Driver {
5758
Driver._validateConnection.bind(this),
5859
config.connectionPoolSize
5960
);
61+
this._connectionProvider = this._createConnectionProvider(url, this._pool);
6062
}
6163

6264
/**
@@ -111,41 +113,15 @@ class Driver {
111113
*/
112114
session(mode) {
113115
const sessionMode = Driver._validateSessionMode(mode);
114-
const connectionPromise = this._acquireConnection(sessionMode);
116+
const connectionPromise = this._connectionProvider.acquireConnection(sessionMode);
115117
connectionPromise.catch((err) => {
116118
if (this.onError && err.code === SERVICE_UNAVAILABLE) {
117119
this.onError(err);
118120
} else {
119121
//we don't need to tell the driver about this error
120122
}
121123
});
122-
return this._createSession(connectionPromise, this._releaseConnection(connectionPromise));
123-
}
124-
125-
/**
126-
* The returned function gets called on Session#close(), and is where we return the pooled 'connection' instance.
127-
* We don't pool Session instances, to avoid users using the Session after they've called close.
128-
* The `Session` object is just a thin wrapper around Connection anyway, so it makes little difference.
129-
* @param {Promise} connectionPromise - promise resolved with the connection.
130-
* @return {function(callback: function)} - function that releases the connection and then executes an optional callback.
131-
* @protected
132-
*/
133-
_releaseConnection(connectionPromise) {
134-
return userDefinedCallback => {
135-
connectionPromise.then(conn => {
136-
// Queue up a 'reset', to ensure the next user gets a clean session to work with.
137-
conn.reset();
138-
conn.sync();
139-
140-
// Return connection to the pool
141-
conn._release();
142-
}).catch(ignoredError => {
143-
});
144-
145-
if (userDefinedCallback) {
146-
userDefinedCallback();
147-
}
148-
};
124+
return this._createSession(connectionPromise);
149125
}
150126

151127
static _validateSessionMode(rawMode) {
@@ -157,13 +133,13 @@ class Driver {
157133
}
158134

159135
//Extension point
160-
_acquireConnection(mode) {
161-
return Promise.resolve(this._pool.acquire(this._url));
136+
_createConnectionProvider(address, connectionPool) {
137+
return new DirectConnectionProvider(address, connectionPool);
162138
}
163139

164140
//Extension point
165-
_createSession(connectionPromise, cb) {
166-
return new Session(connectionPromise, cb);
141+
_createSession(connectionPromise) {
142+
return new Session(connectionPromise);
167143
}
168144

169145
/**
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/**
2+
* Copyright (c) 2002-2017 "Neo Technology,","
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../error';
21+
import {READ, WRITE} from '../driver';
22+
import Session from '../session';
23+
import RoundRobinArray from './round-robin-array';
24+
import RoutingTable from './routing-table';
25+
import Rediscovery from './rediscovery';
26+
27+
class ConnectionProvider {
28+
29+
acquireConnection(mode) {
30+
throw new Error('Abstract method');
31+
}
32+
}
33+
34+
export class DirectConnectionProvider extends ConnectionProvider {
35+
36+
constructor(address, connectionPool) {
37+
super();
38+
this._address = address;
39+
this._connectionPool = connectionPool;
40+
}
41+
42+
acquireConnection(mode) {
43+
return Promise.resolve(this._connectionPool.acquire(this._address));
44+
}
45+
}
46+
47+
export class LoadBalancer extends ConnectionProvider {
48+
49+
constructor(address, connectionPool) {
50+
super();
51+
this._routingTable = new RoutingTable(new RoundRobinArray([address]));
52+
this._rediscovery = new Rediscovery();
53+
this._connectionPool = connectionPool;
54+
}
55+
56+
acquireConnection(mode) {
57+
return this._freshRoutingTable().then(routingTable => {
58+
if (mode === READ) {
59+
return this._acquireConnectionToServer(routingTable.readers, 'read');
60+
} else if (mode === WRITE) {
61+
return this._acquireConnectionToServer(routingTable.writers, 'write');
62+
} else {
63+
throw newError('Illegal mode ' + mode);
64+
}
65+
});
66+
}
67+
68+
forget(address) {
69+
this._routingTable.forget(address);
70+
this._connectionPool.purge(address);
71+
}
72+
73+
forgetWriter(address) {
74+
this._routingTable.forgetWriter(address);
75+
}
76+
77+
_acquireConnectionToServer(serversRoundRobinArray, serverName) {
78+
const address = serversRoundRobinArray.next();
79+
if (!address) {
80+
return Promise.reject(newError('No ' + serverName + ' servers available', SESSION_EXPIRED));
81+
}
82+
return this._connectionPool.acquire(address);
83+
}
84+
85+
_freshRoutingTable() {
86+
const currentRoutingTable = this._routingTable;
87+
88+
if (!currentRoutingTable.isStale()) {
89+
return Promise.resolve(currentRoutingTable);
90+
}
91+
return this._refreshRoutingTable(currentRoutingTable);
92+
}
93+
94+
_refreshRoutingTable(currentRoutingTable) {
95+
const knownRouters = currentRoutingTable.routers.toArray();
96+
97+
const refreshedTablePromise = knownRouters.reduce((refreshedTablePromise, currentRouter, currentIndex) => {
98+
return refreshedTablePromise.then(newRoutingTable => {
99+
if (newRoutingTable) {
100+
if (!newRoutingTable.writers.isEmpty()) {
101+
// valid routing table was fetched - just return it, try next router otherwise
102+
return newRoutingTable;
103+
}
104+
} else {
105+
// returned routing table was undefined, this means a connection error happened and we need to forget the
106+
// previous router and try the next one
107+
const previousRouterIndex = currentIndex - 1;
108+
this._forgetRouter(currentRoutingTable, knownRouters, previousRouterIndex);
109+
}
110+
111+
// try next router
112+
const session = this._createSessionForRediscovery(currentRouter);
113+
return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter);
114+
});
115+
}, Promise.resolve(null));
116+
117+
return refreshedTablePromise.then(newRoutingTable => {
118+
if (newRoutingTable && !newRoutingTable.writers.isEmpty()) {
119+
this._updateRoutingTable(newRoutingTable);
120+
return newRoutingTable;
121+
}
122+
123+
// forget the last known router because it did not return a valid routing table
124+
const lastRouterIndex = knownRouters.length - 1;
125+
this._forgetRouter(currentRoutingTable, knownRouters, lastRouterIndex);
126+
127+
// none of the existing routers returned valid routing table, throw exception
128+
throw newError('Could not perform discovery. No routing servers available.', SERVICE_UNAVAILABLE);
129+
});
130+
}
131+
132+
_createSessionForRediscovery(routerAddress) {
133+
const connection = this._connectionPool.acquire(routerAddress);
134+
const connectionPromise = Promise.resolve(connection);
135+
return new Session(connectionPromise);
136+
}
137+
138+
_updateRoutingTable(newRoutingTable) {
139+
const currentRoutingTable = this._routingTable;
140+
141+
// close old connections to servers not present in the new routing table
142+
const staleServers = currentRoutingTable.serversDiff(newRoutingTable);
143+
staleServers.forEach(server => this._connectionPool.purge(server));
144+
145+
// make this driver instance aware of the new table
146+
this._routingTable = newRoutingTable;
147+
}
148+
149+
_forgetRouter(routingTable, routersArray, routerIndex) {
150+
const address = routersArray[routerIndex];
151+
if (address) {
152+
routingTable.forgetRouter(address);
153+
}
154+
}
155+
}

0 commit comments

Comments
 (0)