Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 84 additions & 112 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { promisify } from 'util';

import { type BSONSerializeOptions, type Document, EJSON, resolveBSONOptions } from '../bson';
import type { Collection } from '../collection';
import {
type AnyError,
MongoBatchReExecutionError,
MONGODB_ERROR_CODES,
MongoInvalidArgumentError,
MongoRuntimeError,
MongoServerError,
MongoWriteConcernError
} from '../error';
Expand All @@ -22,7 +21,6 @@ import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import {
applyRetryableWrites,
type Callback,
getTopology,
hasAtomicOperators,
maybeAddIdToDocuments,
Expand Down Expand Up @@ -500,86 +498,46 @@ export function mergeBatchResults(
}
}

function executeCommands(
async function executeCommands(
bulkOperation: BulkOperationBase,
options: BulkWriteOptions,
callback: Callback<BulkWriteResult>
) {
options: BulkWriteOptions
): Promise<BulkWriteResult> {
if (bulkOperation.s.batches.length === 0) {
return callback(
undefined,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
);
return new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
}

const batch = bulkOperation.s.batches.shift() as Batch;
for (const batch of bulkOperation.s.batches) {
const finalOptions = resolveOptions(bulkOperation, {
...options,
ordered: bulkOperation.isOrdered
});

function resultHandler(err?: AnyError, result?: Document) {
// Error is a driver related error not a bulk op error, return early
if (err && 'message' in err && !(err instanceof MongoWriteConcernError)) {
return callback(
new MongoBulkWriteError(
err,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
)
);
if (finalOptions.bypassDocumentValidation !== true) {
delete finalOptions.bypassDocumentValidation;
}

if (err instanceof MongoWriteConcernError) {
return handleMongoWriteConcernError(
batch,
bulkOperation.s.bulkResult,
bulkOperation.isOrdered,
err,
callback
);
// Is the bypassDocumentValidation options specific
if (bulkOperation.s.bypassDocumentValidation === true) {
finalOptions.bypassDocumentValidation = true;
}

// Merge the results together
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, result);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
if (bulkOperation.handleWriteError(callback, writeResult)) return;

// Execute the next command in line
executeCommands(bulkOperation, options, callback);
}

const finalOptions = resolveOptions(bulkOperation, {
...options,
ordered: bulkOperation.isOrdered
});

if (finalOptions.bypassDocumentValidation !== true) {
delete finalOptions.bypassDocumentValidation;
}

// Set an operationIf if provided
if (bulkOperation.operationId) {
resultHandler.operationId = bulkOperation.operationId;
}

// Is the bypassDocumentValidation options specific
if (bulkOperation.s.bypassDocumentValidation === true) {
finalOptions.bypassDocumentValidation = true;
}

// Is the checkKeys option disabled
if (bulkOperation.s.checkKeys === false) {
finalOptions.checkKeys = false;
}

if (finalOptions.retryWrites) {
if (isUpdateBatch(batch)) {
finalOptions.retryWrites = finalOptions.retryWrites && !batch.operations.some(op => op.multi);
// Is the checkKeys option disabled
if (bulkOperation.s.checkKeys === false) {
finalOptions.checkKeys = false;
}

if (isDeleteBatch(batch)) {
finalOptions.retryWrites =
finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0);
if (finalOptions.retryWrites) {
if (isUpdateBatch(batch)) {
finalOptions.retryWrites =
finalOptions.retryWrites && !batch.operations.some(op => op.multi);
}

if (isDeleteBatch(batch)) {
finalOptions.retryWrites =
finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0);
}
}
}

try {
const operation = isInsertBatch(batch)
? new InsertOperation(bulkOperation.s.namespace, batch.operations, finalOptions)
: isUpdateBatch(batch)
Expand All @@ -588,38 +546,61 @@ function executeCommands(
? new DeleteOperation(bulkOperation.s.namespace, batch.operations, finalOptions)
: null;

if (operation != null) {
executeOperation(bulkOperation.s.collection.client, operation).then(
result => resultHandler(undefined, result),
error => resultHandler(error)
);
if (operation == null) throw new MongoRuntimeError(`Unknown batchType: ${batch.batchType}`);

let thrownError = null;
let result;
try {
result = await executeOperation(bulkOperation.s.collection.client, operation);
} catch (error) {
thrownError = error;
}

if (thrownError != null) {
if (!(thrownError instanceof MongoWriteConcernError)) {
// Error is a driver related error not a bulk op error, return early
throw new MongoBulkWriteError(
thrownError,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
);
}

if (thrownError instanceof MongoWriteConcernError) {
handleMongoWriteConcernError(
batch,
bulkOperation.s.bulkResult,
bulkOperation.isOrdered,
thrownError
);
}
}
} catch (err) {
// Force top level error
err.ok = 0;
// Merge top level error and return
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, undefined);
callback();

mergeBatchResults(batch, bulkOperation.s.bulkResult, thrownError, result);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
bulkOperation.handleWriteError(writeResult);
}

bulkOperation.s.batches.length = 0;

const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
bulkOperation.handleWriteError(writeResult);
return writeResult;
}

function handleMongoWriteConcernError(
batch: Batch,
bulkResult: BulkResult,
isOrdered: boolean,
err: MongoWriteConcernError,
callback: Callback<BulkWriteResult>
) {
err: MongoWriteConcernError
): never {
mergeBatchResults(batch, bulkResult, undefined, err.result);

callback(
new MongoBulkWriteError(
{
message: err.result.writeConcernError.errmsg,
code: err.result.writeConcernError.code
},
new BulkWriteResult(bulkResult, isOrdered)
)
throw new MongoBulkWriteError(
{
message: err.result.writeConcernError.errmsg,
code: err.result.writeConcernError.code
},
new BulkWriteResult(bulkResult, isOrdered)
);
}

Expand Down Expand Up @@ -875,8 +856,6 @@ export interface BulkWriteOptions extends CommandOperationOptions {
let?: Document;
}

const executeCommandsAsync = promisify(executeCommands);

/**
* TODO(NODE-4063)
* BulkWrites merge complexity is implemented in executeCommands
Expand All @@ -895,15 +874,15 @@ export class BulkWriteShimOperation extends AbstractOperation {
return 'bulkWrite' as const;
}

execute(_server: Server, session: ClientSession | undefined): Promise<any> {
async execute(_server: Server, session: ClientSession | undefined): Promise<any> {
if (this.options.session == null) {
// An implicit session could have been created by 'executeOperation'
// So if we stick it on finalOptions here, each bulk operation
// will use this same session, it'll be passed in the same way
// an explicit session would be
this.options.session = session;
}
return executeCommandsAsync(this.bulkOperation, this.options);
return await executeCommands(this.bulkOperation, this.options);
}
}

Expand Down Expand Up @@ -1239,33 +1218,26 @@ export abstract class BulkOperationBase {
* Handles the write error before executing commands
* @internal
*/
handleWriteError(callback: Callback<BulkWriteResult>, writeResult: BulkWriteResult): boolean {
handleWriteError(writeResult: BulkWriteResult): void {
if (this.s.bulkResult.writeErrors.length > 0) {
const msg = this.s.bulkResult.writeErrors[0].errmsg
? this.s.bulkResult.writeErrors[0].errmsg
: 'write operation failed';

callback(
new MongoBulkWriteError(
{
message: msg,
code: this.s.bulkResult.writeErrors[0].code,
writeErrors: this.s.bulkResult.writeErrors
},
writeResult
)
throw new MongoBulkWriteError(
{
message: msg,
code: this.s.bulkResult.writeErrors[0].code,
writeErrors: this.s.bulkResult.writeErrors
},
writeResult
);

return true;
}

const writeConcernError = writeResult.getWriteConcernError();
if (writeConcernError) {
callback(new MongoBulkWriteError(writeConcernError, writeResult));
return true;
throw new MongoBulkWriteError(writeConcernError, writeResult);
}

return false;
}

abstract addToOperationsList(
Expand Down
7 changes: 3 additions & 4 deletions src/bulk/unordered.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import type { Collection } from '../collection';
import { MongoInvalidArgumentError } from '../error';
import type { DeleteStatement } from '../operations/delete';
import type { UpdateStatement } from '../operations/update';
import { type Callback } from '../utils';
import {
Batch,
BatchType,
Expand All @@ -20,12 +19,12 @@ export class UnorderedBulkOperation extends BulkOperationBase {
super(collection, options, false);
}

override handleWriteError(callback: Callback, writeResult: BulkWriteResult): boolean {
override handleWriteError(writeResult: BulkWriteResult): void {
if (this.s.batches.length) {
return false;
return;
}

return super.handleWriteError(callback, writeResult);
return super.handleWriteError(writeResult);
}

addToOperationsList(
Expand Down
36 changes: 14 additions & 22 deletions test/integration/crud/crud_api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1093,32 +1093,24 @@ describe('CRUD API', function () {
}
});

it('should correctly throw error on illegal callback when unordered bulkWrite encounters error', {
// Add a tag that our runner can trigger on
// in this case we are setting that node needs to be higher than 0.10.X to run
metadata: {
requires: { topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'] }
},

test: async function () {
const ops = [];
// Create a set of operations that go over the 1000 limit causing two messages
let i = 0;
for (; i < 1005; i++) {
ops.push({ insertOne: { _id: i, a: i } });
}
it('should correctly throw error on illegal callback when unordered bulkWrite encounters error', async function () {
const ops = [];
// Create a set of operations that go over the 1000 limit causing two messages
let i = 0;
for (; i < 1005; i++) {
ops.push({ insertOne: { _id: i, a: i } });
}

ops.push({ insertOne: { _id: 0, a: i } });
ops.push({ insertOne: { _id: 0, a: i } });

const db = client.db();
const db = client.db();

const error = await db
.collection('t20_1')
.bulkWrite(ops, { ordered: false, writeConcern: { w: 1 } })
.catch(error => error);
const error = await db
.collection('t20_1')
.bulkWrite(ops, { ordered: false, writeConcern: { w: 1 } })
.catch(error => error);

expect(error).to.be.instanceOf(MongoError);
}
expect(error).to.be.instanceOf(MongoError);
});

it('should correctly throw error on illegal callback when ordered bulkWrite encounters error', {
Expand Down