Skip to content

refactor(NODE-7061): remove BulkWriteOperation, InsertManyOperation and BulkWriteShimOperation #4602

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 21 additions & 41 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@ import type { CollationOptions, CommandOperationOptions } from '../operations/co
import { DeleteOperation, type DeleteStatement, makeDeleteStatement } from '../operations/delete';
import { executeOperation } from '../operations/execute_operation';
import { InsertOperation } from '../operations/insert';
import { AbstractOperation, type Hint } from '../operations/operation';
import { type Hint } from '../operations/operation';
import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../operations/update';
import type { Server } from '../sdam/server';
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { type Sort } from '../sort';
import { type TimeoutContext } from '../timeout';
import { TimeoutContext } from '../timeout';
import {
applyRetryableWrites,
getTopology,
Expand Down Expand Up @@ -854,40 +852,6 @@ export interface BulkWriteOptions extends CommandOperationOptions {
timeoutContext?: TimeoutContext;
}

/**
* TODO(NODE-4063)
* BulkWrites merge complexity is implemented in executeCommands
* This provides a vehicle to treat bulkOperations like any other operation (hence "shim")
* We would like this logic to simply live inside the BulkWriteOperation class
* @internal
*/
export class BulkWriteShimOperation extends AbstractOperation {
bulkOperation: BulkOperationBase;
constructor(bulkOperation: BulkOperationBase, options: BulkWriteOptions) {
super(options);
this.bulkOperation = bulkOperation;
}

get commandName(): string {
return 'bulkWrite' as const;
}

async execute(
_server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<any> {
if (this.options.session == null) {
// An implicit session could have been created by 'executeOperation'
// So if we stick it on finalOptions here, each bulk operation
// will use this same session, it'll be passed in the same way
// an explicit session would be
this.options.session = session;
}
return await executeCommands(this.bulkOperation, { ...this.options, timeoutContext });
}
}

/** @public */
export abstract class BulkOperationBase {
isOrdered: boolean;
Expand Down Expand Up @@ -1208,10 +1172,26 @@ export abstract class BulkOperationBase {
}

this.s.executed = true;
const finalOptions = { ...this.s.options, ...options };
const operation = new BulkWriteShimOperation(this, finalOptions);
const finalOptions = resolveOptions(this.collection, { ...this.s.options, ...options });

// if there is no timeoutContext provided, create a timeoutContext and use it for
// all batches in the bulk operation
finalOptions.timeoutContext ??= TimeoutContext.create({
session: finalOptions.session,
timeoutMS: finalOptions.timeoutMS,
serverSelectionTimeoutMS: this.collection.client.s.options.serverSelectionTimeoutMS,
waitQueueTimeoutMS: this.collection.client.s.options.waitQueueTimeoutMS
});

if (finalOptions.session == null) {
// if there is not an explicit session provided to `execute()`, create
// an implicit session and use that for all batches in the bulk operation
return await this.collection.client.withSession({ explicit: false }, async session => {
return await executeCommands(this, { ...finalOptions, session });
});
}

return await executeOperation(this.s.collection.client, operation, finalOptions.timeoutContext);
return await executeCommands(this, { ...finalOptions });
}

/**
Expand Down
74 changes: 54 additions & 20 deletions src/collection.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { type BSONSerializeOptions, type Document, resolveBSONOptions } from './bson';
import type { AnyBulkWriteOperation, BulkWriteOptions, BulkWriteResult } from './bulk/common';
import type {
AnyBulkWriteOperation,
BulkOperationBase,
BulkWriteOptions,
BulkWriteResult
} from './bulk/common';
import { OrderedBulkOperation } from './bulk/ordered';
import { UnorderedBulkOperation } from './bulk/unordered';
import { ChangeStream, type ChangeStreamDocument, type ChangeStreamOptions } from './change_stream';
Expand All @@ -24,7 +29,6 @@ import type {
WithoutId
} from './mongo_types';
import type { AggregateOptions } from './operations/aggregate';
import { BulkWriteOperation } from './operations/bulk_write';
import { CountOperation, type CountOptions } from './operations/count';
import {
DeleteManyOperation,
Expand All @@ -38,7 +42,7 @@ import {
EstimatedDocumentCountOperation,
type EstimatedDocumentCountOptions
} from './operations/estimated_document_count';
import { executeOperation } from './operations/execute_operation';
import { autoConnect, executeOperation } from './operations/execute_operation';
import type { FindOptions } from './operations/find';
import {
FindOneAndDeleteOperation,
Expand All @@ -61,7 +65,6 @@ import {
type ListIndexesOptions
} from './operations/indexes';
import {
InsertManyOperation,
type InsertManyResult,
InsertOneOperation,
type InsertOneOptions,
Expand Down Expand Up @@ -305,14 +308,31 @@ export class Collection<TSchema extends Document = Document> {
docs: ReadonlyArray<OptionalUnlessRequiredId<TSchema>>,
options?: BulkWriteOptions
): Promise<InsertManyResult<TSchema>> {
return await executeOperation(
this.client,
new InsertManyOperation(
this as TODO_NODE_3286,
docs,
resolveOptions(this, options ?? { ordered: true })
) as TODO_NODE_3286
);
if (!Array.isArray(docs)) {
throw new MongoInvalidArgumentError('Argument "docs" must be an array of documents');
}
options = resolveOptions(this, options ?? {});

const acknowledged = WriteConcern.fromOptions(options)?.w !== 0;

try {
const res = await this.bulkWrite(
docs.map(doc => ({ insertOne: { document: doc } })),
options
);
return {
acknowledged,
insertedCount: res.insertedCount,
insertedIds: res.insertedIds
};
} catch (err) {
if (err && err.message === 'Operation must be an object with an operation key') {
throw new MongoInvalidArgumentError(
'Collection.insertMany() cannot be called with an array that has null/undefined values'
);
}
throw err;
}
}

/**
Expand Down Expand Up @@ -342,14 +362,28 @@ export class Collection<TSchema extends Document = Document> {
throw new MongoInvalidArgumentError('Argument "operations" must be an array of documents');
}

return await executeOperation(
this.client,
new BulkWriteOperation(
this as TODO_NODE_3286,
operations,
resolveOptions(this, options ?? { ordered: true })
)
);
options = resolveOptions(this, options ?? {});

// TODO(NODE-7071): remove once the client doesn't need to be connected to construct
// bulk operations
const isConnected = this.client.topology != null;
if (!isConnected) {
await autoConnect(this.client);
}

// Create the bulk operation
const bulk: BulkOperationBase =
options.ordered === false
? this.initializeUnorderedBulkOp(options)
: this.initializeOrderedBulkOp(options);

// for each op go through and add to the bulk
for (const operation of operations) {
bulk.raw(operation);
}

// Execute the bulk
return await bulk.execute({ ...options });
}

/**
Expand Down
64 changes: 0 additions & 64 deletions src/operations/bulk_write.ts

This file was deleted.

2 changes: 1 addition & 1 deletion src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ export async function executeOperation<
* Connects a client if it has not yet been connected
* @internal
*/
async function autoConnect(client: MongoClient): Promise<Topology> {
export async function autoConnect(client: MongoClient): Promise<Topology> {
if (client.topology == null) {
if (client.s.hasBeenClosed) {
throw new MongoNotConnectedError('Client must be connected before running operations');
Expand Down
65 changes: 3 additions & 62 deletions src/operations/insert.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import type { Document } from '../bson';
import type { BulkWriteOptions } from '../bulk/common';
import type { Collection } from '../collection';
import { MongoInvalidArgumentError, MongoServerError } from '../error';
import { MongoServerError } from '../error';
import type { InferIdType } from '../mongo_types';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { maybeAddIdToDocuments, type MongoDBNamespace } from '../utils';
import { WriteConcern } from '../write_concern';
import { BulkWriteOperation } from './bulk_write';
import { CommandOperation, type CommandOperationOptions } from './command';
import { AbstractOperation, Aspect, defineAspects } from './operation';
import { Aspect, defineAspects } from './operation';

/** @internal */
export class InsertOperation extends CommandOperation<Document> {
Expand Down Expand Up @@ -73,7 +71,7 @@ export interface InsertOneResult<TSchema = Document> {

export class InsertOneOperation extends InsertOperation {
constructor(collection: Collection, doc: Document, options: InsertOneOptions) {
super(collection.s.namespace, maybeAddIdToDocuments(collection, [doc], options), options);
super(collection.s.namespace, [maybeAddIdToDocuments(collection, doc, options)], options);
}

override async execute(
Expand Down Expand Up @@ -105,62 +103,5 @@ export interface InsertManyResult<TSchema = Document> {
insertedIds: { [key: number]: InferIdType<TSchema> };
}

/** @internal */
export class InsertManyOperation extends AbstractOperation<InsertManyResult> {
override options: BulkWriteOptions;
collection: Collection;
docs: ReadonlyArray<Document>;

constructor(collection: Collection, docs: ReadonlyArray<Document>, options: BulkWriteOptions) {
super(options);

if (!Array.isArray(docs)) {
throw new MongoInvalidArgumentError('Argument "docs" must be an array of documents');
}

this.options = options;
this.collection = collection;
this.docs = docs;
}

override get commandName() {
return 'insert' as const;
}

override async execute(
server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<InsertManyResult> {
const coll = this.collection;
const options = { ...this.options, ...this.bsonOptions, readPreference: this.readPreference };
const writeConcern = WriteConcern.fromOptions(options);
const bulkWriteOperation = new BulkWriteOperation(
coll,
this.docs.map(document => ({
insertOne: { document }
})),
options
);

try {
const res = await bulkWriteOperation.execute(server, session, timeoutContext);
return {
acknowledged: writeConcern?.w !== 0,
insertedCount: res.insertedCount,
insertedIds: res.insertedIds
};
} catch (err) {
if (err && err.message === 'Operation must be an object with an operation key') {
throw new MongoInvalidArgumentError(
'Collection.insertMany() cannot be called with an array that has null/undefined values'
);
}
throw err;
}
}
}

defineAspects(InsertOperation, [Aspect.RETRYABLE, Aspect.WRITE_OPERATION]);
defineAspects(InsertOneOperation, [Aspect.RETRYABLE, Aspect.WRITE_OPERATION]);
defineAspects(InsertManyOperation, [Aspect.WRITE_OPERATION]);
Loading