Skip to content

Commit ca924df

Browse files
committed
feat(NODE-6329): client bulk write happy path
1 parent 8b0f354 commit ca924df

File tree

21 files changed

+2265
-5
lines changed

21 files changed

+2265
-5
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import type { Document } from '../bson';
2+
import type { MongoClient } from '../mongo_client';
3+
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
4+
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
5+
import { executeOperation } from '../operations/execute_operation';
6+
import type { ClientSession } from '../sessions';
7+
import { mergeOptions, MongoDBNamespace } from '../utils';
8+
import {
9+
AbstractCursor,
10+
type AbstractCursorOptions,
11+
type InitialCursorResponse
12+
} from './abstract_cursor';
13+
14+
/** @public */
15+
export interface ClientBulkWriteCursorOptions
16+
extends AbstractCursorOptions,
17+
ClientBulkWriteOptions {}
18+
19+
/**
20+
* @public
21+
*/
22+
export class ClientBulkWriteCursor extends AbstractCursor {
23+
public readonly command: Document;
24+
/** @internal */
25+
private clientBulkWriteOptions: ClientBulkWriteOptions;
26+
27+
/** @internal */
28+
constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) {
29+
super(client, new MongoDBNamespace('admin'), options);
30+
31+
this.command = command;
32+
this.clientBulkWriteOptions = options;
33+
}
34+
35+
clone(): ClientBulkWriteCursor {
36+
const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions);
37+
delete clonedOptions.session;
38+
return new ClientBulkWriteCursor(this.client, this.command, {
39+
...clonedOptions
40+
});
41+
}
42+
43+
/** @internal */
44+
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
45+
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, {
46+
...this.clientBulkWriteOptions,
47+
...this.cursorOptions,
48+
session
49+
});
50+
51+
const response = await executeOperation(this.client, clientBulkWriteOperation);
52+
53+
return { server: clientBulkWriteOperation.server, session, response };
54+
}
55+
}

src/index.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,18 @@ export type {
468468
AggregateOptions,
469469
DB_AGGREGATE_COLLECTION
470470
} from './operations/aggregate';
471+
export type {
472+
AnyClientBulkWriteModel,
473+
ClientBulkWriteOptions,
474+
ClientBulkWriteResult,
475+
ClientDeleteManyModel,
476+
ClientDeleteOneModel,
477+
ClientInsertOneModel,
478+
ClientReplaceOneModel,
479+
ClientUpdateManyModel,
480+
ClientUpdateOneModel,
481+
ClientWriteModel
482+
} from './operations/client_bulk_write/common';
471483
export type {
472484
CollationOptions,
473485
CommandOperation,

src/mongo_client.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ import {
3030
SeverityLevel
3131
} from './mongo_logger';
3232
import { TypedEventEmitter } from './mongo_types';
33+
import {
34+
type AnyClientBulkWriteModel,
35+
type ClientBulkWriteOptions,
36+
type ClientBulkWriteResult
37+
} from './operations/client_bulk_write/common';
38+
import { ClientBulkWriteExecutor } from './operations/client_bulk_write/executor';
3339
import { executeOperation } from './operations/execute_operation';
3440
import { RunAdminCommandOperation } from './operations/run_command';
3541
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
@@ -477,6 +483,18 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
477483
return this.s.bsonOptions;
478484
}
479485

