diff --git a/src/bulk/common.ts b/src/bulk/common.ts index 50283c94f1..4b1578903e 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -14,13 +14,11 @@ import type { CollationOptions, CommandOperationOptions } from '../operations/co import { DeleteOperation, type DeleteStatement, makeDeleteStatement } from '../operations/delete'; import { executeOperation } from '../operations/execute_operation'; import { InsertOperation } from '../operations/insert'; -import { AbstractOperation, type Hint } from '../operations/operation'; +import { type Hint } from '../operations/operation'; import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../operations/update'; -import type { Server } from '../sdam/server'; import type { Topology } from '../sdam/topology'; -import type { ClientSession } from '../sessions'; import { type Sort } from '../sort'; -import { type TimeoutContext } from '../timeout'; +import { TimeoutContext } from '../timeout'; import { applyRetryableWrites, getTopology, @@ -854,40 +852,6 @@ export interface BulkWriteOptions extends CommandOperationOptions { timeoutContext?: TimeoutContext; } -/** - * TODO(NODE-4063) - * BulkWrites merge complexity is implemented in executeCommands - * This provides a vehicle to treat bulkOperations like any other operation (hence "shim") - * We would like this logic to simply live inside the BulkWriteOperation class - * @internal - */ -export class BulkWriteShimOperation extends AbstractOperation { - bulkOperation: BulkOperationBase; - constructor(bulkOperation: BulkOperationBase, options: BulkWriteOptions) { - super(options); - this.bulkOperation = bulkOperation; - } - - get commandName(): string { - return 'bulkWrite' as const; - } - - async execute( - _server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { - if (this.options.session == null) { - // An implicit session could have been created by 'executeOperation' - // So if we stick it on finalOptions here, each bulk operation - // will use this same session, it'll be passed in the same way - // an explicit session would be - this.options.session = session; - } - return await executeCommands(this.bulkOperation, { ...this.options, timeoutContext }); - } -} - /** @public */ export abstract class BulkOperationBase { isOrdered: boolean; @@ -1208,10 +1172,26 @@ export abstract class BulkOperationBase { } this.s.executed = true; - const finalOptions = { ...this.s.options, ...options }; - const operation = new BulkWriteShimOperation(this, finalOptions); + const finalOptions = resolveOptions(this.collection, { ...this.s.options, ...options }); + + // if there is no timeoutContext provided, create a timeoutContext and use it for + // all batches in the bulk operation + finalOptions.timeoutContext ??= TimeoutContext.create({ + session: finalOptions.session, + timeoutMS: finalOptions.timeoutMS, + serverSelectionTimeoutMS: this.collection.client.s.options.serverSelectionTimeoutMS, + waitQueueTimeoutMS: this.collection.client.s.options.waitQueueTimeoutMS + }); + + if (finalOptions.session == null) { + // if there is not an explicit session provided to `execute()`, create + // an implicit session and use that for all batches in the bulk operation + return await this.collection.client.withSession({ explicit: false }, async session => { + return await executeCommands(this, { ...finalOptions, session }); + }); + } - return await executeOperation(this.s.collection.client, operation, finalOptions.timeoutContext); + return await executeCommands(this, { ...finalOptions }); } /** diff --git a/src/collection.ts b/src/collection.ts index 1bdc89b262..5ef6ed8f2e 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -1,5 +1,10 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from './bson'; -import type { AnyBulkWriteOperation, BulkWriteOptions, BulkWriteResult } from './bulk/common'; +import type { + AnyBulkWriteOperation, + BulkOperationBase, + BulkWriteOptions, + BulkWriteResult +} from './bulk/common'; import { OrderedBulkOperation } from './bulk/ordered'; import { UnorderedBulkOperation } from './bulk/unordered'; import { ChangeStream, type ChangeStreamDocument, type ChangeStreamOptions } from './change_stream'; @@ -24,7 +29,6 @@ import type { WithoutId } from './mongo_types'; import type { AggregateOptions } from './operations/aggregate'; -import { BulkWriteOperation } from './operations/bulk_write'; import { CountOperation, type CountOptions } from './operations/count'; import { DeleteManyOperation, @@ -38,7 +42,7 @@ import { EstimatedDocumentCountOperation, type EstimatedDocumentCountOptions } from './operations/estimated_document_count'; -import { executeOperation } from './operations/execute_operation'; +import { autoConnect, executeOperation } from './operations/execute_operation'; import type { FindOptions } from './operations/find'; import { FindOneAndDeleteOperation, @@ -61,7 +65,6 @@ import { type ListIndexesOptions } from './operations/indexes'; import { - InsertManyOperation, type InsertManyResult, InsertOneOperation, type InsertOneOptions, @@ -305,14 +308,31 @@ export class Collection { docs: ReadonlyArray>, options?: BulkWriteOptions ): Promise> { - return await executeOperation( - this.client, - new InsertManyOperation( - this as TODO_NODE_3286, - docs, - resolveOptions(this, options ?? { ordered: true }) - ) as TODO_NODE_3286 - ); + if (!Array.isArray(docs)) { + throw new MongoInvalidArgumentError('Argument "docs" must be an array of documents'); + } + options = resolveOptions(this, options ?? {}); + + const acknowledged = WriteConcern.fromOptions(options)?.w !== 0; + + try { + const res = await this.bulkWrite( + docs.map(doc => ({ insertOne: { document: doc } })), + options + ); + return { + acknowledged, + insertedCount: res.insertedCount, + insertedIds: res.insertedIds + }; + } catch (err) { + if (err && err.message === 'Operation must be an object with an operation key') { + throw new MongoInvalidArgumentError( + 'Collection.insertMany() cannot be called with an array that has null/undefined values' + ); + } + throw err; + } } /** @@ -342,14 +362,28 @@ export class Collection { throw new MongoInvalidArgumentError('Argument "operations" must be an array of documents'); } - return await executeOperation( - this.client, - new BulkWriteOperation( - this as TODO_NODE_3286, - operations, - resolveOptions(this, options ?? { ordered: true }) - ) - ); + options = resolveOptions(this, options ?? {}); + + // TODO(NODE-7071): remove once the client doesn't need to be connected to construct + // bulk operations + const isConnected = this.client.topology != null; + if (!isConnected) { + await autoConnect(this.client); + } + + // Create the bulk operation + const bulk: BulkOperationBase = + options.ordered === false + ? this.initializeUnorderedBulkOp(options) + : this.initializeOrderedBulkOp(options); + + // for each op go through and add to the bulk + for (const operation of operations) { + bulk.raw(operation); + } + + // Execute the bulk + return await bulk.execute({ ...options }); } /** diff --git a/src/operations/bulk_write.ts b/src/operations/bulk_write.ts deleted file mode 100644 index 55b61ef73b..0000000000 --- a/src/operations/bulk_write.ts +++ /dev/null @@ -1,64 +0,0 @@ -import type { - AnyBulkWriteOperation, - BulkOperationBase, - BulkWriteOptions, - BulkWriteResult -} from '../bulk/common'; -import type { Collection } from '../collection'; -import type { Server } from '../sdam/server'; -import type { ClientSession } from '../sessions'; -import { type TimeoutContext } from '../timeout'; -import { AbstractOperation, Aspect, defineAspects } from './operation'; - -/** @internal */ -export class BulkWriteOperation extends AbstractOperation { - override options: BulkWriteOptions; - collection: Collection; - operations: ReadonlyArray; - - constructor( - collection: Collection, - operations: ReadonlyArray, - options: BulkWriteOptions - ) { - super(options); - this.options = options; - this.collection = collection; - this.operations = operations; - } - - override get commandName() { - return 'bulkWrite' as const; - } - - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { - const coll = this.collection; - const operations = this.operations; - const options = { - ...this.options, - ...this.bsonOptions, - readPreference: this.readPreference, - timeoutContext - }; - - // Create the bulk operation - const bulk: BulkOperationBase = - options.ordered === false - ? coll.initializeUnorderedBulkOp(options) - : coll.initializeOrderedBulkOp(options); - - // for each op go through and add to the bulk - for (let i = 0; i < operations.length; i++) { - bulk.raw(operations[i]); - } - - // Execute the bulk - return await bulk.execute({ ...options, session }); - } -} - -defineAspects(BulkWriteOperation, [Aspect.WRITE_OPERATION]); diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index ed71399999..454f56daaa 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -129,7 +129,7 @@ export async function executeOperation< * Connects a client if it has not yet been connected * @internal */ -async function autoConnect(client: MongoClient): Promise { +export async function autoConnect(client: MongoClient): Promise { if (client.topology == null) { if (client.s.hasBeenClosed) { throw new MongoNotConnectedError('Client must be connected before running operations'); diff --git a/src/operations/insert.ts b/src/operations/insert.ts index 1a40763e31..588468f313 100644 --- a/src/operations/insert.ts +++ b/src/operations/insert.ts @@ -1,16 +1,14 @@ import type { Document } from '../bson'; import type { BulkWriteOptions } from '../bulk/common'; import type { Collection } from '../collection'; -import { MongoInvalidArgumentError, MongoServerError } from '../error'; +import { MongoServerError } from '../error'; import type { InferIdType } from '../mongo_types'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; import { type TimeoutContext } from '../timeout'; import { maybeAddIdToDocuments, type MongoDBNamespace } from '../utils'; -import { WriteConcern } from '../write_concern'; -import { BulkWriteOperation } from './bulk_write'; import { CommandOperation, type CommandOperationOptions } from './command'; -import { AbstractOperation, Aspect, defineAspects } from './operation'; +import { Aspect, defineAspects } from './operation'; /** @internal */ export class InsertOperation extends CommandOperation { @@ -73,7 +71,7 @@ export interface InsertOneResult { export class InsertOneOperation extends InsertOperation { constructor(collection: Collection, doc: Document, options: InsertOneOptions) { - super(collection.s.namespace, maybeAddIdToDocuments(collection, [doc], options), options); + super(collection.s.namespace, [maybeAddIdToDocuments(collection, doc, options)], options); } override async execute( @@ -105,62 +103,5 @@ export interface InsertManyResult { insertedIds: { [key: number]: InferIdType }; } -/** @internal */ -export class InsertManyOperation extends AbstractOperation { - override options: BulkWriteOptions; - collection: Collection; - docs: ReadonlyArray; - - constructor(collection: Collection, docs: ReadonlyArray, options: BulkWriteOptions) { - super(options); - - if (!Array.isArray(docs)) { - throw new MongoInvalidArgumentError('Argument "docs" must be an array of documents'); - } - - this.options = options; - this.collection = collection; - this.docs = docs; - } - - override get commandName() { - return 'insert' as const; - } - - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { - const coll = this.collection; - const options = { ...this.options, ...this.bsonOptions, readPreference: this.readPreference }; - const writeConcern = WriteConcern.fromOptions(options); - const bulkWriteOperation = new BulkWriteOperation( - coll, - this.docs.map(document => ({ - insertOne: { document } - })), - options - ); - - try { - const res = await bulkWriteOperation.execute(server, session, timeoutContext); - return { - acknowledged: writeConcern?.w !== 0, - insertedCount: res.insertedCount, - insertedIds: res.insertedIds - }; - } catch (err) { - if (err && err.message === 'Operation must be an object with an operation key') { - throw new MongoInvalidArgumentError( - 'Collection.insertMany() cannot be called with an array that has null/undefined values' - ); - } - throw err; - } - } -} - defineAspects(InsertOperation, [Aspect.RETRYABLE, Aspect.WRITE_OPERATION]); defineAspects(InsertOneOperation, [Aspect.RETRYABLE, Aspect.WRITE_OPERATION]); -defineAspects(InsertManyOperation, [Aspect.WRITE_OPERATION]); diff --git a/src/utils.ts b/src/utils.ts index 22cda4092b..c4a68dcd52 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1357,38 +1357,23 @@ export async function once(ee: EventEmitter, name: string, options?: Abortabl } export function maybeAddIdToDocuments( - coll: Collection, - docs: Document[], + collection: Collection, + document: Document, options: { forceServerObjectId?: boolean } -): Document[]; -export function maybeAddIdToDocuments( - coll: Collection, - docs: Document, - options: { forceServerObjectId?: boolean } -): Document; -export function maybeAddIdToDocuments( - coll: Collection, - docOrDocs: Document[] | Document, - options: { forceServerObjectId?: boolean } -): Document[] | Document { +): Document { const forceServerObjectId = - typeof options.forceServerObjectId === 'boolean' - ? options.forceServerObjectId - : coll.s.db.options?.forceServerObjectId; + options.forceServerObjectId ?? collection.s.db.options?.forceServerObjectId ?? false; // no need to modify the docs if server sets the ObjectId - if (forceServerObjectId === true) { - return docOrDocs; + if (forceServerObjectId) { + return document; } - const transform = (doc: Document): Document => { - if (doc._id == null) { - doc._id = coll.s.pkFactory.createPk(); - } + if (document._id == null) { + document._id = collection.s.pkFactory.createPk(); + } - return doc; - }; - return Array.isArray(docOrDocs) ? docOrDocs.map(transform) : transform(docOrDocs); + return document; } export async function fileIsAccessible(fileName: string, mode?: number) { diff --git a/test/integration/crud/abstract_operation.test.ts b/test/integration/crud/abstract_operation.test.ts index a30c185db6..0c26dd01cc 100644 --- a/test/integration/crud/abstract_operation.test.ts +++ b/test/integration/crud/abstract_operation.test.ts @@ -37,12 +37,6 @@ describe('abstract operation', function () { subclassType: mongodb.AggregateOperation, correctCommandName: 'aggregate' }, - { - subclassCreator: () => - new mongodb.BulkWriteOperation(collection, [{ insertOne: { document: { a: 1 } } }], {}), - subclassType: mongodb.BulkWriteOperation, - correctCommandName: 'bulkWrite' - }, { subclassCreator: () => new mongodb.CollectionsOperation(db, {}), subclassType: mongodb.CollectionsOperation, @@ -161,11 +155,6 @@ describe('abstract operation', function () { subclassType: mongodb.InsertOneOperation, correctCommandName: 'insert' }, - { - subclassCreator: () => new mongodb.InsertManyOperation(collection, [{ a: 1 }], {}), - subclassType: mongodb.InsertManyOperation, - correctCommandName: 'insert' - }, { subclassCreator: () => new mongodb.IsCappedOperation(collection, {}), subclassType: mongodb.IsCappedOperation, diff --git a/test/integration/crud/bulk.test.ts b/test/integration/crud/bulk.test.ts index c7a80ffa18..bdae64ca5f 100644 --- a/test/integration/crud/bulk.test.ts +++ b/test/integration/crud/bulk.test.ts @@ -925,7 +925,7 @@ describe('Bulk', function () { try { batch.insert({ string: hugeString }); test.ok(false); - } catch (err) {} // eslint-disable-line + } catch (err) { } // eslint-disable-line // Finish up test client.close(done); @@ -1216,34 +1216,27 @@ describe('Bulk', function () { } }); - it('should correctly execute unordered batch using w:0', { - metadata: { requires: { topology: ['single', 'replicaset', 'ssl', 'heap', 'wiredtiger'] } }, + it('should correctly execute unordered batch using w:0', async function () { + await client.connect(); + const db = client.db(); + const col = db.collection('batch_write_ordered_ops_9'); + const bulk = col.initializeUnorderedBulkOp(); + for (let i = 0; i < 100; i++) { + bulk.insert({ a: 1 }); + } - test: function (done) { - client.connect((err, client) => { - const db = client.db(); - const col = db.collection('batch_write_ordered_ops_9'); - const bulk = col.initializeUnorderedBulkOp(); - for (let i = 0; i < 100; i++) { - bulk.insert({ a: 1 }); - } + bulk.find({ b: 1 }).upsert().update({ b: 1 }); + bulk.find({ c: 1 }).delete(); - bulk.find({ b: 1 }).upsert().update({ b: 1 }); - bulk.find({ c: 1 }).delete(); + const result = await bulk.execute({ writeConcern: { w: 0 } }); + test.equal(0, result.upsertedCount); + test.equal(0, result.insertedCount); + test.equal(0, result.matchedCount); + test.ok(0 === result.modifiedCount || result.modifiedCount == null); + test.equal(0, result.deletedCount); + test.equal(false, result.hasWriteErrors()); - bulk.execute({ writeConcern: { w: 0 } }, function (err, result) { - expect(err).to.not.exist; - test.equal(0, result.upsertedCount); - test.equal(0, result.insertedCount); - test.equal(0, result.matchedCount); - test.ok(0 === result.modifiedCount || result.modifiedCount == null); - test.equal(0, result.deletedCount); - test.equal(false, result.hasWriteErrors()); - - client.close(done); - }); - }); - } + await client.close(); }); it('should provide an accessor for operations on ordered bulk ops', function (done) { diff --git a/test/mongodb.ts b/test/mongodb.ts index 45e6a6679c..e9d019d0b1 100644 --- a/test/mongodb.ts +++ b/test/mongodb.ts @@ -163,7 +163,6 @@ export * from '../src/mongo_client'; export * from '../src/mongo_logger'; export * from '../src/mongo_types'; export * from '../src/operations/aggregate'; -export * from '../src/operations/bulk_write'; export * from '../src/operations/client_bulk_write/command_builder'; export * from '../src/operations/client_bulk_write/common'; export * from '../src/operations/client_bulk_write/results_merger';