Skip to content

Commit 5f261ca

Browse files
committed
feat(NODE-6337): implement client bulk write batching
1 parent bb3bae8 commit 5f261ca

File tree

9 files changed

+438
-192
lines changed

9 files changed

+438
-192
lines changed

src/cmap/commands.ts

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -429,10 +429,67 @@ export interface OpMsgOptions {
429429

430430
/** @internal */
431431
export class DocumentSequence {
432+
field: string;
432433
documents: Document[];
434+
serializedDocumentsLength: number;
435+
private chunks: Uint8Array[];
436+
private header?: Buffer;
433437

434-
constructor(documents: Document[]) {
435-
this.documents = documents;
438+
/**
439+
* Create a new document sequence for the provided field.
440+
* @param field - The field it will replace.
441+
*/
442+
constructor(field: string, documents?: Document[]) {
443+
this.field = field;
444+
this.documents = [];
445+
this.chunks = [];
446+
this.serializedDocumentsLength = 0;
447+
this.init();
448+
if (documents) {
449+
for (const doc of documents) {
450+
this.push(doc, BSON.serialize(doc));
451+
}
452+
}
453+
}
454+
455+
/**
456+
* Initialize the buffer chunks.
457+
*/
458+
private init() {
459+
// Document sequences starts with type 1 at the first byte.
460+
const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1);
461+
buffer[0] = 1;
462+
// Third part is the field name at offset 5 with trailing null byte.
463+
encodeUTF8Into(buffer, `${this.field}\0`, 5);
464+
this.chunks.push(buffer);
465+
this.header = buffer;
466+
}
467+
468+
/**
469+
* Push a document to the document sequence. Will serialize the document
470+
* as well and return the current serialized length of all documents.
471+
* @param document - The document to add.
472+
* @param buffer - The serialized document in raw BSON.
473+
* @returns The serialized documents length.
474+
*/
475+
push(document: Document, buffer: Uint8Array): number {
476+
this.serializedDocumentsLength += buffer.length;
477+
// Push the document.
478+
this.documents.push(document);
479+
// Push the document raw bson.
480+
this.chunks.push(buffer);
481+
// Write the new length.
482+
this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1);
483+
return this.serializedDocumentsLength;
484+
}
485+
486+
/**
487+
* Get the fully serialized bytes for the document sequence section.
488+
* @returns The section bytes.
489+
*/
490+
toBin(): Uint8Array {
491+
// TODO: What to do if no documents?
492+
return Buffer.concat(this.chunks);
436493
}
437494
}
438495

