Skip to content

Commit 18d6c8f

Browse files
All aworking
1 parent e75c6a0 commit 18d6c8f

File tree

5 files changed

+53
-74
lines changed

5 files changed

+53
-74
lines changed

src/bulk/common.ts

Lines changed: 21 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,11 @@ import type { CollationOptions, CommandOperationOptions } from '../operations/co
1414
import { DeleteOperation, type DeleteStatement, makeDeleteStatement } from '../operations/delete';
1515
import { executeOperation } from '../operations/execute_operation';
1616
import { InsertOperation } from '../operations/insert';
17-
import { AbstractOperation, type Hint } from '../operations/operation';
17+
import { type Hint } from '../operations/operation';
1818
import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../operations/update';
19-
import type { Server } from '../sdam/server';
2019
import type { Topology } from '../sdam/topology';
21-
import type { ClientSession } from '../sessions';
2220
import { type Sort } from '../sort';
23-
import { type TimeoutContext } from '../timeout';
21+
import { TimeoutContext } from '../timeout';
2422
import {
2523
applyRetryableWrites,
2624
getTopology,
@@ -854,40 +852,6 @@ export interface BulkWriteOptions extends CommandOperationOptions {
854852
timeoutContext?: TimeoutContext;
855853
}
856854

857-
/**
858-
* TODO(NODE-4063)
859-
* BulkWrites merge complexity is implemented in executeCommands
860-
* This provides a vehicle to treat bulkOperations like any other operation (hence "shim")
861-
* We would like this logic to simply live inside the BulkWriteOperation class
862-
* @internal
863-
*/
864-
export class BulkWriteShimOperation extends AbstractOperation {
865-
bulkOperation: BulkOperationBase;
866-
constructor(bulkOperation: BulkOperationBase, options: BulkWriteOptions) {
867-
super(options);
868-
this.bulkOperation = bulkOperation;
869-
}
870-
871-
get commandName(): string {
872-
return 'bulkWrite' as const;
873-
}
874-
875-
async execute(
876-
_server: Server,
877-
session: ClientSession | undefined,
878-
timeoutContext: TimeoutContext
879-
): Promise<any> {
880-
if (this.options.session == null) {
881-
// An implicit session could have been created by 'executeOperation'
882-
// So if we stick it on finalOptions here, each bulk operation
883-
// will use this same session, it'll be passed in the same way
884-
// an explicit session would be
885-
this.options.session = session;
886-
}
887-
return await executeCommands(this.bulkOperation, { ...this.options, timeoutContext });
888-
}
889-
}
890-
891855
/** @public */
892856
export abstract class BulkOperationBase {
893857
isOrdered: boolean;
@@ -1208,10 +1172,26 @@ export abstract class BulkOperationBase {
12081172
}
12091173

12101174
this.s.executed = true;
1211-
const finalOptions = { ...this.s.options, ...options };
1212-
const operation = new BulkWriteShimOperation(this, finalOptions);
1175+
const finalOptions = resolveOptions(this.collection, { ...this.s.options, ...options });
1176+
1177+
// if there is no timeoutContext provided, create a timeoutContext and use it for
1178+
// all batches in the bulk operation
1179+
finalOptions.timeoutContext ??= TimeoutContext.create({
1180+
session: finalOptions.session,
1181+
timeoutMS: finalOptions.timeoutMS,
1182+
serverSelectionTimeoutMS: this.collection.client.s.options.serverSelectionTimeoutMS,
1183+
waitQueueTimeoutMS: this.collection.client.s.options.waitQueueTimeoutMS
1184+
});
1185+
1186+
if (finalOptions.session == null) {
1187+
// if there is not an explicit session provided to `execute()`, create
1188+
// an implicit session and use that for all batches in the bulk operation
1189+
return await this.collection.client.withSession({ explicit: false }, async session => {
1190+
return await executeCommands(this, { ...finalOptions, session });
1191+
});
1192+
}
12131193

1214-
return await executeOperation(this.s.collection.client, operation, finalOptions.timeoutContext);
1194+
return await executeCommands(this, { ...finalOptions });
12151195
}
12161196

12171197
/**

src/collection.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import {
4242
EstimatedDocumentCountOperation,
4343
type EstimatedDocumentCountOptions
4444
} from './operations/estimated_document_count';
45-
import { executeOperation } from './operations/execute_operation';
45+
import { autoConnect, executeOperation } from './operations/execute_operation';
4646
import type { FindOptions } from './operations/find';
4747
import {
4848
FindOneAndDeleteOperation,
@@ -311,7 +311,7 @@ export class Collection<TSchema extends Document = Document> {
311311
if (!Array.isArray(docs)) {
312312
throw new MongoInvalidArgumentError('Argument "docs" must be an array of documents');
313313
}
314-
options = options ?? {};
314+
options = resolveOptions(this, options ?? {});
315315

316316
const acknowledged = WriteConcern.fromOptions(options)?.w !== 0;
317317

@@ -362,7 +362,14 @@ export class Collection<TSchema extends Document = Document> {
362362
throw new MongoInvalidArgumentError('Argument "operations" must be an array of documents');
363363
}
364364

365-
options = options ?? {};
365+
options = resolveOptions(this, options ?? {});
366+
367+
// TODO(NODE-7071): remove once the client doesn't need to be connected to construct
368+
// bulk operations
369+
const isConnected = this.client.topology != null;
370+
if (!isConnected) {
371+
await autoConnect(this.client);
372+
}
366373

367374
// Create the bulk operation
368375
const bulk: BulkOperationBase =
@@ -371,8 +378,8 @@ export class Collection<TSchema extends Document = Document> {
371378
: this.initializeOrderedBulkOp(options);
372379

373380
// for each op go through and add to the bulk
374-
for (let i = 0; i < operations.length; i++) {
375-
bulk.raw(operations[i]);
381+
for (const operation of operations) {
382+
bulk.raw(operation);
376383
}
377384

378385
// Execute the bulk

src/operations/execute_operation.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ export async function executeOperation<
129129
* Connects a client if it has not yet been connected
130130
* @internal
131131
*/
132-
async function autoConnect(client: MongoClient): Promise<Topology> {
132+
export async function autoConnect(client: MongoClient): Promise<Topology> {
133133
if (client.topology == null) {
134134
if (client.s.hasBeenClosed) {
135135
throw new MongoNotConnectedError('Client must be connected before running operations');

test/integration/crud/bulk.test.ts

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -925,7 +925,7 @@ describe('Bulk', function () {
925925
try {
926926
batch.insert({ string: hugeString });
927927
test.ok(false);
928-
} catch (err) {} // eslint-disable-line
928+
} catch (err) { } // eslint-disable-line
929929

930930
// Finish up test
931931
client.close(done);
@@ -1216,34 +1216,27 @@ describe('Bulk', function () {
12161216
}
12171217
});
12181218

1219-
it('should correctly execute unordered batch using w:0', {
1220-
metadata: { requires: { topology: ['single', 'replicaset', 'ssl', 'heap', 'wiredtiger'] } },
1219+
it('should correctly execute unordered batch using w:0', async function () {
1220+
await client.connect();
1221+
const db = client.db();
1222+
const col = db.collection('batch_write_ordered_ops_9');
1223+
const bulk = col.initializeUnorderedBulkOp();
1224+
for (let i = 0; i < 100; i++) {
1225+
bulk.insert({ a: 1 });
1226+
}
12211227

1222-
test: function (done) {
1223-
client.connect((err, client) => {
1224-
const db = client.db();
1225-
const col = db.collection('batch_write_ordered_ops_9');
1226-
const bulk = col.initializeUnorderedBulkOp();
1227-
for (let i = 0; i < 100; i++) {
1228-
bulk.insert({ a: 1 });
1229-
}
1228+
bulk.find({ b: 1 }).upsert().update({ b: 1 });
1229+
bulk.find({ c: 1 }).delete();
12301230

1231-
bulk.find({ b: 1 }).upsert().update({ b: 1 });
1232-
bulk.find({ c: 1 }).delete();
1231+
const result = await bulk.execute({ writeConcern: { w: 0 } });
1232+
test.equal(0, result.upsertedCount);
1233+
test.equal(0, result.insertedCount);
1234+
test.equal(0, result.matchedCount);
1235+
test.ok(0 === result.modifiedCount || result.modifiedCount == null);
1236+
test.equal(0, result.deletedCount);
1237+
test.equal(false, result.hasWriteErrors());
12331238

1234-
bulk.execute({ writeConcern: { w: 0 } }, function (err, result) {
1235-
expect(err).to.not.exist;
1236-
test.equal(0, result.upsertedCount);
1237-
test.equal(0, result.insertedCount);
1238-
test.equal(0, result.matchedCount);
1239-
test.ok(0 === result.modifiedCount || result.modifiedCount == null);
1240-
test.equal(0, result.deletedCount);
1241-
test.equal(false, result.hasWriteErrors());
1242-
1243-
client.close(done);
1244-
});
1245-
});
1246-
}
1239+
await client.close();
12471240
});
12481241

12491242
it('should provide an accessor for operations on ordered bulk ops', function (done) {

test/mongodb.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,6 @@ export * from '../src/mongo_client';
163163
export * from '../src/mongo_logger';
164164
export * from '../src/mongo_types';
165165
export * from '../src/operations/aggregate';
166-
export * from '../src/operations/bulk_write';
167166
export * from '../src/operations/client_bulk_write/command_builder';
168167
export * from '../src/operations/client_bulk_write/common';
169168
export * from '../src/operations/client_bulk_write/results_merger';

0 commit comments

Comments
 (0)