Skip to content

Commit 648d402

Browse files
committed
Simplify connection release when closing session
This commit makes `Session#close()` responsible for releasing the underlying connection to the pool. Previously this behaviour was passed in as a function from the driver.
1 parent 709da0e commit 648d402

File tree

4 files changed

+127
-73
lines changed

4 files changed

+127
-73
lines changed

src/v1/driver.js

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
* limitations under the License.
1818
*/
1919

20-
import Session from "./session";
21-
import Pool from "./internal/pool";
22-
import {connect} from "./internal/connector";
23-
import StreamObserver from "./internal/stream-observer";
24-
import {newError, SERVICE_UNAVAILABLE} from "./error";
20+
import Session from './session';
21+
import Pool from './internal/pool';
22+
import {connect} from './internal/connector';
23+
import StreamObserver from './internal/stream-observer';
24+
import {newError, SERVICE_UNAVAILABLE} from './error';
2525

2626
const READ = 'READ', WRITE = 'WRITE';
2727
/**
@@ -119,33 +119,7 @@ class Driver {
119119
//we don't need to tell the driver about this error
120120
}
121121
});
122-
return this._createSession(connectionPromise, this._releaseConnection(connectionPromise));
123-
}
124-
125-
/**
126-
* The returned function gets called on Session#close(), and is where we return the pooled 'connection' instance.
127-
* We don't pool Session instances, to avoid users using the Session after they've called close.
128-
* The `Session` object is just a thin wrapper around Connection anyway, so it makes little difference.
129-
* @param {Promise} connectionPromise - promise resolved with the connection.
130-
* @return {function(callback: function)} - function that releases the connection and then executes an optional callback.
131-
* @protected
132-
*/
133-
_releaseConnection(connectionPromise) {
134-
return userDefinedCallback => {
135-
connectionPromise.then(conn => {
136-
// Queue up a 'reset', to ensure the next user gets a clean session to work with.
137-
conn.reset();
138-
conn.sync();
139-
140-
// Return connection to the pool
141-
conn._release();
142-
}).catch(ignoredError => {
143-
});
144-
145-
if (userDefinedCallback) {
146-
userDefinedCallback();
147-
}
148-
};
122+
return this._createSession(connectionPromise);
149123
}
150124

