Skip to content

Commit 3480fbd

Browse files
POC
1 parent aac7629 commit 3480fbd

File tree

5 files changed

+167
-21
lines changed

5 files changed

+167
-21
lines changed

src/db.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import * as CONSTANTS from './constants';
66
import { AggregationCursor } from './cursor/aggregation_cursor';
77
import { ListCollectionsCursor } from './cursor/list_collections_cursor';
88
import { RunCommandCursor, type RunCursorCommandOptions } from './cursor/run_command_cursor';
9-
import { MongoInvalidArgumentError } from './error';
9+
import { MONGODB_ERROR_CODES, MongoInvalidArgumentError, MongoServerError } from './error';
1010
import type { MongoClient, PkFactory } from './mongo_client';
1111
import type { Abortable, TODO_NODE_3286 } from './mongo_types';
1212
import type { AggregateOptions } from './operations/aggregate';

src/operations/execute_operation.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import type { Topology } from '../sdam/topology';
2626
import type { ClientSession } from '../sessions';
2727
import { TimeoutContext } from '../timeout';
2828
import { abortable, supportsRetryableWrites } from '../utils';
29-
import { AbstractOperation, Aspect } from './operation';
29+
import { AbstractOperation, Aspect, ModernOperation } from './operation';
3030

3131
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
3232
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
@@ -85,6 +85,8 @@ export async function executeOperation<
8585
throw new MongoInvalidArgumentError('ClientSession must be from the same MongoClient');
8686
}
8787

88+
operation.session ??= session;
89+
8890
const readPreference = operation.readPreference ?? ReadPreference.primary;
8991
const inTransaction = !!session?.inTransaction();
9092

@@ -231,6 +233,8 @@ async function tryOperation<
231233
let previousOperationError: MongoError | undefined;
232234
let previousServer: ServerDescription | undefined;
233235

