Skip to content
This repository was archived by the owner on Aug 6, 2025. It is now read-only.

Commit f75c12b

Browse files
authored
DOP-3633: Use bulkWrites for asset upsertion (#814)
1 parent dcb59a1 commit f75c12b

File tree

4 files changed

+38
-22
lines changed

4 files changed

+38
-22
lines changed

modules/persistence/src/services/assets/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import AdmZip from 'adm-zip';
2-
import { upsert } from '../connector';
2+
import { bulkUpsert } from '../connector';
33

44
const COLLECTION_NAME = 'assets';
55

@@ -14,8 +14,8 @@ const assetsFromZip = (zip: AdmZip) => {
1414

1515
export const upsertAssets = async (zip: AdmZip) => {
1616
try {
17-
const assets = await assetsFromZip(zip);
18-
return Promise.all(assets.map((asset) => upsert(asset, COLLECTION_NAME, asset._id)));
17+
const assets = assetsFromZip(zip);
18+
return bulkUpsert(assets, COLLECTION_NAME);
1919
} catch (error) {
2020
console.error(`Error at upsertion time for ${COLLECTION_NAME}: ${error}`);
2121
throw error;

modules/persistence/src/services/connector/index.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
// If no, the helper should be implemented in that service, not here
55

66
import * as mongodb from 'mongodb';
7-
import { ObjectId, Db } from 'mongodb';
7+
import { ObjectId, Db, Document } from 'mongodb';
88
import { db as poolDb } from './pool';
99

1010
// We should only ever have one client active at a time.
@@ -51,15 +51,23 @@ export const insert = async (docs: any[], collection: string, buildId: ObjectId)
5151
};
5252

5353
// Upsert wrapper, requires an _id field.
54-
export const upsert = async (payload: any, collection: string, _id: string | ObjectId) => {
54+
export const bulkUpsert = async (items: Document[], collection: string) => {
5555
const upsertSession = await db();
5656
try {
57-
const query = { _id };
58-
const update = { $set: payload };
59-
const options = { upsert: true };
60-
return await upsertSession.collection(collection).updateOne(query, update, options);
57+
const operations: mongodb.AnyBulkWriteOperation[] = [];
58+
items.forEach((item: Document) => {
59+
const op = {
60+
updateOne: {
61+
filter: { _id: item._id },
62+
update: { $set: item },
63+
upsert: true,
64+
},
65+
};
66+
operations.push(op);
67+
});
68+
return upsertSession.collection(collection).bulkWrite(operations);
6169
} catch (error) {
62-
console.error(`Error at upsertion time for ${collection}: ${error}`);
70+
console.error(`Error at bulk upsertion time for ${collection}: ${error}`);
6371
throw error;
6472
}
6573
};

modules/persistence/src/services/metadata/repos_branches/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { pool, db } from '../../connector';
1+
import { pool } from '../../connector';
22
import { Metadata } from '..';
33
import { project } from '../ToC';
44
import { WithId } from 'mongodb';

modules/persistence/tests/services/connector.test.ts

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import { ObjectID } from 'bson';
2-
import { db, insert, upsert } from '../../src/services/connector';
2+
import { bulkUpsert, db, insert } from '../../src/services/connector';
33

44
const mockConnect = jest.fn();
55
const mockDb = jest.fn();
66
const mockCollection = jest.fn();
77
const mockInsertMany = jest.fn();
8-
const mockUpdateOne = jest.fn();
8+
const mockBulkWrite = jest.fn();
99
const mockClose = jest.fn();
1010

1111
// below is a "jest mock" of a mongodb client
@@ -31,8 +31,8 @@ jest.mock('mongodb', () => ({
3131
async insertMany() {
3232
return mockInsertMany();
3333
}
34-
async updateOne(...args) {
35-
return mockUpdateOne(...args);
34+
async bulkWrite(...args) {
35+
return mockBulkWrite(...args);
3636
}
3737
close() {
3838
mockClose();
@@ -118,20 +118,28 @@ describe('Connector module', () => {
118118
});
119119
});
120120

121-
describe('upsert', () => {
122-
const payload = { name: 'upsert-doc' };
121+
describe('bulkUpsert', () => {
122+
const payload = { _id: 'test-id', name: 'upsert-doc' };
123123
const collection = 'metadata';
124-
const id = 'test-id';
124+
125125
test('it calls on collection to update one with upsert option true', async () => {
126-
await upsert(payload, collection, id);
126+
await bulkUpsert([payload], collection);
127127
expect(mockCollection).toBeCalledWith(collection);
128-
expect(mockUpdateOne).toBeCalledWith({ _id: 'test-id' }, { $set: { name: 'upsert-doc' } }, { upsert: true });
128+
expect(mockBulkWrite).toBeCalledWith([
129+
{
130+
updateOne: {
131+
filter: { _id: payload._id },
132+
update: { $set: payload },
133+
upsert: true,
134+
},
135+
},
136+
]);
129137
});
130138

131139
test('it throws error on updateone error', async () => {
132-
mockUpdateOne.mockRejectedValueOnce(new Error('test error') as never);
140+
mockBulkWrite.mockRejectedValueOnce(new Error('test error') as never);
133141
try {
134-
await upsert(payload, collection, id);
142+
await bulkUpsert([payload], collection);
135143
} catch (e) {
136144
expect(e.message).toEqual('test error');
137145
}

0 commit comments

Comments
 (0)