@@ -29,7 +29,6 @@ use crate::{
2929 wire:: { next_request_id, Message } ,
3030 PinnedConnectionHandle ,
3131 } ,
32- Command ,
3332 ConnectionPool ,
3433 RawCommandResponse ,
3534 StreamDescription ,
@@ -310,7 +309,6 @@ impl Client {
310309 let mut retry: Option < ExecutionRetry > = None ;
311310 let mut implicit_session: Option < ClientSession > = None ;
312311 loop {
313-
314312 if retry. is_some ( ) {
315313 op. update_for_retry ( ) ;
316314 }
@@ -395,7 +393,14 @@ impl Client {
395393 } ;
396394
397395 let details = match self
398- . execute_command_on_connection ( cmd, op, & mut conn, & mut session, retryability)
396+ . execute_operation_on_connection (
397+ op,
398+ & mut conn,
399+ & mut session,
400+ txn_number,
401+ retryability,
402+ effective_criteria,
403+ )
399404 . await
400405 {
401406 Ok ( output) => ExecutionDetails {
@@ -468,127 +473,21 @@ impl Client {
468473 session : & mut Option < & mut ClientSession > ,
469474 txn_number : Option < i64 > ,
470475 retryability : Retryability ,
476+ effective_criteria : SelectionCriteria ,
471477 ) -> Result < T :: O > {
472478 loop {
473- let stream_description = connection. stream_description ( ) ?;
474- let is_sharded = stream_description. initial_server_type == ServerType :: Mongos ;
475- let mut cmd = op. build ( stream_description) ?;
476- self . inner . topology . update_command_with_read_pref (
477- connection. address ( ) ,
478- & mut cmd,
479- op. selection_criteria ( ) ,
480- ) ;
481-
482- match session {
483- Some ( ref mut session) if op. supports_sessions ( ) && op. is_acknowledged ( ) => {
484- cmd. set_session ( session) ;
485- if let Some ( txn_number) = txn_number {
486- cmd. set_txn_number ( txn_number) ;
487- }
488- if session
489- . options ( )
490- . and_then ( |opts| opts. snapshot )
491- . unwrap_or ( false )
492- {
493- if connection
494- . stream_description ( ) ?
495- . max_wire_version
496- . unwrap_or ( 0 )
497- < 13
498- {
499- let labels: Option < Vec < _ > > = None ;
500- return Err ( Error :: new (
501- ErrorKind :: IncompatibleServer {
502- message : "Snapshot reads require MongoDB 5.0 or later" . into ( ) ,
503- } ,
504- labels,
505- ) ) ;
506- }
507- cmd. set_snapshot_read_concern ( session) ;
508- }
509- // If this is a causally consistent session, set `readConcern.afterClusterTime`.
510- // Causal consistency defaults to true, unless snapshot is true.
511- else if session. causal_consistency ( )
512- && matches ! (
513- session. transaction. state,
514- TransactionState :: None | TransactionState :: Starting
515- )
516- && op. supports_read_concern ( stream_description)
517- {
518- cmd. set_after_cluster_time ( session) ;
519- }
520-
521- match session. transaction . state {
522- TransactionState :: Starting => {
523- cmd. set_start_transaction ( ) ;
524- cmd. set_autocommit ( ) ;
525- if session. causal_consistency ( ) {
526- cmd. set_after_cluster_time ( session) ;
527- }
528-
529- if let Some ( ref options) = session. transaction . options {
530- if let Some ( ref read_concern) = options. read_concern {
531- cmd. set_read_concern_level ( read_concern. level . clone ( ) ) ;
532- }
533- }
534- if self . is_load_balanced ( ) {
535- session. pin_connection ( connection. pin ( ) ?) ;
536- } else if is_sharded {
537- session. pin_mongos ( connection. address ( ) . clone ( ) ) ;
538- }
539- session. transaction . state = TransactionState :: InProgress ;
540- }
541- TransactionState :: InProgress => cmd. set_autocommit ( ) ,
542- TransactionState :: Committed { .. } | TransactionState :: Aborted => {
543- cmd. set_autocommit ( ) ;
544-
545- // Append the recovery token to the command if we are committing or
546- // aborting on a sharded transaction.
547- if is_sharded {
548- if let Some ( ref recovery_token) = session. transaction . recovery_token
549- {
550- cmd. set_recovery_token ( recovery_token) ;
551- }
552- }
553- }
554- _ => { }
555- }
556- session. update_last_use ( ) ;
557- }
558- Some ( ref session) if !op. supports_sessions ( ) && !session. is_implicit ( ) => {
559- return Err ( ErrorKind :: InvalidArgument {
560- message : format ! ( "{} does not support sessions" , cmd. name) ,
561- }
562- . into ( ) ) ;
563- }
564- Some ( ref session) if !op. is_acknowledged ( ) && !session. is_implicit ( ) => {
565- return Err ( ErrorKind :: InvalidArgument {
566- message : "Cannot use ClientSessions with unacknowledged write concern"
567- . to_string ( ) ,
568- }
569- . into ( ) ) ;
570- }
571- _ => { }
572- }
573-
574- let session_cluster_time = session. as_ref ( ) . and_then ( |session| session. cluster_time ( ) ) ;
575- let client_cluster_time = self . inner . topology . cluster_time ( ) ;
576- let max_cluster_time =
577- std:: cmp:: max ( session_cluster_time, client_cluster_time. as_ref ( ) ) ;
578- if let Some ( cluster_time) = max_cluster_time {
579- cmd. set_cluster_time ( cluster_time) ;
580- }
479+ let cmd = self . build_command (
480+ op,
481+ connection,
482+ session,
483+ txn_number,
484+ effective_criteria. clone ( ) ,
485+ ) ?;
581486
582487 let connection_info = connection. info ( ) ;
583488 let service_id = connection. service_id ( ) ;
584489 let request_id = next_request_id ( ) ;
585-
586- if let Some ( ref server_api) = self . inner . options . server_api {
587- cmd. set_server_api ( server_api) ;
588- }
589-
590490 let should_redact = cmd. should_redact ( ) ;
591-
592491 let cmd_name = cmd. name . clone ( ) ;
593492 let target_db = cmd. target_db . clone ( ) ;
594493
@@ -627,8 +526,9 @@ impl Client {
627526 let start_time = Instant :: now ( ) ;
628527 let command_result = match connection. send_message ( message) . await {
629528 Ok ( response) => {
630- self . handle_response ( op, session, is_sharded, response)
631- . await
529+ let is_sharded =
530+ connection. stream_description ( ) ?. initial_server_type == ServerType :: Mongos ;
531+ self . parse_response ( op, session, is_sharded, response) . await
632532 }
633533 Err ( err) => Err ( err) ,
634534 } ;
@@ -703,6 +603,7 @@ impl Client {
703603 let context = ExecutionContext {
704604 connection,
705605 session : session. as_deref_mut ( ) ,
606+ effective_criteria : effective_criteria. clone ( ) ,
706607 } ;
707608
708609 match op. handle_response ( response, context) . await {
@@ -734,6 +635,128 @@ impl Client {
734635 }
735636 }
736637
638+ fn build_command < T : Operation > (
639+ & self ,
640+ op : & mut T ,
641+ connection : & mut PooledConnection ,
642+ session : & mut Option < & mut ClientSession > ,
643+ txn_number : Option < i64 > ,
644+ effective_criteria : SelectionCriteria ,
645+ ) -> Result < crate :: cmap:: Command > {
646+ let stream_description = connection. stream_description ( ) ?;
647+ let is_sharded = stream_description. initial_server_type == ServerType :: Mongos ;
648+ let mut cmd = op. build ( stream_description) ?;
649+ self . inner . topology . update_command_with_read_pref (
650+ connection. address ( ) ,
651+ & mut cmd,
652+ & effective_criteria,
653+ ) ;
654+
655+ match session {
656+ Some ( ref mut session) if op. supports_sessions ( ) && op. is_acknowledged ( ) => {
657+ cmd. set_session ( session) ;
658+ if let Some ( txn_number) = txn_number {
659+ cmd. set_txn_number ( txn_number) ;
660+ }
661+ if session
662+ . options ( )
663+ . and_then ( |opts| opts. snapshot )
664+ . unwrap_or ( false )
665+ {
666+ if connection
667+ . stream_description ( ) ?
668+ . max_wire_version
669+ . unwrap_or ( 0 )
670+ < 13
671+ {
672+ let labels: Option < Vec < _ > > = None ;
673+ return Err ( Error :: new (
674+ ErrorKind :: IncompatibleServer {
675+ message : "Snapshot reads require MongoDB 5.0 or later" . into ( ) ,
676+ } ,
677+ labels,
678+ ) ) ;
679+ }
680+ cmd. set_snapshot_read_concern ( session) ;
681+ }
682+ // If this is a causally consistent session, set `readConcern.afterClusterTime`.
683+ // Causal consistency defaults to true, unless snapshot is true.
684+ else if session. causal_consistency ( )
685+ && matches ! (
686+ session. transaction. state,
687+ TransactionState :: None | TransactionState :: Starting
688+ )
689+ && op. supports_read_concern ( stream_description)
690+ {
691+ cmd. set_after_cluster_time ( session) ;
692+ }
693+
694+ match session. transaction . state {
695+ TransactionState :: Starting => {
696+ cmd. set_start_transaction ( ) ;
697+ cmd. set_autocommit ( ) ;
698+ if session. causal_consistency ( ) {
699+ cmd. set_after_cluster_time ( session) ;
700+ }
701+
702+ if let Some ( ref options) = session. transaction . options {
703+ if let Some ( ref read_concern) = options. read_concern {
704+ cmd. set_read_concern_level ( read_concern. level . clone ( ) ) ;
705+ }
706+ }
707+ if self . is_load_balanced ( ) {
708+ session. pin_connection ( connection. pin ( ) ?) ;
709+ } else if is_sharded {
710+ session. pin_mongos ( connection. address ( ) . clone ( ) ) ;
711+ }
712+ session. transaction . state = TransactionState :: InProgress ;
713+ }
714+ TransactionState :: InProgress => cmd. set_autocommit ( ) ,
715+ TransactionState :: Committed { .. } | TransactionState :: Aborted => {
716+ cmd. set_autocommit ( ) ;
717+
718+ // Append the recovery token to the command if we are committing or aborting
719+ // on a sharded transaction.
720+ if is_sharded {
721+ if let Some ( ref recovery_token) = session. transaction . recovery_token {
722+ cmd. set_recovery_token ( recovery_token) ;
723+ }
724+ }
725+ }
726+ _ => { }
727+ }
728+ session. update_last_use ( ) ;
729+ }
730+ Some ( ref session) if !op. supports_sessions ( ) && !session. is_implicit ( ) => {
731+ return Err ( ErrorKind :: InvalidArgument {
732+ message : format ! ( "{} does not support sessions" , cmd. name) ,
733+ }
734+ . into ( ) ) ;
735+ }
736+ Some ( ref session) if !op. is_acknowledged ( ) && !session. is_implicit ( ) => {
737+ return Err ( ErrorKind :: InvalidArgument {
738+ message : "Cannot use ClientSessions with unacknowledged write concern"
739+ . to_string ( ) ,
740+ }
741+ . into ( ) ) ;
742+ }
743+ _ => { }
744+ }
745+
746+ let session_cluster_time = session. as_ref ( ) . and_then ( |session| session. cluster_time ( ) ) ;
747+ let client_cluster_time = self . inner . topology . cluster_time ( ) ;
748+ let max_cluster_time = std:: cmp:: max ( session_cluster_time, client_cluster_time. as_ref ( ) ) ;
749+ if let Some ( cluster_time) = max_cluster_time {
750+ cmd. set_cluster_time ( cluster_time) ;
751+ }
752+
753+ if let Some ( ref server_api) = self . inner . options . server_api {
754+ cmd. set_server_api ( server_api) ;
755+ }
756+
757+ Ok ( cmd)
758+ }
759+
737760 #[ cfg( feature = "in-use-encryption" ) ]
738761 fn auto_encrypt < ' a > (
739762 & ' a self ,
@@ -786,7 +809,7 @@ impl Client {
786809 . await
787810 }
788811
789- async fn handle_response < T : Operation > (
812+ async fn parse_response < T : Operation > (
790813 & self ,
791814 op : & T ,
792815 session : & mut Option < & mut ClientSession > ,
0 commit comments