@@ -4,7 +4,6 @@ const DOMAIN_TO_SESSION = new WeakMap()
4
4
const Promise = require ( 'bluebird' )
5
5
const domain = require ( 'domain' )
6
6
7
- const AtomicSessionConnectionPair = require ( './lib/atomic-session-connpair.js' )
8
7
const TxSessionConnectionPair = require ( './lib/tx-session-connpair.js' )
9
8
const SessionConnectionPair = require ( './lib/session-connpair.js' )
10
9
@@ -43,17 +42,14 @@ const api = module.exports = {
43
42
} ,
44
43
45
44
getConnection ( ) {
46
- return DOMAIN_TO_SESSION . get ( process . domain ) . getConnection ( )
45
+ return api . session . getConnection ( )
47
46
} ,
48
47
49
48
get session ( ) {
50
49
var current = DOMAIN_TO_SESSION . get ( process . domain )
51
- if ( ! current || ! process . domain ) {
50
+ if ( ! current || current . inactive || ! process . domain ) {
52
51
throw new NoSessionAvailable ( )
53
52
}
54
- while ( current . inactive && current . parent ) {
55
- current = current . parent
56
- }
57
53
return current
58
54
} ,
59
55
@@ -93,15 +89,20 @@ class Session {
93
89
94
90
transaction ( operation , args ) {
95
91
const getConnPair = this . getConnection ( )
96
- const getResult = Session$RunWrapped ( this , connPair => {
97
- return new TransactionSession ( this , connPair )
92
+ const getResult = Session$RunWrapped ( connPair => {
93
+ return new TransactionSession ( connPair )
98
94
} , getConnPair , `BEGIN` , {
99
95
success : `COMMIT` ,
100
96
failure : `ROLLBACK`
101
97
} , operation , args )
102
- const releasePair = getResult . return ( getConnPair ) . then (
103
- pair => pair . release ( )
104
- )
98
+
99
+ const releasePair = getConnPair . then ( pair => {
100
+ return getResult . reflect ( ) . then ( result => {
101
+ return result . isFulfilled ( )
102
+ ? pair . release ( )
103
+ : pair . release ( result . reason ( ) )
104
+ } )
105
+ } )
105
106
106
107
return releasePair . return ( getResult )
107
108
}
@@ -119,97 +120,107 @@ class Session {
119
120
}
120
121
121
122
class TransactionSession {
122
- constructor ( parent , connPair ) {
123
- this . parent = parent
123
+ constructor ( connPair ) {
124
124
this . connectionPair = connPair
125
125
this . inactive = false
126
126
this . operation = Promise . resolve ( true )
127
127
}
128
128
129
129
getConnection ( ) {
130
130
if ( this . inactive ) {
131
- return this . parent . getConnection ( )
131
+ return new Promise ( ( _ , reject ) => {
132
+ reject ( new NoSessionAvailable ( ) )
133
+ } )
132
134
}
133
- // XXX (chrisdickinson): creating a TxConnPair implicitly
135
+ // NB (chrisdickinson): creating a TxConnPair implicitly
134
136
// swaps out "this.operation", creating a linked list of
135
137
// promises.
136
138
return new TxSessionConnectionPair ( this ) . onready
137
139
}
138
140
139
141
transaction ( operation , args ) {
140
142
if ( this . inactive ) {
141
- return this . parent . transaction ( operation , args )
143
+ return new Promise ( ( _ , reject ) => {
144
+ reject ( new NoSessionAvailable ( ) )
145
+ } )
142
146
}
143
147
return operation . apply ( null , args )
144
148
}
145
149
146
150
atomic ( operation , args ) {
147
- const atomicConnPair = new AtomicSessionConnectionPair ( this )
151
+ const atomicConnPair = this . getConnection ( )
148
152
const savepointName = getSavepointName ( operation )
149
- const getResult = Session$RunWrapped ( this , connPair => {
150
- return new AtomicSession ( this , connPair , savepointName )
151
- } , atomicConnPair . onready , `SAVEPOINT ${ savepointName } ` , {
153
+ const getResult = Session$RunWrapped ( connPair => {
154
+ return new AtomicSession ( connPair , savepointName )
155
+ } , atomicConnPair , `SAVEPOINT ${ savepointName } ` , {
152
156
success : `RELEASE SAVEPOINT ${ savepointName } ` ,
153
157
failure : `ROLLBACK TO SAVEPOINT ${ savepointName } `
154
158
} , operation , args )
155
159
156
- return getResult . then ( ( ) => {
157
- setImmediate ( ( ) => {
158
- atomicConnPair . close ( )
160
+ const releasePair = atomicConnPair . then ( pair => {
161
+ return getResult . reflect ( ) . then ( result => {
162
+ return result . isFulfilled ( )
163
+ ? pair . release ( )
164
+ : pair . release ( result . reason ( ) )
159
165
} )
160
- } ) . return ( getResult )
166
+ } )
167
+
168
+ return releasePair . return ( getResult )
161
169
}
162
170
}
163
171
164
172
class AtomicSession extends TransactionSession {
165
- constructor ( parent , connection , name ) {
166
- super ( parent , connection )
173
+ constructor ( connection , name ) {
174
+ super ( connection )
167
175
this . name = name
168
176
}
169
177
}
170
178
171
- function Session$RunWrapped ( parent ,
172
- createSession ,
173
- getConnPair , before , after , operation , args ) {
174
- const createSubdomain = getConnPair . then ( connPair => {
179
+ function Session$RunWrapped ( createSession ,
180
+ getConnPair ,
181
+ before ,
182
+ after ,
183
+ operation ,
184
+ args ) {
185
+ return getConnPair . then ( pair => {
175
186
const subdomain = domain . create ( )
176
- const session = createSession ( connPair )
187
+ const session = createSession ( pair )
177
188
DOMAIN_TO_SESSION . set ( subdomain , session )
178
- return subdomain
179
- } )
180
189
181
- const runBefore = getConnPair . then ( connPair => new Promise (
182
- ( resolve , reject ) => connPair . connection . query (
183
- before ,
184
- err => err ? reject ( err ) : resolve ( )
185
- )
186
- ) )
190
+ const runBefore = new Promise ( ( resolve , reject ) => {
191
+ return pair . connection . query (
192
+ before ,
193
+ err => err ? reject ( err ) : resolve ( )
194
+ )
195
+ } )
187
196
188
- const getResult = runBefore . return (
189
- createSubdomain
190
- ) . then ( domain => {
191
- args . unshift ( operation )
192
- return Promise . resolve ( domain . run . apply ( domain , args ) )
193
- } )
197
+ return runBefore . then ( ( ) => {
198
+ const opArgs = args . slice ( )
199
+ opArgs . unshift ( operation )
200
+ const getResult = Promise . resolve (
201
+ subdomain . run . apply ( subdomain , opArgs )
202
+ )
194
203
195
- const getReflectedResult = getResult . reflect ( )
196
- const runCommitStep = Promise . join (
197
- getReflectedResult ,
198
- getConnPair . get ( 'connection' )
199
- ) . spread ( ( result , connection ) => {
200
- return new Promise ( ( resolve , reject ) => {
201
- connection . query (
202
- result . isFulfilled ( )
203
- ? after . success
204
- : after . failure ,
205
- err => err ? reject ( err ) : resolve ( )
204
+ const waitOperation = Promise . join (
205
+ getResult ,
206
+ getResult . then ( ( ) => session . operation )
206
207
)
208
+ . finally ( markInactive ( subdomain ) )
209
+ . return ( getResult . reflect ( ) )
210
+
211
+ const runCommitStep = waitOperation . then ( result => {
212
+ return new Promise ( ( resolve , reject ) => {
213
+ return pair . connection . query (
214
+ result . isFulfilled ( )
215
+ ? after . success
216
+ : after . failure ,
217
+ err => err ? reject ( err ) : resolve ( )
218
+ )
219
+ } )
220
+ } )
221
+ return runCommitStep . return ( getResult )
207
222
} )
208
223
} )
209
-
210
- return runCommitStep . return (
211
- createSubdomain
212
- ) . then ( markInactive ( parent ) ) . return ( getResult )
213
224
}
214
225
215
226
function getSavepointName ( operation ) {
@@ -221,14 +232,11 @@ function getSavepointName (operation) {
221
232
}
222
233
getSavepointName . ID = 0
223
234
224
- function markInactive ( session ) {
225
- return domain => {
226
- domain . exit ( )
227
- DOMAIN_TO_SESSION . get ( domain ) . inactive = true
228
-
229
- // if, somehow, we get a reference to this domain again, point
230
- // it at the parent session.
231
- DOMAIN_TO_SESSION . set ( domain , session )
235
+ function markInactive ( subdomain ) {
236
+ return ( ) => {
237
+ subdomain . exit ( )
238
+ DOMAIN_TO_SESSION . get ( subdomain ) . inactive = true
239
+ DOMAIN_TO_SESSION . set ( subdomain , null )
232
240
}
233
241
}
234
242
0 commit comments