Skip to content

Commit 7f4816f

Browse files
committed
fix mongo streaming
1 parent 0080b9d commit 7f4816f

File tree

2 files changed

+54
-54
lines changed

2 files changed

+54
-54
lines changed

src/adapters/datasources/datasource-mongodb.js

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ const processQuery = qpm({
1414
converters: {
1515
nullstring: val => {
1616
return { $type: 10 }
17-
}, // reference BSON datatypes https://www.mongodb.com/docs/manual/reference/bson-types/
18-
},
17+
} // reference BSON datatypes https://www.mongodb.com/docs/manual/reference/bson-types/
18+
}
1919
})
2020

2121
const url = process.env.MONGODB_URL || 'mongodb://localhost:27017'
@@ -136,16 +136,16 @@ export class DataSourceMongoDb extends DataSource {
136136
}
137137
}
138138

139-
async collection() {
139+
async collection () {
140140
return (await this.connection()).db(this.namespace).collection(this.name)
141141
}
142142

143-
async createIndexes(client) {
143+
async createIndexes (client) {
144144
const indexOperations = this.options.connOpts.indexes.map(index => {
145145
return {
146146
name: index.fields.join('_'),
147147
key: index.fields.reduce((a, v) => ({ ...a, [v]: 1 }), {}),
148-
...index.options,
148+
...index.options
149149
}
150150
})
151151

@@ -278,7 +278,7 @@ export class DataSourceMongoDb extends DataSource {
278278
* @returns
279279
*/
280280

281-
async mongoCount({ filter = {} } = {}) {
281+
async mongoCount ({ filter = {} } = {}) {
282282
return filter == {}
283283
? await this.countDb()
284284
: (await this.collection()).count(filter)
@@ -295,21 +295,20 @@ export class DataSourceMongoDb extends DataSource {
295295
* }} param0
296296
* @returns
297297
*/
298-
streamList({ writable, serialize, transform, options }) {
298+
streamList ({ writable, serialize, transform, options }) {
299299
try {
300-
301300
const serializer = new Transform({
302301
writableObjectMode: true,
303302

304303
// start of array
305-
construct(callback) {
304+
construct (callback) {
306305
this.first = true
307306
this.push('[')
308307
callback()
309308
},
310309

311310
// each chunk is a record
312-
transform(chunk, _encoding, next) {
311+
transform (chunk, _encoding, next) {
313312
// comma-separate
314313
if (this.first) this.first = false
315314
else this.push(',')
@@ -320,32 +319,32 @@ export class DataSourceMongoDb extends DataSource {
320319
},
321320

322321
// end of array
323-
flush(callback) {
322+
flush (callback) {
324323
this.push(']')
325324
callback()
326-
},
325+
}
327326
})
328327

329328
const paginate = new Transform({
330329
writableObjectMode: true,
331330

332331
// start of array
333-
construct(callback) {
332+
construct (callback) {
334333
this.push('{ ')
335334
callback()
336335
},
337336

338337
// first chunk is the pagination data, rest is result
339-
transform(chunk, _encoding, next) {
338+
transform (chunk, _encoding, next) {
340339
this.push(chunk)
341340
next()
342341
},
343342

344343
// end of array
345-
flush(callback) {
344+
flush (callback) {
346345
this.push('}')
347346
callback()
348-
},
347+
}
349348
})
350349

351350
return new Promise(async (resolve, reject) => {
@@ -354,7 +353,7 @@ export class DataSourceMongoDb extends DataSource {
354353

355354
// optionally transform db stream then pipe to output
356355
pipeArgs.push(readable)
357-
356+
358357
if (options.page) {
359358
const count = ~~(await this.mongoCount(options.filter))
360359
pipeArgs.push(paginate)
@@ -366,19 +365,18 @@ export class DataSourceMongoDb extends DataSource {
366365
() => console.log('paginated query', options)
367366
)
368367
}
369-
368+
370369
return pipeArgs
371-
372370
})
373371
} catch (error) {
374372
console.error({
375373
fn: this.streamList.name,
376-
error,
374+
error
377375
})
378376
}
379377
}
380378

381-
processOptions({ options = {}, query = {} }) {
379+
processOptions ({ options = {}, query = {} }) {
382380
return { ...processQuery(query), ...options } // options must overwite the query not otherwise
383381
}
384382

@@ -389,13 +387,14 @@ export class DataSourceMongoDb extends DataSource {
389387
*/
390388
async list (param) {
391389
try {
390+
let result
392391
const options = this.processOptions(param)
393-
if (0 < ~~query.__page) {
392+
if (0 < ~~options.__page) {
394393
// qpm > processOptions weeds out __page - add it back properly as an integer
395394
options.page = ~~query.__page
396395
options.skip = (options.page - 1) * options.limit || 0
397396
}
398-
if (query.__aggregate) {
397+
if (options.__aggregate) {
399398
// qpm > processOptions weeds out __aggregate - add it back properly as parsed json
400399
try {
401400
const aggregateQuery = JSON.parse(query.__aggregate)
@@ -406,7 +405,7 @@ export class DataSourceMongoDb extends DataSource {
406405
}
407406
console.log({ options })
408407

409-
if (streamRequested && !query.__json)
408+
if (options.streamRequested && !query.__json)
410409
this.streamList({ writable, serialize, transform, options })
411410
else {
412411
const data = (await this.mongoFind(options)).toArray()
@@ -415,7 +414,7 @@ export class DataSourceMongoDb extends DataSource {
415414
...options,
416415
data,
417416
count,
418-
total: Math.ceil(count / options.limit),
417+
total: Math.ceil(count / options.limit)
419418
}
420419
}
421420

src/domain/datasource-factory.js

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,15 @@ const DefaultDataSource =
3838
*/
3939
const DsCoreExtensions = superclass =>
4040
class extends superclass {
41-
constructor(map, name, namespace, options) {
41+
constructor (map, name, namespace, options) {
4242
super(map, name, namespace, options)
4343
}
4444

45-
set factory(value) {
45+
set factory (value) {
4646
this[FACTORY] = value
4747
}
4848

49-
get factory() {
49+
get factory () {
5050
return this[FACTORY]
5151
}
5252

@@ -61,7 +61,7 @@ const DsCoreExtensions = superclass =>
6161
* @param {string} id
6262
* @param {Model} data
6363
*/
64-
async save(id, data) {
64+
async save (id, data) {
6565
try {
6666
this.saveSync(id, data)
6767
await super.save(id, JSON.parse(JSON.stringify(data)))
@@ -78,7 +78,7 @@ const DsCoreExtensions = superclass =>
7878
* @param {string} id
7979
* @returns {Promise<Model>|undefined}
8080
*/
81-
async find(id) {
81+
async find (id) {
8282
try {
8383
const cached = this.findSync(id)
8484
if (cached) return cached
@@ -98,33 +98,33 @@ const DsCoreExtensions = superclass =>
9898
}
9999
}
100100

101-
hydrate() {
101+
hydrate () {
102102
const ctx = this
103103

104104
return new Transform({
105105
objectMode: true,
106106

107-
transform(chunk, encoding, next) {
107+
transform (chunk, encoding, next) {
108108
this.push(ModelFactory.loadModel(broker, ctx, chunk, ctx.name))
109109
next()
110-
},
110+
}
111111
})
112112
}
113113

114-
serialize() {
114+
serialize () {
115115
let first = true
116116

117117
return new Transform({
118118
objectMode: true,
119119

120120
// start of array
121-
construct(callback) {
121+
construct (callback) {
122122
this.push('[')
123123
callback()
124124
},
125125

126126
// each chunk is a record
127-
transform(chunk, encoding, next) {
127+
transform (chunk, encoding, next) {
128128
// comma-separate
129129
if (first) first = false
130130
else this.push(',')
@@ -135,10 +135,10 @@ const DsCoreExtensions = superclass =>
135135
},
136136

137137
// end of array
138-
flush(callback) {
138+
flush (callback) {
139139
this.push(']')
140140
callback()
141-
},
141+
}
142142
})
143143
}
144144

@@ -148,7 +148,7 @@ const DsCoreExtensions = superclass =>
148148
* @param {import('./datasource').listOptions} options
149149
* @returns {Array<Readable|Transform>}
150150
*/
151-
stream(list, options) {
151+
stream (list, options) {
152152
return new Promise((resolve, reject) => {
153153
options.writable.on('error', reject)
154154
options.writable.on('end', resolve)
@@ -173,14 +173,15 @@ const DsCoreExtensions = superclass =>
173173
* @override
174174
* @param {import('../../domain/datasource').listOptions} param
175175
*/
176-
async list(options) {
176+
async list (options) {
177177
try {
178178
if (options?.query?.__count) return this.count()
179179
if (options?.query?.__cached) return this.listSync(options.query)
180180

181181
const opts = { ...options, streamRequested: options?.writable }
182182
const list = [await super.list(opts)].flat()
183183

184+
console.debug({ list })
184185
if (list.length < 1) return []
185186

186187
if (list[0] instanceof Readable || list[0] instanceof Transform)
@@ -202,7 +203,7 @@ const DsCoreExtensions = superclass =>
202203
* @param {*} id
203204
* @returns
204205
*/
205-
async delete(id) {
206+
async delete (id) {
206207
try {
207208
await super.delete(id)
208209
// only if super succeeds
@@ -238,11 +239,11 @@ const DataSourceFactory = (() => {
238239
* @param {*} name
239240
* @returns
240241
*/
241-
function hasDataSource(name) {
242+
function hasDataSource (name) {
242243
return dataSources.has(name)
243244
}
244245

245-
function listDataSources() {
246+
function listDataSources () {
246247
return [...dataSources.keys()]
247248
}
248249

@@ -267,7 +268,7 @@ const DataSourceFactory = (() => {
267268
* @param {dsOpts} options
268269
* @returns {typeof DataSource}
269270
*/
270-
function createDataSourceClass(spec, options) {
271+
function createDataSourceClass (spec, options) {
271272
const { memoryOnly, ephemeral, adapterName } = options
272273

273274
if (memoryOnly || ephemeral) return dsClasses['DataSourceMemory']
@@ -295,7 +296,7 @@ const DataSourceFactory = (() => {
295296
* @param {dsOpts} options
296297
* @returns {typeof DataSource}
297298
*/
298-
function extendDataSourceClass(DsClass, options = {}) {
299+
function extendDataSourceClass (DsClass, options = {}) {
299300
const mixins = [extendClass].concat(options.mixins || [])
300301
return compose(...mixins)(DsClass)
301302
}
@@ -306,7 +307,7 @@ const DataSourceFactory = (() => {
306307
* @param {dsOpts} [options]
307308
* @returns {DataSource}
308309
*/
309-
function createDataSource(name, namespace, options) {
310+
function createDataSource (name, namespace, options) {
310311
const spec = ModelFactory.getModelSpec(name)
311312
const dsMap = options.dsMap || new Map()
312313

@@ -332,7 +333,7 @@ const DataSourceFactory = (() => {
332333
* @param {dsOpts} options
333334
* @returns {import('./datasource').default}
334335
*/
335-
function getDataSource(name, namespace = null, options = {}) {
336+
function getDataSource (name, namespace = null, options = {}) {
336337
if (!dataSources) dataSources = new Map()
337338
if (!namespace) return dataSources.get(name)
338339
if (dataSources.has(name)) return dataSources.get(name)
@@ -347,7 +348,7 @@ const DataSourceFactory = (() => {
347348
* @param {dsOpts} [options]
348349
* @returns
349350
*/
350-
function getSharedDataSource(name, namespace = null, options = {}) {
351+
function getSharedDataSource (name, namespace = null, options = {}) {
351352
if (!dataSources) dataSources = new Map()
352353
if (!namespace) return dataSources.get(name)
353354
if (dataSources.has(name)) return dataSources.get(name)
@@ -362,20 +363,20 @@ const DataSourceFactory = (() => {
362363
* @param {string} name
363364
* @returns {ProxyHandler<DataSource>}
364365
*/
365-
function getRestrictedDataSource(name, namespace, options) {
366+
function getRestrictedDataSource (name, namespace, options) {
366367
return new Proxy(getDataSource(name, namespace, options), {
367-
get(target, key) {
368+
get (target, key) {
368369
if (key === 'factory') {
369370
throw new Error('unauthorized')
370371
}
371372
},
372-
ownKeys(target) {
373+
ownKeys (target) {
373374
return []
374-
},
375+
}
375376
})
376377
}
377378

378-
function close() {
379+
function close () {
379380
dataSources.forEach(ds => ds.close())
380381
}
381382

@@ -390,7 +391,7 @@ const DataSourceFactory = (() => {
390391
getRestrictedDataSource,
391392
hasDataSource,
392393
listDataSources,
393-
close,
394+
close
394395
})
395396
})()
396397

0 commit comments

Comments
 (0)