236+
const isModernOperation = operation instanceof ModernOperation;
237+
234238
for (let tries = 0; tries < maxTries; tries++) {
235239
if (previousOperationError) {
236240
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
@@ -280,7 +284,17 @@ async function tryOperation<
280284
if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
281285
operation.resetBatch();
282286
}
283-
return await operation.execute(server, session, timeoutContext);
287+
288+
if (!isModernOperation) {
289+
return await operation.execute(server, session, timeoutContext);
290+
}
291+
292+
try {
293+
const result = await server.modernCommand(operation, timeoutContext);
294+
return operation.handleOk(result) as TResult;
295+
} catch (error) {
296+
operation.handleError(error);
297+
}
284298
} catch (operationError) {
285299
if (!(operationError instanceof MongoError)) throw operationError;
286300
if (

src/operations/operation.ts

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import { type Connection, type MongoError } from '..';
12
import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '../bson';
23
import { type Abortable } from '../mongo_types';
34
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
4-
import type { Server } from '../sdam/server';
5+
import type { Server, ServerCommandOptions } from '../sdam/server';
56
import type { ClientSession } from '../sessions';
67
import { type TimeoutContext } from '../timeout';
78
import type { MongoDBNamespace } from '../utils';
@@ -108,6 +109,10 @@ export abstract class AbstractOperation<TResult = any> {
108109
return this._session;
109110
}
110111

112+
set session(session: ClientSession) {
113+
this._session = session;
114+
}
115+
111116
clearSession() {
112117
this._session = undefined;
113118
}
@@ -125,6 +130,37 @@ export abstract class AbstractOperation<TResult = any> {
125130
}
126131
}
127132

133+
export abstract class ModernOperation<T> extends AbstractOperation<T> {
134+
/** this will never be used - but we must implement it to satisfy AbstractOperation's interface */
135+
override execute(
136+
_server: Server,
137+
_session: ClientSession | undefined,
138+
_timeoutContext: TimeoutContext
139+
): Promise<T> {
140+
throw new Error('cannot execute!!');
141+
}
142+
143+
abstract buildCommand(connection: Connection, session?: ClientSession): Document;
144+
145+
abstract buildOptions(timeoutContext: TimeoutContext): ServerCommandOptions;
146+
147+
/**
148+
* Optional - if the operation performs error handling, such as wrapping or renaming the error,
149+
* this method can be overridden.
150+
*/
151+
handleOk(response: Document) {
152+
return response;
153+
}
154+
155+
/**
156+
* Optional - if the operation performs post-processing
157+
* on the result document, this method can be overridden.
158+
*/
159+
handleError(error: MongoError): void {
160+
throw error;
161+
}
162+
}
163+
128164
export function defineAspects(
129165
operation: { aspects?: Set<symbol> },
130166
aspects: symbol | symbol[] | Set<symbol>

src/operations/search_indexes/drop.ts

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,29 @@
1+
import { type Connection, type MongoError } from '../..';
12
import type { Document } from '../../bson';
23
import type { Collection } from '../../collection';
34
import { MONGODB_ERROR_CODES, MongoServerError } from '../../error';
4-
import type { Server } from '../../sdam/server';
5+
import type { Server, ServerCommandOptions } from '../../sdam/server';
56
import type { ClientSession } from '../../sessions';
67
import { type TimeoutContext } from '../../timeout';
7-
import { AbstractOperation } from '../operation';
8+
import { AbstractOperation, ModernOperation } from '../operation';
89

910
/** @internal */
10-
export class DropSearchIndexOperation extends AbstractOperation<void> {
11+
export class DropSearchIndexOperation extends ModernOperation<void> {
1112
private readonly collection: Collection;
1213
private readonly name: string;
1314

1415
constructor(collection: Collection, name: string) {
1516
super();
1617
this.collection = collection;
1718
this.name = name;
19+
this.ns = collection.fullNamespace;
1820
}
1921

2022
override get commandName() {
2123
return 'dropSearchIndex' as const;
2224
}
2325

24-
override async execute(
25-
server: Server,
26-
session: ClientSession | undefined,
27-
timeoutContext: TimeoutContext
28-
): Promise<void> {
26+
override buildCommand(_connection: Connection, _session?: ClientSession): Document {
2927
const namespace = this.collection.fullNamespace;
3028

3129
const command: Document = {
@@ -35,15 +33,18 @@ export class DropSearchIndexOperation extends AbstractOperation<void> {
3533
if (typeof this.name === 'string') {
3634
command.name = this.name;
3735
}
36+
return command;
37+
}
38+
39+
override buildOptions(timeoutContext: TimeoutContext): ServerCommandOptions {
40+
return { session: this.session, timeoutContext };
41+
}
3842

39-
try {
40-
await server.command(namespace, command, { session, timeoutContext });
41-
} catch (error) {
42-
const isNamespaceNotFoundError =
43-
error instanceof MongoServerError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound;
44-
if (!isNamespaceNotFoundError) {
45-
throw error;
46-
}
43+
override handleError(error: MongoError): void {
44+
const isNamespaceNotFoundError =
45+
error instanceof MongoServerError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound;
46+
if (!isNamespaceNotFoundError) {
47+
throw error;
4748
}
4849
}
4950
}

src/sdam/server.ts

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ import {
3838
import type { ServerApi } from '../mongo_client';
3939
import { type Abortable, TypedEventEmitter } from '../mongo_types';
4040
import type { GetMoreOptions } from '../operations/get_more';
41+
import { type ModernOperation } from '../operations/operation';
4142
import type { ClientSession } from '../sessions';
42-
import { type TimeoutContext } from '../timeout';
43+
import { Timeout, type TimeoutContext } from '../timeout';
4344
import { isTransactionCommand } from '../transactions';
4445
import {
4546
abortable,
@@ -277,6 +278,100 @@ export class Server extends TypedEventEmitter<ServerEvents> {
277278
}
278279
}
279280

281+
public async modernCommand(
282+
operation: ModernOperation<any>,
283+
timeoutContext: TimeoutContext
284+
): Promise<Document> {
285+
if (this.s.state === STATE_CLOSING || this.s.state === STATE_CLOSED) {
286+
throw new MongoServerClosedError();
287+
}
288+
const session = operation.session;
289+
290+
let conn = session?.pinnedConnection;
291+
292+
this.incrementOperationCount();
293+
if (conn == null) {
294+
try {
295+
conn = await this.pool.checkOut({ timeoutContext });
296+
} catch (checkoutError) {
297+
this.decrementOperationCount();
298+
if (!(checkoutError instanceof PoolClearedError)) this.handleError(checkoutError);
299+
throw checkoutError;
300+
}
301+
}
302+
303+
const cmd = operation.buildCommand(conn, session);
304+
const options = operation.buildOptions(timeoutContext);
305+
const ns = operation.ns;
306+
307+
if (this.loadBalanced && isPinnableCommand(cmd, session)) {
308+
session?.pin(conn);
309+
}
310+
311+
options.directConnection = this.topology.s.options.directConnection;
312+
313+
// There are cases where we need to flag the read preference not to get sent in
314+
// the command, such as pre-5.0 servers attempting to perform an aggregate write
315+
// with a non-primary read preference. In this case the effective read preference
316+
// (primary) is not the same as the provided and must be removed completely.
317+
if (options.omitReadPreference) {
318+
delete options.readPreference;
319+
}
320+
321+
if (this.description.iscryptd) {
322+
options.omitMaxTimeMS = true;
323+
}
324+
325+
let reauthPromise: Promise<void> | null = null;
326+
327+
try {
328+
try {
329+
const res = await conn.command(ns, cmd, options);
330+
throwIfWriteConcernError(res);
331+
return res;
332+
} catch (commandError) {
333+
throw this.decorateCommandError(conn, cmd, options, commandError);
334+
}
335+
} catch (operationError) {
336+
if (
337+
operationError instanceof MongoError &&
338+
operationError.code === MONGODB_ERROR_CODES.Reauthenticate
339+
) {
340+
reauthPromise = this.pool.reauthenticate(conn);
341+
reauthPromise.then(undefined, error => {
342+
reauthPromise = null;
343+
squashError(error);
344+
});
345+
346+
await abortable(reauthPromise, options);
347+
reauthPromise = null; // only reachable if reauth succeeds
348+
349+
try {
350+
const res = await conn.command(ns, cmd, options);
351+
throwIfWriteConcernError(res);
352+
return res;
353+
} catch (commandError) {
354+
throw this.decorateCommandError(conn, cmd, options, commandError);
355+
}
356+
} else {
357+
throw operationError;
358+
}
359+
} finally {
360+
this.decrementOperationCount();
361+
if (session?.pinnedConnection !== conn) {
362+
if (reauthPromise != null) {
363+
// The reauth promise only exists if it hasn't thrown.
364+
const checkBackIn = () => {
365+
this.pool.checkIn(conn);
366+
};
367+
void reauthPromise.then(checkBackIn, checkBackIn);
368+
} else {
369+
this.pool.checkIn(conn);
370+
}
371+
}
372+
}
373+
}
374+
280375
public async command<T extends MongoDBResponseConstructor>(
281376
ns: MongoDBNamespace,
282377
command: Document,

0 commit comments

Comments
 (0)