Skip to content

Commit 643088b

Browse files
Merge branch 'main' into NODE-7083
2 parents 87c403c + 25dd18e commit 643088b

File tree

18 files changed

+455
-422
lines changed

18 files changed

+455
-422
lines changed

src/collection.ts

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -402,12 +402,7 @@ export class Collection<TSchema extends Document = Document> {
402402
): Promise<UpdateResult<TSchema>> {
403403
return await executeOperation(
404404
this.client,
405-
new UpdateOneOperation(
406-
this as TODO_NODE_3286,
407-
filter,
408-
update,
409-
resolveOptions(this, options)
410-
) as TODO_NODE_3286
405+
new UpdateOneOperation(this.s.namespace, filter, update, resolveOptions(this, options))
411406
);
412407
}
413408

@@ -425,12 +420,7 @@ export class Collection<TSchema extends Document = Document> {
425420
): Promise<UpdateResult<TSchema>> {
426421
return await executeOperation(
427422
this.client,
428-
new ReplaceOneOperation(
429-
this as TODO_NODE_3286,
430-
filter,
431-
replacement,
432-
resolveOptions(this, options)
433-
)
423+
new ReplaceOneOperation(this.s.namespace, filter, replacement, resolveOptions(this, options))
434424
);
435425
}
436426

@@ -452,12 +442,7 @@ export class Collection<TSchema extends Document = Document> {
452442
): Promise<UpdateResult<TSchema>> {
453443
return await executeOperation(
454444
this.client,
455-
new UpdateManyOperation(
456-
this as TODO_NODE_3286,
457-
filter,
458-
update,
459-
resolveOptions(this, options)
460-
) as TODO_NODE_3286
445+
new UpdateManyOperation(this.s.namespace, filter, update, resolveOptions(this, options))
461446
);
462447
}
463448

@@ -473,7 +458,7 @@ export class Collection<TSchema extends Document = Document> {
473458
): Promise<DeleteResult> {
474459
return await executeOperation(
475460
this.client,
476-
new DeleteOneOperation(this as TODO_NODE_3286, filter, resolveOptions(this, options))
461+
new DeleteOneOperation(this.s.namespace, filter, resolveOptions(this, options))
477462
);
478463
}
479464

@@ -489,7 +474,7 @@ export class Collection<TSchema extends Document = Document> {
489474
): Promise<DeleteResult> {
490475
return await executeOperation(
491476
this.client,
492-
new DeleteManyOperation(this as TODO_NODE_3286, filter, resolveOptions(this, options))
477+
new DeleteManyOperation(this.s.namespace, filter, resolveOptions(this, options))
493478
);
494479
}
495480

@@ -513,7 +498,7 @@ export class Collection<TSchema extends Document = Document> {
513498
...options,
514499
readPreference: ReadPreference.PRIMARY
515500
})
516-
) as TODO_NODE_3286
501+
)
517502
);
518503
}
519504

@@ -581,7 +566,7 @@ export class Collection<TSchema extends Document = Document> {
581566
this.client,
582567
this.s.namespace,
583568
filter,
584-
resolveOptions(this as TODO_NODE_3286, options)
569+
resolveOptions(this, options)
585570
);
586571
}
587572

src/operations/client_bulk_write/client_bulk_write.ts

Lines changed: 23 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
1-
import { MongoClientBulkWriteExecutionError, ServerType } from '../../beta';
1+
import { type Connection } from '../../cmap/connection';
22
import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
3-
import type { Server } from '../../sdam/server';
43
import type { ClientSession } from '../../sessions';
5-
import { type TimeoutContext } from '../../timeout';
64
import { MongoDBNamespace } from '../../utils';
7-
import { CommandOperation } from '../command';
5+
import { ModernizedCommandOperation } from '../command';
86
import { Aspect, defineAspects } from '../operation';
9-
import { type ClientBulkWriteCommandBuilder } from './command_builder';
7+
import { type ClientBulkWriteCommand, type ClientBulkWriteCommandBuilder } from './command_builder';
108
import { type ClientBulkWriteOptions } from './common';
119

