@@ -21,11 +21,12 @@ const processQuery = qpm({
21
21
const url = process . env . MONGODB_URL || 'mongodb://localhost:27017'
22
22
23
23
const dsOptions = configRoot . adapters . datasources . DataSourceMongoDb . options || {
24
- runOffline : false ,
25
- numConns : 2 ,
24
+ runOffline : true ,
25
+ numConns : 2
26
26
}
27
27
28
28
const cacheSize = configRoot . adapters . cacheSize || 3000
29
+ let connPool
29
30
30
31
/**
31
32
* @type {Map<string,MongoClient> }
@@ -43,7 +44,7 @@ const mongoOpts = {
43
44
* even when the database is offline.
44
45
*/
45
46
export class DataSourceMongoDb extends DataSource {
46
- constructor ( map , name , namespace , options = { } ) {
47
+ constructor ( map , name , namespace , options = { } ) {
47
48
super ( map , name , namespace , options )
48
49
this . cacheSize = cacheSize
49
50
this . mongoOpts = mongoOpts
@@ -52,47 +53,56 @@ export class DataSourceMongoDb extends DataSource {
52
53
this . url = url
53
54
}
54
55
55
- connect ( client ) {
56
- return async function ( ) {
57
- let timeout = false
58
- const timerId = setTimeout ( ( ) => {
59
- timeout = true
60
- } , 500 )
61
- await client . connect ( )
62
- clearTimeout ( timerId )
63
- if ( timeout ) throw new Error ( 'mongo conn timeout' )
64
- }
65
- }
66
-
67
- async connectionPool ( ) {
56
+ /**
57
+ *
58
+ * @returns {Promise<import('mongodb').Db> }
59
+ */
60
+ async connectionPool ( ) {
68
61
return new Promise ( ( resolve , reject ) => {
69
62
if ( this . db ) return resolve ( this . db )
70
63
MongoClient . connect (
71
64
this . url ,
72
65
{
73
66
...this . mongoOpts ,
74
67
poolSize : dsOptions . numConns || 2 ,
68
+ connectTimeoutMS : 500
75
69
} ,
76
- ( err , database ) => {
70
+ ( err , db ) => {
77
71
if ( err ) return reject ( err )
78
- resolve ( ( this . db = database . db ( this . namespace ) ) )
72
+ this . db = db ( this . namespace )
73
+ resolve ( this . db )
79
74
}
80
75
)
81
76
} )
82
77
}
83
78
84
- async connection ( ) {
79
+ connect ( client ) {
80
+ return async function ( ) {
81
+ let timeout = false
82
+ const timerId = setTimeout ( ( ) => {
83
+ timeout = true
84
+ } , 500 )
85
+ await client . connect ( )
86
+ clearTimeout ( timerId )
87
+ if ( timeout ) throw new Error ( 'mongo conn timeout' )
88
+ }
89
+ }
90
+
91
+ async connection ( ) {
85
92
try {
86
93
while ( connections . length < ( dsOptions . numConns || 1 ) ) {
87
- const client = new MongoClient ( this . url , this . mongoOpts )
94
+ const client = new MongoClient ( this . url , {
95
+ ...this . mongoOpts ,
96
+ connectTimeoutMS : 500
97
+ } )
88
98
const thresholds = {
89
99
default : {
90
100
errorRate : 1 ,
91
101
callVolume : 1 ,
92
102
intervalMs : 10000 ,
93
103
testDelay : 300000 ,
94
- // fallbackFn: () => client.emit('connectionClosed ')
95
- } ,
104
+ fallbackFn : ( ) => console . log ( 'circuit open ')
105
+ }
96
106
}
97
107
const breaker = CircuitBreaker (
98
108
'mongodb.connect' ,
@@ -145,7 +155,7 @@ export class DataSourceMongoDb extends DataSource {
145
155
. createIndexes ( indexOperations )
146
156
}
147
157
148
- async find ( id ) {
158
+ async find ( id ) {
149
159
try {
150
160
return ( await this . collection ( ) ) . findOne ( { _id : id } )
151
161
} catch ( error ) {
@@ -163,7 +173,7 @@ export class DataSourceMongoDb extends DataSource {
163
173
* @param {* } id
164
174
* @param {* } data
165
175
*/
166
- async save ( id , data ) {
176
+ async save ( id , data ) {
167
177
try {
168
178
await (
169
179
await this . collection ( )
@@ -186,19 +196,19 @@ export class DataSourceMongoDb extends DataSource {
186
196
* @param {number } highWaterMark num of docs per batch write
187
197
* @returns
188
198
*/
189
- createWriteStream ( filter = { } , highWaterMark = HIGHWATERMARK ) {
199
+ createWriteStream ( filter = { } , highWaterMark = HIGHWATERMARK ) {
190
200
try {
191
201
let objects = [ ]
192
202
const ctx = this
193
203
194
- async function upsert ( ) {
204
+ async function upsert ( ) {
195
205
const operations = objects . map ( obj => {
196
206
return {
197
207
replaceOne : {
198
208
filter : { ...filter , _id : obj . id } ,
199
209
replacement : { ...obj , _id : obj . id } ,
200
- upsert : true ,
201
- } ,
210
+ upsert : true
211
+ }
202
212
}
203
213
} )
204
214
@@ -217,17 +227,17 @@ export class DataSourceMongoDb extends DataSource {
217
227
const writable = new Writable ( {
218
228
objectMode : true ,
219
229
220
- async write ( chunk , _encoding , next ) {
230
+ async write ( chunk , _encoding , next ) {
221
231
objects . push ( chunk )
222
232
// if true time to flush buffer and write to db
223
233
if ( objects . length >= highWaterMark ) await upsert ( )
224
234
next ( )
225
235
} ,
226
236
227
- end ( chunk , _ , done ) {
237
+ end ( chunk , _ , done ) {
228
238
objects . push ( chunk )
229
239
done ( )
230
- } ,
240
+ }
231
241
} )
232
242
233
243
writable . on ( 'finish' , async ( ) => await upsert ( ) )
@@ -247,7 +257,7 @@ export class DataSourceMongoDb extends DataSource {
247
257
*
248
258
* @returns {Promise<import('mongodb').AbstractCursor> }
249
259
*/
250
- async mongoFind ( { filter, sort, limit, aggregate, skip } = { } ) {
260
+ async mongoFind ( { filter, sort, limit, aggregate, skip } = { } ) {
251
261
console . log ( { fn : this . mongoFind . name , filter } )
252
262
let cursor = aggregate
253
263
? ( await this . collection ( ) ) . aggregate ( aggregate )
@@ -287,7 +297,6 @@ export class DataSourceMongoDb extends DataSource {
287
297
*/
288
298
streamList ( { writable, serialize, transform, options } ) {
289
299
try {
290
- let pipeArgs = [ ]
291
300
292
301
const serializer = new Transform ( {
293
302
writableObjectMode : true ,
@@ -340,15 +349,12 @@ export class DataSourceMongoDb extends DataSource {
340
349
} )
341
350
342
351
return new Promise ( async ( resolve , reject ) => {
352
+ const pipeArgs = [ ]
343
353
const readable = ( await this . mongoFind ( options ) ) . stream ( )
344
354
345
- readable . on ( 'error' , reject )
346
- readable . on ( 'end' , resolve )
347
-
348
355
// optionally transform db stream then pipe to output
349
356
pipeArgs . push ( readable )
350
- if ( transform ) pipeArgs . push ( transform )
351
- if ( serialize ) pipeArgs . push ( serializer )
357
+
352
358
if ( options . page ) {
353
359
const count = ~ ~ ( await this . mongoCount ( options . filter ) )
354
360
pipeArgs . push ( paginate )
@@ -360,10 +366,9 @@ export class DataSourceMongoDb extends DataSource {
360
366
( ) => console . log ( 'paginated query' , options )
361
367
)
362
368
}
363
- pipeArgs . push ( writable )
364
-
365
- // perform pipe operations
366
- pipeArgs . reduce ( ( prevVal , currVal ) => prevVal . pipe ( currVal ) )
369
+
370
+ return pipeArgs
371
+
367
372
} )
368
373
} catch ( error ) {
369
374
console . error ( {
@@ -382,7 +387,7 @@ export class DataSourceMongoDb extends DataSource {
382
387
* @override
383
388
* @param {import('../../domain/datasource').listOptions } param
384
389
*/
385
- async list ( param ) {
390
+ async list ( param ) {
386
391
try {
387
392
const options = this . processOptions ( param )
388
393
if ( 0 < ~ ~ query . __page ) {
@@ -401,7 +406,7 @@ export class DataSourceMongoDb extends DataSource {
401
406
}
402
407
console . log ( { options } )
403
408
404
- if ( writable && ! query . __json )
409
+ if ( streamRequested && ! query . __json )
405
410
this . streamList ( { writable, serialize, transform, options } )
406
411
else {
407
412
const data = ( await this . mongoFind ( options ) ) . toArray ( )
@@ -424,19 +429,19 @@ export class DataSourceMongoDb extends DataSource {
424
429
*
425
430
* @override
426
431
*/
427
- async count ( ) {
432
+ async count ( ) {
428
433
return {
429
434
total : await this . countDb ( ) ,
430
435
cached : this . getCacheSize ( ) ,
431
- bytes : this . getCacheSizeBytes ( ) ,
436
+ bytes : this . getCacheSizeBytes ( )
432
437
}
433
438
}
434
439
435
440
/**
436
441
* @override
437
442
* @returns
438
443
*/
439
- async countDb ( ) {
444
+ async countDb ( ) {
440
445
return ( await this . collection ( ) ) . countDocuments ( )
441
446
}
442
447
@@ -447,7 +452,7 @@ export class DataSourceMongoDb extends DataSource {
447
452
* @override
448
453
* @param {* } id
449
454
*/
450
- async delete ( id ) {
455
+ async delete ( id ) {
451
456
try {
452
457
await ( await this . collection ( ) ) . deleteOne ( { _id : id } )
453
458
} catch ( error ) {
0 commit comments