Skip to content

Commit b811cc2

Browse files
authored
Merge pull request #263 from module-federation/release-0-1-0
Release 0 1 0
2 parents 54f7073 + 98b76cb commit b811cc2

File tree

3 files changed

+287
-282
lines changed

3 files changed

+287
-282
lines changed

src/adapters/datasources/datasource-mongodb.js

Lines changed: 75 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@ const processQuery = qpm({
1414
converters: {
1515
nullstring: val => {
1616
return { $type: 10 }
17-
}, // reference BSON datatypes https://www.mongodb.com/docs/manual/reference/bson-types/
18-
},
17+
} // reference BSON datatypes https://www.mongodb.com/docs/manual/reference/bson-types/
18+
}
1919
})
2020

2121
const url = process.env.MONGODB_URL || 'mongodb://localhost:27017'
2222

2323
const dsOptions = configRoot.adapters.datasources.DataSourceMongoDb.options || {
2424
runOffline: true,
25-
numConns: 2,
25+
numConns: 2
2626
}
2727

2828
const cacheSize = configRoot.adapters.cacheSize || 3000
@@ -44,7 +44,7 @@ const mongoOpts = {
4444
* even when the database is offline.
4545
*/
4646
export class DataSourceMongoDb extends DataSource {
47-
constructor(map, name, namespace, options = {}) {
47+
constructor (map, name, namespace, options = {}) {
4848
super(map, name, namespace, options)
4949
this.cacheSize = cacheSize
5050
this.mongoOpts = mongoOpts
@@ -57,15 +57,15 @@ export class DataSourceMongoDb extends DataSource {
5757
*
5858
* @returns {Promise<import('mongodb').Db>}
5959
*/
60-
async connectionPool() {
60+
async connectionPool () {
6161
return new Promise((resolve, reject) => {
6262
if (this.db) return resolve(this.db)
6363
MongoClient.connect(
6464
this.url,
6565
{
6666
...this.mongoOpts,
6767
poolSize: dsOptions.numConns || 2,
68-
connectTimeoutMS: 500,
68+
connectTimeoutMS: 500
6969
},
7070
(err, db) => {
7171
if (err) return reject(err)
@@ -76,7 +76,7 @@ export class DataSourceMongoDb extends DataSource {
7676
})
7777
}
7878

79-
connect(client) {
79+
connect (client) {
8080
return async function () {
8181
let timeout = false
8282
const timerId = setTimeout(() => {
@@ -88,21 +88,21 @@ export class DataSourceMongoDb extends DataSource {
8888
}
8989
}
9090

91-
async connection() {
91+
async connection () {
9292
try {
9393
while (connections.length < (dsOptions.numConns || 1)) {
9494
const client = new MongoClient(this.url, {
9595
...this.mongoOpts,
96-
connectTimeoutMS: 500,
96+
connectTimeoutMS: 500
9797
})
9898
const thresholds = {
9999
default: {
100100
errorRate: 1,
101101
callVolume: 1,
102102
intervalMs: 10000,
103103
testDelay: 300000,
104-
fallbackFn: () => console.log('circuit open'),
105-
},
104+
fallbackFn: () => console.log('circuit open')
105+
}
106106
}
107107
const breaker = CircuitBreaker(
108108
'mongodb.connect',
@@ -136,16 +136,16 @@ export class DataSourceMongoDb extends DataSource {
136136
}
137137
}
138138

139-
async collection() {
139+
async collection () {
140140
return (await this.connection()).db(this.namespace).collection(this.name)
141141
}
142142

143-
async createIndexes(client) {
143+
async createIndexes (client) {
144144
const indexOperations = this.options.connOpts.indexes.map(index => {
145145
return {
146146
name: index.fields.join('_'),
147147
key: index.fields.reduce((a, v) => ({ ...a, [v]: 1 }), {}),
148-
...index.options,
148+
...index.options
149149
}
150150
})
151151

@@ -155,7 +155,7 @@ export class DataSourceMongoDb extends DataSource {
155155
.createIndexes(indexOperations)
156156
}
157157

158-
async find(id) {
158+
async find (id) {
159159
try {
160160
return (await this.collection()).findOne({ _id: id })
161161
} catch (error) {
@@ -173,7 +173,7 @@ export class DataSourceMongoDb extends DataSource {
173173
* @param {*} id
174174
* @param {*} data
175175
*/
176-
async save(id, data) {
176+
async save (id, data) {
177177
try {
178178
await (
179179
await this.collection()
@@ -196,19 +196,19 @@ export class DataSourceMongoDb extends DataSource {
196196
* @param {number} highWaterMark num of docs per batch write
197197
* @returns
198198
*/
199-
createWriteStream(filter = {}, highWaterMark = HIGHWATERMARK) {
199+
createWriteStream (filter = {}, highWaterMark = HIGHWATERMARK) {
200200
try {
201201
let objects = []
202202
const ctx = this
203203

204-
async function upsert() {
204+
async function upsert () {
205205
const operations = objects.map(obj => {
206206
return {
207207
replaceOne: {
208208
filter: { ...filter, _id: obj.id },
209209
replacement: { ...obj, _id: obj.id },
210-
upsert: true,
211-
},
210+
upsert: true
211+
}
212212
}
213213
})
214214

@@ -227,17 +227,17 @@ export class DataSourceMongoDb extends DataSource {
227227
const writable = new Writable({
228228
objectMode: true,
229229

230-
async write(chunk, _encoding, next) {
230+
async write (chunk, _encoding, next) {
231231
objects.push(chunk)
232232
// if true time to flush buffer and write to db
233233
if (objects.length >= highWaterMark) await upsert()
234234
next()
235235
},
236236

237-
end(chunk, _, done) {
237+
end (chunk, _, done) {
238238
objects.push(chunk)
239239
done()
240-
},
240+
}
241241
})
242242

243243
writable.on('finish', async () => await upsert())
@@ -257,8 +257,10 @@ export class DataSourceMongoDb extends DataSource {
257257
*
258258
* @returns {Promise<import('mongodb').AbstractCursor>}
259259
*/
260-
async mongoFind({ filter, sort, limit, aggregate, skip } = {}) {
260+
async mongoFind ({ filter, sort, limit, aggregate, skip } = {}) {
261261
console.log({ fn: this.mongoFind.name, filter })
262+
console.log({ aggregate })
263+
262264
let cursor = aggregate
263265
? (await this.collection()).aggregate(aggregate)
264266
: (await this.collection()).find(filter)
@@ -278,7 +280,7 @@ export class DataSourceMongoDb extends DataSource {
278280
* @returns
279281
*/
280282

281-
async mongoCount({ filter = {} } = {}) {
283+
async mongoCount ({ filter = {} } = {}) {
282284
return filter == {}
283285
? await this.countDb()
284286
: (await this.collection()).count(filter)
@@ -290,93 +292,65 @@ export class DataSourceMongoDb extends DataSource {
290292
*
291293
* @param {{
292294
* filter:*
293-
* transform:Transform
294295
* serialize:boolean
295296
* }} param0
296297
* @returns
297298
*/
298-
streamList({ writable, serialize, transform, options }) {
299-
try {
300-
const serializer = new Transform({
301-
writableObjectMode: true,
302-
303-
// start of array
304-
construct(callback) {
305-
this.first = true
306-
this.push('[')
307-
callback()
308-
},
309-
310-
// each chunk is a record
311-
transform(chunk, _encoding, next) {
312-
// comma-separate
313-
if (this.first) this.first = false
314-
else this.push(',')
315-
316-
// serialize record
317-
this.push(JSON.stringify(chunk))
318-
next()
319-
},
320-
321-
// end of array
322-
flush(callback) {
323-
this.push(']')
324-
callback()
325-
},
326-
})
299+
async streamList (options) {
300+
const pipeArgs = []
327301

302+
try {
328303
const paginate = new Transform({
329304
writableObjectMode: true,
330305

331306
// start of array
332-
construct(callback) {
307+
construct (callback) {
333308
this.push('{ ')
334309
callback()
335310
},
336311

337312
// first chunk is the pagination data, rest is result
338-
transform(chunk, _encoding, next) {
313+
transform (chunk, _encoding, next) {
339314
this.push(chunk)
340315
next()
341316
},
342317

343318
// end of array
344-
flush(callback) {
319+
flush (callback) {
345320
this.push('}')
346321
callback()
347-
},
322+
}
348323
})
349324

350-
return new Promise(async (resolve, reject) => {
351-
const pipeArgs = []
352-
const readable = (await this.mongoFind(options)).stream()
353-
354-
// optionally transform db stream then pipe to output
355-
pipeArgs.push(readable)
356-
357-
if (options.page) {
358-
const count = ~~(await this.mongoCount(options.filter))
359-
pipeArgs.push(paginate)
360-
paginate._transform(
361-
`"count":${count}, "total":${Math.ceil(
362-
count / options.limit
363-
)}, ${JSON.stringify(options).slice(1, -1)}, "data":`,
364-
'utf8',
365-
() => console.log('paginated query', options)
366-
)
367-
}
325+
const readable = (await this.mongoFind(options)).stream()
326+
// optionally transform db stream then pipe to output
327+
pipeArgs.push(readable)
368328

369-
return pipeArgs
370-
})
329+
if (options.page) {
330+
const count = ~~(await this.mongoCount(options.filter))
331+
332+
paginate._transform(
333+
`"count":${count}, "total":${Math.ceil(
334+
count / options.limit
335+
)}, ${JSON.stringify(options).slice(1, -1)}, "data":`,
336+
'utf8',
337+
() => console.log('paginated query', options)
338+
)
339+
340+
pipeArgs.push(paginate)
341+
}
342+
343+
return pipeArgs
371344
} catch (error) {
372345
console.error({
373346
fn: this.streamList.name,
374-
error,
347+
error
375348
})
376349
}
377350
}
378351

379-
processOptions({ options = {}, query = {} }) {
352+
processOptions ({ options = {}, query = {} }) {
353+
console.log(processQuery(query))
380354
return { ...processQuery(query), ...options } // options must overwite the query not otherwise
381355
}
382356

@@ -385,37 +359,37 @@ export class DataSourceMongoDb extends DataSource {
385359
* @override
386360
* @param {import('../../domain/datasource').listOptions} param
387361
*/
388-
async list(param) {
362+
async list (param) {
389363
try {
364+
console.log({ param })
390365
let result
391366
const options = this.processOptions(param)
367+
392368
if (0 < ~~options.__page) {
393369
// qpm > processOptions weeds out __page - add it back properly as an integer
394-
options.page = ~~query.__page
370+
options.page = ~~options.__page
395371
options.skip = (options.page - 1) * options.limit || 0
396372
}
397-
if (options.__aggregate) {
373+
374+
if (param.query.__aggregate) {
398375
// qpm > processOptions weeds out __aggregate - add it back properly as parsed json
399376
try {
400-
const aggregateQuery = JSON.parse(query.__aggregate)
401-
options.aggregate = aggregateQuery
377+
options.aggregate = JSON.parse(param.query.__aggregate)
402378
} catch (e) {
403379
console.error(e, 'invalid Aggregate')
404380
}
405381
}
406382
console.log({ options })
407383

408-
if (options.streamRequested && !query.__json)
409-
this.streamList({ writable, serialize, transform, options })
410-
else {
411-
const data = (await this.mongoFind(options)).toArray()
412-
const count = data?.length
413-
result = {
414-
...options,
415-
data,
416-
count,
417-
total: Math.ceil(count / options.limit),
418-
}
384+
if (options.streamRequested) return this.streamList(options)
385+
386+
const data = (await this.mongoFind(options)).toArray()
387+
const count = data?.length
388+
result = {
389+
...options,
390+
data,
391+
count,
392+
total: Math.ceil(count / options.limit)
419393
}
420394

421395
return options?.page ? result : result?.data
@@ -428,19 +402,19 @@ export class DataSourceMongoDb extends DataSource {
428402
*
429403
* @override
430404
*/
431-
async count() {
405+
async count () {
432406
return {
433407
total: await this.countDb(),
434408
cached: this.getCacheSize(),
435-
bytes: this.getCacheSizeBytes(),
409+
bytes: this.getCacheSizeBytes()
436410
}
437411
}
438412

439413
/**
440414
* @override
441415
* @returns
442416
*/
443-
async countDb() {
417+
async countDb () {
444418
return (await this.collection()).countDocuments()
445419
}
446420

@@ -451,7 +425,7 @@ export class DataSourceMongoDb extends DataSource {
451425
* @override
452426
* @param {*} id
453427
*/
454-
async delete(id) {
428+
async delete (id) {
455429
try {
456430
await (await this.collection()).deleteOne({ _id: id })
457431
} catch (error) {

0 commit comments

Comments
 (0)