diff --git a/package-lock.json b/package-lock.json index c4c595ceaad..d36a1bf914f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -44189,6 +44189,7 @@ "compass-preferences-model": "^2.35.0", "hadron-app-registry": "^9.4.8", "lodash": "^4.17.21", + "mongodb": "^6.14.1", "mongodb-ns": "^2.4.2", "mongodb-schema": "^12.5.2", "react": "^17.0.2", @@ -56617,6 +56618,7 @@ "hadron-app-registry": "^9.4.8", "lodash": "^4.17.21", "mocha": "^10.2.0", + "mongodb": "^6.14.1", "mongodb-ns": "^2.4.2", "mongodb-schema": "^12.5.2", "nyc": "^15.1.0", diff --git a/packages/compass-data-modeling/package.json b/packages/compass-data-modeling/package.json index cdd416f5c35..4df58a223af 100644 --- a/packages/compass-data-modeling/package.json +++ b/packages/compass-data-modeling/package.json @@ -66,6 +66,7 @@ "compass-preferences-model": "^2.35.0", "hadron-app-registry": "^9.4.8", "lodash": "^4.17.21", + "mongodb": "^6.14.1", "mongodb-ns": "^2.4.2", "mongodb-schema": "^12.5.2", "react": "^17.0.2", diff --git a/packages/compass-data-modeling/src/store/analysis-process.ts b/packages/compass-data-modeling/src/store/analysis-process.ts index e8ffc9da387..dfd9007712c 100644 --- a/packages/compass-data-modeling/src/store/analysis-process.ts +++ b/packages/compass-data-modeling/src/store/analysis-process.ts @@ -3,6 +3,8 @@ import { isAction } from './util'; import type { DataModelingThunkAction } from './reducer'; import { analyzeDocuments } from 'mongodb-schema'; import { getCurrentDiagramFromState } from './diagram'; +import type { Document } from 'bson'; +import type { AggregationCursor } from 'mongodb'; export type AnalysisProcessState = { currentAnalysisOptions: @@ -155,32 +157,32 @@ export function startAnalysis( try { const dataService = services.connections.getDataServiceForConnection(connectionId); - const samples = await Promise.all( + const schema = await Promise.all( namespaces.map(async (ns) => { - // TODO - const sample = await dataService.sample( + const sample: AggregationCursor = dataService.sampleCursor( ns, { size: 100 }, - undefined, { - abortSignal: cancelController.signal, + signal: cancelController.signal, + promoteValues: false, + }, + { + fallbackReadPreference: 'secondaryPreferred', } ); + + const accessor = await analyzeDocuments(sample, { + signal: cancelController.signal, + }); + + // TODO(COMPASS-9314): Update how we show analysis progress. dispatch({ type: AnalysisProcessActionTypes.NAMESPACE_SAMPLE_FETCHED, namespace: ns, }); - return { ns, sample }; - }) - ); - const schema = await Promise.all( - samples.map(async ({ ns, sample }) => { - const schema = await analyzeDocuments(sample, { + + const schema = await accessor.getMongoDBJsonSchema({ signal: cancelController.signal, - }).then((accessor) => { - return accessor.getMongoDBJsonSchema({ - signal: cancelController.signal, - }); }); dispatch({ type: AnalysisProcessActionTypes.NAMESPACE_SCHEMA_ANALYZED, diff --git a/packages/compass-schema-validation/src/index.ts b/packages/compass-schema-validation/src/index.ts index f1456e3f0df..6dc65f41c70 100644 --- a/packages/compass-schema-validation/src/index.ts +++ b/packages/compass-schema-validation/src/index.ts @@ -13,6 +13,7 @@ import { createLoggerLocator } from '@mongodb-js/compass-logging/provider'; import { telemetryLocator } from '@mongodb-js/compass-telemetry/provider'; import { SchemaValidationTabTitle } from './plugin-title'; import { workspacesServiceLocator } from '@mongodb-js/compass-workspaces/provider'; +import type { RequiredDataServiceProps } from './modules'; const CompassSchemaValidationHadronPlugin = registerHadronPlugin( { @@ -23,13 +24,8 @@ const CompassSchemaValidationHadronPlugin = registerHadronPlugin( activate: onActivated, }, { - dataService: dataServiceLocator as DataServiceLocator< - | 'aggregate' - | 'collectionInfo' - | 'updateCollection' - | 'sample' - | 'isCancelError' - >, + dataService: + dataServiceLocator as DataServiceLocator, connectionInfoRef: connectionInfoRefLocator, instance: mongoDBInstanceLocator, preferences: preferencesLocator, diff --git a/packages/compass-schema-validation/src/modules/index.ts b/packages/compass-schema-validation/src/modules/index.ts index 09fb5cf926e..59901d97389 100644 --- a/packages/compass-schema-validation/src/modules/index.ts +++ b/packages/compass-schema-validation/src/modules/index.ts @@ -67,14 +67,13 @@ export type RootAction = | EditModeAction | ResetAction; -export type DataService = Pick< - OriginalDataService, +export type RequiredDataServiceProps = | 'aggregate' | 'collectionInfo' | 'updateCollection' - | 'sample' - | 'isCancelError' ->; + | 'sampleCursor' + | 'isCancelError'; +export type DataService = Pick; export type SchemaValidationExtraArgs = { dataService: DataService; diff --git a/packages/compass-schema-validation/src/stores/store.spec.ts b/packages/compass-schema-validation/src/stores/store.spec.ts index 785b26469c5..21510c990cb 100644 --- a/packages/compass-schema-validation/src/stores/store.spec.ts +++ b/packages/compass-schema-validation/src/stores/store.spec.ts @@ -42,7 +42,13 @@ const fakeDataService = { /* never resolves */ }), isCancelError: () => false, - sample: () => [{ prop1: 'abc' }], + sampleCursor: () => + ({ + async *[Symbol.asyncIterator]() { + await new Promise((resolve) => setTimeout(resolve, 0)); + yield* [{ prop1: 'abc' }]; + }, + } as any), } as any; const fakeWorkspaces = { diff --git a/packages/compass-schema/src/index.ts b/packages/compass-schema/src/index.ts index b546ea37cd3..8ecd87046b2 100644 --- a/packages/compass-schema/src/index.ts +++ b/packages/compass-schema/src/index.ts @@ -8,6 +8,7 @@ import { import CompassSchema from './components/compass-schema'; import { registerHadronPlugin } from 'hadron-app-registry'; import { activateSchemaPlugin } from './stores/store'; +import type { RequiredDataServiceProps } from './stores/store'; import { createLoggerLocator } from '@mongodb-js/compass-logging/provider'; import { telemetryLocator } from '@mongodb-js/compass-telemetry/provider'; import { preferencesLocator } from 'compass-preferences-model/provider'; @@ -31,9 +32,8 @@ const CompassSchemaHadronPlugin = registerHadronPlugin( activate: activateSchemaPlugin, }, { - dataService: dataServiceLocator as DataServiceLocator< - 'sample' | 'isCancelError' - >, + dataService: + dataServiceLocator as DataServiceLocator, logger: createLoggerLocator('COMPASS-SCHEMA-UI'), track: telemetryLocator, preferences: preferencesLocator, diff --git a/packages/compass-schema/src/modules/schema-analysis.spec.ts b/packages/compass-schema/src/modules/schema-analysis.spec.ts index d2acd47f829..5b30653a2ac 100644 --- a/packages/compass-schema/src/modules/schema-analysis.spec.ts +++ b/packages/compass-schema/src/modules/schema-analysis.spec.ts @@ -5,13 +5,13 @@ import mongoDBSchemaAnalyzeSchema from 'mongodb-schema'; import type { Schema } from 'mongodb-schema'; import { createNoopLogger } from '@mongodb-js/compass-logging/provider'; import { isInternalFieldPath } from 'hadron-document'; - -import { analyzeSchema, calculateSchemaMetadata } from './schema-analysis'; import { createSandboxFromDefaultPreferences, type PreferencesAccess, } from 'compass-preferences-model'; +import { analyzeSchema, calculateSchemaMetadata } from './schema-analysis'; + const testDocs = [ { someFields: { @@ -81,13 +81,18 @@ describe('schema-analysis', function () { describe('#getResult', function () { it('returns the schema', async function () { + const docs = [ + { x: 1 }, + { y: 2, __safeContent__: [bson.Binary.createFromBase64('aaaa')] }, + ]; const dataService = { - sample: () => - Promise.resolve([ - { x: 1 }, - { y: 2, __safeContent__: [bson.Binary.createFromBase64('aaaa')] }, - ]), - isCancelError: () => false, + sampleCursor: () => + ({ + async *[Symbol.asyncIterator]() { + await new Promise((resolve) => setTimeout(resolve, 0)); + yield* docs; + }, + } as any), }; const abortController = new AbortController(); const abortSignal = abortController.signal; @@ -179,10 +184,17 @@ describe('schema-analysis', function () { it('adds promoteValues: false so the analyzer can report more accurate types', async function () { const dataService = { - sample: () => Promise.resolve([]), - isCancelError: () => false, + sampleCursor: () => + ({ + async *[Symbol.asyncIterator]() { + await new Promise((resolve) => setTimeout(resolve, 0)); + yield { + a: 123, + }; + }, + } as any), }; - const sampleSpy = sinon.spy(dataService, 'sample'); + const sampleSpy = sinon.spy(dataService, 'sampleCursor'); const abortController = new AbortController(); const abortSignal = abortController.signal; @@ -199,19 +211,31 @@ describe('schema-analysis', function () { expect(sampleSpy).to.have.been.calledWith( 'db.coll', {}, - { promoteValues: false } + { signal: abortSignal, promoteValues: false }, + { fallbackReadPreference: 'secondaryPreferred' } ); }); it('returns undefined if is cancelled', async function () { - const dataService = { - sample: () => Promise.reject(new Error('test error')), - isCancelError: () => true, - }; - const abortController = new AbortController(); const abortSignal = abortController.signal; + const dataService = { + sampleCursor: () => + ({ + async *[Symbol.asyncIterator]() { + await new Promise((resolve) => setTimeout(resolve, 0)); + yield { + a: 123, + }; + abortController.abort(); + yield { + a: 345, + }; + }, + } as any), + }; + const result = await analyzeSchema( dataService, abortSignal, @@ -228,13 +252,19 @@ describe('schema-analysis', function () { it('throws if sample throws', async function () { const error: Error & { code?: any; - } = new Error('should have been thrown'); + } = new Error('pineapple'); error.name = 'MongoError'; error.code = new bson.Int32(1000); const dataService = { - sample: () => Promise.reject(error), - isCancelError: () => false, + sampleCursor: () => + ({ + async *[Symbol.asyncIterator]() { + await new Promise((resolve) => setTimeout(resolve, 0)); + yield {}; + throw error; + }, + } as any), }; const abortController = new AbortController(); @@ -251,7 +281,7 @@ describe('schema-analysis', function () { preferences ); } catch (err: any) { - expect(err.message).to.equal('should have been thrown'); + expect(err.message).to.equal('pineapple'); expect(err.code).to.equal(1000); return; } diff --git a/packages/compass-schema/src/modules/schema-analysis.ts b/packages/compass-schema/src/modules/schema-analysis.ts index 6a1c3bc5012..9696591fbb3 100644 --- a/packages/compass-schema/src/modules/schema-analysis.ts +++ b/packages/compass-schema/src/modules/schema-analysis.ts @@ -50,15 +50,15 @@ export const analyzeSchema = async ( ns, }); - const docs = await dataService.sample( + const sampleCursor = dataService.sampleCursor( ns, query, { ...aggregateOptions, promoteValues: false, + signal: abortSignal, }, { - abortSignal, fallbackReadPreference: 'secondaryPreferred', } ); @@ -72,7 +72,10 @@ export const analyzeSchema = async ( : { signal: abortSignal, }; - const schemaAccessor = await analyzeDocuments(docs, schemaParseOptions); + const schemaAccessor = await analyzeDocuments( + sampleCursor, + schemaParseOptions + ); log.info(mongoLogId(1001000090), 'Schema', 'Schema analysis completed', { ns, }); @@ -81,8 +84,14 @@ export const analyzeSchema = async ( log.error(mongoLogId(1001000091), 'Schema', 'Schema analysis failed', { ns, error: err.message, + aborted: abortSignal.aborted, + ...(abortSignal.aborted + ? { abortReason: abortSignal.reason?.message ?? abortSignal.reason } + : {}), }); - if (dataService.isCancelError(err)) { + + if (abortSignal.aborted) { + // The operation was aborted, so we don't throw an error. debug('caught background operation terminated error', err); return; } diff --git a/packages/compass-schema/src/stores/store.spec.ts b/packages/compass-schema/src/stores/store.spec.ts index 3ccbddc1247..c792cdf2b2f 100644 --- a/packages/compass-schema/src/stores/store.spec.ts +++ b/packages/compass-schema/src/stores/store.spec.ts @@ -30,6 +30,15 @@ const mockQueryBar = { }, }; +function createMockCursor(data: any[]) { + return { + async *[Symbol.asyncIterator]() { + await new Promise((resolve) => setTimeout(resolve, 0)); + yield* data; + }, + }; +} + describe('Schema Store', function () { let store: SchemaStore; let deactivate: () => void; @@ -37,19 +46,16 @@ describe('Schema Store', function () { const localAppRegistry = new AppRegistry(); const globalAppRegistry = new AppRegistry(); const namespace = 'db.coll'; - let sampleStub: Sinon.SinonStub; - let isCancelErrorStub: Sinon.SinonStub; + let sampleCursorStub: Sinon.SinonStub; beforeEach(function () { sandbox = Sinon.createSandbox(); - sampleStub = sandbox.stub(); - isCancelErrorStub = sandbox.stub(); + sampleCursorStub = sandbox.stub(); }); async function createStore(services: Partial = {}) { const dataService = { - sample: sampleStub, - isCancelError: isCancelErrorStub, + sampleCursor: sampleCursorStub, }; const connectionInfoRef = { current: { @@ -109,9 +115,11 @@ describe('Schema Store', function () { it('runs analysis', async function () { const oldResultId = store.getState().schemaAnalysis.resultId; - sampleStub.resolves([{ name: 'Hans' }, { name: 'Greta' }]); + sampleCursorStub.returns( + createMockCursor([{ name: 'Hans' }, { name: 'Greta' }]) + ); await store.dispatch(startAnalysis()); - expect(sampleStub).to.have.been.called; + expect(sampleCursorStub).to.have.been.called; const { analysisState, error, schema, resultId, analysisStartTime } = store.getState().schemaAnalysis; expect(analysisState).to.equal('complete'); @@ -127,9 +135,19 @@ describe('Schema Store', function () { expect(store.getState().schemaAnalysis.analysisState).to.equal( 'analyzing' ); - sampleStub.rejects(new Error('abort')); + sampleCursorStub.returns({ + async *[Symbol.asyncIterator]() { + await new Promise((resolve) => setTimeout(resolve, 10)); + yield { + a: 123, + }; + await new Promise((resolve) => setTimeout(resolve, 100)); + yield { + b: 123, + }; + }, + }); store.dispatch(stopAnalysis()); - isCancelErrorStub.returns(true); await analysisPromise; expect(store.getState().schemaAnalysis.analysisState).to.equal('initial'); }); @@ -137,13 +155,17 @@ describe('Schema Store', function () { describe('schema export', function () { describe('with an analyzed schema', function () { beforeEach(async function () { - sampleStub.resolves([{ name: 'Hans' }, { name: 'Greta' }]); + sampleCursorStub.returns( + createMockCursor([{ name: 'Hans' }, { name: 'Greta' }]) + ); await store.dispatch(startAnalysis()); }); it('runs schema export formatting with the analyzed schema when opened', async function () { - sampleStub.resolves([{ name: 'Hans' }, { name: 'Greta' }]); - expect(sampleStub).to.have.been.called; + sampleCursorStub.returns( + createMockCursor([{ name: 'Hans' }, { name: 'Greta' }]) + ); + expect(sampleCursorStub).to.have.been.called; expect(store.getState().schemaExport.exportStatus).to.equal( 'inprogress' ); @@ -170,9 +192,11 @@ describe('Schema Store', function () { }); it('runs schema export formatting with a new format', async function () { - sampleStub.resolves([{ name: 'Hans' }, { name: 'Greta' }]); + sampleCursorStub.returns( + createMockCursor([{ name: 'Hans' }, { name: 'Greta' }]) + ); await store.dispatch(changeExportSchemaFormat('mongoDBJSON')); - expect(sampleStub).to.have.been.called; + expect(sampleCursorStub).to.have.been.called; const { exportStatus, errorMessage, exportedSchema, filename } = store.getState().schemaExport; expect(exportStatus).to.equal('complete'); @@ -192,9 +216,11 @@ describe('Schema Store', function () { }); it('runs the analysis with fallback read pref secondaryPreferred', async function () { - sampleStub.resolves([{ name: 'Hans' }, { name: 'Greta' }]); + sampleCursorStub.returns( + createMockCursor([{ name: 'Hans' }, { name: 'Greta' }]) + ); await store.dispatch(startAnalysis()); - expect(sampleStub.getCall(0).args[3]) + expect(sampleCursorStub.getCall(0).args[3]) .property('fallbackReadPreference') .to.equal('secondaryPreferred'); }); @@ -218,7 +244,7 @@ describe('Schema Store', function () { it('does not set read preference to secondaryPreferred', async function () { await store.dispatch(startAnalysis()); - expect(sampleStub.getCall(0).args[2]).not.to.have.property( + expect(sampleCursorStub.getCall(0).args[2]).not.to.have.property( 'readPreference' ); }); diff --git a/packages/compass-schema/src/stores/store.ts b/packages/compass-schema/src/stores/store.ts index 09d8054af0a..21d5733ca16 100644 --- a/packages/compass-schema/src/stores/store.ts +++ b/packages/compass-schema/src/stores/store.ts @@ -30,7 +30,8 @@ import { } from './schema-export-reducer'; import type { InternalLayer } from '../modules/geo'; -export type DataService = Pick; +export type RequiredDataServiceProps = 'sampleCursor'; +export type DataService = Pick; export type SchemaPluginServices = { dataService: DataService; connectionInfoRef: ConnectionInfoRef; diff --git a/packages/data-service/src/data-service.spec.ts b/packages/data-service/src/data-service.spec.ts index 893f430e750..bdde5c9cd60 100644 --- a/packages/data-service/src/data-service.spec.ts +++ b/packages/data-service/src/data-service.spec.ts @@ -1,11 +1,10 @@ import assert from 'assert'; -import { ObjectId } from 'bson'; +import { Int32, ObjectId, UUID } from 'bson'; import chai from 'chai'; import chaiAsPromised from 'chai-as-promised'; import type { Sort } from 'mongodb'; -import { Collection, MongoServerError } from 'mongodb'; -import { MongoClient } from 'mongodb'; -import { Int32, UUID } from 'bson'; +import { Collection, MongoClient, MongoServerError } from 'mongodb'; +import type { Document } from 'bson'; import sinon from 'sinon'; import ConnectionStringUrl from 'mongodb-connection-string-url'; import type { DataService } from './data-service'; @@ -957,6 +956,159 @@ describe('DataService', function () { }); }); + describe('#sampleCursor', function () { + it('returns a list of sampled documents', async function () { + const docs = await dataService.sampleCursor(testNamespace).toArray(); + expect(docs.length).to.equal(2); + }); + + it('allows to pass a query', async function () { + const docs = await dataService + .sampleCursor(testNamespace, { + query: { a: 1 }, + }) + .toArray(); + expect(docs.length).to.equal(1); + expect(docs[0]).to.haveOwnProperty('_id'); + expect(docs[0].a).to.equal(1); + }); + + it('allows to pass a projection', async function () { + const docs = await dataService + .sampleCursor(testNamespace, { + fields: { + a: 1, + _id: 0, + }, + }) + .toArray(); + + expect(docs).to.deep.include.members([{ a: 1 }, { a: 2 }]); + }); + + it('allows to set a sample size', async function () { + const docsCursor = dataService.sampleCursor(testNamespace, { + size: 2, + }); + const docs: Document[] = []; + expect(await docsCursor.hasNext()).to.equal(true); + docs.push((await docsCursor.next()) as Document); + expect(await docsCursor.hasNext()).to.equal(true); + docs.push((await docsCursor.next()) as Document); + expect(await docsCursor.hasNext()).to.equal(false); + + expect(docs.length).to.equal(2); + }); + + it('always sets default sample size and allowDiskUse: true', async function () { + sandbox.spy(dataService, 'aggregateCursor'); + const cursor = dataService.sampleCursor('db.coll'); + + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(dataService.aggregateCursor).to.have.been.calledWith( + 'db.coll', + [{ $sample: { size: 1000 } }], + { allowDiskUse: true } + ); + await cursor.close(); + }); + + it('allows to pass down aggregation options to the driver', async function () { + sandbox.spy(dataService, 'aggregateCursor'); + const cursor = dataService.sampleCursor( + 'db.coll', + {}, + { + maxTimeMS: 123, + session: undefined, + bsonRegExp: true, + } + ); + await cursor.close(); + + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(dataService.aggregateCursor).to.have.been.calledWith( + 'db.coll', + [{ $sample: { size: 1000 } }], + { + allowDiskUse: true, + maxTimeMS: 123, + session: undefined, + bsonRegExp: true, + } + ); + }); + + it('allows to override allowDiskUse', async function () { + sandbox.spy(dataService, 'aggregateCursor'); + const cursor = dataService.sampleCursor( + 'db.coll', + {}, + { + allowDiskUse: false, + } + ); + + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(dataService.aggregateCursor).to.have.been.calledWith( + 'db.coll', + [{ $sample: { size: 1000 } }], + { allowDiskUse: false } + ); + await cursor.close(); + }); + + it('allows to pass fallbackReadPreference and sets the read preference when unset', async function () { + sandbox.spy(dataService, 'aggregateCursor'); + const cursor = dataService.sampleCursor( + 'db.coll', + {}, + {}, + { + fallbackReadPreference: 'secondaryPreferred', + } + ); + + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(dataService.aggregateCursor).to.have.been.calledWith( + 'db.coll', + [{ $sample: { size: 1000 } }], + { allowDiskUse: true, readPreference: 'secondaryPreferred' } + ); + await cursor.close(); + }); + + it('allows to pass fallbackReadPreference and does not set the read preference when it is already set', async function () { + sandbox.spy(dataService, 'aggregateCursor'); + const connectionStringReplacement = new ConnectionStringUrl( + cluster().connectionString + ); + connectionStringReplacement.searchParams.set( + 'readPreference', + 'primary' + ); + sandbox.replace(dataService as any, '_connectionOptions', { + connectionString: connectionStringReplacement.toString(), + }); + const cursor = dataService.sampleCursor( + 'db.coll', + {}, + {}, + { + fallbackReadPreference: 'secondaryPreferred', + } + ); + await cursor.close(); + + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(dataService.aggregateCursor).to.have.been.calledWith( + 'db.coll', + [{ $sample: { size: 1000 } }], + { allowDiskUse: true } + ); + }); + }); + describe('#sample', function () { it('returns a list of sampled documents', async function () { const docs = await dataService.sample(testNamespace); diff --git a/packages/data-service/src/data-service.ts b/packages/data-service/src/data-service.ts index f389b72bdbf..2d2b3a12a0e 100644 --- a/packages/data-service/src/data-service.ts +++ b/packages/data-service/src/data-service.ts @@ -5,6 +5,7 @@ import type { import { EventEmitter } from 'events'; import { ExplainVerbosity, ClientEncryption } from 'mongodb'; import type { + Abortable, AggregateOptions, AggregationCursor, AnyBulkWriteOperation, @@ -169,6 +170,12 @@ export type ExplainExecuteOptions = ExecutionOptions & { explainVerbosity?: keyof typeof ExplainVerbosity; }; +export type SampleOptions = { + size?: number; + query?: Filter; + fields?: Document; +}; + export interface DataServiceEventMap { topologyDescriptionChanged: (evt: TopologyDescriptionChangedEvent) => void; serverHeartbeatFailed: (evt: ServerHeartbeatFailedEvent) => void; @@ -550,7 +557,7 @@ export interface DataService { aggregateCursor( ns: string, pipeline: Document[], - options?: AggregateOptions + options?: AggregateOptions & Abortable ): AggregationCursor; explainAggregate( @@ -681,6 +688,22 @@ export interface DataService { executionOptions?: ExecutionOptions ): Promise; + /** + * Returns a cursor to a sample on the collection. + * + * @param ns - The namespace to sample. + * @param args - The sampling options. + * @param options - Driver options (ie. maxTimeMs, session, batchSize ...) + */ + sampleCursor( + ns: string, + args?: SampleOptions, + options?: AggregateOptions & Abortable, + executionOptions?: ExecutionOptions & { + fallbackReadPreference?: ReadPreferenceMode; + } + ): AggregationCursor; + /** * Sample documents from the collection. * @@ -690,7 +713,7 @@ export interface DataService { */ sample( ns: string, - args?: { query?: Filter; size?: number; fields?: Document }, + args?: SampleOptions, options?: AggregateOptions, executionOptions?: ExecutionOptions & { fallbackReadPreference?: ReadPreferenceMode; @@ -1837,7 +1860,7 @@ class DataServiceImpl extends WithLogContext implements DataService { aggregateCursor( ns: string, pipeline: Document[], - options: AggregateOptions = {} + options: AggregateOptions & Abortable = {} ): AggregationCursor { return this._collection(ns, 'CRUD').aggregate(pipeline, options); } @@ -2250,18 +2273,7 @@ class DataServiceImpl extends WithLogContext implements DataService { return await db.createCollection(name, createCollectionOptions); } - sample( - ns: string, - { - query, - size, - fields, - }: { query?: Filter; size?: number; fields?: Document } = {}, - options: AggregateOptions = {}, - executionOptions?: ExecutionOptions & { - fallbackReadPreference?: ReadPreferenceMode; - } - ): Promise { + private _buildSamplingPipeline({ query, size, fields }: SampleOptions) { const pipeline = []; if (query && Object.keys(query).length > 0) { pipeline.push({ @@ -2282,6 +2294,44 @@ class DataServiceImpl extends WithLogContext implements DataService { }); } + return pipeline; + } + + sampleCursor( + ns: string, + samplingOptions: SampleOptions = {}, + options: AggregateOptions & Abortable = {}, + executionOptions?: ExecutionOptions & { + fallbackReadPreference?: ReadPreferenceMode; + } + ): AggregationCursor { + const pipeline = this._buildSamplingPipeline(samplingOptions); + + return this.aggregateCursor(ns, pipeline, { + allowDiskUse: true, + // When the read preference isn't set in the connection string explicitly, + // then we allow consumers to default to a read preference, for instance + // secondaryPreferred to avoid using the primary for analyzing documents. + ...(executionOptions?.fallbackReadPreference && + !isReadPreferenceSet(this._connectionOptions.connectionString) + ? { + readPreference: executionOptions?.fallbackReadPreference, + } + : {}), + ...options, + }); + } + + sample( + ns: string, + samplingOptions: SampleOptions = {}, + options: AggregateOptions = {}, + executionOptions?: ExecutionOptions & { + fallbackReadPreference?: ReadPreferenceMode; + } + ): Promise { + const pipeline = this._buildSamplingPipeline(samplingOptions); + return this.aggregate( ns, pipeline,