Skip to content

Commit fc852ca

Browse files
add optional metrics args
1 parent bd1f17b commit fc852ca

File tree

3 files changed

+109
-21
lines changed

3 files changed

+109
-21
lines changed

README.md

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,52 @@ it if the promise is rejected. Atomics may be nested.
7777

7878
Install a database `Session` on the domain `d`.
7979

80-
#### `ConnPairFn := Function → Promise({connection, release})`
80+
##### `Options`
81+
82+
Sessions accept the following options:
83+
84+
* `maxConcurrency`: An integer specifying the maximum number of connections a
85+
given session will make at a time. `0` is treated as `Infinity`. Defaults to
86+
`Infinity`. *Note:* this number is implicitly bound by the size of the `pg`
87+
connection pool. For example, even if the limit is set at `200`, if `pg`'s
88+
pool size is limited to `10`, the upper limit will effectively be `10`.
89+
* `onSessionIdle()`: A function that is called whenever all requests for
90+
connections have been satisfied. Note that this may happen while connections
91+
are still open.
92+
* `onConnectionRequest(baton)`: A function accepting a baton object that is
93+
called when a request for a connection is made.
94+
* `onConnectionStart(baton)`: A function acccepting a baton object that is
95+
called when a request for a connection is fulfilled. The baton will be the
96+
same object that was passed to a previous call to `onConnectionRequest`,
97+
suitable for associating timing information.
98+
* `onConnectionFinish(baton, err)`: A function accepting a baton object and
99+
an optional `err` parameter that will be called when a connection is released
100+
back to the session.
101+
* `onTransactionRequest(baton, operation, args)`: A function accepting a baton,
102+
function, and array of arguments representing the request for a transaction
103+
session. Called coincident with `onConnectionRequest`.
104+
* `onTransactionStart(baton, operation, args)`: A function accepting a baton,
105+
function, and array of arguments representing the fulfillment of a request
106+
for a transaction session. Called before `BEGIN`, coincident with
107+
`onConnectionStart`.
108+
* `onTransactionFinish(baton, operation, args, PromiseInspection)`:
109+
A function accepting a baton, function, array of arguments, and a
110+
[`PromiseInspection`][bluebird-inspection] representing the state of the
111+
transaction. Called coincident with `onConnectionFinish`.
112+
* `onAtomicRequest(baton, operation, args)`: A function accepting a baton,
113+
function, and array of arguments representing the request for an atomic
114+
session.
115+
* `onAtomicStart(baton, operation, args)`: A function accepting a baton,
116+
function, and array of arguments representing the fulfillment of a request
117+
for an atomic session.
118+
* `onAtomicFinish(baton, operation, args, PromiseInspection)`:
119+
A function accepting a baton, function, array of arguments, and a
120+
[`PromiseInspection`][bluebird-inspection] representing the state of the
121+
atomic transaction.
122+
123+
All functions will default to `noop` if not provided.
124+
125+
##### `ConnPairFn := Function → Promise({connection, release})`
81126

82127
A function that returns a `Promise` for an object with `connection` and `release`
83128
properties, corresponding to the `client` and `done` parameters handed back by

db-session.js

Lines changed: 59 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,27 @@ class NoSessionAvailable extends Error {
1414
}
1515
}
1616

