Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ export class FindOperators {

/** Add a single update operation to the bulk operation */
updateOne(updateDocument: Document | Document[]): BulkOperationBase {
if (!hasAtomicOperators(updateDocument)) {
if (!hasAtomicOperators(updateDocument, this.bulkOperation.bsonOptions)) {
throw new MongoInvalidArgumentError('Update document requires atomic operators');
}

Expand Down Expand Up @@ -1115,7 +1115,7 @@ export abstract class BulkOperationBase {
...op.updateOne,
multi: false
});
if (!hasAtomicOperators(updateStatement.u)) {
if (!hasAtomicOperators(updateStatement.u, this.bsonOptions)) {
throw new MongoInvalidArgumentError('Update document requires atomic operators');
}
return this.addToOperationsList(BatchType.UPDATE, updateStatement);
Expand All @@ -1129,7 +1129,7 @@ export abstract class BulkOperationBase {
...op.updateMany,
multi: true
});
if (!hasAtomicOperators(updateStatement.u)) {
if (!hasAtomicOperators(updateStatement.u, this.bsonOptions)) {
throw new MongoInvalidArgumentError('Update document requires atomic operators');
}
return this.addToOperationsList(BatchType.UPDATE, updateStatement);
Expand Down
37 changes: 23 additions & 14 deletions src/operations/client_bulk_write/command_builder.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BSON, type Document } from '../../bson';
import { BSON, type BSONSerializeOptions, type Document } from '../../bson';
import { DocumentSequence } from '../../cmap/commands';
import { MongoAPIError, MongoInvalidArgumentError } from '../../error';
import { type PkFactory } from '../../mongo_client';
Expand Down Expand Up @@ -128,7 +128,7 @@ export class ClientBulkWriteCommandBuilder {

if (nsIndex != null) {
// Build the operation and serialize it to get the bytes buffer.
const operation = buildOperation(model, nsIndex, this.pkFactory);
const operation = buildOperation(model, nsIndex, this.pkFactory, this.options);
let operationBuffer;
try {
operationBuffer = BSON.serialize(operation);
Expand Down Expand Up @@ -159,7 +159,12 @@ export class ClientBulkWriteCommandBuilder {
// construct our nsInfo and ops documents and buffers.
namespaces.set(ns, currentNamespaceIndex);
const nsInfo = { ns: ns };
const operation = buildOperation(model, currentNamespaceIndex, this.pkFactory);
const operation = buildOperation(
model,
currentNamespaceIndex,
this.pkFactory,
this.options
);
let nsInfoBuffer;
let operationBuffer;
try {
Expand Down Expand Up @@ -339,9 +344,10 @@ export interface ClientUpdateOperation {
*/
export const buildUpdateOneOperation = (
model: ClientUpdateOneModel<Document>,
index: number
index: number,
options: BSONSerializeOptions
): ClientUpdateOperation => {
return createUpdateOperation(model, index, false);
return createUpdateOperation(model, index, false, options);
};

/**
Expand All @@ -352,17 +358,18 @@ export const buildUpdateOneOperation = (
*/
export const buildUpdateManyOperation = (
model: ClientUpdateManyModel<Document>,
index: number
index: number,
options: BSONSerializeOptions
): ClientUpdateOperation => {
return createUpdateOperation(model, index, true);
return createUpdateOperation(model, index, true, options);
};

/**
* Validate the update document.
* @param update - The update document.
*/
function validateUpdate(update: Document) {
if (!hasAtomicOperators(update)) {
function validateUpdate(update: Document, options: BSONSerializeOptions) {
if (!hasAtomicOperators(update, options)) {
throw new MongoAPIError(
'Client bulk write update models must only contain atomic modifiers (start with $) and must not be empty.'
);
Expand All @@ -375,13 +382,14 @@ function validateUpdate(update: Document) {
function createUpdateOperation(
model: ClientUpdateOneModel<Document> | ClientUpdateManyModel<Document>,
index: number,
multi: boolean
multi: boolean,
options: BSONSerializeOptions
): ClientUpdateOperation {
// Update documents provided in UpdateOne and UpdateMany write models are
// required only to contain atomic modifiers (i.e. keys that start with "$").
// Drivers MUST throw an error if an update document is empty or if the
// document's first key does not start with "$".
validateUpdate(model.update);
validateUpdate(model.update, options);
const document: ClientUpdateOperation = {
update: index,
multi: multi,
Expand Down Expand Up @@ -459,7 +467,8 @@ export const buildReplaceOneOperation = (
export function buildOperation(
model: AnyClientBulkWriteModel<Document>,
index: number,
pkFactory: PkFactory
pkFactory: PkFactory,
options: BSONSerializeOptions
): Document {
switch (model.name) {
case 'insertOne':
Expand All @@ -469,9 +478,9 @@ export function buildOperation(
case 'deleteMany':
return buildDeleteManyOperation(model, index);
case 'updateOne':
return buildUpdateOneOperation(model, index);
return buildUpdateOneOperation(model, index, options);
case 'updateMany':
return buildUpdateManyOperation(model, index);
return buildUpdateManyOperation(model, index, options);
case 'replaceOne':
return buildReplaceOneOperation(model, index);
}
Expand Down
2 changes: 1 addition & 1 deletion src/operations/find_and_modify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ export class FindOneAndUpdateOperation extends FindAndModifyOperation {
throw new MongoInvalidArgumentError('Argument "update" must be an object');
}

if (!hasAtomicOperators(update)) {
if (!hasAtomicOperators(update, options)) {
throw new MongoInvalidArgumentError('Update document requires atomic operators');
}

Expand Down
4 changes: 2 additions & 2 deletions src/operations/update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ export class UpdateOneOperation extends UpdateOperation {
options
);

if (!hasAtomicOperators(update)) {
if (!hasAtomicOperators(update, options)) {
throw new MongoInvalidArgumentError('Update document requires atomic operators');
}
}
Expand Down Expand Up @@ -179,7 +179,7 @@ export class UpdateManyOperation extends UpdateOperation {
options
);

if (!hasAtomicOperators(update)) {
if (!hasAtomicOperators(update, options)) {
throw new MongoInvalidArgumentError('Update document requires atomic operators');
}
}
Expand Down
20 changes: 19 additions & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,10 @@ export function calculateDurationInMs(started: number | undefined): number {
}

/** @internal */
export function hasAtomicOperators(doc: Document | Document[]): boolean {
export function hasAtomicOperators(
doc: Document | Document[],
options?: CommandOperationOptions
): boolean {
if (Array.isArray(doc)) {
for (const document of doc) {
if (hasAtomicOperators(document)) {
Expand All @@ -487,6 +490,21 @@ export function hasAtomicOperators(doc: Document | Document[]): boolean {
}

const keys = Object.keys(doc);
// In this case we need to throw if all the atomic operators are undefined.
if (options?.ignoreUndefined) {
let allUndefined = true;
for (const key of keys) {
// eslint-disable-next-line no-restricted-syntax
if (doc[key] !== undefined) {
allUndefined = false;
break;
}
}
if (allUndefined) {
throw new MongoInvalidArgumentError('All atomic operators provided have undefined values.');
}
}

return keys.length > 0 && keys[0][0] === '$';
}

Expand Down
50 changes: 50 additions & 0 deletions test/integration/crud/bulk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,56 @@ describe('Bulk', function () {
client = null;
});

describe('#bulkWrite', function () {
context('when including an update with all undefined atomic operators', function () {
context('when ignoreUndefined is true', function () {
context('when performing an update many', function () {
it('throws an error', async function () {
const collection = client.db('test').collection('test');
const error = await collection
.bulkWrite(
[
{
updateMany: {
filter: { age: { $lte: 5 } },
update: { $set: undefined, $unset: undefined }
}
}
],
{ ignoreUndefined: true }
)
.catch(error => error);
expect(error.message).to.include(
'All atomic operators provided have undefined values.'
);
});
});

context('when performing an update one', function () {
it('throws an error', async function () {
const collection = client.db('test').collection('test');
const error = await collection
.bulkWrite(
[
{
updateOne: {
filter: { age: { $lte: 5 } },
update: { $set: undefined, $unset: undefined }
}
}
],
{ ignoreUndefined: true }
)
.catch(error => error);
expect(error.message).to.include(
'All atomic operators provided have undefined values.'
);
});
});
});
});
});

describe('BulkOperationBase', () => {
describe('#raw()', function () {
context('when called with an undefined operation', function () {
Expand Down
58 changes: 57 additions & 1 deletion test/integration/crud/client_bulk_write.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,63 @@ describe('Client Bulk Write', function () {

afterEach(async function () {
await client?.close();
await clearFailPoint(this.configuration);
await clearFailPoint(this.configuration).catch(() => null);
});

describe('#bulkWrite', function () {
context('when ignoreUndefined is true', function () {
context('when including an update with all undefined atomic operators', function () {
context('when performing an update many', function () {
beforeEach(async function () {
client = this.configuration.newClient();
});

it('throws an error', async function () {
const error = await client
.bulkWrite(
[
{
name: 'updateMany',
namespace: 'foo.bar',
filter: { age: { $lte: 5 } },
update: { $set: undefined, $unset: undefined }
}
],
{ ignoreUndefined: true }
)
.catch(error => error);
expect(error.message).to.include(
'All atomic operators provided have undefined values.'
);
});
});

context('when performing an update one', function () {
beforeEach(async function () {
client = this.configuration.newClient();
});

it('throws an error', async function () {
const error = await client
.bulkWrite(
[
{
name: 'updateOne',
namespace: 'foo.bar',
filter: { age: { $lte: 5 } },
update: { $set: undefined, $unset: undefined }
}
],
{ ignoreUndefined: true }
)
.catch(error => error);
expect(error.message).to.include(
'All atomic operators provided have undefined values.'
);
});
});
});
});
});

describe('CSOT enabled', function () {
Expand Down
Loading