@@ -543,21 +600,7 @@ export class OpMsgRequest {
543600
const chunks = [];
544601
for (const [key, value] of Object.entries(document)) {
545602
if (value instanceof DocumentSequence) {
546-
// Document sequences starts with type 1 at the first byte.
547-
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
548-
buffer[0] = 1;
549-
// Third part is the field name at offset 5 with trailing null byte.
550-
encodeUTF8Into(buffer, `${key}\0`, 5);
551-
chunks.push(buffer);
552-
// Fourth part are the documents' bytes.
553-
let docsLength = 0;
554-
for (const doc of value.documents) {
555-
const docBson = this.serializeBson(doc);
556-
docsLength += docBson.length;
557-
chunks.push(docBson);
558-
}
559-
// Second part of the sequence is the length at offset 1;
560-
buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1);
603+
chunks.push(value.toBin());
561604
// Why are we removing the field from the command? This is because it needs to be
562605
// removed in the OP_MSG request first section, and DocumentSequence is not a
563606
// BSON type and is specific to the MongoDB wire protocol so there's nothing

src/operations/client_bulk_write/command_builder.ts

Lines changed: 87 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { type Document } from '../../bson';
1+
import { BSON, type Document } from '../../bson';
22
import { DocumentSequence } from '../../cmap/commands';
33
import { type PkFactory } from '../../mongo_client';
44
import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types';
@@ -28,6 +28,11 @@ export interface ClientBulkWriteCommand {
2828
comment?: any;
2929
}
3030

31+
/**
32+
* The bytes overhead for the extra fields added post command generation.
33+
*/
34+
const MESSAGE_OVERHEAD_BYTES = 1000;
35+
3136
/** @internal */
3237
export class ClientBulkWriteCommandBuilder {
3338
models: AnyClientBulkWriteModel[];
@@ -62,32 +67,101 @@ export class ClientBulkWriteCommandBuilder {
6267
/**
6368
* Build the bulk write commands from the models.
6469
*/
65-
buildCommands(): ClientBulkWriteCommand[] {
70+
buildCommands(maxMessageSizeBytes?: number): ClientBulkWriteCommand[] {
71+
// If we don't know the maxMessageSizeBytes or for some reason it's 0
72+
// then we cannot calculate the batch.
73+
if (!maxMessageSizeBytes) {
74+
throw new Error('');
75+
}
76+
6677
// Iterate the models to build the ops and nsInfo fields.
67-
const operations = [];
78+
// We need to do this in a loop which creates one command each up
79+
// to the max bson size or max message size.
80+
const commands: ClientBulkWriteCommand[] = [];
81+
let currentCommandLength = MESSAGE_OVERHEAD_BYTES;
6882
let currentNamespaceIndex = 0;
83+
let currentCommand: ClientBulkWriteCommand = this.baseCommand();
6984
const namespaces = new Map<string, number>();
70-
for (const model of this.models) {
85+
for (const [modelIndex, model] of this.models.entries()) {
7186
const ns = model.namespace;
7287
const index = namespaces.get(ns);
7388
if (index != null) {
74-
operations.push(buildOperation(model, index, this.pkFactory));
89+
// Pushing to the ops document sequence returns the bytes length added.
90+
const operation = buildOperation(model, index, this.pkFactory);
91+
const operationBuffer = BSON.serialize(operation);
92+
93+
// Check if the operation buffer can fit in the current command. If it can,
94+
// then add the operation to the document sequence and increment the
95+
// current length.
96+
if (currentCommandLength + operationBuffer.length < maxMessageSizeBytes) {
97+
// Pushing to the ops document sequence returns the bytes length added.
98+
const opsLength = currentCommand.ops.push(operation, operationBuffer);
99+
currentCommandLength += opsLength;
100+
101+
// If this is the last model in the array, push the current command.
102+
if (modelIndex === this.models.length - 1) {
103+
commands.push(currentCommand);
104+
}
105+
} else {
106+
// We need to batch. Push the current command to the commands
107+
// array and create a new current command if there are more models
108+
// that need to be iterated.
109+
commands.push(currentCommand);
110+
if (modelIndex < this.models.length - 1) {
111+
currentCommand = this.baseCommand();
112+
}
113+
}
75114
} else {
76115
namespaces.set(ns, currentNamespaceIndex);
77-
operations.push(buildOperation(model, currentNamespaceIndex, this.pkFactory));
78-
currentNamespaceIndex++;
116+
const nsInfo = { ns: ns };
117+
const nsInfoBuffer = BSON.serialize(nsInfo);
118+
const operation = buildOperation(model, currentNamespaceIndex, this.pkFactory);
119+
const operationBuffer = BSON.serialize(operation);
120+
121+
// Check if the operation and nsInfo buffers can fit in the command. If they
122+
// can, then add the operation and nsInfo to their respective document
123+
// sequences and increment the current length.
124+
if (
125+
currentCommandLength + nsInfoBuffer.length + operationBuffer.length <
126+
maxMessageSizeBytes
127+
) {
128+
// Pushing to the nsInfo document sequence returns the bytes length added.
129+
const nsInfoLength = currentCommand.nsInfo.push(nsInfo, nsInfoBuffer);
130+
currentCommandLength += nsInfoLength;
131+
132+
// Pushing to the ops document sequence returns the bytes length added.
133+
const opsLength = currentCommand.ops.push(operation, operationBuffer);
134+
currentCommandLength += opsLength;
135+
136+
// We've added a new namespace, increment the namespace index.
137+
currentNamespaceIndex++;
138+
139+
// If this is the last model in the array, push the current command.
140+
if (modelIndex === this.models.length - 1) {
141+
commands.push(currentCommand);
142+
}
143+
} else {
144+
// We need to batch. Push the current command to the commands
145+
// array and create a new current command if there are more models
146+
// that need to be iterated.
147+
commands.push(currentCommand);
148+
if (modelIndex < this.models.length - 1) {
149+
currentCommand = this.baseCommand();
150+
}
151+
}
79152
}
80153
}
81154

82-
const nsInfo = Array.from(namespaces.keys(), ns => ({ ns }));
155+
return commands;
156+
}
83157

84-
// The base command.
158+
private baseCommand(): ClientBulkWriteCommand {
85159
const command: ClientBulkWriteCommand = {
86160
bulkWrite: 1,
87161
errorsOnly: this.errorsOnly,
88162
ordered: this.options.ordered ?? true,
89-
ops: new DocumentSequence(operations),
90-
nsInfo: new DocumentSequence(nsInfo)
163+
ops: new DocumentSequence('ops'),
164+
nsInfo: new DocumentSequence('nsInfo')
91165
};
92166
// Add bypassDocumentValidation if it was present in the options.
93167
if (this.options.bypassDocumentValidation != null) {
@@ -103,7 +177,8 @@ export class ClientBulkWriteCommandBuilder {
103177
if (this.options.comment !== undefined) {
104178
command.comment = this.options.comment;
105179
}
106-
return [command];
180+
181+
return command;
107182
}
108183
}
109184

src/operations/client_bulk_write/executor.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ export class ClientBulkWriteExecutor {
5757
this.options,
5858
pkFactory
5959
);
60-
const commands = commandBuilder.buildCommands();
60+
const commands = commandBuilder.buildCommands(
61+
this.client.topology?.description.maxMessageSizeBytes
62+
);
6163
if (this.options.writeConcern?.w === 0) {
6264
return await executeUnacknowledged(this.client, this.options, commands);
6365
}
@@ -75,10 +77,14 @@ async function executeAcknowledged(
7577
): Promise<ClientBulkWriteResult> {
7678
const resultsMerger = new ClientBulkWriteResultsMerger(options);
7779
// For each command will will create and exhaust a cursor for the results.
80+
let currentBatchOffset = 0;
7881
for (const command of commands) {
7982
const cursor = new ClientBulkWriteCursor(client, command, options);
8083
const docs = await cursor.toArray();
81-
resultsMerger.merge(command.ops.documents, cursor.response, docs);
84+
const operations = command.ops.documents;
85+
resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs);
86+
// Set the new batch index so we can back back to the index in the original models.
87+
currentBatchOffset += operations.length;
8288
}
8389
return resultsMerger.result;
8490
}

src/operations/client_bulk_write/results_merger.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,13 @@ export class ClientBulkWriteResultsMerger {
4242

4343
/**
4444
* Merge the results in the cursor to the existing result.
45+
* @param currentBatchOffset - The offset index to the original models.
4546
* @param response - The cursor response.
4647
* @param documents - The documents in the cursor.
4748
* @returns The current result.
4849
*/
4950
merge(
51+
currentBatchOffset: number,
5052
operations: Document[],
5153
response: ClientBulkWriteCursorResponse,
5254
documents: Document[]
@@ -67,7 +69,9 @@ export class ClientBulkWriteResultsMerger {
6769
const operation = operations[document.idx];
6870
// Handle insert results.
6971
if ('insert' in operation) {
70-
this.result.insertResults?.set(document.idx, { insertedId: operation.document._id });
72+
this.result.insertResults?.set(document.idx + currentBatchOffset, {
73+
insertedId: operation.document._id
74+
});
7175
}
7276
// Handle update results.
7377
if ('update' in operation) {
@@ -80,11 +84,13 @@ export class ClientBulkWriteResultsMerger {
8084
if (document.upserted) {
8185
result.upsertedId = document.upserted._id;
8286
}
83-
this.result.updateResults?.set(document.idx, result);
87+
this.result.updateResults?.set(document.idx + currentBatchOffset, result);
8488
}
8589
// Handle delete results.
8690
if ('delete' in operation) {
87-
this.result.deleteResults?.set(document.idx, { deletedCount: document.n });
91+
this.result.deleteResults?.set(document.idx + currentBatchOffset, {
92+
deletedCount: document.n
93+
});
8894
}
8995
}
9096
}

src/sdam/server_description.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ export class ServerDescription {
6969
setVersion: number | null;
7070
electionId: ObjectId | null;
7171
logicalSessionTimeoutMinutes: number | null;
72+
/** The max message size in bytes for the server. */
73+
maxMessageSizeBytes: number;
7274

7375
// NOTE: does this belong here? It seems we should gossip the cluster time at the CMAP level
7476
$clusterTime?: ClusterTime;
@@ -111,6 +113,7 @@ export class ServerDescription {
111113
this.setVersion = hello?.setVersion ?? null;
112114
this.electionId = hello?.electionId ?? null;
113115
this.logicalSessionTimeoutMinutes = hello?.logicalSessionTimeoutMinutes ?? null;
116+
this.maxMessageSizeBytes = hello?.maxMessageSizeBytes ?? 0;
114117
this.primary = hello?.primary ?? null;
115118
this.me = hello?.me?.toLowerCase() ?? null;
116119
this.$clusterTime = hello?.$clusterTime ?? null;

src/sdam/topology_description.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ export class TopologyDescription {
4343
heartbeatFrequencyMS: number;
4444
localThresholdMS: number;
4545
commonWireVersion: number;
46+
maxMessageSizeBytes?: number;
4647

4748
/**
4849
* Create a TopologyDescription
@@ -71,6 +72,16 @@ export class TopologyDescription {
7172

7273
// determine server compatibility
7374
for (const serverDescription of this.servers.values()) {
75+
// Find the lowest maxMessageSizeBytes from all the servers.
76+
if (this.maxMessageSizeBytes == null) {
77+
this.maxMessageSizeBytes = serverDescription.maxMessageSizeBytes;
78+
} else {
79+
this.maxMessageSizeBytes = Math.min(
80+
this.maxMessageSizeBytes,
81+
serverDescription.maxMessageSizeBytes
82+
);
83+
}
84+
7485
// Load balancer mode is always compatible.
7586
if (
7687
serverDescription.type === ServerType.Unknown ||

test/unit/cmap/commands.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ describe('commands', function () {
1515
context('when there is one document sequence', function () {
1616
const command = {
1717
test: 1,
18-
field: new DocumentSequence([{ test: 1 }])
18+
field: new DocumentSequence('field', [{ test: 1 }])
1919
};
2020
const msg = new OpMsgRequest('admin', command, {});
2121
const buffers = msg.toBin();
@@ -53,8 +53,8 @@ describe('commands', function () {
5353
context('when there are multiple document sequences', function () {
5454
const command = {
5555
test: 1,
56-
fieldOne: new DocumentSequence([{ test: 1 }]),
57-
fieldTwo: new DocumentSequence([{ test: 1 }])
56+
fieldOne: new DocumentSequence('fieldOne', [{ test: 1 }]),
57+
fieldTwo: new DocumentSequence('fieldTwo', [{ test: 1 }])
5858
};
5959
const msg = new OpMsgRequest('admin', command, {});
6060
const buffers = msg.toBin();

0 commit comments

Comments
 (0)