diff --git a/src/schema-analyzer.ts b/src/schema-analyzer.ts index 631cefa..7d0d5d7 100644 --- a/src/schema-analyzer.ts +++ b/src/schema-analyzer.ts @@ -18,6 +18,7 @@ import { import semanticTypes from './semantic-types'; import { AnyIterable } from './types'; +import { allowAbort, ALLOW_ABORT_INTERVAL_COUNT } from './util'; type TypeCastMap = { Array: unknown[]; @@ -484,6 +485,10 @@ export class SchemaAnalyzer { fields: [] }; + // Increments when every field or type is analyzed. + // Useful for occasionally checking if the analysis should be aborted. + fieldAndTypeAnalysisCounter = 0; + constructor(options?: SchemaParseOptions) { // Set default options. this.options = { ...defaultSchemaParseOptions, ...options }; @@ -512,6 +517,13 @@ export class SchemaAnalyzer { } } + allowAbortDuringAnalysis() { + // Allow aborting the analysis. + if (this.fieldAndTypeAnalysisCounter++ % ALLOW_ABORT_INTERVAL_COUNT === 0) { + allowAbort(); + } + } + increaseFieldCount() { if (!this.options.distinctFieldsAbortThreshold) return; this.fieldsCount++; @@ -531,14 +543,15 @@ export class SchemaAnalyzer { return returnValue; } - analyzeDoc(doc: Document) { + async analyzeDoc(doc: Document) { this.finalized = false; /** * Takes a field value, determines the correct type, handles recursion into * nested arrays and documents, and passes the value down to `addToValue`. * Note: This mutates the `schema` argument. */ - const addToType = (path: string[], value: BSONValue, schema: SchemaAnalysisFieldTypes) => { + const addToType = async(path: string[], value: BSONValue, schema: SchemaAnalysisFieldTypes) => { + await this.allowAbortDuringAnalysis(); const bsonType = getBSONType(value); // If semantic type detection is enabled, the type is the semantic type // or the original bson type if no semantic type was detected. If disabled, @@ -560,13 +573,16 @@ export class SchemaAnalyzer { type.types = type.types ?? Object.create(null); type.lengths = type.lengths ?? []; type.lengths.push((value as BSONValue[]).length); - (value as BSONValue[]).forEach((v: BSONValue) => addToType(path, v, type.types)); + for (const v of (value as BSONValue[])) { + await addToType(path, v, type.types); + } } else if (isDocumentType(type)) { // Recurse into nested documents by calling `addToField` for all sub-fields. type.fields = type.fields ?? Object.create(null); - Object.entries(value as Document).forEach( - ([fieldName, v]) => addToField(fieldName, [...path, fieldName], v, type.fields) - ); + + for (const [fieldName, v] of Object.entries(value as Document)) { + await addToField(fieldName, [...path, fieldName], v, type.fields); + } } else if (this.options.storeValues && !isNullType(type)) { // When the `storeValues` option is enabled, store some example values. if (!type.values) { @@ -584,7 +600,8 @@ export class SchemaAnalyzer { * Handles a field from a document. Passes the value to `addToType`. * Note: This mutates the `schema` argument. */ - const addToField = (fieldName: string, path: string[], value: BSONValue, schema: SchemaAnalysisFieldsMap) => { + const addToField = async(fieldName: string, path: string[], value: BSONValue, schema: SchemaAnalysisFieldsMap) => { + await this.allowAbortDuringAnalysis(); if (!schema[fieldName]) { schema[fieldName] = { name: fieldName, @@ -597,11 +614,11 @@ export class SchemaAnalyzer { const field = schema[fieldName]; field.count++; - addToType(path, value, field.types); + await addToType(path, value, field.types); }; for (const key of Object.keys(doc)) { - addToField(key, [key], doc[key], this.schemaAnalysisRoot.fields); + await addToField(key, [key], doc[key], this.schemaAnalysisRoot.fields); } this.schemaAnalysisRoot.count += 1; } @@ -652,7 +669,7 @@ export async function getCompletedSchemaAnalyzer( const analyzer = new SchemaAnalyzer(options); for await (const doc of verifyStreamSource(source)) { if (options?.signal?.aborted) throw options.signal.reason; - analyzer.analyzeDoc(doc); + await analyzer.analyzeDoc(doc); } return analyzer; } diff --git a/src/schema-converters/internalToExpanded.ts b/src/schema-converters/internalToExpanded.ts index fbe8217..0124e8c 100644 --- a/src/schema-converters/internalToExpanded.ts +++ b/src/schema-converters/internalToExpanded.ts @@ -2,7 +2,7 @@ import { ArraySchemaType, DocumentSchemaType, Schema as InternalSchema, SchemaTy import { type ExpandedJSONSchema } from '../types'; import { InternalTypeToStandardTypeMap, RELAXED_EJSON_DEFINITIONS } from './internalToStandard'; import { InternalTypeToBsonTypeMap } from './internalToMongoDB'; -import { allowAbort } from './util'; +import { allowAbort } from '../util'; const createConvertInternalToExpanded = function() { const usedDefinitions = new Set(); diff --git a/src/schema-converters/internalToMongoDB.ts b/src/schema-converters/internalToMongoDB.ts index e85f9ca..c40447f 100644 --- a/src/schema-converters/internalToMongoDB.ts +++ b/src/schema-converters/internalToMongoDB.ts @@ -3,7 +3,7 @@ */ import { ArraySchemaType, DocumentSchemaType, Schema as InternalSchema, SchemaType } from '../schema-analyzer'; import { MongoDBJSONSchema } from '../types'; -import { allowAbort } from './util'; +import { allowAbort } from '../util'; export const InternalTypeToBsonTypeMap: Record< SchemaType['name'] | 'Double' | 'BSONSymbol', diff --git a/src/schema-converters/internalToStandard.ts b/src/schema-converters/internalToStandard.ts index a1ec463..923604c 100644 --- a/src/schema-converters/internalToStandard.ts +++ b/src/schema-converters/internalToStandard.ts @@ -1,7 +1,7 @@ import { JSONSchema4TypeName } from 'json-schema'; import { ArraySchemaType, DocumentSchemaType, Schema as InternalSchema, SchemaType } from '../schema-analyzer'; import { StandardJSONSchema } from '../types'; -import { allowAbort } from './util'; +import { allowAbort } from '../util'; type StandardTypeDefinition = { type: JSONSchema4TypeName, $ref?: never; } | { $ref: string, type?: never }; diff --git a/src/schema-converters/util.ts b/src/util.ts similarity index 83% rename from src/schema-converters/util.ts rename to src/util.ts index 941ceee..f1521de 100644 --- a/src/schema-converters/util.ts +++ b/src/util.ts @@ -1,3 +1,5 @@ +export const ALLOW_ABORT_INTERVAL_COUNT = 1000; + export async function allowAbort(signal?: AbortSignal) { return new Promise((resolve, reject) => setTimeout(() => {