diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index ace5e905b58..2e39bf40749 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -92,7 +92,6 @@ export interface CommandOptions extends BSONSerializeOptions { session?: ClientSession; documentsReturnedIn?: string; noResponse?: boolean; - omitReadPreference?: boolean; omitMaxTimeMS?: boolean; // TODO(NODE-2802): Currently the CommandOptions take a property willRetryWrite which is a hint diff --git a/src/cursor/aggregation_cursor.ts b/src/cursor/aggregation_cursor.ts index 8b9f3259444..07201e1c304 100644 --- a/src/cursor/aggregation_cursor.ts +++ b/src/cursor/aggregation_cursor.ts @@ -2,7 +2,6 @@ import type { Document } from '../bson'; import { MongoAPIError } from '../error'; import { Explain, - ExplainableCursor, type ExplainCommandOptions, type ExplainVerbosityLike, validateExplainTimeoutOptions @@ -19,6 +18,7 @@ import { CursorTimeoutMode, type InitialCursorResponse } from './abstract_cursor'; +import { ExplainableCursor } from './explainable_cursor'; /** @public */ export interface AggregationCursorOptions extends AbstractCursorOptions, AggregateOptions {} diff --git a/src/cursor/explainable_cursor.ts b/src/cursor/explainable_cursor.ts new file mode 100644 index 00000000000..6becb0bb373 --- /dev/null +++ b/src/cursor/explainable_cursor.ts @@ -0,0 +1,51 @@ +import { type Document } from '../bson'; +import { type ExplainCommandOptions, type ExplainVerbosityLike } from '../explain'; +import { AbstractCursor } from './abstract_cursor'; + +/** + * @public + * + * A base class for any cursors that have `explain()` methods. + */ +export abstract class ExplainableCursor extends AbstractCursor { + /** Execute the explain for the cursor */ + abstract explain(): Promise; + abstract explain(verbosity: ExplainVerbosityLike | ExplainCommandOptions): Promise; + abstract explain(options: { timeoutMS?: number }): Promise; + abstract explain( + verbosity: ExplainVerbosityLike | ExplainCommandOptions, + options: { timeoutMS?: number } + ): Promise; + abstract explain( + verbosity?: ExplainVerbosityLike | ExplainCommandOptions | { timeoutMS?: number }, + options?: { timeoutMS?: number } + ): Promise; + + protected resolveExplainTimeoutOptions( + verbosity?: ExplainVerbosityLike | ExplainCommandOptions | { timeoutMS?: number }, + options?: { timeoutMS?: number } + ): { timeout?: { timeoutMS?: number }; explain?: ExplainVerbosityLike | ExplainCommandOptions } { + let explain: ExplainVerbosityLike | ExplainCommandOptions | undefined; + let timeout: { timeoutMS?: number } | undefined; + + if (verbosity == null && options == null) { + explain = undefined; + timeout = undefined; + } else if (verbosity != null && options == null) { + explain = + typeof verbosity !== 'object' + ? verbosity + : 'verbosity' in verbosity + ? verbosity + : undefined; + + timeout = typeof verbosity === 'object' && 'timeoutMS' in verbosity ? verbosity : undefined; + } else { + // @ts-expect-error TS isn't smart enough to determine that if both options are provided, the first is explain options + explain = verbosity; + timeout = options; + } + + return { timeout, explain }; + } +} diff --git a/src/cursor/find_cursor.ts b/src/cursor/find_cursor.ts index 6170ddc6fa7..83082f00243 100644 --- a/src/cursor/find_cursor.ts +++ b/src/cursor/find_cursor.ts @@ -3,7 +3,6 @@ import { CursorResponse } from '../cmap/wire_protocol/responses'; import { MongoAPIError, MongoInvalidArgumentError, MongoTailableCursorError } from '../error'; import { Explain, - ExplainableCursor, type ExplainCommandOptions, type ExplainVerbosityLike, validateExplainTimeoutOptions @@ -19,6 +18,7 @@ import type { ClientSession } from '../sessions'; import { formatSort, type Sort, type SortDirection } from '../sort'; import { emitWarningOnce, mergeOptions, type MongoDBNamespace, squashError } from '../utils'; import { type InitialCursorResponse } from './abstract_cursor'; +import { ExplainableCursor } from './explainable_cursor'; /** @public Flags allowed for cursor */ export const FLAGS = [ diff --git a/src/explain.ts b/src/explain.ts index 670bea53041..6100e1ae11a 100644 --- a/src/explain.ts +++ b/src/explain.ts @@ -1,5 +1,4 @@ import { type Document } from './bson'; -import { AbstractCursor } from './cursor/abstract_cursor'; import { MongoAPIError } from './error'; /** @public */ @@ -123,51 +122,3 @@ export function decorateWithExplain( return baseCommand; } - -/** - * @public - * - * A base class for any cursors that have `explain()` methods. - */ -export abstract class ExplainableCursor extends AbstractCursor { - /** Execute the explain for the cursor */ - abstract explain(): Promise; - abstract explain(verbosity: ExplainVerbosityLike | ExplainCommandOptions): Promise; - abstract explain(options: { timeoutMS?: number }): Promise; - abstract explain( - verbosity: ExplainVerbosityLike | ExplainCommandOptions, - options: { timeoutMS?: number } - ): Promise; - abstract explain( - verbosity?: ExplainVerbosityLike | ExplainCommandOptions | { timeoutMS?: number }, - options?: { timeoutMS?: number } - ): Promise; - - protected resolveExplainTimeoutOptions( - verbosity?: ExplainVerbosityLike | ExplainCommandOptions | { timeoutMS?: number }, - options?: { timeoutMS?: number } - ): { timeout?: { timeoutMS?: number }; explain?: ExplainVerbosityLike | ExplainCommandOptions } { - let explain: ExplainVerbosityLike | ExplainCommandOptions | undefined; - let timeout: { timeoutMS?: number } | undefined; - - if (verbosity == null && options == null) { - explain = undefined; - timeout = undefined; - } else if (verbosity != null && options == null) { - explain = - typeof verbosity !== 'object' - ? verbosity - : 'verbosity' in verbosity - ? verbosity - : undefined; - - timeout = typeof verbosity === 'object' && 'timeoutMS' in verbosity ? verbosity : undefined; - } else { - // @ts-expect-error TS isn't smart enough to determine that if both options are provided, the first is explain options - explain = verbosity; - timeout = options; - } - - return { timeout, explain }; - } -} diff --git a/src/index.ts b/src/index.ts index 9f4574c251b..33945827aea 100644 --- a/src/index.ts +++ b/src/index.ts @@ -10,7 +10,6 @@ import { ListCollectionsCursor } from './cursor/list_collections_cursor'; import { ListIndexesCursor } from './cursor/list_indexes_cursor'; import type { RunCommandCursor } from './cursor/run_command_cursor'; import { Db } from './db'; -import { ExplainableCursor } from './explain'; import { GridFSBucket } from './gridfs'; import { GridFSBucketReadStream } from './gridfs/download'; import { GridFSBucketWriteStream } from './gridfs/upload'; @@ -44,6 +43,7 @@ export { } from './bulk/common'; export { ClientEncryption } from './client-side-encryption/client_encryption'; export { ChangeStreamCursor } from './cursor/change_stream_cursor'; +export { ExplainableCursor } from './cursor/explainable_cursor'; export { MongoAPIError, MongoAWSError, @@ -98,7 +98,6 @@ export { ClientSession, Collection, Db, - ExplainableCursor, FindCursor, GridFSBucket, GridFSBucketReadStream, @@ -511,6 +510,7 @@ export type { CollationOptions, CommandOperation, CommandOperationOptions, + ModernizedCommandOperation, OperationParent } from './operations/command'; export type { CountOptions } from './operations/count'; diff --git a/src/operations/aggregate.ts b/src/operations/aggregate.ts index a11365a9e89..ce78641deca 100644 --- a/src/operations/aggregate.ts +++ b/src/operations/aggregate.ts @@ -1,14 +1,16 @@ +import { type Connection } from '..'; import type { Document } from '../bson'; import { CursorResponse, ExplainedCursorResponse } from '../cmap/wire_protocol/responses'; import { type CursorTimeoutMode } from '../cursor/abstract_cursor'; import { MongoInvalidArgumentError } from '../error'; import { type ExplainOptions } from '../explain'; -import type { Server } from '../sdam/server'; -import type { ClientSession } from '../sessions'; -import { type TimeoutContext } from '../timeout'; import { maxWireVersion, type MongoDBNamespace } from '../utils'; import { WriteConcern } from '../write_concern'; -import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command'; +import { + type CollationOptions, + type CommandOperationOptions, + ModernizedCommandOperation +} from './command'; import { Aspect, defineAspects, type Hint } from './operation'; /** @internal */ @@ -51,7 +53,8 @@ export interface AggregateOptions extends Omit { +export class AggregateOperation extends ModernizedCommandOperation { + override SERVER_COMMAND_RESPONSE_TYPE = CursorResponse; override options: AggregateOptions; target: string | typeof DB_AGGREGATE_COLLECTION; pipeline: Document[]; @@ -79,9 +82,7 @@ export class AggregateOperation extends CommandOperation { } } - if (this.hasWriteStage) { - this.trySecondaryWrite = true; - } else { + if (!this.hasWriteStage) { delete this.options.writeConcern; } @@ -94,6 +95,8 @@ export class AggregateOperation extends CommandOperation { if (options?.cursor != null && typeof options.cursor !== 'object') { throw new MongoInvalidArgumentError('Cursor options must be an object'); } + + this.SERVER_COMMAND_RESPONSE_TYPE = this.explain ? ExplainedCursorResponse : CursorResponse; } override get commandName() { @@ -108,13 +111,9 @@ export class AggregateOperation extends CommandOperation { this.pipeline.push(stage); } - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { - const options: AggregateOptions = this.options; - const serverWireVersion = maxWireVersion(server); + override buildCommandDocument(connection: Connection): Document { + const options = this.options; + const serverWireVersion = maxWireVersion(connection); const command: Document = { aggregate: this.target, pipeline: this.pipeline }; if (this.hasWriteStage && serverWireVersion < MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT) { @@ -152,13 +151,13 @@ export class AggregateOperation extends CommandOperation { command.cursor.batchSize = options.batchSize; } - return await super.executeCommand( - server, - session, - command, - timeoutContext, - this.explain ? ExplainedCursorResponse : CursorResponse - ); + return command; + } + + override handleOk( + response: InstanceType + ): CursorResponse { + return response; } } diff --git a/src/operations/command.ts b/src/operations/command.ts index 36216fbd980..223a5a57fc1 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -11,10 +11,9 @@ import { import { ReadConcern } from '../read_concern'; import type { ReadPreference } from '../read_preference'; import type { Server, ServerCommandOptions } from '../sdam/server'; -import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection'; import type { ClientSession } from '../sessions'; import { type TimeoutContext } from '../timeout'; -import { commandSupportsReadConcern, maxWireVersion, MongoDBNamespace } from '../utils'; +import { commandSupportsReadConcern, MongoDBNamespace } from '../utils'; import { WriteConcern, type WriteConcernOptions } from '../write_concern'; import type { ReadConcernLike } from './../read_concern'; import { AbstractOperation, Aspect, ModernizedOperation, type OperationOptions } from './operation'; @@ -150,17 +149,12 @@ export abstract class CommandOperation extends AbstractOperation { session }; - const serverWireVersion = maxWireVersion(server); const inTransaction = this.session && this.session.inTransaction(); if (this.readConcern && commandSupportsReadConcern(cmd) && !inTransaction) { Object.assign(cmd, { readConcern: this.readConcern }); } - if (this.trySecondaryWrite && serverWireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION) { - options.omitReadPreference = true; - } - if (this.writeConcern && this.hasAspect(Aspect.WRITE_OPERATION) && !inTransaction) { WriteConcern.apply(cmd, this.writeConcern); } @@ -241,17 +235,12 @@ export abstract class ModernizedCommandOperation extends ModernizedOperation< override buildCommand(connection: Connection, session?: ClientSession): Document { const command = this.buildCommandDocument(connection, session); - const serverWireVersion = maxWireVersion(connection); const inTransaction = this.session && this.session.inTransaction(); if (this.readConcern && commandSupportsReadConcern(command) && !inTransaction) { Object.assign(command, { readConcern: this.readConcern }); } - if (this.trySecondaryWrite && serverWireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION) { - command.omitReadPreference = true; - } - if (this.writeConcern && this.hasAspect(Aspect.WRITE_OPERATION) && !inTransaction) { WriteConcern.apply(command, this.writeConcern); } diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 5306005a0d9..3a5ef1fe6cc 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -26,6 +26,7 @@ import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; import { TimeoutContext } from '../timeout'; import { abortable, supportsRetryableWrites } from '../utils'; +import { AggregateOperation } from './aggregate'; import { AbstractOperation, Aspect, ModernizedOperation } from './operation'; const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation; @@ -192,7 +193,7 @@ async function tryOperation< // server selection to potentially force monitor checks if the server is // in an unknown state. selector = sameServerSelector(operation.server?.description); - } else if (operation.trySecondaryWrite) { + } else if (operation instanceof AggregateOperation && operation.hasWriteStage) { // If operation should try to write to secondary use the custom server selector // otherwise provide the read preference. selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference); diff --git a/src/operations/operation.ts b/src/operations/operation.ts index 4171c689c1f..0804ec6486f 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -33,7 +33,6 @@ export interface OperationOptions extends BSONSerializeOptions { /** @internal Hints to `executeOperation` that this operation should not unpin on an ended transaction */ bypassPinningCheck?: boolean; - omitReadPreference?: boolean; /** @internal Hint to `executeOperation` to omit maxTimeMS */ omitMaxTimeMS?: boolean; @@ -57,7 +56,6 @@ export abstract class AbstractOperation { readPreference: ReadPreference; server!: Server; bypassPinningCheck: boolean; - trySecondaryWrite: boolean; // BSON serialization options bsonOptions?: BSONSerializeOptions; @@ -83,7 +81,6 @@ export abstract class AbstractOperation { this.options = options; this.bypassPinningCheck = !!options.bypassPinningCheck; - this.trySecondaryWrite = false; } /** Must match the first key of the command object sent to the server. diff --git a/src/sdam/server.ts b/src/sdam/server.ts index bfaa9ac93d3..1e3c483014c 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -37,6 +37,7 @@ import { } from '../error'; import type { ServerApi } from '../mongo_client'; import { type Abortable, TypedEventEmitter } from '../mongo_types'; +import { AggregateOperation } from '../operations/aggregate'; import type { GetMoreOptions } from '../operations/get_more'; import { type ModernizedOperation } from '../operations/operation'; import type { ClientSession } from '../sessions'; @@ -68,6 +69,7 @@ import type { } from './events'; import { Monitor, type MonitorOptions } from './monitor'; import { compareTopologyVersion, ServerDescription } from './server_description'; +import { MIN_SECONDARY_WRITE_WIRE_VERSION } from './server_selection'; import type { Topology } from './topology'; const stateTransition = makeStateMachine({ @@ -310,11 +312,11 @@ export class Server extends TypedEventEmitter { options.directConnection = this.topology.s.options.directConnection; - // There are cases where we need to flag the read preference not to get sent in - // the command, such as pre-5.0 servers attempting to perform an aggregate write - // with a non-primary read preference. In this case the effective read preference - // (primary) is not the same as the provided and must be removed completely. - if (options.omitReadPreference) { + const omitReadPreference = + operation instanceof AggregateOperation && + operation.hasWriteStage && + maxWireVersion(conn) < MIN_SECONDARY_WRITE_WIRE_VERSION; + if (omitReadPreference) { delete options.readPreference; } @@ -401,14 +403,6 @@ export class Server extends TypedEventEmitter { options.directConnection = this.topology.s.options.directConnection; - // There are cases where we need to flag the read preference not to get sent in - // the command, such as pre-5.0 servers attempting to perform an aggregate write - // with a non-primary read preference. In this case the effective read preference - // (primary) is not the same as the provided and must be removed completely. - if (options.omitReadPreference) { - delete options.readPreference; - } - if (this.description.iscryptd) { options.omitMaxTimeMS = true; } diff --git a/test/unit/operations/aggregate.test.js b/test/unit/operations/aggregate.test.js index 118140d95f9..363572e8362 100644 --- a/test/unit/operations/aggregate.test.js +++ b/test/unit/operations/aggregate.test.js @@ -10,16 +10,16 @@ describe('AggregateOperation', function () { context('when out is in the options', function () { const operation = new AggregateOperation(db, [], { out: 'test', dbName: db }); - it('sets trySecondaryWrite to true', function () { - expect(operation.trySecondaryWrite).to.be.true; + it('sets hasWriteStage to true', function () { + expect(operation.hasWriteStage).to.be.true; }); }); context('when $out is the last stage', function () { const operation = new AggregateOperation(db, [{ $out: 'test' }], { dbName: db }); - it('sets trySecondaryWrite to true', function () { - expect(operation.trySecondaryWrite).to.be.true; + it('sets hasWriteStage to true', function () { + expect(operation.hasWriteStage).to.be.true; }); }); @@ -28,16 +28,16 @@ describe('AggregateOperation', function () { dbName: db }); - it('sets trySecondaryWrite to false', function () { - expect(operation.trySecondaryWrite).to.be.false; + it('sets hasWriteStage to false', function () { + expect(operation.hasWriteStage).to.be.false; }); }); context('when $merge is the last stage', function () { const operation = new AggregateOperation(db, [{ $merge: { into: 'test' } }], { dbName: db }); - it('sets trySecondaryWrite to true', function () { - expect(operation.trySecondaryWrite).to.be.true; + it('sets hasWriteStage to true', function () { + expect(operation.hasWriteStage).to.be.true; }); }); @@ -48,24 +48,24 @@ describe('AggregateOperation', function () { { dbName: db } ); - it('sets trySecondaryWrite to false', function () { - expect(operation.trySecondaryWrite).to.be.false; + it('sets hasWriteStage to false', function () { + expect(operation.hasWriteStage).to.be.false; }); }); context('when no writable stages in empty pipeline', function () { const operation = new AggregateOperation(db, [], { dbName: db }); - it('sets trySecondaryWrite to false', function () { - expect(operation.trySecondaryWrite).to.be.false; + it('sets hasWriteStage to false', function () { + expect(operation.hasWriteStage).to.be.false; }); }); context('when no writable stages', function () { const operation = new AggregateOperation(db, [{ $project: { name: 1 } }], { dbName: db }); - it('sets trySecondaryWrite to false', function () { - expect(operation.trySecondaryWrite).to.be.false; + it('sets hasWriteStage to false', function () { + expect(operation.hasWriteStage).to.be.false; }); }); });