Skip to content

Commit 44fc3a7

Browse files
initial code commit
1 parent a15052a commit 44fc3a7

File tree

5 files changed

+371
-0
lines changed

5 files changed

+371
-0
lines changed

db-session.js

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
'use strict'
2+
3+
const DOMAIN_TO_SESSION = new WeakMap()
4+
const Promise = require('bluebird')
5+
const domain = require('domain')
6+
7+
const AtomicSessionConnectionPair = require('./lib/atomic-session-connpair.js')
8+
const TxSessionConnectionPair = require('./lib/tx-session-connpair.js')
9+
const SessionConnectionPair = require('./lib/session-connpair.js')
10+
11+
const api = module.exports = {
12+
install (domain, getConnection, opts) {
13+
opts = opts || {}
14+
DOMAIN_TO_SESSION.set(domain, new Session(
15+
getConnection,
16+
opts.maxConcurrency
17+
))
18+
},
19+
20+
atomic (operation) {
21+
return function atomic$operation () {
22+
const args = [].slice.call(arguments)
23+
return api.session.atomic(operation, args)
24+
}
25+
},
26+
27+
transaction (operation) {
28+
return function transaction$operation () {
29+
const args = [].slice.call(arguments)
30+
return api.session.transaction(operation, args)
31+
}
32+
},
33+
34+
getConnection () {
35+
return DOMAIN_TO_SESSION.get(process.domain).getConnection()
36+
},
37+
38+
get session () {
39+
var current = DOMAIN_TO_SESSION.get(process.domain)
40+
if (!current) {
41+
throw new Error('no session active')
42+
}
43+
while (current.inactive && current.parent) {
44+
current = current.parent
45+
}
46+
return current
47+
}
48+
}
49+
50+
// how does this nest:
51+
// 1. no transaction — session creates connections on-demand up till maxconcurrency
52+
// 2. transaction — session holds one connection, gives it to requesters as-needed, one
53+
// at a time
54+
// 3. atomic — grouped set of operations — parent transaction treats all connections performed
55+
// as a single operation
56+
class Session {
57+
constructor (getConnection, maxConcurrency) {
58+
this._getConnection = getConnection
59+
this._activeConnections = 0
60+
this._maxConcurrency = maxConcurrency || Infinity
61+
this.pending = []
62+
}
63+
64+
getConnection () {
65+
if (this._activeConnections === this._maxConcurrency) {
66+
// not using Promise.defer() here in case it gets deprecated by
67+
// bluebird.
68+
const pending = _defer()
69+
this.pending.push(pending)
70+
return pending.promise
71+
}
72+
73+
const connPair = this._getConnection()
74+
++this._activeConnections
75+
76+
return connPair.then(
77+
pair => new SessionConnectionPair(pair, this)
78+
)
79+
}
80+
81+
transaction (operation, args) {
82+
const getConnPair = this.getConnection()
83+
const getResult = Session$RunWrapped(this, connPair => {
84+
return new TransactionSession(this, connPair)
85+
}, getConnPair, `BEGIN`, {
86+
success: `COMMIT`,
87+
failure: `ROLLBACK`
88+
}, operation, args)
89+
const releasePair = getResult.return(getConnPair).then(
90+
pair => pair.release()
91+
)
92+
93+
return releasePair.return(getResult)
94+
}
95+
96+
atomic (operation, args) {
97+
return this.transaction(() => {
98+
return DOMAIN_TO_SESSION.get(process.domain).atomic(operation, args)
99+
}, args.slice())
100+
}
101+
102+
releasePair (pair, err) {
103+
--this._activeConnections
104+
pair.release(err)
105+
}
106+
}
107+
108+
class TransactionSession {
109+
constructor (parent, connPair) {
110+
this.parent = parent
111+
this.connectionPair = connPair
112+
this.inactive = false
113+
this.operation = Promise.resolve(true)
114+
this.operation.redBalloon = true
115+
}
116+
117+
getConnection () {
118+
if (this.inactive) {
119+
return this.parent.getConnection()
120+
}
121+
// XXX(chrisdickinson): creating a TxConnPair implicitly
122+
// swaps out "this.operation", creating a linked list of
123+
// promises.
124+
return new TxSessionConnectionPair(this).onready
125+
}
126+
127+
transaction (operation, args) {
128+
if (this.inactive) {
129+
return this.parent.transaction(operation, args)
130+
}
131+
return operation.apply(null, args)
132+
}
133+
134+
atomic (operation, args) {
135+
const atomicConnPair = new AtomicSessionConnectionPair(this)
136+
const savepointName = getSavepointName(operation)
137+
const getResult = Session$RunWrapped(this, connPair => {
138+
return new AtomicSession(this, connPair, savepointName)
139+
}, atomicConnPair.onready, `SAVEPOINT ${savepointName}`, {
140+
success: `RELEASE SAVEPOINT ${savepointName}`,
141+
failure: `ROLLBACK TO SAVEPOINT ${savepointName}`
142+
}, operation, args)
143+
144+
return getResult.then(() => {
145+
setImmediate(() => {
146+
atomicConnPair.close()
147+
})
148+
}).return(getResult)
149+
}
150+
}
151+
152+
class AtomicSession extends TransactionSession {
153+
constructor (parent, connection, name) {
154+
super(parent, connection)
155+
this.name = name
156+
}
157+
}
158+
159+
function Session$RunWrapped (parent,
160+
createSession,
161+
getConnPair, before, after, operation, args) {
162+
const createSubdomain = getConnPair.then(connPair => {
163+
const subdomain = domain.create()
164+
const session = createSession(connPair)
165+
DOMAIN_TO_SESSION.set(subdomain, session)
166+
return subdomain
167+
})
168+
169+
const runBefore = getConnPair.then(connPair => new Promise(
170+
(resolve, reject) => connPair.connection.query(
171+
before,
172+
err => err ? reject(err) : resolve()
173+
)
174+
))
175+
176+
const getResult = runBefore.return(
177+
createSubdomain
178+
).then(domain => {
179+
args.unshift(operation)
180+
return Promise.resolve(domain.run.apply(domain, args))
181+
})
182+
183+
const getReflectedResult = getResult.reflect()
184+
const runCommitStep = Promise.join(
185+
getReflectedResult,
186+
getConnPair.get('connection')
187+
).spread((result, connection) => {
188+
return new Promise((resolve, reject) => {
189+
connection.query(
190+
result.isFulfilled()
191+
? after.success
192+
: after.failure,
193+
err => err ? reject(err) : resolve()
194+
)
195+
})
196+
})
197+
198+
return runCommitStep.return(
199+
createSubdomain
200+
).then(markInactive(parent)).return(getResult)
201+
}
202+
203+
function getSavepointName (operation) {
204+
const id = getSavepointName.ID++
205+
const dt = new Date().toISOString().replace(/[^\d]/g, '_').slice(0, -1)
206+
const name = (operation.name || 'anon').replace(/[^\w]/g, '_')
207+
// e.g., "save_13_userToOrg_2016_01_03_08_30_00_000"
208+
return `save_${id}_${name}_${dt}`
209+
}
210+
getSavepointName.ID = 0
211+
212+
function markInactive (session) {
213+
// XXX(chrisdickinson): is this a good idea? this
214+
// means that DB requests after the operation completes
215+
// will still work, but will operate outside of the
216+
// transaction.
217+
return domain => {
218+
DOMAIN_TO_SESSION.get(domain).inactive = true
219+
DOMAIN_TO_SESSION.set(domain, session)
220+
}
221+
}
222+
223+
function _defer () {
224+
const pending = {
225+
resolve: null,
226+
reject: null,
227+
promise: null
228+
}
229+
pending.promise = new Promise((resolve, reject) => {
230+
pending.resolve = resolve
231+
pending.reject = reject
232+
})
233+
return pending
234+
}

