Skip to content

Commit f51ae67

Browse files
committed
fix aggregate mongo function
1 parent fdd1a34 commit f51ae67

File tree

2 files changed

+68
-91
lines changed

2 files changed

+68
-91
lines changed

src/adapters/datasources/datasource-mongodb.js

Lines changed: 35 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,8 @@ export class DataSourceMongoDb extends DataSource {
259259
*/
260260
async mongoFind ({ filter, sort, limit, aggregate, skip } = {}) {
261261
console.log({ fn: this.mongoFind.name, filter })
262+
console.log({ aggregate })
263+
262264
let cursor = aggregate
263265
? (await this.collection()).aggregate(aggregate)
264266
: (await this.collection()).find(filter)
@@ -294,36 +296,10 @@ export class DataSourceMongoDb extends DataSource {
294296
* }} param0
295297
* @returns
296298
*/
297-
streamList ({ serialize, options }) {
298-
try {
299-
const serializer = new Transform({
300-
writableObjectMode: true,
301-
302-
// start of array
303-
construct (callback) {
304-
this.first = true
305-
this.push('[')
306-
callback()
307-
},
308-
309-
// each chunk is a record
310-
transform (chunk, _encoding, next) {
311-
// comma-separate
312-
if (this.first) this.first = false
313-
else this.push(',')
314-
315-
// serialize record
316-
this.push(JSON.stringify(chunk))
317-
next()
318-
},
319-
320-
// end of array
321-
flush (callback) {
322-
this.push(']')
323-
callback()
324-
}
325-
})
299+
async streamList (options) {
300+
const pipeArgs = []
326301

302+
try {
327303
const paginate = new Transform({
328304
writableObjectMode: true,
329305

@@ -346,29 +322,25 @@ export class DataSourceMongoDb extends DataSource {
346322
}
347323
})
348324

349-
return new Promise(async (resolve, reject) => {
350-
const pipeArgs = []
351-
const readable = (await this.mongoFind(options)).stream()
352-
353-
// optionally transform db stream then pipe to output
354-
pipeArgs.push(readable)
325+
const readable = (await this.mongoFind(options)).stream()
326+
// optionally transform db stream then pipe to output
327+
pipeArgs.push(readable)
355328

356-
if (options.page) {
357-
const count = ~~(await this.mongoCount(options.filter))
329+
if (options.page) {
330+
const count = ~~(await this.mongoCount(options.filter))
358331

359-
paginate._transform(
360-
`"count":${count}, "total":${Math.ceil(
361-
count / options.limit
362-
)}, ${JSON.stringify(options).slice(1, -1)}, "data":`,
363-
'utf8',
364-
() => console.log('paginated query', options)
365-
)
332+
paginate._transform(
333+
`"count":${count}, "total":${Math.ceil(
334+
count / options.limit
335+
)}, ${JSON.stringify(options).slice(1, -1)}, "data":`,
336+
'utf8',
337+
() => console.log('paginated query', options)
338+
)
366339

367-
pipeArgs.push(paginate)
368-
}
340+
pipeArgs.push(paginate)
341+
}
369342

370-
return pipeArgs
371-
})
343+
return pipeArgs
372344
} catch (error) {
373345
console.error({
374346
fn: this.streamList.name,
@@ -378,6 +350,7 @@ export class DataSourceMongoDb extends DataSource {
378350
}
379351

380352
processOptions ({ options = {}, query = {} }) {
353+
console.log(processQuery(query))
381354
return { ...processQuery(query), ...options } // options must overwite the query not otherwise
382355
}
383356

@@ -388,34 +361,35 @@ export class DataSourceMongoDb extends DataSource {
388361
*/
389362
async list (param) {
390363
try {
364+
console.log({ param })
391365
let result
392366
const options = this.processOptions(param)
367+
393368
if (0 < ~~options.__page) {
394369
// qpm > processOptions weeds out __page - add it back properly as an integer
395370
options.page = ~~options.__page
396371
options.skip = (options.page - 1) * options.limit || 0
397372
}
398-
if (options.__aggregate) {
373+
374+
if (param.query.__aggregate) {
399375
// qpm > processOptions weeds out __aggregate - add it back properly as parsed json
400376
try {
401-
const aggregateQuery = JSON.parse(query.__aggregate)
402-
options.aggregate = aggregateQuery
377+
options.aggregate = JSON.parse(param.query.__aggregate)
403378
} catch (e) {
404379
console.error(e, 'invalid Aggregate')
405380
}
406381
}
407382
console.log({ options })
408383

409-
if (options.streamRequested) this.streamList({ serialize, options })
410-
else {
411-
const data = (await this.mongoFind(options)).toArray()
412-
const count = data?.length
413-
result = {
414-
...options,
415-
data,
416-
count,
417-
total: Math.ceil(count / options.limit)
418-
}
384+
if (options.streamRequested) return this.streamList(options)
385+
386+
const data = (await this.mongoFind(options)).toArray()
387+
const count = data?.length
388+
result = {
389+
...options,
390+
data,
391+
count,
392+
total: Math.ceil(count / options.limit)
419393
}
420394

421395
return options?.page ? result : result?.data

src/domain/datasource-factory.js

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,15 @@ const DefaultDataSource =
3838
*/
3939
const DsCoreExtensions = superclass =>
4040
class extends superclass {
41-
constructor(map, name, namespace, options) {
41+
constructor (map, name, namespace, options) {
4242
super(map, name, namespace, options)
4343
}
4444

45-
set factory(value) {
45+
set factory (value) {
4646
this[FACTORY] = value
4747
}
4848

49-
get factory() {
49+
get factory () {
5050
return this[FACTORY]
5151
}
5252

@@ -61,7 +61,7 @@ const DsCoreExtensions = superclass =>
6161
* @param {string} id
6262
* @param {Model} data
6363
*/
64-
async save(id, data) {
64+
async save (id, data) {
6565
try {
6666
this.saveSync(id, data)
6767
await super.save(id, JSON.parse(JSON.stringify(data)))
@@ -78,7 +78,7 @@ const DsCoreExtensions = superclass =>
7878
* @param {string} id
7979
* @returns {Promise<Model>|undefined}
8080
*/
81-
async find(id) {
81+
async find (id) {
8282
try {
8383
const cached = this.findSync(id)
8484
if (cached) return cached
@@ -98,33 +98,33 @@ const DsCoreExtensions = superclass =>
9898
}
9999
}
100100

101-
hydrate() {
101+
hydrate () {
102102
const ctx = this
103103

104104
return new Transform({
105105
objectMode: true,
106106

107-
transform(chunk, encoding, next) {
107+
transform (chunk, encoding, next) {
108108
this.push(ModelFactory.loadModel(broker, ctx, chunk, ctx.name))
109109
next()
110-
},
110+
}
111111
})
112112
}
113113

114-
serialize() {
114+
serialize () {
115115
let first = true
116116

117117
return new Transform({
118118
objectMode: true,
119119

120120
// start of array
121-
construct(callback) {
121+
construct (callback) {
122122
this.push('[')
123123
callback()
124124
},
125125

126126
// each chunk is a record
127-
transform(chunk, encoding, next) {
127+
transform (chunk, encoding, next) {
128128
// comma-separate
129129
if (first) first = false
130130
else this.push(',')
@@ -135,10 +135,10 @@ const DsCoreExtensions = superclass =>
135135
},
136136

137137
// end of array
138-
flush(callback) {
138+
flush (callback) {
139139
this.push(']')
140140
callback()
141-
},
141+
}
142142
})
143143
}
144144

@@ -148,7 +148,7 @@ const DsCoreExtensions = superclass =>
148148
* @param {import('./datasource').listOptions} options
149149
* @returns {Array<Readable|Transform>}
150150
*/
151-
stream(list, options) {
151+
stream (list, options) {
152152
return new Promise((resolve, reject) => {
153153
options.writable.on('error', reject)
154154
options.writable.on('end', resolve)
@@ -173,12 +173,15 @@ const DsCoreExtensions = superclass =>
173173
* @override
174174
* @param {import('../../domain/datasource').listOptions} param
175175
*/
176-
async list(options) {
176+
async list (options) {
177177
try {
178178
if (options?.query?.__count) return this.count()
179179
if (options?.query?.__cached) return this.listSync(options.query)
180180

181-
const opts = { ...options, streamRequested: options?.writable }
181+
const opts = {
182+
...options,
183+
streamRequested: options?.writable ? true : false
184+
}
182185
const list = [await super.list(opts)].flat()
183186

184187
if (list.length < 1) return []
@@ -202,7 +205,7 @@ const DsCoreExtensions = superclass =>
202205
* @param {*} id
203206
* @returns
204207
*/
205-
async delete(id) {
208+
async delete (id) {
206209
try {
207210
await super.delete(id)
208211
// only if super succeeds
@@ -238,11 +241,11 @@ const DataSourceFactory = (() => {
238241
* @param {*} name
239242
* @returns
240243
*/
241-
function hasDataSource(name) {
244+
function hasDataSource (name) {
242245
return dataSources.has(name)
243246
}
244247

245-
function listDataSources() {
248+
function listDataSources () {
246249
return [...dataSources.keys()]
247250
}
248251

@@ -267,7 +270,7 @@ const DataSourceFactory = (() => {
267270
* @param {dsOpts} options
268271
* @returns {typeof DataSource}
269272
*/
270-
function createDataSourceClass(spec, options) {
273+
function createDataSourceClass (spec, options) {
271274
const { memoryOnly, ephemeral, adapterName } = options
272275

273276
if (memoryOnly || ephemeral) return dsClasses['DataSourceMemory']
@@ -295,7 +298,7 @@ const DataSourceFactory = (() => {
295298
* @param {dsOpts} options
296299
* @returns {typeof DataSource}
297300
*/
298-
function extendDataSourceClass(DsClass, options = {}) {
301+
function extendDataSourceClass (DsClass, options = {}) {
299302
const mixins = [extendClass].concat(options.mixins || [])
300303
return compose(...mixins)(DsClass)
301304
}
@@ -306,7 +309,7 @@ const DataSourceFactory = (() => {
306309
* @param {dsOpts} [options]
307310
* @returns {DataSource}
308311
*/
309-
function createDataSource(name, namespace, options) {
312+
function createDataSource (name, namespace, options) {
310313
const spec = ModelFactory.getModelSpec(name)
311314
const dsMap = options.dsMap || new Map()
312315

@@ -332,7 +335,7 @@ const DataSourceFactory = (() => {
332335
* @param {dsOpts} options
333336
* @returns {import('./datasource').default}
334337
*/
335-
function getDataSource(name, namespace = null, options = {}) {
338+
function getDataSource (name, namespace = null, options = {}) {
336339
if (!dataSources) dataSources = new Map()
337340
if (!namespace) return dataSources.get(name)
338341
if (dataSources.has(name)) return dataSources.get(name)
@@ -347,7 +350,7 @@ const DataSourceFactory = (() => {
347350
* @param {dsOpts} [options]
348351
* @returns
349352
*/
350-
function getSharedDataSource(name, namespace = null, options = {}) {
353+
function getSharedDataSource (name, namespace = null, options = {}) {
351354
if (!dataSources) dataSources = new Map()
352355
if (!namespace) return dataSources.get(name)
353356
if (dataSources.has(name)) return dataSources.get(name)
@@ -362,20 +365,20 @@ const DataSourceFactory = (() => {
362365
* @param {string} name
363366
* @returns {ProxyHandler<DataSource>}
364367
*/
365-
function getRestrictedDataSource(name, namespace, options) {
368+
function getRestrictedDataSource (name, namespace, options) {
366369
return new Proxy(getDataSource(name, namespace, options), {
367-
get(target, key) {
370+
get (target, key) {
368371
if (key === 'factory') {
369372
throw new Error('unauthorized')
370373
}
371374
},
372-
ownKeys(target) {
375+
ownKeys (target) {
373376
return []
374-
},
377+
}
375378
})
376379
}
377380

378-
function close() {
381+
function close () {
379382
dataSources.forEach(ds => ds.close())
380383
}
381384

@@ -390,7 +393,7 @@ const DataSourceFactory = (() => {
390393
getRestrictedDataSource,
391394
hasDataSource,
392395
listDataSources,
393-
close,
396+
close
394397
})
395398
})()
396399

0 commit comments

Comments
 (0)