Skip to content

Commit ad40851

Browse files
committed
Extract routing table handling logic in a class
This commit introduces a `ConnectionProvider` abstraction that is mainly responsible for acquiring connections from the connection pool. Implementation for the direct driver does only that while implementation for routing driver (`LoadBalancer`) handles routing table refreshes and rediscovery. So all this logic was moved to the `LoadBalancer` class. This refactoring allows easier testing and is the groundwork to decouple session from connection. Idea is that sessions will hold `ConnectionProvider` and use it to obtain connections when necessary.
1 parent 648d402 commit ad40851

File tree

5 files changed

+780
-116
lines changed

5 files changed

+780
-116
lines changed

src/v1/driver.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import Pool from './internal/pool';
2222
import {connect} from './internal/connector';
2323
import StreamObserver from './internal/stream-observer';
2424
import {newError, SERVICE_UNAVAILABLE} from './error';
25+
import {DirectConnectionProvider} from './internal/connection-providers';
2526

2627
const READ = 'READ', WRITE = 'WRITE';
2728
/**
@@ -57,6 +58,7 @@ class Driver {
5758
Driver._validateConnection.bind(this),
5859
config.connectionPoolSize
5960
);
61+
this._connectionProvider = this._createConnectionProvider(url, this._pool);
6062
}
6163

6264
/**
@@ -111,7 +113,7 @@ class Driver {
111113
*/
112114
session(mode) {
113115
const sessionMode = Driver._validateSessionMode(mode);
114-
const connectionPromise = this._acquireConnection(sessionMode);
116+
const connectionPromise = this._connectionProvider.acquireConnection(sessionMode);
115117
connectionPromise.catch((err) => {
116118
if (this.onError && err.code === SERVICE_UNAVAILABLE) {
117119
this.onError(err);
@@ -131,8 +133,8 @@ class Driver {
131133
}
132134

133135
//Extension point
134-
_acquireConnection(mode) {
135-
return Promise.resolve(this._pool.acquire(this._url));
136+
_createConnectionProvider(address, connectionPool) {
137+
return new DirectConnectionProvider(address, connectionPool);
136138
}
137139

138140
//Extension point
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/**
2+
* Copyright (c) 2002-2017 "Neo Technology,","
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../error';
21+
import {READ, WRITE} from '../driver';
22+
import Session from '../session';
23+
import RoundRobinArray from './round-robin-array';
24+
import RoutingTable from './routing-table';
25+
import Rediscovery from './rediscovery';
26+
27+
class ConnectionProvider {
28+
29+
acquireConnection(mode) {
30+
throw new Error('Abstract method');
31+
}
32+
}
33+
34+
export class DirectConnectionProvider extends ConnectionProvider {
35+
36+
constructor(address, connectionPool) {
37+
super();
38+
this._address = address;
39+
this._connectionPool = connectionPool;
40+
}
41+
42+
acquireConnection(mode) {
43+
return Promise.resolve(this._connectionPool.acquire(this._address));
44+
}
45+
}
46+
47+
export class LoadBalancer extends ConnectionProvider {
48+
49+
constructor(address, connectionPool) {
50+
super();
51+
this._routingTable = new RoutingTable(new RoundRobinArray([address]));
52+
this._rediscovery = new Rediscovery();
53+
this._connectionPool = connectionPool;
54+
}
55+
56+
acquireConnection(mode) {
57+
return this._freshRoutingTable().then(routingTable => {
58+
if (mode === READ) {
59+
return this._acquireConnectionToServer(routingTable.readers, 'read');
60+
} else if (mode === WRITE) {
61+
return this._acquireConnectionToServer(routingTable.writers, 'write');
62+
} else {
63+
throw newError('Illegal mode ' + mode);
64+
}
65+
});
66+
}
67+
68+
forget(address) {
69+
this._routingTable.forget(address);
70+
}
71+
72+
forgetWriter(address) {
73+
this._routingTable.forgetWriter(address);
74+
}
75+
76+
_acquireConnectionToServer(serversRoundRobinArray, serverName) {
77+
const address = serversRoundRobinArray.next();
78+
if (!address) {
79+
return Promise.reject(newError('No ' + serverName + ' servers available', SESSION_EXPIRED));
80+
}
81+
return this._connectionPool.acquire(address);
82+
}
83+
84+
_freshRoutingTable() {
85+
const currentRoutingTable = this._routingTable;
86+
87+
if (!currentRoutingTable.isStale()) {
88+
return Promise.resolve(currentRoutingTable);
89+
}
90+
return this._refreshRoutingTable(currentRoutingTable);
91+
}
92+
93+
_refreshRoutingTable(currentRoutingTable) {
94+
const knownRouters = currentRoutingTable.routers.toArray();
95+
96+
const refreshedTablePromise = knownRouters.reduce((refreshedTablePromise, currentRouter, currentIndex) => {
97+
return refreshedTablePromise.then(newRoutingTable => {
98+
if (newRoutingTable) {
99+
if (!newRoutingTable.writers.isEmpty()) {
100+
// valid routing table was fetched - just return it, try next router otherwise
101+
return newRoutingTable;
102+
}
103+
} else {
104+
// returned routing table was undefined, this means a connection error happened and we need to forget the
105+
// previous router and try the next one
106+
const previousRouter = knownRouters[currentIndex - 1];
107+
if (previousRouter) {
108+
currentRoutingTable.forgetRouter(previousRouter);
109+
}
110+
}
111+
112+
// try next router
113+
const session = this._createSessionForRediscovery(currentRouter);
114+
return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter);
115+
});
116+
}, Promise.resolve(null));
117+
118+
return refreshedTablePromise.then(newRoutingTable => {
119+
if (newRoutingTable && !newRoutingTable.writers.isEmpty()) {
120+
this._updateRoutingTable(newRoutingTable);
121+
return newRoutingTable;
122+
}
123+
throw newError('Could not perform discovery. No routing servers available.', SERVICE_UNAVAILABLE);
124+
});
125+
}
126+
127+
_createSessionForRediscovery(routerAddress) {
128+
const connection = this._connectionPool.acquire(routerAddress);
129+
const connectionPromise = Promise.resolve(connection);
130+
return new Session(connectionPromise);
131+
}
132+
133+
_updateRoutingTable(newRoutingTable) {
134+
const currentRoutingTable = this._routingTable;
135+
136+
// close old connections to servers not present in the new routing table
137+
const staleServers = currentRoutingTable.serversDiff(newRoutingTable);
138+
staleServers.forEach(server => this._connectionPool.purge(server));
139+
140+
// make this driver instance aware of the new table
141+
this._routingTable = newRoutingTable;
142+
}
143+
}

src/v1/routing-driver.js

Lines changed: 9 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,9 @@
1818
*/
1919

2020
import Session from './session';
21-
import {Driver, READ, WRITE} from './driver';
21+
import {Driver} from './driver';
2222
import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from './error';
23-
import RoundRobinArray from './internal/round-robin-array';
24-
import RoutingTable from './internal/routing-table';
25-
import Rediscovery from './internal/rediscovery';
23+
import {LoadBalancer} from './internal/connection-providers';
2624

2725
/**
2826
* A driver that supports routing in a core-edge cluster.
@@ -31,8 +29,10 @@ class RoutingDriver extends Driver {
3129

3230
constructor(url, userAgent, token = {}, config = {}) {
3331
super(url, userAgent, token, RoutingDriver._validateConfig(config));
34-
this._routingTable = new RoutingTable(new RoundRobinArray([url]));
35-
this._rediscovery = new Rediscovery();
32+
}
33+
34+
_createConnectionProvider(address, connectionPool) {
35+
return new LoadBalancer(address, connectionPool);
3636
}
3737

3838
_createSession(connectionPromise) {
@@ -50,10 +50,10 @@ class RoutingDriver extends Driver {
5050
let url = 'UNKNOWN';
5151
if (conn) {
5252
url = conn.url;
53-
this._routingTable.forgetWriter(conn.url);
53+
this._connectionProvider.forgetWriter(conn.url);
5454
} else {
5555
connectionPromise.then((conn) => {
56-
this._routingTable.forgetWriter(conn.url);
56+
this._connectionProvider.forgetWriter(conn.url);
5757
}).catch(() => {/*ignore*/});
5858
}
5959
return newError('No longer possible to write to server at ' + url, SESSION_EXPIRED);
@@ -63,94 +63,11 @@ class RoutingDriver extends Driver {
6363
});
6464
}
6565

