Skip to content

Commit d4d4bc6

Browse files
committed
refactor: build commands with connection
1 parent d9a6e72 commit d4d4bc6

File tree

7 files changed

+155
-247
lines changed

7 files changed

+155
-247
lines changed

src/cursor/client_bulk_write_cursor.ts

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import type { Document } from '../bson';
21
import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
32
import { MongoClientBulkWriteCursorError } from '../error';
43
import type { MongoClient } from '../mongo_client';
54
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
5+
import { type ClientBulkWriteCommandBuilder } from '../operations/client_bulk_write/command_builder';
66
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
77
import { executeOperation } from '../operations/execute_operation';
88
import type { ClientSession } from '../sessions';
@@ -24,17 +24,21 @@ export interface ClientBulkWriteCursorOptions
2424
* @internal
2525
*/
2626
export class ClientBulkWriteCursor extends AbstractCursor {
27-
public readonly command: Document;
27+
commandBuilder: ClientBulkWriteCommandBuilder;
2828
/** @internal */
2929
private cursorResponse?: ClientBulkWriteCursorResponse;
3030
/** @internal */
3131
private clientBulkWriteOptions: ClientBulkWriteOptions;
3232

3333
/** @internal */
34-
constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) {
34+
constructor(
35+
client: MongoClient,
36+
commandBuilder: ClientBulkWriteCommandBuilder,
37+
options: ClientBulkWriteOptions = {}
38+
) {
3539
super(client, new MongoDBNamespace('admin', '$cmd'), options);
3640

37-
this.command = command;
41+
this.commandBuilder = commandBuilder;
3842
this.clientBulkWriteOptions = options;
3943
}
4044

@@ -49,17 +53,21 @@ export class ClientBulkWriteCursor extends AbstractCursor {
4953
);
5054
}
5155

