Skip to content

refactor(NODE-7089): make AggregateOperation subclass ModernizedOperation #4608

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ export interface CommandOptions extends BSONSerializeOptions {
session?: ClientSession;
documentsReturnedIn?: string;
noResponse?: boolean;
omitReadPreference?: boolean;
omitMaxTimeMS?: boolean;

// TODO(NODE-2802): Currently the CommandOptions take a property willRetryWrite which is a hint
Expand Down
2 changes: 1 addition & 1 deletion src/cursor/aggregation_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import type { Document } from '../bson';
import { MongoAPIError } from '../error';
import {
Explain,
ExplainableCursor,
type ExplainCommandOptions,
type ExplainVerbosityLike,
validateExplainTimeoutOptions
Expand All @@ -19,6 +18,7 @@ import {
CursorTimeoutMode,
type InitialCursorResponse
} from './abstract_cursor';
import { ExplainableCursor } from './explainable_cursor';

/** @public */
export interface AggregationCursorOptions extends AbstractCursorOptions, AggregateOptions {}
Expand Down
51 changes: 51 additions & 0 deletions src/cursor/explainable_cursor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { type Document } from '../bson';
import { type ExplainCommandOptions, type ExplainVerbosityLike } from '../explain';
import { AbstractCursor } from './abstract_cursor';

/**
* @public
*
* A base class for any cursors that have `explain()` methods.
*/
export abstract class ExplainableCursor<TSchema> extends AbstractCursor<TSchema> {
/** Execute the explain for the cursor */
abstract explain(): Promise<Document>;
abstract explain(verbosity: ExplainVerbosityLike | ExplainCommandOptions): Promise<Document>;
abstract explain(options: { timeoutMS?: number }): Promise<Document>;
abstract explain(
verbosity: ExplainVerbosityLike | ExplainCommandOptions,
options: { timeoutMS?: number }
): Promise<Document>;
abstract explain(
verbosity?: ExplainVerbosityLike | ExplainCommandOptions | { timeoutMS?: number },
options?: { timeoutMS?: number }
): Promise<Document>;

protected resolveExplainTimeoutOptions(
verbosity?: ExplainVerbosityLike | ExplainCommandOptions | { timeoutMS?: number },
options?: { timeoutMS?: number }
): { timeout?: { timeoutMS?: number }; explain?: ExplainVerbosityLike | ExplainCommandOptions } {
let explain: ExplainVerbosityLike | ExplainCommandOptions | undefined;
let timeout: { timeoutMS?: number } | undefined;

if (verbosity == null && options == null) {
explain = undefined;
timeout = undefined;
} else if (verbosity != null && options == null) {
explain =
typeof verbosity !== 'object'
? verbosity
: 'verbosity' in verbosity
? verbosity
: undefined;

timeout = typeof verbosity === 'object' && 'timeoutMS' in verbosity ? verbosity : undefined;
} else {
// @ts-expect-error TS isn't smart enough to determine that if both options are provided, the first is explain options
explain = verbosity;
timeout = options;
}

return { timeout, explain };
}
}
2 changes: 1 addition & 1 deletion src/cursor/find_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { CursorResponse } from '../cmap/wire_protocol/responses';
import { MongoAPIError, MongoInvalidArgumentError, MongoTailableCursorError } from '../error';
import {
Explain,
ExplainableCursor,
type ExplainCommandOptions,
type ExplainVerbosityLike,
validateExplainTimeoutOptions
Expand All @@ -19,6 +18,7 @@ import type { ClientSession } from '../sessions';
import { formatSort, type Sort, type SortDirection } from '../sort';
import { emitWarningOnce, mergeOptions, type MongoDBNamespace, squashError } from '../utils';
import { type InitialCursorResponse } from './abstract_cursor';
import { ExplainableCursor } from './explainable_cursor';

/** @public Flags allowed for cursor */
export const FLAGS = [
Expand Down
49 changes: 0 additions & 49 deletions src/explain.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { type Document } from './bson';
import { AbstractCursor } from './cursor/abstract_cursor';
import { MongoAPIError } from './error';

/** @public */
Expand Down Expand Up @@ -123,51 +122,3 @@ export function decorateWithExplain(

return baseCommand;
}

/**
* @public
*
* A base class for any cursors that have `explain()` methods.
*/
export abstract class ExplainableCursor<TSchema> extends AbstractCursor<TSchema> {
/** Execute the explain for the cursor */
abstract explain(): Promise<Document>;
abstract explain(verbosity: ExplainVerbosityLike | ExplainCommandOptions): Promise<Document>;
abstract explain(options: { timeoutMS?: number }): Promise<Document>;
abstract explain(
verbosity: ExplainVerbosityLike | ExplainCommandOptions,
options: { timeoutMS?: number }
): Promise<Document>;
abstract explain(
verbosity?: ExplainVerbosityLike | ExplainCommandOptions | { timeoutMS?: number },
options?: { timeoutMS?: number }
): Promise<Document>;

protected resolveExplainTimeoutOptions(
verbosity?: ExplainVerbosityLike | ExplainCommandOptions | { timeoutMS?: number },
options?: { timeoutMS?: number }
): { timeout?: { timeoutMS?: number }; explain?: ExplainVerbosityLike | ExplainCommandOptions } {
let explain: ExplainVerbosityLike | ExplainCommandOptions | undefined;
let timeout: { timeoutMS?: number } | undefined;

if (verbosity == null && options == null) {
explain = undefined;
timeout = undefined;
} else if (verbosity != null && options == null) {
explain =
typeof verbosity !== 'object'
? verbosity
: 'verbosity' in verbosity
? verbosity
: undefined;

timeout = typeof verbosity === 'object' && 'timeoutMS' in verbosity ? verbosity : undefined;
} else {
// @ts-expect-error TS isn't smart enough to determine that if both options are provided, the first is explain options
explain = verbosity;
timeout = options;
}

return { timeout, explain };
}
}
4 changes: 2 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { ListCollectionsCursor } from './cursor/list_collections_cursor';
import { ListIndexesCursor } from './cursor/list_indexes_cursor';
import type { RunCommandCursor } from './cursor/run_command_cursor';
import { Db } from './db';
import { ExplainableCursor } from './explain';
import { GridFSBucket } from './gridfs';
import { GridFSBucketReadStream } from './gridfs/download';
import { GridFSBucketWriteStream } from './gridfs/upload';
Expand Down Expand Up @@ -44,6 +43,7 @@ export {
} from './bulk/common';
export { ClientEncryption } from './client-side-encryption/client_encryption';
export { ChangeStreamCursor } from './cursor/change_stream_cursor';
export { ExplainableCursor } from './cursor/explainable_cursor';
export {
MongoAPIError,
MongoAWSError,
Expand Down Expand Up @@ -98,7 +98,6 @@ export {
ClientSession,
Collection,
Db,
ExplainableCursor,
FindCursor,
GridFSBucket,
GridFSBucketReadStream,
Expand Down Expand Up @@ -511,6 +510,7 @@ export type {
CollationOptions,
CommandOperation,
CommandOperationOptions,
ModernizedCommandOperation,
OperationParent
} from './operations/command';
export type { CountOptions } from './operations/count';
Expand Down
43 changes: 21 additions & 22 deletions src/operations/aggregate.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import { type Connection } from '..';
import type { Document } from '../bson';
import { CursorResponse, ExplainedCursorResponse } from '../cmap/wire_protocol/responses';
import { type CursorTimeoutMode } from '../cursor/abstract_cursor';
import { MongoInvalidArgumentError } from '../error';
import { type ExplainOptions } from '../explain';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { maxWireVersion, type MongoDBNamespace } from '../utils';
import { WriteConcern } from '../write_concern';
import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command';
import {
type CollationOptions,
type CommandOperationOptions,
ModernizedCommandOperation
} from './command';
import { Aspect, defineAspects, type Hint } from './operation';

/** @internal */
Expand Down Expand Up @@ -51,7 +53,8 @@ export interface AggregateOptions extends Omit<CommandOperationOptions, 'explain
}

/** @internal */
export class AggregateOperation extends CommandOperation<CursorResponse> {
export class AggregateOperation extends ModernizedCommandOperation<CursorResponse> {
override SERVER_COMMAND_RESPONSE_TYPE = CursorResponse;
override options: AggregateOptions;
target: string | typeof DB_AGGREGATE_COLLECTION;
pipeline: Document[];
Expand Down Expand Up @@ -79,9 +82,7 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {
}
}

if (this.hasWriteStage) {
this.trySecondaryWrite = true;
} else {
if (!this.hasWriteStage) {
delete this.options.writeConcern;
}

Expand All @@ -94,6 +95,8 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {
if (options?.cursor != null && typeof options.cursor !== 'object') {
throw new MongoInvalidArgumentError('Cursor options must be an object');
}

this.SERVER_COMMAND_RESPONSE_TYPE = this.explain ? ExplainedCursorResponse : CursorResponse;
}

override get commandName() {
Expand All @@ -108,13 +111,9 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {
this.pipeline.push(stage);
}

override async execute(
server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<CursorResponse> {
const options: AggregateOptions = this.options;
const serverWireVersion = maxWireVersion(server);
override buildCommandDocument(connection: Connection): Document {
const options = this.options;
const serverWireVersion = maxWireVersion(connection);
const command: Document = { aggregate: this.target, pipeline: this.pipeline };

if (this.hasWriteStage && serverWireVersion < MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT) {
Expand Down Expand Up @@ -152,13 +151,13 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {
command.cursor.batchSize = options.batchSize;
}

return await super.executeCommand(
server,
session,
command,
timeoutContext,
this.explain ? ExplainedCursorResponse : CursorResponse
);
return command;
}

override handleOk(
response: InstanceType<typeof this.SERVER_COMMAND_RESPONSE_TYPE>
): CursorResponse {
return response;
}
}

Expand Down
13 changes: 1 addition & 12 deletions src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ import {
import { ReadConcern } from '../read_concern';
import type { ReadPreference } from '../read_preference';
import type { Server, ServerCommandOptions } from '../sdam/server';
import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { commandSupportsReadConcern, maxWireVersion, MongoDBNamespace } from '../utils';
import { commandSupportsReadConcern, MongoDBNamespace } from '../utils';
import { WriteConcern, type WriteConcernOptions } from '../write_concern';
import type { ReadConcernLike } from './../read_concern';
import { AbstractOperation, Aspect, ModernizedOperation, type OperationOptions } from './operation';
Expand Down Expand Up @@ -150,17 +149,12 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
session
};

const serverWireVersion = maxWireVersion(server);
const inTransaction = this.session && this.session.inTransaction();

if (this.readConcern && commandSupportsReadConcern(cmd) && !inTransaction) {
Object.assign(cmd, { readConcern: this.readConcern });
}

if (this.trySecondaryWrite && serverWireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION) {
options.omitReadPreference = true;
}

if (this.writeConcern && this.hasAspect(Aspect.WRITE_OPERATION) && !inTransaction) {
WriteConcern.apply(cmd, this.writeConcern);
}
Expand Down Expand Up @@ -241,17 +235,12 @@ export abstract class ModernizedCommandOperation<T> extends ModernizedOperation<
override buildCommand(connection: Connection, session?: ClientSession): Document {
const command = this.buildCommandDocument(connection, session);

const serverWireVersion = maxWireVersion(connection);
const inTransaction = this.session && this.session.inTransaction();

if (this.readConcern && commandSupportsReadConcern(command) && !inTransaction) {
Object.assign(command, { readConcern: this.readConcern });
}

if (this.trySecondaryWrite && serverWireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION) {
command.omitReadPreference = true;
}

if (this.writeConcern && this.hasAspect(Aspect.WRITE_OPERATION) && !inTransaction) {
WriteConcern.apply(command, this.writeConcern);
}
Expand Down
3 changes: 2 additions & 1 deletion src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { TimeoutContext } from '../timeout';
import { abortable, supportsRetryableWrites } from '../utils';
import { AggregateOperation } from './aggregate';
import { AbstractOperation, Aspect, ModernizedOperation } from './operation';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
Expand Down Expand Up @@ -192,7 +193,7 @@ async function tryOperation<
// server selection to potentially force monitor checks if the server is
// in an unknown state.
selector = sameServerSelector(operation.server?.description);
} else if (operation.trySecondaryWrite) {
} else if (operation instanceof AggregateOperation && operation.hasWriteStage) {
// If operation should try to write to secondary use the custom server selector
// otherwise provide the read preference.
selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference);
Expand Down
3 changes: 0 additions & 3 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ export interface OperationOptions extends BSONSerializeOptions {

/** @internal Hints to `executeOperation` that this operation should not unpin on an ended transaction */
bypassPinningCheck?: boolean;
omitReadPreference?: boolean;

/** @internal Hint to `executeOperation` to omit maxTimeMS */
omitMaxTimeMS?: boolean;
Expand All @@ -57,7 +56,6 @@ export abstract class AbstractOperation<TResult = any> {
readPreference: ReadPreference;
server!: Server;
bypassPinningCheck: boolean;
trySecondaryWrite: boolean;

// BSON serialization options
bsonOptions?: BSONSerializeOptions;
Expand All @@ -83,7 +81,6 @@ export abstract class AbstractOperation<TResult = any> {

this.options = options;
this.bypassPinningCheck = !!options.bypassPinningCheck;
this.trySecondaryWrite = false;
}

/** Must match the first key of the command object sent to the server.
Expand Down
Loading