lib/atomic-session-connpair.js

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
'use strict'
2+
3+
const RESOLVE_SYM = Symbol()
4+
const REJECT_SYM = Symbol()
5+
6+
module.exports = class AtomicSessionConnPair {
7+
constructor (session) {
8+
this.pair = session.connectionPair
9+
this.release = err => release(this, err)
10+
this.completed = new Promise((resolve, reject) => {
11+
this[RESOLVE_SYM] = resolve
12+
this[REJECT_SYM] = reject
13+
})
14+
this.onready = session.operation.then(() => this)
15+
session.operation = this.completed
16+
}
17+
18+
get connection () {
19+
return this.pair.connection
20+
}
21+
22+
close () {
23+
this[RESOLVE_SYM]()
24+
}
25+
}
26+
27+
function release (conn, err) {
28+
if (err) {
29+
return conn[REJECT_SYM](err)
30+
}
31+
}

lib/session-connpair.js

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
'use strict'
2+
3+
module.exports = class SessionConnectionPair {
4+
constructor (connPair, session) {
5+
this.pair = connPair
6+
this.session = session
7+
8+
// tightly bind "release", because we don't know
9+
// who will be calling it.
10+
this.release = err => release(this, err)
11+
}
12+
13+
get connection () {
14+
return this.pair.connection
15+
}
16+
}
17+
18+
// release: attempt to hand the connection pair to the next
19+
// in the list of waiting receivers. If there are none, release
20+
// the connection entirely. This lets us limit the concurrency
21+
// per-request, instead of globally.
22+
function release (conn, err) {
23+
if (err) {
24+
return handleError(conn, err)
25+
}
26+
const next = conn.session.pending.shift()
27+
if (next) {
28+
return next.resolve(conn)
29+
}
30+
conn.session.releasePair(conn.pair, null)
31+
}
32+
33+
// handleError: release the connection back to the pg pool
34+
// with the error notification; replay all pending connections
35+
// so they don't try to grab this one.
36+
function handleError (conn, err) {
37+
conn.session.decrementConnections()
38+
conn.session.releasePair(conn.pair, err)
39+
40+
const pending = conn.session.pending.slice()
41+
conn.session.length = 0
42+
while (pending.length) {
43+
pending.shift().resolve(conn.session.getConnection())
44+
}
45+
}