151125
static _validateSessionMode(rawMode) {
@@ -162,8 +136,8 @@ class Driver {
162136
}
163137

164138
//Extension point
165-
_createSession(connectionPromise, cb) {
166-
return new Session(connectionPromise, cb);
139+
_createSession(connectionPromise) {
140+
return new Session(connectionPromise);
167141
}
168142

169143
/**

src/v1/routing-driver.js

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717
* limitations under the License.
1818
*/
1919

20-
import Session from "./session";
21-
import {Driver, READ, WRITE} from "./driver";
22-
import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from "./error";
23-
import RoundRobinArray from "./internal/round-robin-array";
24-
import RoutingTable from "./internal/routing-table";
25-
import Rediscovery from "./internal/rediscovery";
20+
import Session from './session';
21+
import {Driver, READ, WRITE} from './driver';
22+
import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from './error';
23+
import RoundRobinArray from './internal/round-robin-array';
24+
import RoutingTable from './internal/routing-table';
25+
import Rediscovery from './internal/rediscovery';
2626

2727
/**
2828
* A driver that supports routing in a core-edge cluster.
@@ -35,8 +35,8 @@ class RoutingDriver extends Driver {
3535
this._rediscovery = new Rediscovery();
3636
}
3737

38-
_createSession(connectionPromise, cb) {
39-
return new RoutingSession(connectionPromise, cb, (error, conn) => {
38+
_createSession(connectionPromise) {
39+
return new RoutingSession(connectionPromise, (error, conn) => {
4040
if (error.code === SERVICE_UNAVAILABLE || error.code === SESSION_EXPIRED) {
4141
if (conn) {
4242
this._forget(conn.url)
@@ -132,7 +132,7 @@ class RoutingDriver extends Driver {
132132
// error transformer here is a no-op unlike the one in a regular session, this is so because errors are
133133
// handled in the rediscovery promise chain and we do not need to do this in the error transformer
134134
const errorTransformer = error => error;
135-
return new RoutingSession(connectionPromise, this._releaseConnection(connectionPromise), errorTransformer);
135+
return new RoutingSession(connectionPromise, errorTransformer);
136136
}
137137

138138
_forget(url) {
@@ -160,8 +160,8 @@ class RoutingDriver extends Driver {
160160
}
161161

162162
class RoutingSession extends Session {
163-
constructor(connectionPromise, onClose, onFailedConnection) {
164-
super(connectionPromise, onClose);
163+
constructor(connectionPromise, onFailedConnection) {
164+
super(connectionPromise);
165165
this._onFailedConnection = onFailedConnection;
166166
}
167167

src/v1/session.js

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
import StreamObserver from "./internal/stream-observer";
20-
import Result from "./result";
21-
import Transaction from "./transaction";
22-
import {newError} from "./error";
23-
import {assertString} from "./internal/util";
19+
import StreamObserver from './internal/stream-observer';
20+
import Result from './result';
21+
import Transaction from './transaction';
22+
import {newError} from './error';
23+
import {assertString} from './internal/util';
2424

2525
/**
2626
* A Session instance is used for handling the connection and
@@ -32,11 +32,10 @@ class Session {
3232
/**
3333
* @constructor
3434
* @param {Promise.<Connection>} connectionPromise - Promise of a connection to use
35-
* @param {function()} onClose - Function to be called on connection close
3635
*/
37-
constructor( connectionPromise, onClose ) {
36+
constructor(connectionPromise) {
3837
this._connectionPromise = connectionPromise;
39-
this._onClose = onClose;
38+
this._open = true;
4039
this._hasTx = false;
4140
}
4241

@@ -102,25 +101,41 @@ class Session {
102101

103102
/**
104103
* Close this session.
105-
* @param {function()} cb - Function to be called after the session has been closed
104+
* @param {function()} callback - Function to be called after the session has been closed
106105
* @return
107106
*/
108-
close(cb=(()=>null)) {
109-
if(this._onClose) {
110-
try {
111-
this._onClose(cb);
112-
} finally {
113-
this._onClose = null;
114-
}
107+
close(callback = (() => null)) {
108+
if (this._open) {
109+
this._open = false;
110+
this._releaseCurrentConnection().then(callback);
115111
} else {
116-
cb();
112+
callback();
117113
}
118114
}
119115

120116
//Can be overridden to add error callback on RUN
121117
_onRunFailure() {
122118
return (err) => {return err};
123119
}
120+
121+
/**
122+
* Return the current pooled connection instance to the connection pool.
123+
* We don't pool Session instances, to avoid users using the Session after they've called close.
124+
* The `Session` object is just a thin wrapper around Connection anyway, so it makes little difference.
125+
* @return {Promise} - promise resolved then connection is returned to the pool.
126+
* @private
127+
*/
128+
_releaseCurrentConnection() {
129+
return this._connectionPromise.then(conn => {
130+
// Queue up a 'reset', to ensure the next user gets a clean session to work with.
131+
conn.reset();
132+
conn.sync();
133+
134+
// Return connection to the pool
135+
conn._release();
136+
}).catch(ignoredError => {
137+
});
138+
}
124139
}
125140

126141
/** Internal stream observer used for transactional results*/

test/v1/session.test.js

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,56 @@ describe('session', () => {
4545
driver.close();
4646
});
4747

48-
it('close should be idempotent ', () => {
49-
// Given
50-
let counter = 0;
51-
const _session = new Session(null, () => {
52-
counter++;
48+
it('close should invoke callback ', done => {
49+
const connection = new FakeConnection();
50+
const session = new Session(Promise.resolve(connection));
51+
52+
session.close(done);
53+
});
54+
55+
it('close should invoke callback even when already closed ', done => {
56+
const connection = new FakeConnection();
57+
const session = new Session(Promise.resolve(connection));
58+
59+
session.close(() => {
60+
session.close(() => {
61+
session.close(() => {
62+
done();
63+
});
64+
});
65+
});
66+
});
67+
68+
it('close should be idempotent ', done => {
69+
const connection = new FakeConnection();
70+
const session = new Session(Promise.resolve(connection));
71+
72+
session.close(() => {
73+
expect(connection.closedOnce()).toBeTruthy();
74+
75+
session.close(() => {
76+
expect(connection.closedOnce()).toBeTruthy();
77+
78+
session.close(() => {
79+
expect(connection.closedOnce()).toBeTruthy();
80+
done();
81+
});
82+
});
83+
});
84+
});
85+
86+
it('should be possible to close driver after closing session with failed tx ', done => {
87+
const driver = neo4j.driver('bolt://localhost', neo4j.auth.basic('neo4j', 'neo4j'));
88+
const session = driver.session();
89+
const tx = session.beginTransaction();
90+
tx.run('INVALID QUERY').catch(() => {
91+
tx.rollback().catch(() => {
92+
session.close(() => {
93+
driver.close();
94+
done();
95+
});
96+
});
5397
});
54-
_session.close();
55-
expect(counter).toBe(1);
56-
_session.close();
57-
expect(counter).toBe(1);
5898
});
5999

60100
it('should expose basic run/subscribe ', done => {
@@ -74,7 +114,7 @@ describe('session', () => {
74114
});
75115
});
76116

77-
it('should keep context in subscribe methods ', function (done) {
117+
it('should keep context in subscribe methods ', done => {
78118
// Given
79119
function MyObserver() {
80120
this.local = 'hello';
@@ -87,7 +127,7 @@ describe('session', () => {
87127
expect(privateLocal).toBe('hello');
88128
expect(this.local).toBe('hello');
89129
done();
90-
}
130+
};
91131
}
92132

93133
// When & Then
@@ -400,6 +440,31 @@ describe('session', () => {
400440
it('should fail for illegal session mode', () => {
401441
expect(() => driver.session('ILLEGAL_MODE')).toThrow();
402442
});
443+
444+
class FakeConnection {
445+
446+
constructor() {
447+
this.resetInvoked = 0;
448+
this.syncInvoked = 0;
449+
this.releaseInvoked = 0;
450+
}
451+
452+
reset() {
453+
this.resetInvoked++;
454+
}
455+
456+
sync() {
457+
this.syncInvoked++;
458+
}
459+
460+
_release() {
461+
this.releaseInvoked++;
462+
}
463+
464+
closedOnce() {
465+
return this.resetInvoked === 1 && this.syncInvoked === 1 && this.releaseInvoked === 1;
466+
}
467+
}
403468
});
404469

405470

0 commit comments

Comments
 (0)