1210
/**
1311
* Executes a single client bulk write operation within a potential batch.
1412
* @internal
1513
*/
16-
export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCursorResponse> {
14+
export class ClientBulkWriteOperation extends ModernizedCommandOperation<ClientBulkWriteCursorResponse> {
15+
override SERVER_COMMAND_RESPONSE_TYPE = ClientBulkWriteCursorResponse;
16+
1717
commandBuilder: ClientBulkWriteCommandBuilder;
1818
override options: ClientBulkWriteOptions;
1919

@@ -36,72 +36,28 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
3636
return this.commandBuilder.isBatchRetryable;
3737
}
3838

39-
/**
40-
* Execute the command. Superclass will handle write concern, etc.
41-
* @param server - The server.
42-
* @param session - The session.
43-
* @returns The response.
44-
*/
45-
override async execute(
46-
server: Server,
47-
session: ClientSession | undefined,
48-
timeoutContext: TimeoutContext
49-
): Promise<ClientBulkWriteCursorResponse> {
50-
let command;
39+
override handleOk(
40+
response: InstanceType<typeof this.SERVER_COMMAND_RESPONSE_TYPE>
41+
): ClientBulkWriteCursorResponse {
42+
return response;
43+
}
5144

52-
if (server.description.type === ServerType.LoadBalancer) {
53-
if (session) {
54-
let connection;
55-
if (!session.pinnedConnection) {
56-
// Checkout a connection to build the command.
57-
connection = await server.pool.checkOut({ timeoutContext });
58-
// Pin the connection to the session so it get used to execute the command and we do not
59-
// perform a double check-in/check-out.
60-
session.pin(connection);
61-
} else {
62-
connection = session.pinnedConnection;
63-
}
64-
command = this.commandBuilder.buildBatch(
65-
connection.hello?.maxMessageSizeBytes,
66-
connection.hello?.maxWriteBatchSize,
67-
connection.hello?.maxBsonObjectSize
68-
);
69-
} else {
70-
throw new MongoClientBulkWriteExecutionError(
71-
'Session provided to the client bulk write operation must be present.'
72-
);
73-
}
74-
} else {
75-
// At this point we have a server and the auto connect code has already
76-
// run in executeOperation, so the server description will be populated.
77-
// We can use that to build the command.
78-
if (
79-
!server.description.maxWriteBatchSize ||
80-
!server.description.maxMessageSizeBytes ||
81-
!server.description.maxBsonObjectSize
82-
) {
83-
throw new MongoClientBulkWriteExecutionError(
84-
'In order to execute a client bulk write, both maxWriteBatchSize, maxMessageSizeBytes and maxBsonObjectSize must be provided by the servers hello response.'
85-
);
86-
}
87-
command = this.commandBuilder.buildBatch(
88-
server.description.maxMessageSizeBytes,
89-
server.description.maxWriteBatchSize,
90-
server.description.maxBsonObjectSize
91-
);
92-
}
45+
override buildCommandDocument(
46+
connection: Connection,
47+
_session?: ClientSession
48+
): ClientBulkWriteCommand {
49+
const command = this.commandBuilder.buildBatch(
50+
connection.description.maxMessageSizeBytes,
51+
connection.description.maxWriteBatchSize,
52+
connection.description.maxBsonObjectSize
53+
);
9354

94-
// Check after the batch is built if we cannot retry it and override the option.
55+
// Check _after_ the batch is built if we cannot retry it and override the option.
9556
if (!this.canRetryWrite) {
9657
this.options.willRetryWrite = false;
9758
}
98-
return await super.executeCommand(
99-
server,
100-
session,
101-
command,
102-
timeoutContext,
103-
ClientBulkWriteCursorResponse
104-
);
59+
60+
return command;
10561
}
10662
}
10763

src/operations/count.ts

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1+
import { type Connection } from '..';
12
import type { Document } from '../bson';
3+
import { MongoDBResponse } from '../cmap/wire_protocol/responses';
24
import type { Collection } from '../collection';
3-
import type { Server } from '../sdam/server';
45
import type { ClientSession } from '../sessions';
5-
import { type TimeoutContext } from '../timeout';
66
import type { MongoDBNamespace } from '../utils';
7-
import { CommandOperation, type CommandOperationOptions } from './command';
7+
import { type CommandOperationOptions, ModernizedCommandOperation } from './command';
88
import { Aspect, defineAspects } from './operation';
99

1010
/** @public */
@@ -22,7 +22,8 @@ export interface CountOptions extends CommandOperationOptions {
2222
}
2323

2424
/** @internal */
25-
export class CountOperation extends CommandOperation<number> {
25+
export class CountOperation extends ModernizedCommandOperation<number> {
26+
override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse;
2627
override options: CountOptions;
2728
collectionName?: string;
2829
query: Document;
@@ -39,11 +40,7 @@ export class CountOperation extends CommandOperation<number> {
3940
return 'count' as const;
4041
}
4142

42-
override async execute(
43-
server: Server,
44-
session: ClientSession | undefined,
45-
timeoutContext: TimeoutContext
46-
): Promise<number> {
43+
override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document {
4744
const options = this.options;
4845
const cmd: Document = {
4946
count: this.collectionName,
@@ -66,8 +63,11 @@ export class CountOperation extends CommandOperation<number> {
6663
cmd.maxTimeMS = options.maxTimeMS;
6764
}
6865

69-
const result = await super.executeCommand(server, session, cmd, timeoutContext);
70-
return result ? result.n : 0;
66+
return cmd;
67+
}
68+
69+
override handleOk(response: InstanceType<typeof this.SERVER_COMMAND_RESPONSE_TYPE>): number {
70+
return response.getNumber('n') ?? 0;
7171
}
7272
}
7373

src/operations/create_collection.ts

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
1+
import { type Connection } from '..';
12
import type { Document } from '../bson';
23
import {
34
MIN_SUPPORTED_QE_SERVER_VERSION,
45
MIN_SUPPORTED_QE_WIRE_VERSION
56
} from '../cmap/wire_protocol/constants';
7+
import { MongoDBResponse } from '../cmap/wire_protocol/responses';
68
import { Collection } from '../collection';
79
import type { Db } from '../db';
810
import { MongoCompatibilityError } from '../error';
911
import type { PkFactory } from '../mongo_client';
10-
import type { Server } from '../sdam/server';
1112
import type { ClientSession } from '../sessions';
1213
import { TimeoutContext } from '../timeout';
13-
import { CommandOperation, type CommandOperationOptions } from './command';
14+
import { maxWireVersion } from '../utils';
15+
import { type CommandOperationOptions, ModernizedCommandOperation } from './command';
1416
import { executeOperation } from './execute_operation';
1517
import { CreateIndexesOperation } from './indexes';
1618
import { Aspect, defineAspects } from './operation';
@@ -110,7 +112,8 @@ const INVALID_QE_VERSION =
110112
'Driver support of Queryable Encryption is incompatible with server. Upgrade server to use Queryable Encryption.';
111113

112114
/** @internal */
113-
export class CreateCollectionOperation extends CommandOperation<Collection> {
115+
export class CreateCollectionOperation extends ModernizedCommandOperation<Collection> {
116+
override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse;
114117
override options: CreateCollectionOptions;
115118
db: Db;
116119
name: string;
@@ -127,25 +130,19 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
127130
return 'create' as const;
128131
}
129132

130-
override async execute(
131-
server: Server,
132-
session: ClientSession | undefined,
133-
timeoutContext: TimeoutContext
134-
): Promise<Collection> {
135-
const db = this.db;
136-
const name = this.name;
137-
const options = this.options;
138-
139-
const cmd: Document = { create: name };
140-
for (const [option, value] of Object.entries(options)) {
141-
if (value != null && typeof value !== 'function' && !ILLEGAL_COMMAND_FIELDS.has(option)) {
142-
cmd[option] = value;
143-
}
144-
}
133+
override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document {
134+
const isOptionValid = ([k, v]: [k: string, v: unknown]) =>
135+
v != null && typeof v !== 'function' && !ILLEGAL_COMMAND_FIELDS.has(k);
136+
return {
137+
create: this.name,
138+
...Object.fromEntries(Object.entries(this.options).filter(isOptionValid))
139+
};
140+
}
145141

146-
// otherwise just execute the command
147-
await super.executeCommand(server, session, cmd, timeoutContext);
148-
return new Collection(db, name, options);
142+
override handleOk(
143+
_response: InstanceType<typeof this.SERVER_COMMAND_RESPONSE_TYPE>
144+
): Collection<Document> {
145+
return new Collection(this.db, this.name, this.options);
149146
}
150147
}
151148

@@ -167,23 +164,17 @@ export async function createCollections<TSchema extends Document>(
167164

168165
if (encryptedFields) {
169166
class CreateSupportingFLEv2CollectionOperation extends CreateCollectionOperation {
170-
override execute(
171-
server: Server,
172-
session: ClientSession | undefined,
173-
timeoutContext: TimeoutContext
174-
): Promise<Collection> {
175-
// Creating a QE collection required min server of 7.0.0
176-
// TODO(NODE-5353): Get wire version information from connection.
167+
override buildCommandDocument(connection: Connection, session?: ClientSession): Document {
177168
if (
178-
!server.loadBalanced &&
179-
server.description.maxWireVersion < MIN_SUPPORTED_QE_WIRE_VERSION
169+
!connection.description.loadBalanced &&
170+
maxWireVersion(connection) < MIN_SUPPORTED_QE_WIRE_VERSION
180171
) {
181172
throw new MongoCompatibilityError(
182173
`${INVALID_QE_VERSION} The minimum server version required is ${MIN_SUPPORTED_QE_SERVER_VERSION}`
183174
);
184175
}
185176

186-
return super.execute(server, session, timeoutContext);
177+
return super.buildCommandDocument(connection, session);
187178
}
188179
}
189180

0 commit comments

Comments
 (0)