56+
get operations(): [] {
57+
return [];
58+
}
59+
5260
clone(): ClientBulkWriteCursor {
5361
const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions);
5462
delete clonedOptions.session;
55-
return new ClientBulkWriteCursor(this.client, this.command, {
63+
return new ClientBulkWriteCursor(this.client, this.commandBuilder, {
5664
...clonedOptions
5765
});
5866
}
5967

6068
/** @internal */
6169
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
62-
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, {
70+
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.commandBuilder, {
6371
...this.clientBulkWriteOptions,
6472
...this.cursorOptions,
6573
session
Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,28 @@
1-
import { type Document } from 'bson';
2-
1+
import { ServerType } from '../../beta';
32
import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
43
import type { Server } from '../../sdam/server';
54
import type { ClientSession } from '../../sessions';
65
import { MongoDBNamespace } from '../../utils';
76
import { CommandOperation } from '../command';
87
import { Aspect, defineAspects } from '../operation';
8+
import { type ClientBulkWriteCommandBuilder } from './command_builder';
99
import { type ClientBulkWriteOptions } from './common';
1010

1111
/**
1212
* Executes a single client bulk write operation within a potential batch.
1313
* @internal
1414
*/
1515
export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCursorResponse> {
16-
command: Document;
16+
commandBuilder: ClientBulkWriteCommandBuilder;
1717
override options: ClientBulkWriteOptions;
1818

1919
override get commandName() {
2020
return 'bulkWrite' as const;
2121
}
2222

23-
constructor(command: Document, options: ClientBulkWriteOptions) {
23+
constructor(commandBuilder: ClientBulkWriteCommandBuilder, options: ClientBulkWriteOptions) {
2424
super(undefined, options);
25-
this.command = command;
25+
this.commandBuilder = commandBuilder;
2626
this.options = options;
2727
this.ns = new MongoDBNamespace('admin', '$cmd');
2828
}
@@ -37,9 +37,34 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
3737
server: Server,
3838
session: ClientSession | undefined
3939
): Promise<ClientBulkWriteCursorResponse> {
40-
return await super.executeCommand(server, session, this.command, ClientBulkWriteCursorResponse);
40+
let command;
41+
42+
if (server.description.type === ServerType.LoadBalancer) {
43+
// Checkout a connection to build the command.
44+
const connection = await server.pool.checkOut();
45+
command = this.commandBuilder.buildBatch(
46+
connection.hello?.maxMessageSizeBytes,
47+
connection.hello?.maxWriteBatchSize
48+
);
49+
// Pin the connection to the session so it get used to execute the command and we do not
50+
// perform a double check-in/check-out.
51+
session?.pin(connection);
52+
} else {
53+
// At this point we have a server and the auto connect code has already
54+
// run in executeOperation, so the server description will be populated.
55+
// We can use that to build the command.
56+
command = this.commandBuilder.buildBatch(
57+
server.description.maxMessageSizeBytes,
58+
server.description.maxWriteBatchSize
59+
);
60+
}
61+
return await super.executeCommand(server, session, command, ClientBulkWriteCursorResponse);
4162
}
4263
}
4364

4465
// Skipping the collation as it goes on the individual ops.
45-
defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION, Aspect.SKIP_COLLATION]);
66+
defineAspects(ClientBulkWriteOperation, [
67+
Aspect.WRITE_OPERATION,
68+
Aspect.SKIP_COLLATION,
69+
Aspect.CURSOR_CREATING
70+
]);

src/operations/client_bulk_write/command_builder.ts

Lines changed: 48 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ export class ClientBulkWriteCommandBuilder {
3838
models: AnyClientBulkWriteModel[];
3939
options: ClientBulkWriteOptions;
4040
pkFactory: PkFactory;
41+
currentModelIndex: number;
4142

4243
/**
4344
* Create the command builder.
@@ -51,6 +52,7 @@ export class ClientBulkWriteCommandBuilder {
5152
this.models = models;
5253
this.options = options;
5354
this.pkFactory = pkFactory ?? DEFAULT_PK_FACTORY;
55+
this.currentModelIndex = 0;
5456
}
5557

5658
/**
@@ -65,68 +67,54 @@ export class ClientBulkWriteCommandBuilder {
6567
}
6668

6769
/**
68-
* Build the bulk write commands from the models.
70+
* Determines if there is another batch to process.
71+
* @returns True if not all batches have been built.
6972
*/
70-
buildCommands(maxMessageSizeBytes: number, maxWriteBatchSize: number): ClientBulkWriteCommand[] {
71-
// Iterate the models to build the ops and nsInfo fields.
72-
// We need to do this in a loop which creates one command each up
73-
// to the max bson size or max message size.
74-
const commands: ClientBulkWriteCommand[] = [];
75-
let currentCommandLength = 0;
73+
hasNextBatch(): boolean {
74+
return this.currentModelIndex < this.models.length;
75+
}
76+
77+
/**
78+
* Build a single batch of a client bulk write command.
79+
* @param maxMessageSizeBytes - The max message size in bytes.
80+
* @param maxWriteBatchSize - The max write batch size.
81+
* @returns The client bulk write command.
82+
*/
83+
buildBatch(maxMessageSizeBytes: number, maxWriteBatchSize: number): ClientBulkWriteCommand {
84+
let commandLength = 0;
7685
let currentNamespaceIndex = 0;
77-
let currentCommand: ClientBulkWriteCommand = this.baseCommand();
86+
const command: ClientBulkWriteCommand = this.baseCommand();
7887
const namespaces = new Map<string, number>();
7988

80-
for (const model of this.models) {
89+
while (this.currentModelIndex < this.models.length) {
90+
const model = this.models[this.currentModelIndex];
8191
const ns = model.namespace;
82-
const index = namespaces.get(ns);
83-
84-
/**
85-
* Convenience function for resetting everything when a new batch
86-
* is started.
87-
*/
88-
const reset = () => {
89-
commands.push(currentCommand);
90-
namespaces.clear();
91-
currentNamespaceIndex = 0;
92-
currentCommand = this.baseCommand();
93-
namespaces.set(ns, currentNamespaceIndex);
94-
};
92+
const nsIndex = namespaces.get(ns);
9593

96-
if (index != null) {
97-
// Pushing to the ops document sequence returns the bytes length added.
98-
const operation = buildOperation(model, index, this.pkFactory);
94+
if (nsIndex != null) {
95+
// Build the operation and serialize it to get the bytes buffer.
96+
const operation = buildOperation(model, nsIndex, this.pkFactory);
9997
const operationBuffer = BSON.serialize(operation);
10098

101-
// Check if the operation buffer can fit in the current command. If it can,
99+
// Check if the operation buffer can fit in the command. If it can,
102100
// then add the operation to the document sequence and increment the
103101
// current length as long as the ops don't exceed the maxWriteBatchSize.
104102
if (
105-
currentCommandLength + operationBuffer.length < maxMessageSizeBytes &&
106-
currentCommand.ops.documents.length < maxWriteBatchSize
103+
commandLength + operationBuffer.length < maxMessageSizeBytes &&
104+
command.ops.documents.length < maxWriteBatchSize
107105
) {
108106
// Pushing to the ops document sequence returns the total byte length of the document sequence.
109-
currentCommandLength =
110-
MESSAGE_OVERHEAD_BYTES + this.addOperation(currentCommand, operation, operationBuffer);
107+
commandLength = MESSAGE_OVERHEAD_BYTES + command.ops.push(operation, operationBuffer);
108+
// Increment the builder's current model index.
109+
this.currentModelIndex++;
111110
} else {
112-
// We need to batch. Push the current command to the commands
113-
// array and create a new current command. We aslo need to clear the namespaces
114-
// map for the new command.
115-
reset();
116-
117-
const nsInfo = { ns: ns };
118-
const nsInfoBuffer = BSON.serialize(nsInfo);
119-
currentCommandLength =
120-
MESSAGE_OVERHEAD_BYTES +
121-
this.addOperationAndNsInfo(
122-
currentCommand,
123-
operation,
124-
operationBuffer,
125-
nsInfo,
126-
nsInfoBuffer
127-
);
111+
// The operation cannot fit in the current command and will need to
112+
// go in the next batch. Exit the loop.
113+
break;
128114
}
129115
} else {
116+
// The namespace is not already in the nsInfo so we will set it in the map, and
117+
// construct our nsInfo and ops documents and buffers.
130118
namespaces.set(ns, currentNamespaceIndex);
131119
const nsInfo = { ns: ns };
132120
const nsInfoBuffer = BSON.serialize(nsInfo);
@@ -138,68 +126,26 @@ export class ClientBulkWriteCommandBuilder {
138126
// sequences and increment the current length as long as the ops don't exceed
139127
// the maxWriteBatchSize.
140128
if (
141-
currentCommandLength + nsInfoBuffer.length + operationBuffer.length <
142-
maxMessageSizeBytes &&
143-
currentCommand.ops.documents.length < maxWriteBatchSize
129+
commandLength + nsInfoBuffer.length + operationBuffer.length < maxMessageSizeBytes &&
130+
command.ops.documents.length < maxWriteBatchSize
144131
) {
145-
currentCommandLength =
132+
// Pushing to the ops document sequence returns the total byte length of the document sequence.
133+
commandLength =
146134
MESSAGE_OVERHEAD_BYTES +
147-
this.addOperationAndNsInfo(
148-
currentCommand,
149-
operation,
150-
operationBuffer,
151-
nsInfo,
152-
nsInfoBuffer
153-
);
135+
command.nsInfo.push(nsInfo, nsInfoBuffer) +
136+
command.ops.push(operation, operationBuffer);
137+
// We've added a new namespace, increment the namespace index.
138+
currentNamespaceIndex++;
139+
// Increment the builder's current model index.
140+
this.currentModelIndex++;
154141
} else {
155-
// We need to batch. Push the current command to the commands
156-
// array and create a new current command. Aslo clear the namespaces map.
157-
reset();
158-
159-
currentCommandLength =
160-
MESSAGE_OVERHEAD_BYTES +
161-
this.addOperationAndNsInfo(
162-
currentCommand,
163-
operation,
164-
operationBuffer,
165-
nsInfo,
166-
nsInfoBuffer
167-
);
142+
// The operation cannot fit in the current command and will need to
143+
// go in the next batch. Exit the loop.
144+
break;
168145
}
169-
// We've added a new namespace, increment the namespace index.
170-
currentNamespaceIndex++;
171146
}
172147
}
173-
174-
// After we've finisihed iterating all the models put the last current command
175-
// only if there are operations in it.
176-
if (currentCommand.ops.documents.length > 0) {
177-
commands.push(currentCommand);
178-
}
179-
180-
return commands;
181-
}
182-
183-
private addOperation(
184-
command: ClientBulkWriteCommand,
185-
operation: Document,
186-
operationBuffer: Uint8Array
187-
): number {
188-
// Pushing to the ops document sequence returns the total byte length of the document sequence.
189-
return command.ops.push(operation, operationBuffer);
190-
}
191-
192-
private addOperationAndNsInfo(
193-
command: ClientBulkWriteCommand,
194-
operation: Document,
195-
operationBuffer: Uint8Array,
196-
nsInfo: Document,
197-
nsInfoBuffer: Uint8Array
198-
): number {
199-
// Pushing to the nsInfo document sequence returns the total byte length of the document sequence.
200-
const nsInfoLength = command.nsInfo.push(nsInfo, nsInfoBuffer);
201-
const opsLength = this.addOperation(command, operation, operationBuffer);
202-
return nsInfoLength + opsLength;
148+
return command;
203149
}
204150

205151
private baseCommand(): ClientBulkWriteCommand {

0 commit comments

Comments
 (0)