486+
/**
487+
* Executes a client bulk write operation, available on server 8.0+.
488+
* @param models - The client bulk write models.
489+
* @param options - The client bulk write options.
490+
*/
491+
async bulkWrite(
492+
models: AnyClientBulkWriteModel[],
493+
options?: ClientBulkWriteOptions
494+
): Promise<ClientBulkWriteResult> {
495+
return await new ClientBulkWriteExecutor(this, models, options).execute();
496+
}
497+
480498
/**
481499
* Connect to MongoDB using a url
482500
*
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { type Document } from 'bson';
2+
3+
import { CursorResponse } from '../../cmap/wire_protocol/responses';
4+
import type { Server } from '../../sdam/server';
5+
import type { ClientSession } from '../../sessions';
6+
import { AbstractOperation, Aspect, defineAspects } from '../operation';
7+
import { type ClientBulkWriteOptions } from './common';
8+
9+
export class ClientBulkWriteOperation extends AbstractOperation<CursorResponse> {
10+
command: Document;
11+
override options: ClientBulkWriteOptions;
12+
13+
override get commandName() {
14+
return 'bulkWrite' as const;
15+
}
16+
17+
constructor(command: Document, options: ClientBulkWriteOptions) {
18+
super(options);
19+
this.command = command;
20+
this.options = options;
21+
}
22+
23+
override async execute(
24+
server: Server,
25+
session: ClientSession | undefined
26+
): Promise<CursorResponse> {
27+
return await server.command(
28+
this.ns,
29+
this.command,
30+
{
31+
...this.options,
32+
...this.bsonOptions,
33+
documentsReturnedIn: 'firstBatch',
34+
session
35+
},
36+
CursorResponse
37+
);
38+
}
39+
}
40+
41+
defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION]);

src/operations/client_bulk_write/common.ts

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,77 @@ export type AnyClientBulkWriteModel =
144144
| ClientUpdateManyModel
145145
| ClientDeleteOneModel
146146
| ClientDeleteManyModel;
147+
148+
/** @public */
149+
export interface ClientBulkWriteResult {
150+
/**
151+
* The total number of documents inserted across all insert operations.
152+
*/
153+
insertedCount: number;
154+
/**
155+
* The total number of documents upserted across all update operations.
156+
*/
157+
upsertedCount: number;
158+
/**
159+
* The total number of documents matched across all update operations.
160+
*/
161+
matchedCount: number;
162+
/**
163+
* The total number of documents modified across all update operations.
164+
*/
165+
modifiedCount: number;
166+
/**
167+
* The total number of documents deleted across all delete operations.
168+
*/
169+
deletedCount: number;
170+
}
171+
172+
export interface VerboseClientBulkWriteResult extends ClientBulkWriteResult {
173+
/**
174+
* The results of each individual insert operation that was successfully performed.
175+
*/
176+
insertResults: Map<number, ClientInsertOneResult>;
177+
/**
178+
* The results of each individual update operation that was successfully performed.
179+
*/
180+
updateResults: Map<number, ClientUpdateResult>;
181+
/**
182+
* The results of each individual delete operation that was successfully performed.
183+
*/
184+
deleteResults: Map<number, ClientDeleteResult>;
185+
}
186+
187+
export interface ClientInsertOneResult {
188+
/**
189+
* The _id of the inserted document.
190+
*/
191+
insertedId: any;
192+
}
193+
194+
export interface ClientUpdateResult {
195+
/**
196+
* The number of documents that matched the filter.
197+
*/
198+
matchedCount: number;
199+
200+
/**
201+
* The number of documents that were modified.
202+
*/
203+
modifiedCount: number;
204+
205+
/**
206+
* The _id field of the upserted document if an upsert occurred.
207+
*
208+
* It MUST be possible to discern between a BSON Null upserted ID value and this field being
209+
* unset. If necessary, drivers MAY add a didUpsert boolean field to differentiate between
210+
* these two cases.
211+
*/
212+
upsertedId?: any;
213+
}
214+
215+
export interface ClientDeleteResult {
216+
/**
217+
* The number of documents that were deleted.
218+
*/
219+
deletedCount: number;
220+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor';
2+
import { type MongoClient } from '../../mongo_client';
3+
import { ClientBulkWriteCommandBuilder } from './command_builder';
4+
import {
5+
type AnyClientBulkWriteModel,
6+
type ClientBulkWriteOptions,
7+
type ClientBulkWriteResult
8+
} from './common';
9+
import { ClientBulkWriteResultsMerger } from './results_merger';
10+
11+
export class ClientBulkWriteExecutor {
12+
client: MongoClient;
13+
options: ClientBulkWriteOptions;
14+
operations: AnyClientBulkWriteModel[];
15+
16+
constructor(
17+
client: MongoClient,
18+
operations: AnyClientBulkWriteModel[],
19+
options?: ClientBulkWriteOptions
20+
) {
21+
this.client = client;
22+
this.options = options || {};
23+
this.operations = operations;
24+
}
25+
26+
async execute(): Promise<ClientBulkWriteResult> {
27+
const commmandBuilder = new ClientBulkWriteCommandBuilder(this.operations, this.options);
28+
const commands = commmandBuilder.buildCommands();
29+
const resultsMerger = new ClientBulkWriteResultsMerger(this.options);
30+
for (const command of commands) {
31+
const cursor = new ClientBulkWriteCursor(this.client, command, this.options);
32+
for (const docs of await cursor.toArray()) {
33+
resultsMerger.merge(docs);
34+
}
35+
}
36+
return resultsMerger.result;
37+
}
38+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { type Document } from '../../bson';
2+
import { type ClientBulkWriteOptions, type ClientBulkWriteResult } from './common';
3+
4+
export class ClientBulkWriteResultsMerger {
5+
result: ClientBulkWriteResult;
6+
options: ClientBulkWriteOptions;
7+
8+
constructor(options: ClientBulkWriteOptions) {
9+
this.options = options;
10+
this.result = {
11+
insertedCount: 0,
12+
upsertedCount: 0,
13+
matchedCount: 0,
14+
modifiedCount: 0,
15+
deletedCount: 0
16+
};
17+
}
18+
19+
merge(documents: Document[]): ClientBulkWriteResultsMerger {
20+
for (const document of documents) {
21+
document;
22+
}
23+
return this;
24+
}
25+
}

test/integration/crud/crud.spec.test.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,14 @@ import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';
55

66
const clientBulkWriteTests = new RegExp(
77
[
8-
'client bulk write delete with collation',
9-
'client bulk write delete with hint',
108
'client bulkWrite operations support errorResponse assertions',
119
'an individual operation fails during an ordered bulkWrite',
1210
'an individual operation fails during an unordered bulkWrite',
1311
'detailed results are omitted from error when verboseResults is false',
1412
'a top-level failure occurs during a bulkWrite',
1513
'a bulk write with only errors does not report a partial result',
1614
'an empty list of write models is a client-side error',
17-
'a write concern error occurs during a bulkWrite',
18-
'client bulkWrite'
15+
'a write concern error occurs during a bulkWrite'
1916
].join('|')
2017
);
2118

test/mongodb.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ export * from '../src/operations/aggregate';
159159
export * from '../src/operations/bulk_write';
160160
export * from '../src/operations/client_bulk_write/command_builder';
161161
export * from '../src/operations/client_bulk_write/common';
162+
export * from '../src/operations/client_bulk_write/results_merger';
162163
export * from '../src/operations/collections';
163164
export * from '../src/operations/command';
164165
export * from '../src/operations/count';

0 commit comments

Comments
 (0)