Skip to content

refactor(NODE-7084): refactor CUD ops to use ModernizedOperation #4610

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 7 additions & 22 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,12 +402,7 @@ export class Collection<TSchema extends Document = Document> {
): Promise<UpdateResult<TSchema>> {
return await executeOperation(
this.client,
new UpdateOneOperation(
this as TODO_NODE_3286,
filter,
update,
resolveOptions(this, options)
) as TODO_NODE_3286
new UpdateOneOperation(this.s.namespace, filter, update, resolveOptions(this, options))
);
}

Expand All @@ -425,12 +420,7 @@ export class Collection<TSchema extends Document = Document> {
): Promise<UpdateResult<TSchema>> {
return await executeOperation(
this.client,
new ReplaceOneOperation(
this as TODO_NODE_3286,
filter,
replacement,
resolveOptions(this, options)
)
new ReplaceOneOperation(this.s.namespace, filter, replacement, resolveOptions(this, options))
);
}

Expand All @@ -452,12 +442,7 @@ export class Collection<TSchema extends Document = Document> {
): Promise<UpdateResult<TSchema>> {
return await executeOperation(
this.client,
new UpdateManyOperation(
this as TODO_NODE_3286,
filter,
update,
resolveOptions(this, options)
) as TODO_NODE_3286
new UpdateManyOperation(this.s.namespace, filter, update, resolveOptions(this, options))
);
}

Expand All @@ -473,7 +458,7 @@ export class Collection<TSchema extends Document = Document> {
): Promise<DeleteResult> {
return await executeOperation(
this.client,
new DeleteOneOperation(this as TODO_NODE_3286, filter, resolveOptions(this, options))
new DeleteOneOperation(this.s.namespace, filter, resolveOptions(this, options))
);
}

Expand All @@ -489,7 +474,7 @@ export class Collection<TSchema extends Document = Document> {
): Promise<DeleteResult> {
return await executeOperation(
this.client,
new DeleteManyOperation(this as TODO_NODE_3286, filter, resolveOptions(this, options))
new DeleteManyOperation(this.s.namespace, filter, resolveOptions(this, options))
);
}

Expand All @@ -513,7 +498,7 @@ export class Collection<TSchema extends Document = Document> {
...options,
readPreference: ReadPreference.PRIMARY
})
) as TODO_NODE_3286
)
);
}

Expand Down Expand Up @@ -581,7 +566,7 @@ export class Collection<TSchema extends Document = Document> {
this.client,
this.s.namespace,
filter,
resolveOptions(this as TODO_NODE_3286, options)
resolveOptions(this, options)
);
}

Expand Down
90 changes: 23 additions & 67 deletions src/operations/client_bulk_write/client_bulk_write.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import { MongoClientBulkWriteExecutionError, ServerType } from '../../beta';
import { type Connection } from '../../cmap/connection';
import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
import type { Server } from '../../sdam/server';
import type { ClientSession } from '../../sessions';
import { type TimeoutContext } from '../../timeout';
import { MongoDBNamespace } from '../../utils';
import { CommandOperation } from '../command';
import { ModernizedCommandOperation } from '../command';
import { Aspect, defineAspects } from '../operation';
import { type ClientBulkWriteCommandBuilder } from './command_builder';
import { type ClientBulkWriteCommand, type ClientBulkWriteCommandBuilder } from './command_builder';
import { type ClientBulkWriteOptions } from './common';

