Skip to content

Commit 278de0c

Browse files
committed
move streaming logic into DataSourceFactory
1 parent c826431 commit 278de0c

File tree

3 files changed

+69
-76
lines changed

3 files changed

+69
-76
lines changed

src/adapters/controllers/get-models.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
'use strict'
2+
import { Readable, Transform } from 'stream'
23
import getContent from './get-content'
34

45
/**
@@ -16,11 +17,14 @@ export default function getModelsFactory (listModels) {
1617
writable: httpRequest.res
1718
})
1819

19-
if (!models) {
20+
if (
21+
models?.length > 0 &&
22+
(models[0] instanceof Readable || models[0] instanceof Transform)
23+
) {
2024
httpRequest.stream = true
2125
return
2226
}
23-
27+
2428
const { content, contentType } = getContent(httpRequest, models)
2529

2630
return {

src/adapters/datasources/datasource-mongodb.js

Lines changed: 3 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ export class DataSourceMongoDb extends DataSource {
6969
}
7070
}
7171
const breaker = CircuitBreaker(
72-
'mongo.conn',
72+
'mongodb.connect',
7373
this.connect(client),
7474
thresholds
7575
)
@@ -215,55 +215,6 @@ export class DataSourceMongoDb extends DataSource {
215215
* }} param0
216216
* @returns
217217
*/
218-
streamList ({ writable, serialize, transform, options }) {
219-
try {
220-
let first = true
221-
222-
const serializer = new Transform({
223-
writableObjectMode: true,
224-
225-
// start of array
226-
construct (callback) {
227-
this.push('[')
228-
callback()
229-
},
230-
231-
// each chunk is a record
232-
transform (chunk, _encoding, next) {
233-
// comma-separate
234-
if (first) first = false
235-
else this.push(',')
236-
237-
// serialize record
238-
this.push(JSON.stringify(chunk))
239-
next()
240-
},
241-
242-
// end of array
243-
flush (callback) {
244-
this.push(']')
245-
callback()
246-
}
247-
})
248-
249-
return new Promise(async (resolve, reject) => {
250-
const readable = (await this.mongoFind(options)).stream()
251-
252-
readable.on('error', reject)
253-
readable.on('end', resolve)
254-
255-
// optionally transform db stream then pipe to output
256-
if (transform && serialize)
257-
readable
258-
.pipe(transform)
259-
.pipe(serializer)
260-
.pipe(writable)
261-
else if (transform) readable.pipe(transform).pipe(writable)
262-
else if (serialize) readable.pipe(serializer).pipe(writable)
263-
else readable.pipe(writable)
264-
})
265-
} catch (error) {}
266-
}
267218

268219
processOptions (param) {
269220
const { options = {}, query = {} } = param
@@ -294,22 +245,14 @@ export class DataSourceMongoDb extends DataSource {
294245
* - `writable` writable stream for output
295246
*/
296247
async list (param = {}) {
297-
const {
298-
writable = null,
299-
transform = null,
300-
serialize = false,
301-
query = {}
302-
} = param
248+
const { writable = null, transform = null, serialize = false } = param
303249

304250
try {
305-
if (query.__cached) return super.listSync(query)
306-
if (query.__count) return this.count()
307-
308251
const options = this.processOptions(param)
309252
console.log({ options })
310253

311254
if (writable) {
312-
return this.streamList({ writable, serialize, transform, options })
255+
return (await this.mongoFind(options)).stream()
313256
}
314257

315258
return (await this.mongoFind(options)).toArray()

src/domain/datasource-factory.js

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict'
22

3-
import { Transform } from 'stream'
3+
import { Readable, Transform } from 'node:stream'
44
import { isMainThread } from 'worker_threads'
55
/** @typedef {import('.').Model} Model */
66

@@ -94,39 +94,85 @@ const DsCoreExtensions = superclass =>
9494
}
9595
}
9696

97-
transform () {
97+
hydrate () {
9898
const ctx = this
9999

100100
return new Transform({
101101
objectMode: true,
102102

103-
transform (chunk, _encoding, next) {
103+
transform (chunk, encoding, next) {
104104
this.push(ModelFactory.loadModel(broker, ctx, chunk, ctx.name))
105105
next()
106106
}
107107
})
108108
}
109109

110+
serialize () {
111+
let first = true
112+
113+
return new Transform({
114+
objectMode: true,
115+
116+
// start of array
117+
construct (callback) {
118+
this.push('[')
119+
callback()
120+
},
121+
122+
// each chunk is a record
123+
transform (chunk, encoding, next) {
124+
// comma-separate
125+
if (first) first = false
126+
else this.push(',')
127+
128+
// serialize record
129+
this.push(JSON.stringify(chunk))
130+
next()
131+
},
132+
133+
// end of array
134+
flush (callback) {
135+
this.push(']')
136+
callback()
137+
}
138+
})
139+
}
140+
141+
stream (list, options) {
142+
return new Promise((resolve, reject) => {
143+
options.writable.on('error', reject)
144+
options.writable.on('end', resolve)
145+
146+
if (!isMainThread) list.push(this.hydrate())
147+
if (options.transform) list.concat(options.transform)
148+
if (options.serialize) list.push(this.serialize())
149+
150+
return list.reduce((a, b) => a.pipe(b)).pipe(options.writable)
151+
})
152+
}
153+
110154
/**
111155
* @override
112156
* @param {*} options
113157
* @returns
114158
*/
115159
async list (options) {
116160
try {
117-
if (options?.writable)
118-
return isMainThread
119-
? super.list(options)
120-
: super.list({ ...options, transform: this.transform() })
161+
if (options?.query.__count) return this.count()
162+
if (options?.query.__cached) return this.listSync(options.query)
121163

122-
const arr = await super.list(options)
164+
const list = [await super.list(options)].flat(1)
123165

124-
if (Array.isArray(arr))
125-
return isMainThread
126-
? arr
127-
: arr.map(model =>
128-
ModelFactory.loadModel(broker, this, model, this.name)
129-
)
166+
if (list.length < 1) return []
167+
168+
if (list[0] instanceof Readable || list[0] instanceof Transform)
169+
return this.stream(list, options)
170+
171+
return isMainThread
172+
? list
173+
: list.map(model =>
174+
ModelFactory.loadModel(broker, this, model, this.name)
175+
)
130176
} catch (error) {
131177
console.error({ fn: this.list.name, error })
132178
throw error

0 commit comments

Comments
 (0)