@@ -38,8 +38,9 @@ import {
3838import type { ServerApi } from '../mongo_client' ;
3939import { type Abortable , TypedEventEmitter } from '../mongo_types' ;
4040import type { GetMoreOptions } from '../operations/get_more' ;
41+ import { type ModernOperation } from '../operations/operation' ;
4142import type { ClientSession } from '../sessions' ;
42- import { type TimeoutContext } from '../timeout' ;
43+ import { Timeout , type TimeoutContext } from '../timeout' ;
4344import { isTransactionCommand } from '../transactions' ;
4445import {
4546 abortable ,
@@ -277,6 +278,100 @@ export class Server extends TypedEventEmitter<ServerEvents> {
277278 }
278279 }
279280
281+ public async modernCommand (
282+ operation : ModernOperation < any > ,
283+ timeoutContext : TimeoutContext
284+ ) : Promise < Document > {
285+ if ( this . s . state === STATE_CLOSING || this . s . state === STATE_CLOSED ) {
286+ throw new MongoServerClosedError ( ) ;
287+ }
288+ const session = operation . session ;
289+
290+ let conn = session ?. pinnedConnection ;
291+
292+ this . incrementOperationCount ( ) ;
293+ if ( conn == null ) {
294+ try {
295+ conn = await this . pool . checkOut ( { timeoutContext } ) ;
296+ } catch ( checkoutError ) {
297+ this . decrementOperationCount ( ) ;
298+ if ( ! ( checkoutError instanceof PoolClearedError ) ) this . handleError ( checkoutError ) ;
299+ throw checkoutError ;
300+ }
301+ }
302+
303+ const cmd = operation . buildCommand ( conn , session ) ;
304+ const options = operation . buildOptions ( timeoutContext ) ;
305+ const ns = operation . ns ;
306+
307+ if ( this . loadBalanced && isPinnableCommand ( cmd , session ) ) {
308+ session ?. pin ( conn ) ;
309+ }
310+
311+ options . directConnection = this . topology . s . options . directConnection ;
312+
313+ // There are cases where we need to flag the read preference not to get sent in
314+ // the command, such as pre-5.0 servers attempting to perform an aggregate write
315+ // with a non-primary read preference. In this case the effective read preference
316+ // (primary) is not the same as the provided and must be removed completely.
317+ if ( options . omitReadPreference ) {
318+ delete options . readPreference ;
319+ }
320+
321+ if ( this . description . iscryptd ) {
322+ options . omitMaxTimeMS = true ;
323+ }
324+
325+ let reauthPromise : Promise < void > | null = null ;
326+
327+ try {
328+ try {
329+ const res = await conn . command ( ns , cmd , options ) ;
330+ throwIfWriteConcernError ( res ) ;
331+ return res ;
332+ } catch ( commandError ) {
333+ throw this . decorateCommandError ( conn , cmd , options , commandError ) ;
334+ }
335+ } catch ( operationError ) {
336+ if (
337+ operationError instanceof MongoError &&
338+ operationError . code === MONGODB_ERROR_CODES . Reauthenticate
339+ ) {
340+ reauthPromise = this . pool . reauthenticate ( conn ) ;
341+ reauthPromise . then ( undefined , error => {
342+ reauthPromise = null ;
343+ squashError ( error ) ;
344+ } ) ;
345+
346+ await abortable ( reauthPromise , options ) ;
347+ reauthPromise = null ; // only reachable if reauth succeeds
348+
349+ try {
350+ const res = await conn . command ( ns , cmd , options ) ;
351+ throwIfWriteConcernError ( res ) ;
352+ return res ;
353+ } catch ( commandError ) {
354+ throw this . decorateCommandError ( conn , cmd , options , commandError ) ;
355+ }
356+ } else {
357+ throw operationError ;
358+ }
359+ } finally {
360+ this . decrementOperationCount ( ) ;
361+ if ( session ?. pinnedConnection !== conn ) {
362+ if ( reauthPromise != null ) {
363+ // The reauth promise only exists if it hasn't thrown.
364+ const checkBackIn = ( ) => {
365+ this . pool . checkIn ( conn ) ;
366+ } ;
367+ void reauthPromise . then ( checkBackIn , checkBackIn ) ;
368+ } else {
369+ this . pool . checkIn ( conn ) ;
370+ }
371+ }
372+ }
373+ }
374+
280375 public async command < T extends MongoDBResponseConstructor > (
281376 ns : MongoDBNamespace ,
282377 command : Document ,
0 commit comments