From 0b9e3d1e49e989c5f99ab3aebba2d7b2d92b6ae8 Mon Sep 17 00:00:00 2001 From: bailey Date: Mon, 4 Aug 2025 14:05:54 -0600 Subject: [PATCH 01/10] count --- src/operations/count.ts | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/operations/count.ts b/src/operations/count.ts index e3f9800d0e5..1bf0396dab8 100644 --- a/src/operations/count.ts +++ b/src/operations/count.ts @@ -1,10 +1,10 @@ +import { type Connection } from '..'; import type { Document } from '../bson'; +import { MongoDBResponse } from '../cmap/wire_protocol/responses'; import type { Collection } from '../collection'; -import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; -import { type TimeoutContext } from '../timeout'; import type { MongoDBNamespace } from '../utils'; -import { CommandOperation, type CommandOperationOptions } from './command'; +import { type CommandOperationOptions, ModernizedCommandOperation } from './command'; import { Aspect, defineAspects } from './operation'; /** @public */ @@ -22,7 +22,8 @@ export interface CountOptions extends CommandOperationOptions { } /** @internal */ -export class CountOperation extends CommandOperation { +export class CountOperation extends ModernizedCommandOperation { + override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse; override options: CountOptions; collectionName?: string; query: Document; @@ -39,11 +40,7 @@ export class CountOperation extends CommandOperation { return 'count' as const; } - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { + override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document { const options = this.options; const cmd: Document = { count: this.collectionName, @@ -66,8 +63,11 @@ export class CountOperation extends CommandOperation { cmd.maxTimeMS = options.maxTimeMS; } - const result = await super.executeCommand(server, session, cmd, timeoutContext); - return result ? result.n : 0; + return cmd; + } + + override handleOk(response: InstanceType): number { + return response.getNumber('n') ?? 0; } } From 21347e0bc989e553220a048df6b99ce581088dad Mon Sep 17 00:00:00 2001 From: bailey Date: Mon, 4 Aug 2025 14:09:12 -0600 Subject: [PATCH 02/10] estimateddocumentcount --- src/operations/estimated_document_count.ts | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/operations/estimated_document_count.ts b/src/operations/estimated_document_count.ts index 5ab5aa4c305..47e6d39c162 100644 --- a/src/operations/estimated_document_count.ts +++ b/src/operations/estimated_document_count.ts @@ -1,9 +1,9 @@ +import { type Connection } from '..'; import type { Document } from '../bson'; +import { MongoDBResponse } from '../cmap/wire_protocol/responses'; import type { Collection } from '../collection'; -import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; -import { type TimeoutContext } from '../timeout'; -import { CommandOperation, type CommandOperationOptions } from './command'; +import { type CommandOperationOptions, ModernizedCommandOperation } from './command'; import { Aspect, defineAspects } from './operation'; /** @public */ @@ -17,7 +17,8 @@ export interface EstimatedDocumentCountOptions extends CommandOperationOptions { } /** @internal */ -export class EstimatedDocumentCountOperation extends CommandOperation { +export class EstimatedDocumentCountOperation extends ModernizedCommandOperation { + override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse; override options: EstimatedDocumentCountOptions; collectionName: string; @@ -31,11 +32,7 @@ export class EstimatedDocumentCountOperation extends CommandOperation { return 'count' as const; } - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { + override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document { const cmd: Document = { count: this.collectionName }; if (typeof this.options.maxTimeMS === 'number') { @@ -48,9 +45,11 @@ export class EstimatedDocumentCountOperation extends CommandOperation { cmd.comment = this.options.comment; } - const response = await super.executeCommand(server, session, cmd, timeoutContext); + return cmd; + } - return response?.n || 0; + override handleOk(response: InstanceType): number { + return response.getNumber('n') ?? 0; } } From 0651c7f66b8f741eb2b1085ad38dbe6546caee10 Mon Sep 17 00:00:00 2001 From: bailey Date: Tue, 5 Aug 2025 15:15:07 -0600 Subject: [PATCH 03/10] insert --- src/operations/insert.ts | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/src/operations/insert.ts b/src/operations/insert.ts index bbc324e65de..b26a32466df 100644 --- a/src/operations/insert.ts +++ b/src/operations/insert.ts @@ -5,9 +5,7 @@ import { MongoDBResponse } from '../cmap/wire_protocol/responses'; import type { Collection } from '../collection'; import { MongoServerError } from '../error'; import type { InferIdType } from '../mongo_types'; -import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; -import { type TimeoutContext } from '../timeout'; import { maybeAddIdToDocuments, type MongoDBNamespace } from '../utils'; import { type CommandOperationOptions, ModernizedCommandOperation } from './command'; import { Aspect, defineAspects } from './operation'; @@ -73,24 +71,6 @@ export class InsertOneOperation extends InsertOperation { super(collection.s.namespace, [maybeAddIdToDocuments(collection, doc, options)], options); } - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { - const res = await super.execute(server, session, timeoutContext); - if (res.code) throw new MongoServerError(res); - if (res.writeErrors) { - // This should be a WriteError but we can't change it now because of error hierarchy - throw new MongoServerError(res.writeErrors[0]); - } - - return { - acknowledged: this.writeConcern?.w !== 0, - insertedId: this.documents[0]._id - }; - } - override handleOk(response: InstanceType): Document { const res = super.handleOk(response); if (res.code) throw new MongoServerError(res); From 323f638493d0ee497079f6d1d6ca61f80fb4e9b5 Mon Sep 17 00:00:00 2001 From: bailey Date: Wed, 6 Aug 2025 10:11:47 -0600 Subject: [PATCH 04/10] delete --- src/collection.ts | 14 +++++++--- src/operations/delete.ts | 57 +++++++++++++++++++--------------------- 2 files changed, 38 insertions(+), 33 deletions(-) diff --git a/src/collection.ts b/src/collection.ts index 0ad56d04f22..d5f903a3514 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -430,7 +430,7 @@ export class Collection { filter, replacement, resolveOptions(this, options) - ) + ) as TODO_NODE_3286 ); } @@ -473,7 +473,11 @@ export class Collection { ): Promise { return await executeOperation( this.client, - new DeleteOneOperation(this as TODO_NODE_3286, filter, resolveOptions(this, options)) + new DeleteOneOperation( + this as TODO_NODE_3286, + filter, + resolveOptions(this, options) + ) as TODO_NODE_3286 ); } @@ -489,7 +493,11 @@ export class Collection { ): Promise { return await executeOperation( this.client, - new DeleteManyOperation(this as TODO_NODE_3286, filter, resolveOptions(this, options)) + new DeleteManyOperation( + this as TODO_NODE_3286, + filter, + resolveOptions(this, options) + ) as TODO_NODE_3286 ); } diff --git a/src/operations/delete.ts b/src/operations/delete.ts index 0e93ead36a2..e954dcb6699 100644 --- a/src/operations/delete.ts +++ b/src/operations/delete.ts @@ -1,13 +1,16 @@ import type { Document } from '../bson'; +import { type Connection } from '../cmap/connection'; +import { MongoDBResponse } from '../cmap/wire_protocol/responses'; import type { Collection } from '../collection'; import { MongoCompatibilityError, MongoServerError } from '../error'; -import { type TODO_NODE_3286 } from '../mongo_types'; -import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; -import { type TimeoutContext } from '../timeout'; import { type MongoDBNamespace } from '../utils'; import { type WriteConcernOptions } from '../write_concern'; -import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command'; +import { + type CollationOptions, + type CommandOperationOptions, + ModernizedCommandOperation +} from './command'; import { Aspect, defineAspects, type Hint } from './operation'; /** @public */ @@ -43,7 +46,8 @@ export interface DeleteStatement { } /** @internal */ -export class DeleteOperation extends CommandOperation { +export class DeleteOperation extends ModernizedCommandOperation { + override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse; override options: DeleteOptions; statements: DeleteStatement[]; @@ -66,12 +70,9 @@ export class DeleteOperation extends CommandOperation { return this.statements.every(op => (op.limit != null ? op.limit > 0 : true)); } - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { - const options = this.options ?? {}; + override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document { + const options = this.options; + const ordered = typeof options.ordered === 'boolean' ? options.ordered : true; const command: Document = { delete: this.ns.collection, @@ -97,13 +98,7 @@ export class DeleteOperation extends CommandOperation { } } - const res: TODO_NODE_3286 = await super.executeCommand( - server, - session, - command, - timeoutContext - ); - return res; + return command; } } @@ -112,13 +107,14 @@ export class DeleteOneOperation extends DeleteOperation { super(collection.s.namespace, [makeDeleteStatement(filter, { ...options, limit: 1 })], options); } - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { - const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext); + override handleOk( + response: InstanceType + ): DeleteResult { + const res = super.handleOk(response); + + // @ts-expect-error Explain commands have broken TS if (this.explain) return res; + if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]); @@ -133,13 +129,14 @@ export class DeleteManyOperation extends DeleteOperation { super(collection.s.namespace, [makeDeleteStatement(filter, options)], options); } - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { - const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext); + override handleOk( + response: InstanceType + ): DeleteResult { + const res = super.handleOk(response); + + // @ts-expect-error Explain commands have broken TS if (this.explain) return res; + if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]); From 5c5358826bfd42872120a422070fb47f3e3b9692 Mon Sep 17 00:00:00 2001 From: bailey Date: Wed, 6 Aug 2025 10:55:45 -0600 Subject: [PATCH 05/10] update & replace --- src/operations/update.ts | 55 +++++++++++++++------------------------- 1 file changed, 21 insertions(+), 34 deletions(-) diff --git a/src/operations/update.ts b/src/operations/update.ts index f731e77945d..76c4142cbad 100644 --- a/src/operations/update.ts +++ b/src/operations/update.ts @@ -1,13 +1,17 @@ import type { Document } from '../bson'; +import { type Connection } from '../cmap/connection'; +import { MongoDBResponse } from '../cmap/wire_protocol/responses'; import type { Collection } from '../collection'; import { MongoCompatibilityError, MongoInvalidArgumentError, MongoServerError } from '../error'; -import type { InferIdType, TODO_NODE_3286 } from '../mongo_types'; -import type { Server } from '../sdam/server'; +import type { InferIdType } from '../mongo_types'; import type { ClientSession } from '../sessions'; import { formatSort, type Sort, type SortForCmd } from '../sort'; -import { type TimeoutContext } from '../timeout'; import { hasAtomicOperators, type MongoDBNamespace } from '../utils'; -import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command'; +import { + type CollationOptions, + type CommandOperationOptions, + ModernizedCommandOperation +} from './command'; import { Aspect, defineAspects, type Hint } from './operation'; /** @public */ @@ -67,7 +71,8 @@ export interface UpdateStatement { * @internal * UpdateOperation is used in bulk write, while UpdateOneOperation and UpdateManyOperation are only used in the collections API */ -export class UpdateOperation extends CommandOperation { +export class UpdateOperation extends ModernizedCommandOperation { + override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse; override options: UpdateOptions & { ordered?: boolean }; statements: UpdateStatement[]; @@ -95,17 +100,12 @@ export class UpdateOperation extends CommandOperation { return this.statements.every(op => op.multi == null || op.multi === false); } - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { - const options = this.options ?? {}; - const ordered = typeof options.ordered === 'boolean' ? options.ordered : true; + override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document { + const options = this.options; const command: Document = { update: this.ns.collection, updates: this.statements, - ordered + ordered: options.ordered ?? true }; if (typeof options.bypassDocumentValidation === 'boolean') { @@ -122,7 +122,7 @@ export class UpdateOperation extends CommandOperation { command.comment = options.comment; } - const unacknowledgedWrite = this.writeConcern && this.writeConcern.w === 0; + const unacknowledgedWrite = this.writeConcern?.w === 0; if (unacknowledgedWrite) { if (this.statements.find((o: Document) => o.hint)) { // TODO(NODE-3541): fix error for hint with unacknowledged writes @@ -130,8 +130,7 @@ export class UpdateOperation extends CommandOperation { } } - const res = await super.executeCommand(server, session, command, timeoutContext); - return res; + return command; } } @@ -149,12 +148,8 @@ export class UpdateOneOperation extends UpdateOperation { } } - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { - const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext); + override handleOk(response: InstanceType): Document { + const res = super.handleOk(response); if (this.explain != null) return res; if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]); @@ -184,12 +179,8 @@ export class UpdateManyOperation extends UpdateOperation { } } - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { - const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext); + override handleOk(response: InstanceType): Document { + const res = super.handleOk(response); if (this.explain != null) return res; if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]); @@ -240,12 +231,8 @@ export class ReplaceOneOperation extends UpdateOperation { } } - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { - const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext); + override handleOk(response: InstanceType): Document { + const res = super.handleOk(response); if (this.explain != null) return res; if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]); From 99402eeb942470fefb3d30087d8d71ecbcb6221d Mon Sep 17 00:00:00 2001 From: bailey Date: Wed, 6 Aug 2025 11:17:46 -0600 Subject: [PATCH 06/10] find and modify --- src/operations/find_and_modify.ts | 136 ++++++++++++++++++------------ 1 file changed, 80 insertions(+), 56 deletions(-) diff --git a/src/operations/find_and_modify.ts b/src/operations/find_and_modify.ts index 759cb02e72c..aaa07b136aa 100644 --- a/src/operations/find_and_modify.ts +++ b/src/operations/find_and_modify.ts @@ -1,14 +1,14 @@ +import { type Connection } from '..'; import type { Document } from '../bson'; +import { MongoDBResponse } from '../cmap/wire_protocol/responses'; import type { Collection } from '../collection'; import { MongoCompatibilityError, MongoInvalidArgumentError } from '../error'; import { ReadPreference } from '../read_preference'; -import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; import { formatSort, type Sort, type SortForCmd } from '../sort'; -import { type TimeoutContext } from '../timeout'; -import { decorateWithCollation, hasAtomicOperators, maxWireVersion } from '../utils'; +import { decorateWithCollation, hasAtomicOperators } from '../utils'; import { type WriteConcern, type WriteConcernSettings } from '../write_concern'; -import { CommandOperation, type CommandOperationOptions } from './command'; +import { type CommandOperationOptions, ModernizedCommandOperation } from './command'; import { Aspect, defineAspects } from './operation'; /** @public */ @@ -120,9 +120,9 @@ function configureFindAndModifyCmdBaseUpdateOpts( } /** @internal */ -export class FindAndModifyOperation extends CommandOperation { +export class FindAndModifyOperation extends ModernizedCommandOperation { + override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse; override options: FindOneAndReplaceOptions | FindOneAndUpdateOptions | FindOneAndDeleteOptions; - cmdBase: FindAndModifyCmdBase; collection: Collection; query: Document; doc?: Document; @@ -133,8 +133,26 @@ export class FindAndModifyOperation extends CommandOperation { options: FindOneAndReplaceOptions | FindOneAndUpdateOptions | FindOneAndDeleteOptions ) { super(collection, options); - this.options = options ?? {}; - this.cmdBase = { + this.options = options; + // force primary read preference + this.readPreference = ReadPreference.primary; + + this.collection = collection; + this.query = query; + } + + override get commandName() { + return 'findAndModify' as const; + } + + override buildCommandDocument( + _connection: Connection, + _session?: ClientSession + ): Document & FindAndModifyCmdBase { + const options = this.options; + const command: Document & FindAndModifyCmdBase = { + findAndModify: this.collection.collectionName, + query: this.query, remove: false, new: false, upsert: false @@ -144,77 +162,51 @@ export class FindAndModifyOperation extends CommandOperation { const sort = formatSort(options.sort); if (sort) { - this.cmdBase.sort = sort; + command.sort = sort; } if (options.projection) { - this.cmdBase.fields = options.projection; + command.fields = options.projection; } if (options.maxTimeMS) { - this.cmdBase.maxTimeMS = options.maxTimeMS; + command.maxTimeMS = options.maxTimeMS; } // Decorate the findAndModify command with the write Concern if (options.writeConcern) { - this.cmdBase.writeConcern = options.writeConcern; + command.writeConcern = options.writeConcern; } if (options.let) { - this.cmdBase.let = options.let; + command.let = options.let; } // we check for undefined specifically here to allow falsy values // eslint-disable-next-line no-restricted-syntax if (options.comment !== undefined) { - this.cmdBase.comment = options.comment; + command.comment = options.comment; } - // force primary read preference - this.readPreference = ReadPreference.primary; - - this.collection = collection; - this.query = query; - } - - override get commandName() { - return 'findAndModify' as const; - } - - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { - const coll = this.collection; - const query = this.query; - const options = { ...this.options, ...this.bsonOptions }; - - // Create findAndModify command object - const cmd: Document = { - findAndModify: coll.collectionName, - query: query, - ...this.cmdBase - }; - - decorateWithCollation(cmd, coll, options); + decorateWithCollation(command, this.collection, options); if (options.hint) { - // TODO: once this method becomes a CommandOperation we will have the server - // in place to check. const unacknowledgedWrite = this.writeConcern?.w === 0; - if (unacknowledgedWrite || maxWireVersion(server) < 8) { + if (unacknowledgedWrite) { throw new MongoCompatibilityError( 'The current topology does not support a hint on findAndModify commands' ); } - cmd.hint = options.hint; + command.hint = options.hint; } - // Execute the command - const result = await super.executeCommand(server, session, cmd, timeoutContext); - return options.includeResultMetadata ? result : (result.value ?? null); + return command; + } + + override handleOk(response: InstanceType): Document { + const result = super.handleOk(response); + return this.options.includeResultMetadata ? result : (result.value ?? null); } } @@ -227,12 +219,21 @@ export class FindOneAndDeleteOperation extends FindAndModifyOperation { } super(collection, filter, options); - this.cmdBase.remove = true; + } + + override buildCommandDocument( + connection: Connection, + session?: ClientSession + ): Document & FindAndModifyCmdBase { + const document = super.buildCommandDocument(connection, session); + document.remove = true; + return document; } } /** @internal */ export class FindOneAndReplaceOperation extends FindAndModifyOperation { + private replacement: Document; constructor( collection: Collection, filter: Document, @@ -252,13 +253,25 @@ export class FindOneAndReplaceOperation extends FindAndModifyOperation { } super(collection, filter, options); - this.cmdBase.update = replacement; - configureFindAndModifyCmdBaseUpdateOpts(this.cmdBase, options); + this.replacement = replacement; + } + + override buildCommandDocument( + connection: Connection, + session?: ClientSession + ): Document & FindAndModifyCmdBase { + const document = super.buildCommandDocument(connection, session); + document.update = this.replacement; + configureFindAndModifyCmdBaseUpdateOpts(document, this.options); + return document; } } /** @internal */ export class FindOneAndUpdateOperation extends FindAndModifyOperation { + override options: FindOneAndUpdateOptions; + + private update: Document; constructor( collection: Collection, filter: Document, @@ -278,12 +291,23 @@ export class FindOneAndUpdateOperation extends FindAndModifyOperation { } super(collection, filter, options); - this.cmdBase.update = update; - configureFindAndModifyCmdBaseUpdateOpts(this.cmdBase, options); + this.update = update; + this.options = options; + } - if (options.arrayFilters) { - this.cmdBase.arrayFilters = options.arrayFilters; + override buildCommandDocument( + connection: Connection, + session?: ClientSession + ): Document & FindAndModifyCmdBase { + const document = super.buildCommandDocument(connection, session); + document.update = this.update; + configureFindAndModifyCmdBaseUpdateOpts(document, this.options); + + if (this.options.arrayFilters) { + document.arrayFilters = this.options.arrayFilters; } + + return document; } } From 3c5e70db54d09a3bd57c001af5b82d7c79fa6e6a Mon Sep 17 00:00:00 2001 From: bailey Date: Wed, 6 Aug 2025 10:28:52 -0600 Subject: [PATCH 07/10] bulk write --- .../client_bulk_write/client_bulk_write.ts | 90 +++++-------------- 1 file changed, 23 insertions(+), 67 deletions(-) diff --git a/src/operations/client_bulk_write/client_bulk_write.ts b/src/operations/client_bulk_write/client_bulk_write.ts index 26d1e7bb60f..cbfc04b0bad 100644 --- a/src/operations/client_bulk_write/client_bulk_write.ts +++ b/src/operations/client_bulk_write/client_bulk_write.ts @@ -1,19 +1,19 @@ -import { MongoClientBulkWriteExecutionError, ServerType } from '../../beta'; +import { type Connection } from '../../cmap/connection'; import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses'; -import type { Server } from '../../sdam/server'; import type { ClientSession } from '../../sessions'; -import { type TimeoutContext } from '../../timeout'; import { MongoDBNamespace } from '../../utils'; -import { CommandOperation } from '../command'; +import { ModernizedCommandOperation } from '../command'; import { Aspect, defineAspects } from '../operation'; -import { type ClientBulkWriteCommandBuilder } from './command_builder'; +import { type ClientBulkWriteCommand, type ClientBulkWriteCommandBuilder } from './command_builder'; import { type ClientBulkWriteOptions } from './common'; /** * Executes a single client bulk write operation within a potential batch. * @internal */ -export class ClientBulkWriteOperation extends CommandOperation { +export class ClientBulkWriteOperation extends ModernizedCommandOperation { + override SERVER_COMMAND_RESPONSE_TYPE = ClientBulkWriteCursorResponse; + commandBuilder: ClientBulkWriteCommandBuilder; override options: ClientBulkWriteOptions; @@ -36,72 +36,28 @@ export class ClientBulkWriteOperation extends CommandOperation { - let command; + override handleOk( + response: InstanceType + ): ClientBulkWriteCursorResponse { + return response; + } - if (server.description.type === ServerType.LoadBalancer) { - if (session) { - let connection; - if (!session.pinnedConnection) { - // Checkout a connection to build the command. - connection = await server.pool.checkOut({ timeoutContext }); - // Pin the connection to the session so it get used to execute the command and we do not - // perform a double check-in/check-out. - session.pin(connection); - } else { - connection = session.pinnedConnection; - } - command = this.commandBuilder.buildBatch( - connection.hello?.maxMessageSizeBytes, - connection.hello?.maxWriteBatchSize, - connection.hello?.maxBsonObjectSize - ); - } else { - throw new MongoClientBulkWriteExecutionError( - 'Session provided to the client bulk write operation must be present.' - ); - } - } else { - // At this point we have a server and the auto connect code has already - // run in executeOperation, so the server description will be populated. - // We can use that to build the command. - if ( - !server.description.maxWriteBatchSize || - !server.description.maxMessageSizeBytes || - !server.description.maxBsonObjectSize - ) { - throw new MongoClientBulkWriteExecutionError( - 'In order to execute a client bulk write, both maxWriteBatchSize, maxMessageSizeBytes and maxBsonObjectSize must be provided by the servers hello response.' - ); - } - command = this.commandBuilder.buildBatch( - server.description.maxMessageSizeBytes, - server.description.maxWriteBatchSize, - server.description.maxBsonObjectSize - ); - } + override buildCommandDocument( + connection: Connection, + _session?: ClientSession + ): ClientBulkWriteCommand { + const command = this.commandBuilder.buildBatch( + connection.description.maxMessageSizeBytes, + connection.description.maxWriteBatchSize, + connection.description.maxBsonObjectSize + ); - // Check after the batch is built if we cannot retry it and override the option. + // Check _after_ the batch is built if we cannot retry it and override the option. if (!this.canRetryWrite) { this.options.willRetryWrite = false; } - return await super.executeCommand( - server, - session, - command, - timeoutContext, - ClientBulkWriteCursorResponse - ); + + return command; } } From 7c98fdb2918b73a918ce7b15ffe9bc8998e3edec Mon Sep 17 00:00:00 2001 From: bailey Date: Wed, 6 Aug 2025 14:23:30 -0600 Subject: [PATCH 08/10] fix connection leak --- src/sdam/server.ts | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/src/sdam/server.ts b/src/sdam/server.ts index e94e38bc5de..9e7c20af841 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -303,7 +303,30 @@ export class Server extends TypedEventEmitter { } } - const cmd = operation.buildCommand(conn, session); + let reauthPromise: Promise | null = null; + const cleanup = () => { + this.decrementOperationCount(); + if (session?.pinnedConnection !== conn) { + if (reauthPromise != null) { + // The reauth promise only exists if it hasn't thrown. + const checkBackIn = () => { + this.pool.checkIn(conn); + }; + void reauthPromise.then(checkBackIn, checkBackIn); + } else { + this.pool.checkIn(conn); + } + } + }; + + let cmd; + try { + cmd = operation.buildCommand(conn, session); + } catch (e) { + cleanup(); + throw e; + } + const options = operation.buildOptions(timeoutContext); const ns = operation.ns; @@ -325,8 +348,6 @@ export class Server extends TypedEventEmitter { options.omitMaxTimeMS = true; } - let reauthPromise: Promise | null = null; - try { try { const res = await conn.command(ns, cmd, options, operation.SERVER_COMMAND_RESPONSE_TYPE); @@ -360,18 +381,7 @@ export class Server extends TypedEventEmitter { throw operationError; } } finally { - this.decrementOperationCount(); - if (session?.pinnedConnection !== conn) { - if (reauthPromise != null) { - // The reauth promise only exists if it hasn't thrown. - const checkBackIn = () => { - this.pool.checkIn(conn); - }; - void reauthPromise.then(checkBackIn, checkBackIn); - } else { - this.pool.checkIn(conn); - } - } + cleanup(); } } From b42d7e7a7b0b909ed16b8eace861bfca70d6c4d2 Mon Sep 17 00:00:00 2001 From: bailey Date: Wed, 6 Aug 2025 14:23:42 -0600 Subject: [PATCH 09/10] update leak checker to have better description --- test/tools/runner/hooks/leak_checker.ts | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/test/tools/runner/hooks/leak_checker.ts b/test/tools/runner/hooks/leak_checker.ts index 4f53c031dab..2b35801969c 100644 --- a/test/tools/runner/hooks/leak_checker.ts +++ b/test/tools/runner/hooks/leak_checker.ts @@ -143,14 +143,14 @@ const leakCheckerAfterEach = async function () { const TRACE_SOCKETS = process.env.TRACE_SOCKETS === 'true' ? true : false; const kSocketId = Symbol('socketId'); const originalCreateConnection = net.createConnection; -let socketCounter = 0n; -const socketLeakCheckBeforeAll = function socketLeakCheckBeforeAll() { +const socketLeakCheckBeforeEach = function socketLeakCheckBeforeAll() { + const description = this.currentTest.title; + let id = 0; // @ts-expect-error: Typescript says this is readonly, but it is not at runtime net.createConnection = options => { const socket = originalCreateConnection(options); - socket[kSocketId] = socketCounter.toString().padStart(5, '0'); - socketCounter++; + socket[kSocketId] = `"${description}" (${id++})`; return socket; }; }; @@ -175,7 +175,6 @@ const socketLeakCheckAfterEach: Mocha.AsyncFunc = async function socketLeakCheck } }; -const beforeAll = TRACE_SOCKETS ? [socketLeakCheckBeforeAll] : []; -const beforeEach = [leakCheckerBeforeEach]; +const beforeEach = [leakCheckerBeforeEach, ...(TRACE_SOCKETS ? [socketLeakCheckBeforeEach] : [])]; const afterEach = [leakCheckerAfterEach, ...(TRACE_SOCKETS ? [socketLeakCheckAfterEach] : [])]; -export const mochaHooks = { beforeAll, beforeEach, afterEach }; +export const mochaHooks = { beforeEach, afterEach }; From 26c5f68b8241346ab56958266fe60b748332a02a Mon Sep 17 00:00:00 2001 From: bailey Date: Wed, 6 Aug 2025 15:34:10 -0600 Subject: [PATCH 10/10] stronger TS support at operatoins layer --- src/collection.ts | 37 ++++-------------- src/operations/delete.ts | 11 +++--- src/operations/execute_operation.ts | 13 ++++--- src/operations/update.ts | 60 ++++++++++++++++++----------- 4 files changed, 56 insertions(+), 65 deletions(-) diff --git a/src/collection.ts b/src/collection.ts index d5f903a3514..223737c32a9 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -402,12 +402,7 @@ export class Collection { ): Promise> { return await executeOperation( this.client, - new UpdateOneOperation( - this as TODO_NODE_3286, - filter, - update, - resolveOptions(this, options) - ) as TODO_NODE_3286 + new UpdateOneOperation(this.s.namespace, filter, update, resolveOptions(this, options)) ); } @@ -425,12 +420,7 @@ export class Collection { ): Promise> { return await executeOperation( this.client, - new ReplaceOneOperation( - this as TODO_NODE_3286, - filter, - replacement, - resolveOptions(this, options) - ) as TODO_NODE_3286 + new ReplaceOneOperation(this.s.namespace, filter, replacement, resolveOptions(this, options)) ); } @@ -452,12 +442,7 @@ export class Collection { ): Promise> { return await executeOperation( this.client, - new UpdateManyOperation( - this as TODO_NODE_3286, - filter, - update, - resolveOptions(this, options) - ) as TODO_NODE_3286 + new UpdateManyOperation(this.s.namespace, filter, update, resolveOptions(this, options)) ); } @@ -473,11 +458,7 @@ export class Collection { ): Promise { return await executeOperation( this.client, - new DeleteOneOperation( - this as TODO_NODE_3286, - filter, - resolveOptions(this, options) - ) as TODO_NODE_3286 + new DeleteOneOperation(this.s.namespace, filter, resolveOptions(this, options)) ); } @@ -493,11 +474,7 @@ export class Collection { ): Promise { return await executeOperation( this.client, - new DeleteManyOperation( - this as TODO_NODE_3286, - filter, - resolveOptions(this, options) - ) as TODO_NODE_3286 + new DeleteManyOperation(this.s.namespace, filter, resolveOptions(this, options)) ); } @@ -521,7 +498,7 @@ export class Collection { ...options, readPreference: ReadPreference.PRIMARY }) - ) as TODO_NODE_3286 + ) ); } @@ -589,7 +566,7 @@ export class Collection { this.client, this.s.namespace, filter, - resolveOptions(this as TODO_NODE_3286, options) + resolveOptions(this, options) ); } diff --git a/src/operations/delete.ts b/src/operations/delete.ts index e954dcb6699..aff4130e8b5 100644 --- a/src/operations/delete.ts +++ b/src/operations/delete.ts @@ -1,10 +1,9 @@ import type { Document } from '../bson'; import { type Connection } from '../cmap/connection'; import { MongoDBResponse } from '../cmap/wire_protocol/responses'; -import type { Collection } from '../collection'; import { MongoCompatibilityError, MongoServerError } from '../error'; import type { ClientSession } from '../sessions'; -import { type MongoDBNamespace } from '../utils'; +import { type MongoDBCollectionNamespace, type MongoDBNamespace } from '../utils'; import { type WriteConcernOptions } from '../write_concern'; import { type CollationOptions, @@ -103,8 +102,8 @@ export class DeleteOperation extends ModernizedCommandOperation { } export class DeleteOneOperation extends DeleteOperation { - constructor(collection: Collection, filter: Document, options: DeleteOptions) { - super(collection.s.namespace, [makeDeleteStatement(filter, { ...options, limit: 1 })], options); + constructor(ns: MongoDBCollectionNamespace, filter: Document, options: DeleteOptions) { + super(ns, [makeDeleteStatement(filter, { ...options, limit: 1 })], options); } override handleOk( @@ -125,8 +124,8 @@ export class DeleteOneOperation extends DeleteOperation { } } export class DeleteManyOperation extends DeleteOperation { - constructor(collection: Collection, filter: Document, options: DeleteOptions) { - super(collection.s.namespace, [makeDeleteStatement(filter, options)], options); + constructor(ns: MongoDBCollectionNamespace, filter: Document, options: DeleteOptions) { + super(ns, [makeDeleteStatement(filter, options)], options); } override handleOk( diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 3a5ef1fe6cc..51a276995f8 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -34,7 +34,11 @@ const MMAPv1_RETRY_WRITES_ERROR_MESSAGE = 'This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.'; type ResultTypeFromOperation = - TOperation extends AbstractOperation ? K : never; + TOperation extends ModernizedOperation + ? ReturnType + : TOperation extends AbstractOperation + ? K + : never; /** * Executes the given operation with provided arguments. @@ -57,7 +61,7 @@ type ResultTypeFromOperation = * @param operation - The operation to execute */ export async function executeOperation< - T extends AbstractOperation, + T extends AbstractOperation, TResult = ResultTypeFromOperation >(client: MongoClient, operation: T, timeoutContext?: TimeoutContext | null): Promise { if (!(operation instanceof AbstractOperation)) { @@ -179,10 +183,7 @@ type RetryOptions = { * * @param operation - The operation to execute * */ -async function tryOperation< - T extends AbstractOperation, - TResult = ResultTypeFromOperation ->( +async function tryOperation>( operation: T, { topology, timeoutContext, session, readPreference }: RetryOptions ): Promise { diff --git a/src/operations/update.ts b/src/operations/update.ts index 76c4142cbad..e7c9b7c3cf1 100644 --- a/src/operations/update.ts +++ b/src/operations/update.ts @@ -1,12 +1,15 @@ import type { Document } from '../bson'; import { type Connection } from '../cmap/connection'; import { MongoDBResponse } from '../cmap/wire_protocol/responses'; -import type { Collection } from '../collection'; import { MongoCompatibilityError, MongoInvalidArgumentError, MongoServerError } from '../error'; import type { InferIdType } from '../mongo_types'; import type { ClientSession } from '../sessions'; import { formatSort, type Sort, type SortForCmd } from '../sort'; -import { hasAtomicOperators, type MongoDBNamespace } from '../utils'; +import { + hasAtomicOperators, + type MongoDBCollectionNamespace, + type MongoDBNamespace +} from '../utils'; import { type CollationOptions, type CommandOperationOptions, @@ -136,21 +139,27 @@ export class UpdateOperation extends ModernizedCommandOperation { /** @internal */ export class UpdateOneOperation extends UpdateOperation { - constructor(collection: Collection, filter: Document, update: Document, options: UpdateOptions) { - super( - collection.s.namespace, - [makeUpdateStatement(filter, update, { ...options, multi: false })], - options - ); + constructor( + ns: MongoDBCollectionNamespace, + filter: Document, + update: Document, + options: UpdateOptions + ) { + super(ns, [makeUpdateStatement(filter, update, { ...options, multi: false })], options); if (!hasAtomicOperators(update, options)) { throw new MongoInvalidArgumentError('Update document requires atomic operators'); } } - override handleOk(response: InstanceType): Document { + override handleOk( + response: InstanceType + ): UpdateResult { const res = super.handleOk(response); + + // @ts-expect-error Explain typing is broken if (this.explain != null) return res; + if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]); @@ -167,20 +176,25 @@ export class UpdateOneOperation extends UpdateOperation { /** @internal */ export class UpdateManyOperation extends UpdateOperation { - constructor(collection: Collection, filter: Document, update: Document, options: UpdateOptions) { - super( - collection.s.namespace, - [makeUpdateStatement(filter, update, { ...options, multi: true })], - options - ); + constructor( + ns: MongoDBCollectionNamespace, + filter: Document, + update: Document, + options: UpdateOptions + ) { + super(ns, [makeUpdateStatement(filter, update, { ...options, multi: true })], options); if (!hasAtomicOperators(update, options)) { throw new MongoInvalidArgumentError('Update document requires atomic operators'); } } - override handleOk(response: InstanceType): Document { + override handleOk( + response: InstanceType + ): UpdateResult { const res = super.handleOk(response); + + // @ts-expect-error Explain typing is broken if (this.explain != null) return res; if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]); @@ -215,24 +229,24 @@ export interface ReplaceOptions extends CommandOperationOptions { /** @internal */ export class ReplaceOneOperation extends UpdateOperation { constructor( - collection: Collection, + ns: MongoDBCollectionNamespace, filter: Document, replacement: Document, options: ReplaceOptions ) { - super( - collection.s.namespace, - [makeUpdateStatement(filter, replacement, { ...options, multi: false })], - options - ); + super(ns, [makeUpdateStatement(filter, replacement, { ...options, multi: false })], options); if (hasAtomicOperators(replacement)) { throw new MongoInvalidArgumentError('Replacement document must not contain atomic operators'); } } - override handleOk(response: InstanceType): Document { + override handleOk( + response: InstanceType + ): UpdateResult { const res = super.handleOk(response); + + // @ts-expect-error Explain typing is broken if (this.explain != null) return res; if (res.code) throw new MongoServerError(res); if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]);