66-
_acquireConnection(mode) {
67-
return this._freshRoutingTable().then(routingTable => {
68-
if (mode === READ) {
69-
return this._acquireConnectionToServer(routingTable.readers, "read");
70-
} else if (mode === WRITE) {
71-
return this._acquireConnectionToServer(routingTable.writers, "write");
72-
} else {
73-
throw newError('Illegal session mode ' + mode);
74-
}
75-
});
76-
}
77-
78-
_acquireConnectionToServer(serversRoundRobinArray, serverName) {
79-
const address = serversRoundRobinArray.next();
80-
if (!address) {
81-
return Promise.reject(newError('No ' + serverName + ' servers available', SESSION_EXPIRED));
82-
}
83-
return this._pool.acquire(address);
84-
}
85-
86-
_freshRoutingTable() {
87-
const currentRoutingTable = this._routingTable;
88-
89-
if (!currentRoutingTable.isStale()) {
90-
return Promise.resolve(currentRoutingTable);
91-
}
92-
return this._refreshRoutingTable(currentRoutingTable);
93-
}
94-
95-
_refreshRoutingTable(currentRoutingTable) {
96-
const knownRouters = currentRoutingTable.routers.toArray();
97-
98-
const refreshedTablePromise = knownRouters.reduce((refreshedTablePromise, currentRouter, currentIndex) => {
99-
return refreshedTablePromise.then(newRoutingTable => {
100-
if (newRoutingTable) {
101-
if (!newRoutingTable.writers.isEmpty()) {
102-
// valid routing table was fetched - just return it, try next router otherwise
103-
return newRoutingTable;
104-
}
105-
} else {
106-
// returned routing table was undefined, this means a connection error happened and we need to forget the
107-
// previous router and try the next one
108-
const previousRouter = knownRouters[currentIndex - 1];
109-
if (previousRouter) {
110-
currentRoutingTable.forgetRouter(previousRouter);
111-
}
112-
}
113-
114-
// try next router
115-
const session = this._createSessionForRediscovery(currentRouter);
116-
return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter);
117-
})
118-
}, Promise.resolve(null));
119-
120-
return refreshedTablePromise.then(newRoutingTable => {
121-
if (newRoutingTable && !newRoutingTable.writers.isEmpty()) {
122-
this._updateRoutingTable(newRoutingTable);
123-
return newRoutingTable
124-
}
125-
throw newError('Could not perform discovery. No routing servers available.', SERVICE_UNAVAILABLE);
126-
});
127-
}
128-
129-
_createSessionForRediscovery(routerAddress) {
130-
const connection = this._pool.acquire(routerAddress);
131-
const connectionPromise = Promise.resolve(connection);
132-
// error transformer here is a no-op unlike the one in a regular session, this is so because errors are
133-
// handled in the rediscovery promise chain and we do not need to do this in the error transformer
134-
const errorTransformer = error => error;
135-
return new RoutingSession(connectionPromise, errorTransformer);
136-
}
137-
13866
_forget(url) {
139-
this._routingTable.forget(url);
67+
this._connectionProvider.forget(url);
14068
this._pool.purge(url);
14169
}
14270

143-
_updateRoutingTable(newRoutingTable) {
144-
const currentRoutingTable = this._routingTable;
145-
146-
// close old connections to servers not present in the new routing table
147-
const staleServers = currentRoutingTable.serversDiff(newRoutingTable);
148-
staleServers.forEach(server => this._pool.purge(server));
149-
150-
// make this driver instance aware of the new table
151-
this._routingTable = newRoutingTable;
152-
}
153-
15471
static _validateConfig(config) {
15572
if(config.trust === 'TRUST_ON_FIRST_USE') {
15673
throw newError('The chosen trust mode is not compatible with a routing driver');

0 commit comments

Comments
 (0)