Skip to content

Commit 62ee30c

Browse files
All aworking
1 parent e75c6a0 commit 62ee30c

File tree

5 files changed

+52
-37
lines changed

5 files changed

+52
-37
lines changed

src/bulk/common.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import type { Server } from '../sdam/server';
2020
import type { Topology } from '../sdam/topology';
2121
import type { ClientSession } from '../sessions';
2222
import { type Sort } from '../sort';
23-
import { type TimeoutContext } from '../timeout';
23+
import { TimeoutContext } from '../timeout';
2424
import {
2525
applyRetryableWrites,
2626
getTopology,
@@ -1208,10 +1208,26 @@ export abstract class BulkOperationBase {
12081208
}
12091209

12101210
this.s.executed = true;
1211-
const finalOptions = { ...this.s.options, ...options };
1212-
const operation = new BulkWriteShimOperation(this, finalOptions);
1211+
const finalOptions = resolveOptions(this.collection, { ...this.s.options, ...options });
1212+
1213+
// if there is no timeoutContext provided, create a timeoutContext and use it for
1214+
// all batches in the bulk operation
1215+
finalOptions.timeoutContext ??= TimeoutContext.create({
1216+
session: finalOptions.session,
1217+
timeoutMS: finalOptions.timeoutMS,
1218+
serverSelectionTimeoutMS: this.collection.client.s.options.serverSelectionTimeoutMS,
1219+
waitQueueTimeoutMS: this.collection.client.s.options.waitQueueTimeoutMS
1220+
});
1221+
1222+
if (finalOptions.session == null) {
1223+
// if there is not an explicit session provided to `execute()`, create
1224+
// an implicit session and use that for all batches in the bulk operation
1225+
return await this.collection.client.withSession({ explicit: false }, async session => {
1226+
return await executeCommands(this, { ...finalOptions, session });
1227+
});
1228+
}
12131229

1214-
return await executeOperation(this.s.collection.client, operation, finalOptions.timeoutContext);
1230+
return await executeCommands(this, { ...finalOptions });
12151231
}
12161232

12171233
/**

src/collection.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import {
4242
EstimatedDocumentCountOperation,
4343
type EstimatedDocumentCountOptions
4444
} from './operations/estimated_document_count';
45-
import { executeOperation } from './operations/execute_operation';
45+
import { autoConnect, executeOperation } from './operations/execute_operation';
4646
import type { FindOptions } from './operations/find';
4747
import {
4848
FindOneAndDeleteOperation,
@@ -311,7 +311,7 @@ export class Collection<TSchema extends Document = Document> {
311311
if (!Array.isArray(docs)) {
312312
throw new MongoInvalidArgumentError('Argument "docs" must be an array of documents');
313313
}
314-
options = options ?? {};
314+
options = resolveOptions(this, options ?? {});
315315

316316
const acknowledged = WriteConcern.fromOptions(options)?.w !== 0;
317317

@@ -362,7 +362,14 @@ export class Collection<TSchema extends Document = Document> {
362362
throw new MongoInvalidArgumentError('Argument "operations" must be an array of documents');
363363
}
364364

365-
options = options ?? {};
365+
options = resolveOptions(this, options ?? {});
366+
367+
// TODO(NODE-7071): remove once the client doesn't need to be connected to construct
368+
// bulk operations
369+
const isConnected = this.client.topology != null;
370+
if (!isConnected) {
371+
await autoConnect(this.client);
372+
}
366373

367374
// Create the bulk operation
368375
const bulk: BulkOperationBase =
@@ -371,8 +378,8 @@ export class Collection<TSchema extends Document = Document> {
371378
: this.initializeOrderedBulkOp(options);
372379

373380
// for each op go through and add to the bulk
374-
for (let i = 0; i < operations.length; i++) {
375-
bulk.raw(operations[i]);
381+
for (const operation of operations) {
382+
bulk.raw(operation);
376383
}
377384

378385
// Execute the bulk

src/operations/execute_operation.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ export async function executeOperation<
129129
* Connects a client if it has not yet been connected
130130
* @internal
131131
*/
132-
async function autoConnect(client: MongoClient): Promise<Topology> {
132+
export async function autoConnect(client: MongoClient): Promise<Topology> {
133133
if (client.topology == null) {
134134
if (client.s.hasBeenClosed) {
135135
throw new MongoNotConnectedError('Client must be connected before running operations');

test/integration/crud/bulk.test.ts

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -925,7 +925,7 @@ describe('Bulk', function () {
925925
try {
926926
batch.insert({ string: hugeString });
927927
test.ok(false);
928-
} catch (err) {} // eslint-disable-line
928+
} catch (err) { } // eslint-disable-line
929929

930930
// Finish up test
931931
client.close(done);
@@ -1216,34 +1216,27 @@ describe('Bulk', function () {
12161216
}
12171217
});
12181218

1219-
it('should correctly execute unordered batch using w:0', {
1220-
metadata: { requires: { topology: ['single', 'replicaset', 'ssl', 'heap', 'wiredtiger'] } },
1219+
it('should correctly execute unordered batch using w:0', async function () {
1220+
await client.connect();
1221+
const db = client.db();
1222+
const col = db.collection('batch_write_ordered_ops_9');
1223+
const bulk = col.initializeUnorderedBulkOp();
1224+
for (let i = 0; i < 100; i++) {
1225+
bulk.insert({ a: 1 });
1226+
}
12211227

1222-
test: function (done) {
1223-
client.connect((err, client) => {
1224-
const db = client.db();
1225-
const col = db.collection('batch_write_ordered_ops_9');
1226-
const bulk = col.initializeUnorderedBulkOp();
1227-
for (let i = 0; i < 100; i++) {
1228-
bulk.insert({ a: 1 });
1229-
}
1228+
bulk.find({ b: 1 }).upsert().update({ b: 1 });
1229+
bulk.find({ c: 1 }).delete();
12301230

1231-
bulk.find({ b: 1 }).upsert().update({ b: 1 });
1232-
bulk.find({ c: 1 }).delete();
1231+
const result = await bulk.execute({ writeConcern: { w: 0 } });
1232+
test.equal(0, result.upsertedCount);
1233+
test.equal(0, result.insertedCount);
1234+
test.equal(0, result.matchedCount);
1235+
test.ok(0 === result.modifiedCount || result.modifiedCount == null);
1236+
test.equal(0, result.deletedCount);
1237+
test.equal(false, result.hasWriteErrors());
12331238

1234-
bulk.execute({ writeConcern: { w: 0 } }, function (err, result) {
1235-
expect(err).to.not.exist;
1236-
test.equal(0, result.upsertedCount);
1237-
test.equal(0, result.insertedCount);
1238-
test.equal(0, result.matchedCount);
1239-
test.ok(0 === result.modifiedCount || result.modifiedCount == null);
1240-
test.equal(0, result.deletedCount);
1241-
test.equal(false, result.hasWriteErrors());
1242-
1243-
client.close(done);
1244-
});
1245-
});
1246-
}
1239+
await client.close();
12471240
});
12481241

12491242
it('should provide an accessor for operations on ordered bulk ops', function (done) {

test/mongodb.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,6 @@ export * from '../src/mongo_client';
163163
export * from '../src/mongo_logger';
164164
export * from '../src/mongo_types';
165165
export * from '../src/operations/aggregate';
166-
export * from '../src/operations/bulk_write';
167166
export * from '../src/operations/client_bulk_write/command_builder';
168167
export * from '../src/operations/client_bulk_write/common';
169168
export * from '../src/operations/client_bulk_write/results_merger';

0 commit comments

Comments
 (0)