@@ -447,7 +447,7 @@ impl Client {
447447 criteria : Option < & SelectionCriteria > ,
448448 ) -> Result < ServerAddress > {
449449 let ( server, _) = self
450- . select_server ( criteria, "Test select server" , None )
450+ . select_server ( criteria, "Test select server" , None , false )
451451 . await ?;
452452 Ok ( server. address . clone ( ) )
453453 }
@@ -460,6 +460,7 @@ impl Client {
460460 #[ allow( unused_variables) ] // we only use the operation_name for tracing.
461461 operation_name : & str ,
462462 deprioritized : Option < & ServerAddress > ,
463+ is_out_or_merge : bool ,
463464 ) -> Result < ( SelectedServer , SelectionCriteria ) > {
464465 let criteria =
465466 criteria. unwrap_or ( & SelectionCriteria :: ReadPreference ( ReadPreference :: Primary ) ) ;
@@ -488,10 +489,15 @@ impl Client {
488489 let mut watcher = self . inner . topology . watch ( ) ;
489490 loop {
490491 let state = watcher. observe_latest ( ) ;
491- for server in state. description . servers . values ( ) {
492- eprintln ! ( "at selection: {:?}" , server. hello_response( ) ) ;
493- }
494- let effective_criteria = criteria; // TODO
492+ let override_criteria;
493+ let effective_criteria = if let Some ( oc) =
494+ Self :: override_criteria ( criteria, & state. description , is_out_or_merge)
495+ {
496+ override_criteria = oc;
497+ & override_criteria
498+ } else {
499+ criteria
500+ } ;
495501 let result = server_selection:: attempt_to_select_server (
496502 effective_criteria,
497503 & state. description ,
@@ -543,6 +549,38 @@ impl Client {
543549 }
544550 }
545551
552+ /// Check to see if selection criteria need to be overridden. Currently only required for
553+ /// aggregate operations with $merge/$out stages.
554+ fn override_criteria (
555+ criteria : & SelectionCriteria ,
556+ desc : & crate :: sdam:: TopologyDescription ,
557+ is_out_or_merge : bool ,
558+ ) -> Option < SelectionCriteria > {
559+ if is_out_or_merge {
560+ eprintln ! ( "aggregate: checking override" ) ;
561+ }
562+ if !is_out_or_merge
563+ || criteria == & SelectionCriteria :: ReadPreference ( ReadPreference :: Primary )
564+ || desc. topology_type ( ) == crate :: TopologyType :: LoadBalanced
565+ {
566+ if is_out_or_merge {
567+ eprintln ! ( "aggregate: skipping override" ) ;
568+ }
569+ return None ;
570+ }
571+ for server in desc. servers . values ( ) {
572+ let _ = dbg ! ( server. hello_response( ) ) ;
573+ if let Ok ( Some ( v) ) = server. max_wire_version ( ) {
574+ static SERVER_5_0_0_WIRE_VERSION : i32 = 13 ;
575+ if v < SERVER_5_0_0_WIRE_VERSION {
576+ eprintln ! ( "aggregate: overriding criteria" ) ;
577+ return Some ( SelectionCriteria :: ReadPreference ( ReadPreference :: Primary ) ) ;
578+ }
579+ }
580+ }
581+ return None ;
582+ }
583+
546584 #[ cfg( all( test, feature = "dns-resolver" ) ) ]
547585 pub ( crate ) fn get_hosts ( & self ) -> Vec < String > {
548586 let watcher = self . inner . topology . watch ( ) ;
0 commit comments