Skip to content

Commit 5d79902

Browse files
refactor(NODE-7061): remove BulkWriteOperation, InsertManyOperation and BulkWriteShimOperation (#4602)
1 parent 5bfeb97 commit 5d79902

File tree

9 files changed

+108
-251
lines changed

9 files changed

+108
-251
lines changed

src/bulk/common.ts

Lines changed: 21 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,11 @@ import type { CollationOptions, CommandOperationOptions } from '../operations/co
1414
import { DeleteOperation, type DeleteStatement, makeDeleteStatement } from '../operations/delete';
1515
import { executeOperation } from '../operations/execute_operation';
1616
import { InsertOperation } from '../operations/insert';
17-
import { AbstractOperation, type Hint } from '../operations/operation';
17+
import { type Hint } from '../operations/operation';
1818
import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../operations/update';
19-
import type { Server } from '../sdam/server';
2019
import type { Topology } from '../sdam/topology';
21-
import type { ClientSession } from '../sessions';
2220
import { type Sort } from '../sort';
23-
import { type TimeoutContext } from '../timeout';
21+
import { TimeoutContext } from '../timeout';
2422
import {
2523
applyRetryableWrites,
2624
getTopology,
@@ -854,40 +852,6 @@ export interface BulkWriteOptions extends CommandOperationOptions {
854852
timeoutContext?: TimeoutContext;
855853
}
856854

857-
/**
858-
* TODO(NODE-4063)
859-
* BulkWrites merge complexity is implemented in executeCommands
860-
* This provides a vehicle to treat bulkOperations like any other operation (hence "shim")
861-
* We would like this logic to simply live inside the BulkWriteOperation class
862-
* @internal
863-
*/
864-
export class BulkWriteShimOperation extends AbstractOperation {
865-
bulkOperation: BulkOperationBase;
866-
constructor(bulkOperation: BulkOperationBase, options: BulkWriteOptions) {
867-
super(options);
868-
this.bulkOperation = bulkOperation;
869-
}
870-
871-
get commandName(): string {
872-
return 'bulkWrite' as const;
873-
}
874-
875-
async execute(
876-
_server: Server,
877-
session: ClientSession | undefined,
878-
timeoutContext: TimeoutContext
879-
): Promise<any> {
880-
if (this.options.session == null) {
881-
// An implicit session could have been created by 'executeOperation'
882-
// So if we stick it on finalOptions here, each bulk operation
883-
// will use this same session, it'll be passed in the same way
884-
// an explicit session would be
885-
this.options.session = session;
886-
}
887-
return await executeCommands(this.bulkOperation, { ...this.options, timeoutContext });
888-
}
889-
}
890-
891855
/** @public */
892856
export abstract class BulkOperationBase {
893857
isOrdered: boolean;
@@ -1208,10 +1172,26 @@ export abstract class BulkOperationBase {
12081172
}
12091173

12101174
this.s.executed = true;
1211-
const finalOptions = { ...this.s.options, ...options };
1212-
const operation = new BulkWriteShimOperation(this, finalOptions);
1175+
const finalOptions = resolveOptions(this.collection, { ...this.s.options, ...options });
1176+
1177+
// if there is no timeoutContext provided, create a timeoutContext and use it for
1178+
// all batches in the bulk operation
1179+
finalOptions.timeoutContext ??= TimeoutContext.create({
1180+
session: finalOptions.session,
1181+
timeoutMS: finalOptions.timeoutMS,
1182+
serverSelectionTimeoutMS: this.collection.client.s.options.serverSelectionTimeoutMS,
1183+
waitQueueTimeoutMS: this.collection.client.s.options.waitQueueTimeoutMS
1184+
});
1185+
1186+
if (finalOptions.session == null) {
1187+
// if there is not an explicit session provided to `execute()`, create
1188+
// an implicit session and use that for all batches in the bulk operation
1189+
return await this.collection.client.withSession({ explicit: false }, async session => {
1190+
return await executeCommands(this, { ...finalOptions, session });
1191+
});
1192+
}
12131193

1214-
return await executeOperation(this.s.collection.client, operation, finalOptions.timeoutContext);
1194+
return await executeCommands(this, { ...finalOptions });
12151195
}
12161196

12171197
/**

src/collection.ts

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import { type BSONSerializeOptions, type Document, resolveBSONOptions } from './bson';
2-
import type { AnyBulkWriteOperation, BulkWriteOptions, BulkWriteResult } from './bulk/common';
2+
import type {
3+
AnyBulkWriteOperation,
4+
BulkOperationBase,
5+
BulkWriteOptions,
6+
BulkWriteResult
7+
} from './bulk/common';
38
import { OrderedBulkOperation } from './bulk/ordered';
49
import { UnorderedBulkOperation } from './bulk/unordered';
510
import { ChangeStream, type ChangeStreamDocument, type ChangeStreamOptions } from './change_stream';
@@ -24,7 +29,6 @@ import type {
2429
WithoutId
2530
} from './mongo_types';
2631
import type { AggregateOptions } from './operations/aggregate';
27-
import { BulkWriteOperation } from './operations/bulk_write';
2832
import { CountOperation, type CountOptions } from './operations/count';
2933
import {
3034
DeleteManyOperation,
@@ -38,7 +42,7 @@ import {
3842
EstimatedDocumentCountOperation,
3943
type EstimatedDocumentCountOptions
4044
} from './operations/estimated_document_count';
41-
import { executeOperation } from './operations/execute_operation';
45+
import { autoConnect, executeOperation } from './operations/execute_operation';
4246
import type { FindOptions } from './operations/find';
4347
import {
4448
FindOneAndDeleteOperation,
@@ -61,7 +65,6 @@ import {
6165
type ListIndexesOptions
6266
} from './operations/indexes';
6367
import {
64-
InsertManyOperation,
6568
type InsertManyResult,
6669
InsertOneOperation,
6770
type InsertOneOptions,
@@ -305,14 +308,31 @@ export class Collection<TSchema extends Document = Document> {
305308
docs: ReadonlyArray<OptionalUnlessRequiredId<TSchema>>,
306309
options?: BulkWriteOptions
307310
): Promise<InsertManyResult<TSchema>> {
308-
return await executeOperation(
309-
this.client,
310-
new InsertManyOperation(
311-
this as TODO_NODE_3286,
312-
docs,
313-
resolveOptions(this, options ?? { ordered: true })
314-
) as TODO_NODE_3286
315-
);
311+
if (!Array.isArray(docs)) {
312+
throw new MongoInvalidArgumentError('Argument "docs" must be an array of documents');
313+
}
314+
options = resolveOptions(this, options ?? {});
315+
316+
const acknowledged = WriteConcern.fromOptions(options)?.w !== 0;
317+
318+
try {
319+
const res = await this.bulkWrite(
320+
docs.map(doc => ({ insertOne: { document: doc } })),
321+
options
322+
);
323+
return {
324+
acknowledged,
325+
insertedCount: res.insertedCount,
326+
insertedIds: res.insertedIds
327+
};
328+
} catch (err) {
329+
if (err && err.message === 'Operation must be an object with an operation key') {
330+
throw new MongoInvalidArgumentError(
331+
'Collection.insertMany() cannot be called with an array that has null/undefined values'
332+
);
333+
}
334+
throw err;
335+
}
316336
}
317337

318338
/**
@@ -342,14 +362,28 @@ export class Collection<TSchema extends Document = Document> {
342362
throw new MongoInvalidArgumentError('Argument "operations" must be an array of documents');
343363
}
344364

345-
return await executeOperation(
346-
this.client,
347-
new BulkWriteOperation(
348-
this as TODO_NODE_3286,
349-
operations,
350-
resolveOptions(this, options ?? { ordered: true })
351-
)
352-
);
365+
options = resolveOptions(this, options ?? {});
366+
367+
// TODO(NODE-7071): remove once the client doesn't need to be connected to construct
368+
// bulk operations
369+
const isConnected = this.client.topology != null;
370+
if (!isConnected) {
371+
await autoConnect(this.client);
372+
}
373+
374+
// Create the bulk operation
375+
const bulk: BulkOperationBase =
376+
options.ordered === false
377+
? this.initializeUnorderedBulkOp(options)
378+
: this.initializeOrderedBulkOp(options);
379+
380+
// for each op go through and add to the bulk
381+
for (const operation of operations) {
382+
bulk.raw(operation);
383+
}
384+
385+
// Execute the bulk
386+
return await bulk.execute({ ...options });
353387
}
354388

355389
/**

src/operations/bulk_write.ts

Lines changed: 0 additions & 64 deletions
This file was deleted.

src/operations/execute_operation.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ export async function executeOperation<
129129
* Connects a client if it has not yet been connected
130130
* @internal
131131
*/
132-
async function autoConnect(client: MongoClient): Promise<Topology> {
132+
export async function autoConnect(client: MongoClient): Promise<Topology> {
133133
if (client.topology == null) {
134134
if (client.s.hasBeenClosed) {
135135
throw new MongoNotConnectedError('Client must be connected before running operations');

src/operations/insert.ts

Lines changed: 3 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
import type { Document } from '../bson';
22
import type { BulkWriteOptions } from '../bulk/common';
33
import type { Collection } from '../collection';
4-
import { MongoInvalidArgumentError, MongoServerError } from '../error';
4+
import { MongoServerError } from '../error';
55
import type { InferIdType } from '../mongo_types';
66
import type { Server } from '../sdam/server';
77
import type { ClientSession } from '../sessions';
88
import { type TimeoutContext } from '../timeout';
99
import { maybeAddIdToDocuments, type MongoDBNamespace } from '../utils';
10-
import { WriteConcern } from '../write_concern';
11-
import { BulkWriteOperation } from './bulk_write';
1210
import { CommandOperation, type CommandOperationOptions } from './command';
13-
import { AbstractOperation, Aspect, defineAspects } from './operation';
11+
import { Aspect, defineAspects } from './operation';
1412

1513
/** @internal */
1614
export class InsertOperation extends CommandOperation<Document> {
@@ -73,7 +71,7 @@ export interface InsertOneResult<TSchema = Document> {
7371

7472
export class InsertOneOperation extends InsertOperation {
7573
constructor(collection: Collection, doc: Document, options: InsertOneOptions) {
76-
super(collection.s.namespace, maybeAddIdToDocuments(collection, [doc], options), options);
74+
super(collection.s.namespace, [maybeAddIdToDocuments(collection, doc, options)], options);
7775
}
7876

7977
override async execute(
@@ -105,62 +103,5 @@ export interface InsertManyResult<TSchema = Document> {
105103
insertedIds: { [key: number]: InferIdType<TSchema> };
106104
}
107105

108-
/** @internal */
109-
export class InsertManyOperation extends AbstractOperation<InsertManyResult> {
110-
override options: BulkWriteOptions;
111-
collection: Collection;
112-
docs: ReadonlyArray<Document>;
113-
114-
constructor(collection: Collection, docs: ReadonlyArray<Document>, options: BulkWriteOptions) {
115-
super(options);
116-
117-
if (!Array.isArray(docs)) {
118-
throw new MongoInvalidArgumentError('Argument "docs" must be an array of documents');
119-
}
120-
121-
this.options = options;
122-
this.collection = collection;
123-
this.docs = docs;
124-
}
125-
126-
override get commandName() {
127-
return 'insert' as const;
128-
}
129-
130-
override async execute(
131-
server: Server,
132-
session: ClientSession | undefined,
133-
timeoutContext: TimeoutContext
134-
): Promise<InsertManyResult> {
135-
const coll = this.collection;
136-
const options = { ...this.options, ...this.bsonOptions, readPreference: this.readPreference };
137-
const writeConcern = WriteConcern.fromOptions(options);
138-
const bulkWriteOperation = new BulkWriteOperation(
139-
coll,
140-
this.docs.map(document => ({
141-
insertOne: { document }
142-
})),
143-
options
144-
);
145-
146-
try {
147-
const res = await bulkWriteOperation.execute(server, session, timeoutContext);
148-
return {
149-
acknowledged: writeConcern?.w !== 0,
150-
insertedCount: res.insertedCount,
151-
insertedIds: res.insertedIds
152-
};
153-
} catch (err) {
154-
if (err && err.message === 'Operation must be an object with an operation key') {
155-
throw new MongoInvalidArgumentError(
156-
'Collection.insertMany() cannot be called with an array that has null/undefined values'
157-
);
158-
}
159-
throw err;
160-
}
161-
}
162-
}
163-
164106
defineAspects(InsertOperation, [Aspect.RETRYABLE, Aspect.WRITE_OPERATION]);
165107
defineAspects(InsertOneOperation, [Aspect.RETRYABLE, Aspect.WRITE_OPERATION]);
166-
defineAspects(InsertManyOperation, [Aspect.WRITE_OPERATION]);

0 commit comments

Comments
 (0)