/**
* Executes a single client bulk write operation within a potential batch.
* @internal
*/
export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCursorResponse> {
export class ClientBulkWriteOperation extends ModernizedCommandOperation<ClientBulkWriteCursorResponse> {
override SERVER_COMMAND_RESPONSE_TYPE = ClientBulkWriteCursorResponse;

commandBuilder: ClientBulkWriteCommandBuilder;
override options: ClientBulkWriteOptions;

Expand All @@ -36,72 +36,28 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
return this.commandBuilder.isBatchRetryable;
}

/**
* Execute the command. Superclass will handle write concern, etc.
* @param server - The server.
* @param session - The session.
* @returns The response.
*/
override async execute(
server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<ClientBulkWriteCursorResponse> {
let command;
override handleOk(
response: InstanceType<typeof this.SERVER_COMMAND_RESPONSE_TYPE>
): ClientBulkWriteCursorResponse {
return response;
}

if (server.description.type === ServerType.LoadBalancer) {
if (session) {
let connection;
if (!session.pinnedConnection) {
// Checkout a connection to build the command.
connection = await server.pool.checkOut({ timeoutContext });
// Pin the connection to the session so it get used to execute the command and we do not
// perform a double check-in/check-out.
session.pin(connection);
} else {
connection = session.pinnedConnection;
}
command = this.commandBuilder.buildBatch(
connection.hello?.maxMessageSizeBytes,
connection.hello?.maxWriteBatchSize,
connection.hello?.maxBsonObjectSize
);
} else {
throw new MongoClientBulkWriteExecutionError(
'Session provided to the client bulk write operation must be present.'
);
}
} else {
// At this point we have a server and the auto connect code has already
// run in executeOperation, so the server description will be populated.
// We can use that to build the command.
if (
!server.description.maxWriteBatchSize ||
!server.description.maxMessageSizeBytes ||
!server.description.maxBsonObjectSize
) {
throw new MongoClientBulkWriteExecutionError(
'In order to execute a client bulk write, both maxWriteBatchSize, maxMessageSizeBytes and maxBsonObjectSize must be provided by the servers hello response.'
);
}
command = this.commandBuilder.buildBatch(
server.description.maxMessageSizeBytes,
server.description.maxWriteBatchSize,
server.description.maxBsonObjectSize
);
}
override buildCommandDocument(
connection: Connection,
_session?: ClientSession
): ClientBulkWriteCommand {
const command = this.commandBuilder.buildBatch(
connection.description.maxMessageSizeBytes,
connection.description.maxWriteBatchSize,
connection.description.maxBsonObjectSize
);

// Check after the batch is built if we cannot retry it and override the option.
// Check _after_ the batch is built if we cannot retry it and override the option.
if (!this.canRetryWrite) {
this.options.willRetryWrite = false;
}
return await super.executeCommand(
server,
session,
command,
timeoutContext,
ClientBulkWriteCursorResponse
);

return command;
}
}

Expand Down
22 changes: 11 additions & 11 deletions src/operations/count.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { type Connection } from '..';
import type { Document } from '../bson';
import { MongoDBResponse } from '../cmap/wire_protocol/responses';
import type { Collection } from '../collection';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import type { MongoDBNamespace } from '../utils';
import { CommandOperation, type CommandOperationOptions } from './command';
import { type CommandOperationOptions, ModernizedCommandOperation } from './command';
import { Aspect, defineAspects } from './operation';

/** @public */
Expand All @@ -22,7 +22,8 @@ export interface CountOptions extends CommandOperationOptions {
}

/** @internal */
export class CountOperation extends CommandOperation<number> {
export class CountOperation extends ModernizedCommandOperation<number> {
override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse;
override options: CountOptions;
collectionName?: string;
query: Document;
Expand All @@ -39,11 +40,7 @@ export class CountOperation extends CommandOperation<number> {
return 'count' as const;
}

override async execute(
server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<number> {
override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document {
const options = this.options;
const cmd: Document = {
count: this.collectionName,
Expand All @@ -66,8 +63,11 @@ export class CountOperation extends CommandOperation<number> {
cmd.maxTimeMS = options.maxTimeMS;
}

const result = await super.executeCommand(server, session, cmd, timeoutContext);
return result ? result.n : 0;
return cmd;
}

override handleOk(response: InstanceType<typeof this.SERVER_COMMAND_RESPONSE_TYPE>): number {
return response.getNumber('n') ?? 0;
}
}

Expand Down
68 changes: 32 additions & 36 deletions src/operations/delete.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import type { Document } from '../bson';
import type { Collection } from '../collection';
import { type Connection } from '../cmap/connection';
import { MongoDBResponse } from '../cmap/wire_protocol/responses';
import { MongoCompatibilityError, MongoServerError } from '../error';
import { type TODO_NODE_3286 } from '../mongo_types';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { type MongoDBNamespace } from '../utils';
import { type MongoDBCollectionNamespace, type MongoDBNamespace } from '../utils';
import { type WriteConcernOptions } from '../write_concern';
import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command';
import {
type CollationOptions,
type CommandOperationOptions,
ModernizedCommandOperation
} from './command';
import { Aspect, defineAspects, type Hint } from './operation';

/** @public */
Expand Down Expand Up @@ -43,7 +45,8 @@ export interface DeleteStatement {
}

/** @internal */
export class DeleteOperation extends CommandOperation<DeleteResult> {
export class DeleteOperation extends ModernizedCommandOperation<Document> {
override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse;
override options: DeleteOptions;
statements: DeleteStatement[];

Expand All @@ -66,12 +69,9 @@ export class DeleteOperation extends CommandOperation<DeleteResult> {
return this.statements.every(op => (op.limit != null ? op.limit > 0 : true));
}

override async execute(
server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<DeleteResult> {
const options = this.options ?? {};
override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document {
const options = this.options;

const ordered = typeof options.ordered === 'boolean' ? options.ordered : true;
const command: Document = {
delete: this.ns.collection,
Expand All @@ -97,28 +97,23 @@ export class DeleteOperation extends CommandOperation<DeleteResult> {
}
}

const res: TODO_NODE_3286 = await super.executeCommand(
server,
session,
command,
timeoutContext
);
return res;
return command;
}
}

export class DeleteOneOperation extends DeleteOperation {
constructor(collection: Collection, filter: Document, options: DeleteOptions) {
super(collection.s.namespace, [makeDeleteStatement(filter, { ...options, limit: 1 })], options);
constructor(ns: MongoDBCollectionNamespace, filter: Document, options: DeleteOptions) {
super(ns, [makeDeleteStatement(filter, { ...options, limit: 1 })], options);
}

override async execute(
server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<DeleteResult> {
const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext);
override handleOk(
response: InstanceType<typeof this.SERVER_COMMAND_RESPONSE_TYPE>
): DeleteResult {
const res = super.handleOk(response);

// @ts-expect-error Explain commands have broken TS
if (this.explain) return res;

if (res.code) throw new MongoServerError(res);
if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]);

Expand All @@ -129,17 +124,18 @@ export class DeleteOneOperation extends DeleteOperation {
}
}
export class DeleteManyOperation extends DeleteOperation {
constructor(collection: Collection, filter: Document, options: DeleteOptions) {
super(collection.s.namespace, [makeDeleteStatement(filter, options)], options);
constructor(ns: MongoDBCollectionNamespace, filter: Document, options: DeleteOptions) {
super(ns, [makeDeleteStatement(filter, options)], options);
}

override async execute(
server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<DeleteResult> {
const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext);
override handleOk(
response: InstanceType<typeof this.SERVER_COMMAND_RESPONSE_TYPE>
): DeleteResult {
const res = super.handleOk(response);

// @ts-expect-error Explain commands have broken TS
if (this.explain) return res;

if (res.code) throw new MongoServerError(res);
if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]);

Expand Down
Loading