Skip to content

Commit b01d369

Browse files
committed
Make session aware of connection provider
So it could later use it to obtain connections on demand rather than hold a single connection permanently.
1 parent f0605b9 commit b01d369

File tree

6 files changed

+86
-42
lines changed

6 files changed

+86
-42
lines changed

src/v1/driver.js

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class Driver {
5858
Driver._validateConnection.bind(this),
5959
config.connectionPoolSize
6060
);
61-
this._connectionProvider = this._createConnectionProvider(url, this._pool);
61+
this._connectionProvider = this._createConnectionProvider(url, this._pool, this._driverOnErrorCallback.bind(this));
6262
}
6363

6464
/**
@@ -113,15 +113,7 @@ class Driver {
113113
*/
114114
session(mode) {
115115
const sessionMode = Driver._validateSessionMode(mode);
116-
const connectionPromise = this._connectionProvider.acquireConnection(sessionMode);
117-
connectionPromise.catch((err) => {
118-
if (this.onError && err.code === SERVICE_UNAVAILABLE) {
119-
this.onError(err);
120-
} else {
121-
//we don't need to tell the driver about this error
122-
}
123-
});
124-
return this._createSession(connectionPromise);
116+
return this._createSession(sessionMode, this._connectionProvider);
125117
}
126118