17+
function noop () {
18+
}
19+
1720
const api = module.exports = {
1821
install (domain, getConnection, opts) {
19-
opts = opts || {}
22+
opts = Object.assign({
23+
maxConcurrency: Infinity,
24+
onSessionIdle: noop,
25+
onConnectionRequest: noop,
26+
onConnectionStart: noop,
27+
onConnectionFinish: noop,
28+
onTransactionRequest: noop,
29+
onTransactionStart: noop,
30+
onTransactionFinish: noop,
31+
onAtomicRequest: noop,
32+
onAtomicStart: noop,
33+
onAtomicFinish: noop
34+
}, opts || {})
2035
DOMAIN_TO_SESSION.set(domain, new Session(
2136
getConnection,
22-
opts.maxConcurrency
37+
opts
2338
))
2439
},
2540

@@ -63,15 +78,29 @@ const api = module.exports = {
6378
// 3. atomic — grouped set of operations — parent transaction treats all connections performed
6479
// as a single operation
6580
class Session {
66-
constructor (getConnection, maxConcurrency) {
81+
constructor (getConnection, opts) {
6782
this._getConnection = getConnection
68-
this._activeConnections = 0
69-
this._maxConcurrency = maxConcurrency || Infinity
83+
this.activeConnections = 0
84+
this.maxConcurrency = opts.maxConcurrency || Infinity
85+
this.metrics = {
86+
onSessionIdle: opts.onSessionIdle,
87+
onConnectionRequest: opts.onConnectionRequest,
88+
onConnectionStart: opts.onConnectionStart,
89+
onConnectionFinish: opts.onConnectionFinish,
90+
onTransactionRequest: opts.onTransactionRequest,
91+
onTransactionStart: opts.onTransactionStart,
92+
onTransactionFinish: opts.onTransactionFinish,
93+
onAtomicRequest: opts.onAtomicRequest,
94+
onAtomicStart: opts.onAtomicStart,
95+
onAtomicFinish: opts.onAtomicFinish
96+
}
7097
this.pending = []
7198
}
7299

73100
getConnection () {
74-
if (this._activeConnections === this._maxConcurrency) {
101+
const baton = {}
102+
this.metrics.onConnectionRequest(baton)
103+
if (this.activeConnections === this.maxConcurrency) {
75104
// not using Promise.defer() here in case it gets deprecated by
76105
// bluebird.
77106
const pending = _defer()
@@ -80,24 +109,29 @@ class Session {
80109
}
81110

82111
const connPair = Promise.resolve(this._getConnection())
83-
++this._activeConnections
112+
++this.activeConnections
84113

85-
return connPair.then(
86-
pair => new SessionConnectionPair(pair, this)
87-
)
114+
return connPair.then(pair => {
115+
this.metrics.onConnectionStart(baton)
116+
return new SessionConnectionPair(pair, this, baton)
117+
})
88118
}
89119

90120
transaction (operation, args) {
121+
const baton = {}
91122
const getConnPair = this.getConnection()
92-
const getResult = Session$RunWrapped(connPair => {
93-
return new TransactionSession(connPair)
123+
this.metrics.onTransactionRequest(baton, operation, args)
124+
const getResult = Session$RunWrapped(this, connPair => {
125+
this.metrics.onTransactionStart(baton, operation, args)
126+
return new TransactionSession(connPair, this.metrics)
94127
}, getConnPair, `BEGIN`, {
95128
success: `COMMIT`,
96129
failure: `ROLLBACK`
97130
}, operation, args)
98131

99132
const releasePair = getConnPair.then(pair => {
100133
return getResult.reflect().then(result => {
134+
this.metrics.onTransactionFinish(baton, operation, args, result)
101135
return result.isFulfilled()
102136
? pair.release()
103137
: pair.release(result.reason())
@@ -114,16 +148,17 @@ class Session {
114148
}
115149

116150
releasePair (pair, err) {
117-
--this._activeConnections
151+
--this.activeConnections
118152
pair.release(err)
119153
}
120154
}
121155

122156
class TransactionSession {
123-
constructor (connPair) {
157+
constructor (connPair, metrics) {
124158
this.connectionPair = connPair
125159
this.inactive = false
126160
this.operation = Promise.resolve(true)
161+
this.metrics = metrics
127162
}
128163

129164
getConnection () {
@@ -148,17 +183,21 @@ class TransactionSession {
148183
}
149184

150185
atomic (operation, args) {
186+
const baton = {}
151187
const atomicConnPair = this.getConnection()
152188
const savepointName = getSavepointName(operation)
153-
const getResult = Session$RunWrapped(connPair => {
154-
return new AtomicSession(connPair, savepointName)
189+
this.metrics.onAtomicRequest(baton, operation, args)
190+
const getResult = Session$RunWrapped(this, connPair => {
191+
this.metrics.onAtomicStart(baton, operation, args)
192+
return new AtomicSession(connPair, this.metrics, savepointName)
155193
}, atomicConnPair, `SAVEPOINT ${savepointName}`, {
156194
success: `RELEASE SAVEPOINT ${savepointName}`,
157195
failure: `ROLLBACK TO SAVEPOINT ${savepointName}`
158196
}, operation, args)
159197

160198
const releasePair = atomicConnPair.then(pair => {
161199
return getResult.reflect().then(result => {
200+
this.metrics.onAtomicFinish(baton, operation, args, result)
162201
return result.isFulfilled()
163202
? pair.release()
164203
: pair.release(result.reason())
@@ -170,13 +209,14 @@ class TransactionSession {
170209
}
171210

172211
class AtomicSession extends TransactionSession {
173-
constructor (connection, name) {
174-
super(connection)
212+
constructor (connection, metrics, name) {
213+
super(connection, metrics)
175214
this.name = name
176215
}
177216
}
178217

179-
function Session$RunWrapped (createSession,
218+
function Session$RunWrapped (parent,
219+
createSession,
180220
getConnPair,
181221
before,
182222
after,

lib/session-connpair.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
'use strict'
22

33
module.exports = class SessionConnectionPair {
4-
constructor (connPair, session) {
4+
constructor (connPair, session, baton) {
55
this.pair = connPair
66
this.session = session
7+
this.baton = baton
78

89
// tightly bind "release", because we don't know
910
// who will be calling it.
@@ -20,13 +21,15 @@ module.exports = class SessionConnectionPair {
2021
// the connection entirely. This lets us limit the concurrency
2122
// per-request, instead of globally.
2223
function release (conn, err) {
24+
conn.session.metrics.onConnectionFinish(conn.baton, err)
2325
if (err) {
2426
return handleError(conn, err)
2527
}
2628
const next = conn.session.pending.shift()
2729
if (next) {
2830
return next.resolve(conn)
2931
}
32+
conn.session.metrics.onSessionIdle()
3033
conn.session.releasePair(conn.pair, null)
3134
}
3235

0 commit comments

Comments
 (0)