lib/tx-session-connpair.js

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
'use strict'
2+
3+
const RESOLVE_SYM = Symbol()
4+
const REJECT_SYM = Symbol()
5+
6+
module.exports = class TransactionSessionConnPair {
7+
constructor (session) {
8+
this.pair = session.connectionPair
9+
this.release = err => release(this, err)
10+
this.completed = new Promise((resolve, reject) => {
11+
this[RESOLVE_SYM] = resolve
12+
this[REJECT_SYM] = reject
13+
})
14+
this.onready = session.operation.then(() => this)
15+
session.operation = this.completed
16+
}
17+
18+
get connection () {
19+
return this.pair.connection
20+
}
21+
}
22+
23+
function release (conn, err) {
24+
return err ? conn[REJECT_SYM](err) : conn[RESOLVE_SYM]()
25+
}

package.json

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
{
2+
"name": "@npm/pg-db-session",
3+
"version": "1.0.0",
4+
"description": "domain-attached database sessions",
5+
"main": "db-session.js",
6+
"scripts": {
7+
"test": "tape test/*-test.js && standard",
8+
"test-cov": "nyc tape test/*-test.js && standard"
9+
},
10+
"repository": {
11+
"type": "git",
12+
"url": "git+ssh://[email protected]/npm/pg-db-session.git"
13+
},
14+
"keywords": [
15+
"postgres",
16+
"database",
17+
"domain",
18+
"session"
19+
],
20+
"author": "Chris Dickinson <[email protected]> (http://neversaw.us/)",
21+
"license": "MIT",
22+
"bugs": {
23+
"url": "https://github.com/npm/pg-db-session/issues"
24+
},
25+
"homepage": "https://github.com/npm/pg-db-session#readme",
26+
"devDependencies": {
27+
"faucet": "0.0.1",
28+
"nyc": "^5.3.0",
29+
"pg": "^4.4.3",
30+
"standard": "^5.4.1",
31+
"tape": "^4.4.0"
32+
},
33+
"dependencies": {
34+
"bluebird": "^3.1.1"
35+
}
36+
}

0 commit comments

Comments
 (0)