1
- import type { AggregationCursor , Document , FindCursor } from 'mongodb' ;
2
- import { Readable , PassThrough } from 'stream' ;
3
- import { pipeline } from 'stream/promises' ;
4
-
5
- import stream from './stream' ;
6
- import type { ParseStreamOptions } from './stream' ;
7
1
import { SchemaAnalyzer } from './schema-analyzer' ;
8
2
import type {
9
3
ArraySchemaType ,
@@ -24,72 +18,55 @@ import type {
24
18
} from './schema-analyzer' ;
25
19
import * as schemaStats from './stats' ;
26
20
27
- type MongoDBCursor = AggregationCursor | FindCursor ;
21
+ type AnyIterable < T = any > = Iterable < T > | AsyncIterable < T > ;
28
22
29
- function getStreamSource (
30
- source : Document [ ] | MongoDBCursor | Readable
31
- ) : Readable {
32
- let streamSource : Readable ;
33
- if ( 'stream' in source ) {
34
- // MongoDB Cursor.
35
- streamSource = source . stream ( ) ;
36
- } else if ( 'pipe' in source ) {
37
- // Document stream.
38
- streamSource = source ;
39
- } else if ( Array . isArray ( source ) ) {
40
- // Array of documents.
41
- streamSource = Readable . from ( source ) ;
42
- } else {
23
+ function verifyStreamSource (
24
+ source : AnyIterable
25
+ ) : AnyIterable {
26
+ if ( ! ( Symbol . iterator in source ) && ! ( Symbol . asyncIterator in source ) ) {
43
27
throw new Error (
44
28
'Unknown input type for `docs`. Must be an array, ' +
45
29
'stream or MongoDB Cursor.'
46
30
) ;
47
31
}
48
32
49
- return streamSource ;
33
+ return source ;
50
34
}
51
35
52
- async function schemaStream (
53
- source : Document [ ] | MongoDBCursor | Readable ,
54
- options ?: ParseStreamOptions
55
- ) {
56
- const streamSource = getStreamSource ( source ) ;
57
-
58
- const dest = new PassThrough ( { objectMode : true } ) ;
59
- await pipeline ( streamSource , stream ( options ) , dest ) ;
60
- for await ( const result of dest ) {
61
- return result ;
36
+ async function getCompletedSchemaAnalyzer (
37
+ source : AnyIterable ,
38
+ options ?: SchemaParseOptions
39
+ ) : Promise < SchemaAnalyzer > {
40
+ const analyzer = new SchemaAnalyzer ( options ) ;
41
+ for await ( const doc of verifyStreamSource ( source ) ) {
42
+ analyzer . analyzeDoc ( doc ) ;
62
43
}
63
- throw new Error ( 'unreachable' ) ; // `dest` always emits exactly one doc.
44
+ return analyzer ;
64
45
}
65
46
66
47
/**
67
48
* Convenience shortcut for parsing schemas. Accepts an array, stream or
68
49
* MongoDB cursor object to parse documents` from.
69
50
*/
70
51
async function parseSchema (
71
- source : Document [ ] | MongoDBCursor | Readable ,
52
+ source : AnyIterable ,
72
53
options ?: SchemaParseOptions
73
54
) : Promise < Schema > {
74
- return await schemaStream ( source , options ) ;
55
+ return ( await getCompletedSchemaAnalyzer ( source , options ) ) . getResult ( ) ;
75
56
}
76
57
77
58
// Convenience shortcut for getting schema paths.
78
59
async function getSchemaPaths (
79
- source : Document [ ] | MongoDBCursor | Readable
60
+ source : AnyIterable
80
61
) : Promise < string [ ] [ ] > {
81
- return await schemaStream ( source , {
82
- schemaPaths : true
83
- } ) ;
62
+ return ( await getCompletedSchemaAnalyzer ( source ) ) . getSchemaPaths ( ) ;
84
63
}
85
64
86
65
// Convenience shortcut for getting the simplified schema.
87
66
async function getSimplifiedSchema (
88
- source : Document [ ] | MongoDBCursor | Readable
67
+ source : AnyIterable
89
68
) : Promise < SimplifiedSchema > {
90
- return await schemaStream ( source , {
91
- simplifiedSchema : true
92
- } ) ;
69
+ return ( await getCompletedSchemaAnalyzer ( source ) ) . getSimplifiedSchema ( ) ;
93
70
}
94
71
95
72
export default parseSchema ;
@@ -113,8 +90,6 @@ export type {
113
90
} ;
114
91
115
92
export {
116
- stream ,
117
- getStreamSource ,
118
93
parseSchema ,
119
94
getSchemaPaths ,
120
95
getSimplifiedSchema ,
0 commit comments