@@ -14,15 +14,15 @@ const processQuery = qpm({
14
14
converters : {
15
15
nullstring : val => {
16
16
return { $type : 10 }
17
- } // reference BSON datatypes https://www.mongodb.com/docs/manual/reference/bson-types/
18
- }
17
+ } , // reference BSON datatypes https://www.mongodb.com/docs/manual/reference/bson-types/
18
+ } ,
19
19
} )
20
20
21
21
const url = process . env . MONGODB_URL || 'mongodb://localhost:27017'
22
22
23
23
const dsOptions = configRoot . adapters . datasources . DataSourceMongoDb . options || {
24
24
runOffline : true ,
25
- numConns : 2
25
+ numConns : 2 ,
26
26
}
27
27
28
28
const cacheSize = configRoot . adapters . cacheSize || 3000
@@ -44,7 +44,7 @@ const mongoOpts = {
44
44
* even when the database is offline.
45
45
*/
46
46
export class DataSourceMongoDb extends DataSource {
47
- constructor ( map , name , namespace , options = { } ) {
47
+ constructor ( map , name , namespace , options = { } ) {
48
48
super ( map , name , namespace , options )
49
49
this . cacheSize = cacheSize
50
50
this . mongoOpts = mongoOpts
@@ -57,15 +57,15 @@ export class DataSourceMongoDb extends DataSource {
57
57
*
58
58
* @returns {Promise<import('mongodb').Db> }
59
59
*/
60
- async connectionPool ( ) {
60
+ async connectionPool ( ) {
61
61
return new Promise ( ( resolve , reject ) => {
62
62
if ( this . db ) return resolve ( this . db )
63
63
MongoClient . connect (
64
64
this . url ,
65
65
{
66
66
...this . mongoOpts ,
67
67
poolSize : dsOptions . numConns || 2 ,
68
- connectTimeoutMS : 500
68
+ connectTimeoutMS : 500 ,
69
69
} ,
70
70
( err , db ) => {
71
71
if ( err ) return reject ( err )
@@ -76,7 +76,7 @@ export class DataSourceMongoDb extends DataSource {
76
76
} )
77
77
}
78
78
79
- connect ( client ) {
79
+ connect ( client ) {
80
80
return async function ( ) {
81
81
let timeout = false
82
82
const timerId = setTimeout ( ( ) => {
@@ -88,21 +88,21 @@ export class DataSourceMongoDb extends DataSource {
88
88
}
89
89
}
90
90
91
- async connection ( ) {
91
+ async connection ( ) {
92
92
try {
93
93
while ( connections . length < ( dsOptions . numConns || 1 ) ) {
94
94
const client = new MongoClient ( this . url , {
95
95
...this . mongoOpts ,
96
- connectTimeoutMS : 500
96
+ connectTimeoutMS : 500 ,
97
97
} )
98
98
const thresholds = {
99
99
default : {
100
100
errorRate : 1 ,
101
101
callVolume : 1 ,
102
102
intervalMs : 10000 ,
103
103
testDelay : 300000 ,
104
- fallbackFn : ( ) => console . log ( 'circuit open' )
105
- }
104
+ fallbackFn : ( ) => console . log ( 'circuit open' ) ,
105
+ } ,
106
106
}
107
107
const breaker = CircuitBreaker (
108
108
'mongodb.connect' ,
@@ -136,16 +136,16 @@ export class DataSourceMongoDb extends DataSource {
136
136
}
137
137
}
138
138
139
- async collection ( ) {
139
+ async collection ( ) {
140
140
return ( await this . connection ( ) ) . db ( this . namespace ) . collection ( this . name )
141
141
}
142
142
143
- async createIndexes ( client ) {
143
+ async createIndexes ( client ) {
144
144
const indexOperations = this . options . connOpts . indexes . map ( index => {
145
145
return {
146
146
name : index . fields . join ( '_' ) ,
147
147
key : index . fields . reduce ( ( a , v ) => ( { ...a , [ v ] : 1 } ) , { } ) ,
148
- ...index . options
148
+ ...index . options ,
149
149
}
150
150
} )
151
151
@@ -155,7 +155,7 @@ export class DataSourceMongoDb extends DataSource {
155
155
. createIndexes ( indexOperations )
156
156
}
157
157
158
- async find ( id ) {
158
+ async find ( id ) {
159
159
try {
160
160
return ( await this . collection ( ) ) . findOne ( { _id : id } )
161
161
} catch ( error ) {
@@ -173,7 +173,7 @@ export class DataSourceMongoDb extends DataSource {
173
173
* @param {* } id
174
174
* @param {* } data
175
175
*/
176
- async save ( id , data ) {
176
+ async save ( id , data ) {
177
177
try {
178
178
await (
179
179
await this . collection ( )
@@ -196,19 +196,19 @@ export class DataSourceMongoDb extends DataSource {
196
196
* @param {number } highWaterMark num of docs per batch write
197
197
* @returns
198
198
*/
199
- createWriteStream ( filter = { } , highWaterMark = HIGHWATERMARK ) {
199
+ createWriteStream ( filter = { } , highWaterMark = HIGHWATERMARK ) {
200
200
try {
201
201
let objects = [ ]
202
202
const ctx = this
203
203
204
- async function upsert ( ) {
204
+ async function upsert ( ) {
205
205
const operations = objects . map ( obj => {
206
206
return {
207
207
replaceOne : {
208
208
filter : { ...filter , _id : obj . id } ,
209
209
replacement : { ...obj , _id : obj . id } ,
210
- upsert : true
211
- }
210
+ upsert : true ,
211
+ } ,
212
212
}
213
213
} )
214
214
@@ -227,17 +227,17 @@ export class DataSourceMongoDb extends DataSource {
227
227
const writable = new Writable ( {
228
228
objectMode : true ,
229
229
230
- async write ( chunk , _encoding , next ) {
230
+ async write ( chunk , _encoding , next ) {
231
231
objects . push ( chunk )
232
232
// if true time to flush buffer and write to db
233
233
if ( objects . length >= highWaterMark ) await upsert ( )
234
234
next ( )
235
235
} ,
236
236
237
- end ( chunk , _ , done ) {
237
+ end ( chunk , _ , done ) {
238
238
objects . push ( chunk )
239
239
done ( )
240
- }
240
+ } ,
241
241
} )
242
242
243
243
writable . on ( 'finish' , async ( ) => await upsert ( ) )
@@ -257,7 +257,7 @@ export class DataSourceMongoDb extends DataSource {
257
257
*
258
258
* @returns {Promise<import('mongodb').AbstractCursor> }
259
259
*/
260
- async mongoFind ( { filter, sort, limit, aggregate, skip } = { } ) {
260
+ async mongoFind ( { filter, sort, limit, aggregate, skip } = { } ) {
261
261
console . log ( { fn : this . mongoFind . name , filter } )
262
262
let cursor = aggregate
263
263
? ( await this . collection ( ) ) . aggregate ( aggregate )
@@ -278,7 +278,7 @@ export class DataSourceMongoDb extends DataSource {
278
278
* @returns
279
279
*/
280
280
281
- async mongoCount ( { filter = { } } = { } ) {
281
+ async mongoCount ( { filter = { } } = { } ) {
282
282
return filter == { }
283
283
? await this . countDb ( )
284
284
: ( await this . collection ( ) ) . count ( filter )
@@ -295,20 +295,20 @@ export class DataSourceMongoDb extends DataSource {
295
295
* }} param0
296
296
* @returns
297
297
*/
298
- streamList ( { writable, serialize, transform, options } ) {
298
+ streamList ( { writable, serialize, transform, options } ) {
299
299
try {
300
300
const serializer = new Transform ( {
301
301
writableObjectMode : true ,
302
302
303
303
// start of array
304
- construct ( callback ) {
304
+ construct ( callback ) {
305
305
this . first = true
306
306
this . push ( '[' )
307
307
callback ( )
308
308
} ,
309
309
310
310
// each chunk is a record
311
- transform ( chunk , _encoding , next ) {
311
+ transform ( chunk , _encoding , next ) {
312
312
// comma-separate
313
313
if ( this . first ) this . first = false
314
314
else this . push ( ',' )
@@ -319,32 +319,32 @@ export class DataSourceMongoDb extends DataSource {
319
319
} ,
320
320
321
321
// end of array
322
- flush ( callback ) {
322
+ flush ( callback ) {
323
323
this . push ( ']' )
324
324
callback ( )
325
- }
325
+ } ,
326
326
} )
327
327
328
328
const paginate = new Transform ( {
329
329
writableObjectMode : true ,
330
330
331
331
// start of array
332
- construct ( callback ) {
332
+ construct ( callback ) {
333
333
this . push ( '{ ' )
334
334
callback ( )
335
335
} ,
336
336
337
337
// first chunk is the pagination data, rest is result
338
- transform ( chunk , _encoding , next ) {
338
+ transform ( chunk , _encoding , next ) {
339
339
this . push ( chunk )
340
340
next ( )
341
341
} ,
342
342
343
343
// end of array
344
- flush ( callback ) {
344
+ flush ( callback ) {
345
345
this . push ( '}' )
346
346
callback ( )
347
- }
347
+ } ,
348
348
} )
349
349
350
350
return new Promise ( async ( resolve , reject ) => {
@@ -371,12 +371,12 @@ export class DataSourceMongoDb extends DataSource {
371
371
} catch ( error ) {
372
372
console . error ( {
373
373
fn : this . streamList . name ,
374
- error
374
+ error,
375
375
} )
376
376
}
377
377
}
378
378
379
- processOptions ( { options = { } , query = { } } ) {
379
+ processOptions ( { options = { } , query = { } } ) {
380
380
return { ...processQuery ( query ) , ...options } // options must overwite the query not otherwise
381
381
}
382
382
@@ -385,7 +385,7 @@ export class DataSourceMongoDb extends DataSource {
385
385
* @override
386
386
* @param {import('../../domain/datasource').listOptions } param
387
387
*/
388
- async list ( param ) {
388
+ async list ( param ) {
389
389
try {
390
390
let result
391
391
const options = this . processOptions ( param )
@@ -414,7 +414,7 @@ export class DataSourceMongoDb extends DataSource {
414
414
...options ,
415
415
data,
416
416
count,
417
- total : Math . ceil ( count / options . limit )
417
+ total : Math . ceil ( count / options . limit ) ,
418
418
}
419
419
}
420
420
@@ -428,19 +428,19 @@ export class DataSourceMongoDb extends DataSource {
428
428
*
429
429
* @override
430
430
*/
431
- async count ( ) {
431
+ async count ( ) {
432
432
return {
433
433
total : await this . countDb ( ) ,
434
434
cached : this . getCacheSize ( ) ,
435
- bytes : this . getCacheSizeBytes ( )
435
+ bytes : this . getCacheSizeBytes ( ) ,
436
436
}
437
437
}
438
438
439
439
/**
440
440
* @override
441
441
* @returns
442
442
*/
443
- async countDb ( ) {
443
+ async countDb ( ) {
444
444
return ( await this . collection ( ) ) . countDocuments ( )
445
445
}
446
446
@@ -451,7 +451,7 @@ export class DataSourceMongoDb extends DataSource {
451
451
* @override
452
452
* @param {* } id
453
453
*/
454
- async delete ( id ) {
454
+ async delete ( id ) {
455
455
try {
456
456
await ( await this . collection ( ) ) . deleteOne ( { _id : id } )
457
457
} catch ( error ) {
0 commit comments