127119
static _validateSessionMode(rawMode) {
@@ -133,13 +125,22 @@ class Driver {
133125
}
134126

135127
//Extension point
136-
_createConnectionProvider(address, connectionPool) {
137-
return new DirectConnectionProvider(address, connectionPool);
128+
_createConnectionProvider(address, connectionPool, driverOnErrorCallback) {
129+
return new DirectConnectionProvider(address, connectionPool, driverOnErrorCallback);
138130
}
139131

140132
//Extension point
141-
_createSession(connectionPromise) {
142-
return new Session(connectionPromise);
133+
_createSession(mode, connectionProvider) {
134+
return new Session(mode, connectionProvider);
135+
}
136+
137+
_driverOnErrorCallback(error) {
138+
const userDefinedOnErrorCallback = this.onError;
139+
if (userDefinedOnErrorCallback && error.code === SERVICE_UNAVAILABLE) {
140+
userDefinedOnErrorCallback(error);
141+
} else {
142+
// we don't need to tell the driver about this error
143+
}
143144
}
144145

145146
/**

src/v1/internal/connection-providers.js

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,32 +29,43 @@ class ConnectionProvider {
2929
acquireConnection(mode) {
3030
throw new Error('Abstract method');
3131
}
32+
33+
_withAdditionalOnErrorCallback(connectionPromise, driverOnErrorCallback) {
34+
// install error handler from the driver on the connection promise; this callback is installed separately
35+
// so that it does not handle errors, instead it is just an additional error reporting facility.
36+
connectionPromise.catch(error => driverOnErrorCallback(error));
37+
// return the original connection promise
38+
return connectionPromise;
39+
}
3240
}
3341

3442
export class DirectConnectionProvider extends ConnectionProvider {
3543

36-
constructor(address, connectionPool) {
44+
constructor(address, connectionPool, driverOnErrorCallback) {
3745
super();
3846
this._address = address;
3947
this._connectionPool = connectionPool;
48+
this._driverOnErrorCallback = driverOnErrorCallback;
4049
}
4150

4251
acquireConnection(mode) {
43-
return Promise.resolve(this._connectionPool.acquire(this._address));
52+
const connectionPromise = Promise.resolve(this._connectionPool.acquire(this._address));
53+
return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback);
4454
}
4555
}
4656

4757
export class LoadBalancer extends ConnectionProvider {
4858

49-
constructor(address, connectionPool) {
59+
constructor(address, connectionPool, driverOnErrorCallback) {
5060
super();
5161
this._routingTable = new RoutingTable(new RoundRobinArray([address]));
5262
this._rediscovery = new Rediscovery();
5363
this._connectionPool = connectionPool;
64+
this._driverOnErrorCallback = driverOnErrorCallback;
5465
}
5566

5667
acquireConnection(mode) {
57-
return this._freshRoutingTable().then(routingTable => {
68+
const connectionPromise = this._freshRoutingTable().then(routingTable => {
5869
if (mode === READ) {
5970
return this._acquireConnectionToServer(routingTable.readers, 'read');
6071
} else if (mode === WRITE) {
@@ -63,6 +74,7 @@ export class LoadBalancer extends ConnectionProvider {
6374
throw newError('Illegal mode ' + mode);
6475
}
6576
});
77+
return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback);
6678
}
6779

6880
forget(address) {
@@ -132,7 +144,8 @@ export class LoadBalancer extends ConnectionProvider {
132144
_createSessionForRediscovery(routerAddress) {
133145
const connection = this._connectionPool.acquire(routerAddress);
134146
const connectionPromise = Promise.resolve(connection);
135-
return new Session(connectionPromise);
147+
const connectionProvider = new SingleConnectionProvider(connectionPromise);
148+
return new Session(READ, connectionProvider);
136149
}
137150

138151
_updateRoutingTable(newRoutingTable) {
@@ -153,3 +166,17 @@ export class LoadBalancer extends ConnectionProvider {
153166
}
154167
}
155168
}
169+
170+
export class SingleConnectionProvider extends ConnectionProvider {
171+
172+
constructor(connectionPromise) {
173+
super();
174+
this._connectionPromise = connectionPromise;
175+
}
176+
177+
acquireConnection(mode) {
178+
const connectionPromise = this._connectionPromise;
179+
this._connectionPromise = null;
180+
return connectionPromise;
181+
}
182+
}

src/v1/routing-driver.js

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,30 +31,24 @@ class RoutingDriver extends Driver {
3131
super(url, userAgent, token, RoutingDriver._validateConfig(config));
3232
}
3333

34-
_createConnectionProvider(address, connectionPool) {
35-
return new LoadBalancer(address, connectionPool);
34+
_createConnectionProvider(address, connectionPool, driverOnErrorCallback) {
35+
return new LoadBalancer(address, connectionPool, driverOnErrorCallback);
3636
}
3737

38-
_createSession(connectionPromise) {
39-
return new RoutingSession(connectionPromise, (error, conn) => {
38+
_createSession(mode, connectionProvider) {
39+
return new RoutingSession(mode, connectionProvider, (error, conn) => {
4040
if (error.code === SERVICE_UNAVAILABLE || error.code === SESSION_EXPIRED) {
41+
// connection is undefined if error happened before connection was acquired
4142
if (conn) {
4243
this._connectionProvider.forget(conn.url);
43-
} else {
44-
connectionPromise.then((conn) => {
45-
this._connectionProvider.forget(conn.url);
46-
}).catch(() => {/*ignore*/});
4744
}
4845
return error;
4946
} else if (error.code === 'Neo.ClientError.Cluster.NotALeader') {
5047
let url = 'UNKNOWN';
48+
// connection is undefined if error happened before connection was acquired
5149
if (conn) {
5250
url = conn.url;
5351
this._connectionProvider.forgetWriter(conn.url);
54-
} else {
55-
connectionPromise.then((conn) => {
56-
this._connectionProvider.forgetWriter(conn.url);
57-
}).catch(() => {/*ignore*/});
5852
}
5953
return newError('No longer possible to write to server at ' + url, SESSION_EXPIRED);
6054
} else {
@@ -72,8 +66,8 @@ class RoutingDriver extends Driver {
7266
}
7367

7468
class RoutingSession extends Session {
75-
constructor(connectionPromise, onFailedConnection) {
76-
super(connectionPromise);
69+
constructor(mode, connectionProvider, onFailedConnection) {
70+
super(mode, connectionProvider);
7771
this._onFailedConnection = onFailedConnection;
7872
}
7973

src/v1/session.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@ import {assertString} from './internal/util';
3131
class Session {
3232
/**
3333
* @constructor
34-
* @param {Promise.<Connection>} connectionPromise - Promise of a connection to use
34+
* todo: doc params
3535
*/
36-
constructor(connectionPromise) {
37-
this._connectionPromise = connectionPromise;
36+
constructor(mode, connectionProvider) {
37+
this._mode = mode;
38+
this._connectionProvider = connectionProvider;
39+
this._connectionPromise = this._connectionProvider.acquireConnection(this._mode);
3840
this._open = true;
3941
this._hasTx = false;
4042
}

test/internal/connection-providers.test.js

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ import RoundRobinArray from '../../src/v1/internal/round-robin-array';
2525
import {DirectConnectionProvider, LoadBalancer} from '../../src/v1/internal/connection-providers';
2626
import Pool from '../../src/v1/internal/pool';
2727

28+
const NO_OP_DRIVER_CALLBACK = () => {
29+
};
30+
2831
describe('DirectConnectionProvider', () => {
2932

3033
it('acquires connection from the pool', done => {
3134
const pool = newPool();
32-
const connectionProvider = new DirectConnectionProvider('localhost:123', pool);
35+
const connectionProvider = newDirectConnectionProvider('localhost:123', pool);
3336

3437
connectionProvider.acquireConnection(READ).then(connection => {
3538
expect(connection).toBeDefined();
@@ -132,7 +135,7 @@ describe('LoadBalancer', () => {
132135
});
133136

134137
it('initializes routing table with the given router', () => {
135-
const loadBalancer = new LoadBalancer('server-ABC', newPool());
138+
const loadBalancer = new LoadBalancer('server-ABC', newPool(), NO_OP_DRIVER_CALLBACK);
136139

137140
expectRoutingTable(loadBalancer,
138141
['server-ABC'],
@@ -581,8 +584,12 @@ describe('LoadBalancer', () => {
581584

582585
});
583586

587+
function newDirectConnectionProvider(address, pool) {
588+
return new DirectConnectionProvider(address, pool, NO_OP_DRIVER_CALLBACK);
589+
}
590+
584591
function newLoadBalancer(routers, readers, writers, pool = null, expirationTime = Integer.MAX_VALUE, routerToRoutingTable = {}) {
585-
const loadBalancer = new LoadBalancer(null, pool || newPool());
592+
const loadBalancer = new LoadBalancer(null, pool || newPool(), NO_OP_DRIVER_CALLBACK);
586593
loadBalancer._routingTable = new RoutingTable(
587594
new RoundRobinArray(routers),
588595
new RoundRobinArray(readers),

test/v1/session.test.js

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import neo4j from '../../src/v1';
2121
import {statementType} from '../../src/v1/result-summary';
2222
import Session from '../../src/v1/session';
23+
import {READ} from '../../src/v1/driver';
24+
import {SingleConnectionProvider} from '../../src/v1/internal/connection-providers';
2325

2426
describe('session', () => {
2527

@@ -47,14 +49,14 @@ describe('session', () => {
4749

4850
it('close should invoke callback ', done => {
4951
const connection = new FakeConnection();
50-
const session = new Session(Promise.resolve(connection));
52+
const session = newSessionWithConnection(connection);
5153

5254
session.close(done);
5355
});
5456

5557
it('close should invoke callback even when already closed ', done => {
5658
const connection = new FakeConnection();
57-
const session = new Session(Promise.resolve(connection));
59+
const session = newSessionWithConnection(connection);
5860

5961
session.close(() => {
6062
session.close(() => {
@@ -67,7 +69,7 @@ describe('session', () => {
6769

6870
it('close should be idempotent ', done => {
6971
const connection = new FakeConnection();
70-
const session = new Session(Promise.resolve(connection));
72+
const session = newSessionWithConnection(connection);
7173

7274
session.close(() => {
7375
expect(connection.closedOnce()).toBeTruthy();
@@ -441,6 +443,11 @@ describe('session', () => {
441443
expect(() => driver.session('ILLEGAL_MODE')).toThrow();
442444
});
443445

446+
function newSessionWithConnection(connection) {
447+
const connectionProvider = new SingleConnectionProvider(Promise.resolve(connection));
448+
return new Session(READ, connectionProvider);
449+
}
450+
444451
class FakeConnection {
445452

446453
constructor() {
@@ -449,6 +456,12 @@ describe('session', () => {
449456
this.releaseInvoked = 0;
450457
}
451458

459+
run() {
460+
}
461+
462+
discardAll() {
463+
}
464+
452465
reset() {
453466
this.resetInvoked++;
454467
}

0 commit comments

Comments
 (0)