Skip to content

Commit fdd1a34

Browse files
committed
refactor
1 parent c75306e commit fdd1a34

File tree

1 file changed

+47
-47
lines changed

1 file changed

+47
-47
lines changed

src/adapters/datasources/datasource-mongodb.js

Lines changed: 47 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@ const processQuery = qpm({
1414
converters: {
1515
nullstring: val => {
1616
return { $type: 10 }
17-
}, // reference BSON datatypes https://www.mongodb.com/docs/manual/reference/bson-types/
18-
},
17+
} // reference BSON datatypes https://www.mongodb.com/docs/manual/reference/bson-types/
18+
}
1919
})
2020

2121
const url = process.env.MONGODB_URL || 'mongodb://localhost:27017'
2222

2323
const dsOptions = configRoot.adapters.datasources.DataSourceMongoDb.options || {
2424
runOffline: true,
25-
numConns: 2,
25+
numConns: 2
2626
}
2727

2828
const cacheSize = configRoot.adapters.cacheSize || 3000
@@ -44,7 +44,7 @@ const mongoOpts = {
4444
* even when the database is offline.
4545
*/
4646
export class DataSourceMongoDb extends DataSource {
47-
constructor(map, name, namespace, options = {}) {
47+
constructor (map, name, namespace, options = {}) {
4848
super(map, name, namespace, options)
4949
this.cacheSize = cacheSize
5050
this.mongoOpts = mongoOpts
@@ -57,15 +57,15 @@ export class DataSourceMongoDb extends DataSource {
5757
*
5858
* @returns {Promise<import('mongodb').Db>}
5959
*/
60-
async connectionPool() {
60+
async connectionPool () {
6161
return new Promise((resolve, reject) => {
6262
if (this.db) return resolve(this.db)
6363
MongoClient.connect(
6464
this.url,
6565
{
6666
...this.mongoOpts,
6767
poolSize: dsOptions.numConns || 2,
68-
connectTimeoutMS: 500,
68+
connectTimeoutMS: 500
6969
},
7070
(err, db) => {
7171
if (err) return reject(err)
@@ -76,7 +76,7 @@ export class DataSourceMongoDb extends DataSource {
7676
})
7777
}
7878

79-
connect(client) {
79+
connect (client) {
8080
return async function () {
8181
let timeout = false
8282
const timerId = setTimeout(() => {
@@ -88,21 +88,21 @@ export class DataSourceMongoDb extends DataSource {
8888
}
8989
}
9090

91-
async connection() {
91+
async connection () {
9292
try {
9393
while (connections.length < (dsOptions.numConns || 1)) {
9494
const client = new MongoClient(this.url, {
9595
...this.mongoOpts,
96-
connectTimeoutMS: 500,
96+
connectTimeoutMS: 500
9797
})
9898
const thresholds = {
9999
default: {
100100
errorRate: 1,
101101
callVolume: 1,
102102
intervalMs: 10000,
103103
testDelay: 300000,
104-
fallbackFn: () => console.log('circuit open'),
105-
},
104+
fallbackFn: () => console.log('circuit open')
105+
}
106106
}
107107
const breaker = CircuitBreaker(
108108
'mongodb.connect',
@@ -136,16 +136,16 @@ export class DataSourceMongoDb extends DataSource {
136136
}
137137
}
138138

139-
async collection() {
139+
async collection () {
140140
return (await this.connection()).db(this.namespace).collection(this.name)
141141
}
142142

143-
async createIndexes(client) {
143+
async createIndexes (client) {
144144
const indexOperations = this.options.connOpts.indexes.map(index => {
145145
return {
146146
name: index.fields.join('_'),
147147
key: index.fields.reduce((a, v) => ({ ...a, [v]: 1 }), {}),
148-
...index.options,
148+
...index.options
149149
}
150150
})
151151

@@ -155,7 +155,7 @@ export class DataSourceMongoDb extends DataSource {
155155
.createIndexes(indexOperations)
156156
}
157157

158-
async find(id) {
158+
async find (id) {
159159
try {
160160
return (await this.collection()).findOne({ _id: id })
161161
} catch (error) {
@@ -173,7 +173,7 @@ export class DataSourceMongoDb extends DataSource {
173173
* @param {*} id
174174
* @param {*} data
175175
*/
176-
async save(id, data) {
176+
async save (id, data) {
177177
try {
178178
await (
179179
await this.collection()
@@ -196,19 +196,19 @@ export class DataSourceMongoDb extends DataSource {
196196
* @param {number} highWaterMark num of docs per batch write
197197
* @returns
198198
*/
199-
createWriteStream(filter = {}, highWaterMark = HIGHWATERMARK) {
199+
createWriteStream (filter = {}, highWaterMark = HIGHWATERMARK) {
200200
try {
201201
let objects = []
202202
const ctx = this
203203

204-
async function upsert() {
204+
async function upsert () {
205205
const operations = objects.map(obj => {
206206
return {
207207
replaceOne: {
208208
filter: { ...filter, _id: obj.id },
209209
replacement: { ...obj, _id: obj.id },
210-
upsert: true,
211-
},
210+
upsert: true
211+
}
212212
}
213213
})
214214

@@ -227,17 +227,17 @@ export class DataSourceMongoDb extends DataSource {
227227
const writable = new Writable({
228228
objectMode: true,
229229

230-
async write(chunk, _encoding, next) {
230+
async write (chunk, _encoding, next) {
231231
objects.push(chunk)
232232
// if true time to flush buffer and write to db
233233
if (objects.length >= highWaterMark) await upsert()
234234
next()
235235
},
236236

237-
end(chunk, _, done) {
237+
end (chunk, _, done) {
238238
objects.push(chunk)
239239
done()
240-
},
240+
}
241241
})
242242

243243
writable.on('finish', async () => await upsert())
@@ -257,7 +257,7 @@ export class DataSourceMongoDb extends DataSource {
257257
*
258258
* @returns {Promise<import('mongodb').AbstractCursor>}
259259
*/
260-
async mongoFind({ filter, sort, limit, aggregate, skip } = {}) {
260+
async mongoFind ({ filter, sort, limit, aggregate, skip } = {}) {
261261
console.log({ fn: this.mongoFind.name, filter })
262262
let cursor = aggregate
263263
? (await this.collection()).aggregate(aggregate)
@@ -278,7 +278,7 @@ export class DataSourceMongoDb extends DataSource {
278278
* @returns
279279
*/
280280

281-
async mongoCount({ filter = {} } = {}) {
281+
async mongoCount ({ filter = {} } = {}) {
282282
return filter == {}
283283
? await this.countDb()
284284
: (await this.collection()).count(filter)
@@ -290,25 +290,24 @@ export class DataSourceMongoDb extends DataSource {
290290
*
291291
* @param {{
292292
* filter:*
293-
* transform:Transform
294293
* serialize:boolean
295294
* }} param0
296295
* @returns
297296
*/
298-
streamList({ writable, serialize, transform, options }) {
297+
streamList ({ serialize, options }) {
299298
try {
300299
const serializer = new Transform({
301300
writableObjectMode: true,
302301

303302
// start of array
304-
construct(callback) {
303+
construct (callback) {
305304
this.first = true
306305
this.push('[')
307306
callback()
308307
},
309308

310309
// each chunk is a record
311-
transform(chunk, _encoding, next) {
310+
transform (chunk, _encoding, next) {
312311
// comma-separate
313312
if (this.first) this.first = false
314313
else this.push(',')
@@ -319,32 +318,32 @@ export class DataSourceMongoDb extends DataSource {
319318
},
320319

321320
// end of array
322-
flush(callback) {
321+
flush (callback) {
323322
this.push(']')
324323
callback()
325-
},
324+
}
326325
})
327326

328327
const paginate = new Transform({
329328
writableObjectMode: true,
330329

331330
// start of array
332-
construct(callback) {
331+
construct (callback) {
333332
this.push('{ ')
334333
callback()
335334
},
336335

337336
// first chunk is the pagination data, rest is result
338-
transform(chunk, _encoding, next) {
337+
transform (chunk, _encoding, next) {
339338
this.push(chunk)
340339
next()
341340
},
342341

343342
// end of array
344-
flush(callback) {
343+
flush (callback) {
345344
this.push('}')
346345
callback()
347-
},
346+
}
348347
})
349348

350349
return new Promise(async (resolve, reject) => {
@@ -356,27 +355,29 @@ export class DataSourceMongoDb extends DataSource {
356355

357356
if (options.page) {
358357
const count = ~~(await this.mongoCount(options.filter))
359-
pipeArgs.push(paginate)
358+
360359
paginate._transform(
361360
`"count":${count}, "total":${Math.ceil(
362361
count / options.limit
363362
)}, ${JSON.stringify(options).slice(1, -1)}, "data":`,
364363
'utf8',
365364
() => console.log('paginated query', options)
366365
)
366+
367+
pipeArgs.push(paginate)
367368
}
368369

369370
return pipeArgs
370371
})
371372
} catch (error) {
372373
console.error({
373374
fn: this.streamList.name,
374-
error,
375+
error
375376
})
376377
}
377378
}
378379

379-
processOptions({ options = {}, query = {} }) {
380+
processOptions ({ options = {}, query = {} }) {
380381
return { ...processQuery(query), ...options } // options must overwite the query not otherwise
381382
}
382383

@@ -385,13 +386,13 @@ export class DataSourceMongoDb extends DataSource {
385386
* @override
386387
* @param {import('../../domain/datasource').listOptions} param
387388
*/
388-
async list(param) {
389+
async list (param) {
389390
try {
390391
let result
391392
const options = this.processOptions(param)
392393
if (0 < ~~options.__page) {
393394
// qpm > processOptions weeds out __page - add it back properly as an integer
394-
options.page = ~~query.__page
395+
options.page = ~~options.__page
395396
options.skip = (options.page - 1) * options.limit || 0
396397
}
397398
if (options.__aggregate) {
@@ -405,16 +406,15 @@ export class DataSourceMongoDb extends DataSource {
405406
}
406407
console.log({ options })
407408

408-
if (options.streamRequested && !query.__json)
409-
this.streamList({ writable, serialize, transform, options })
409+
if (options.streamRequested) this.streamList({ serialize, options })
410410
else {
411411
const data = (await this.mongoFind(options)).toArray()
412412
const count = data?.length
413413
result = {
414414
...options,
415415
data,
416416
count,
417-
total: Math.ceil(count / options.limit),
417+
total: Math.ceil(count / options.limit)
418418
}
419419
}
420420

@@ -428,19 +428,19 @@ export class DataSourceMongoDb extends DataSource {
428428
*
429429
* @override
430430
*/
431-
async count() {
431+
async count () {
432432
return {
433433
total: await this.countDb(),
434434
cached: this.getCacheSize(),
435-
bytes: this.getCacheSizeBytes(),
435+
bytes: this.getCacheSizeBytes()
436436
}
437437
}
438438

439439
/**
440440
* @override
441441
* @returns
442442
*/
443-
async countDb() {
443+
async countDb () {
444444
return (await this.collection()).countDocuments()
445445
}
446446

@@ -451,7 +451,7 @@ export class DataSourceMongoDb extends DataSource {
451451
* @override
452452
* @param {*} id
453453
*/
454-
async delete(id) {
454+
async delete (id) {
455455
try {
456456
await (await this.collection()).deleteOne({ _id: id })
457457
} catch (error) {

0 commit comments

Comments
 (0)