@@ -641,18 +641,23 @@ def _apply_all_services(self) -> bool:
641641 self .mgr .apply_spec_fails = []
642642 hosts_altered : Set [str ] = set ()
643643 all_conflicting_daemons : List [orchestrator .DaemonDescription ] = []
644+ all_daemons_needing_fencing : List [orchestrator .DaemonDescription ] = []
644645 all_daemons_to_deploy : List [CephadmDaemonDeploySpec ] = []
645646 all_daemons_to_remove : List [orchestrator .DaemonDescription ] = []
647+ services_in_need_of_fencing : List [Tuple [ServiceSpec , Dict [int , Dict [int , Optional [str ]]]]] = []
646648 for spec in specs :
647649 try :
648650 # this will populate daemon deploy/removal queue for this service spec
649651 rank_map = self ._apply_service (spec )
650652 # this finalizes what to deploy for this service
651- conflicting_daemons , daemons_to_deploy , daemons_to_remove = self .prepare_daemons_to_add_and_remove_by_service (spec , rank_map )
653+ conflicting_daemons , daemons_to_deploy , daemons_to_remove , daemons_to_fence , service_needs_fencing = self .prepare_daemons_to_add_and_remove_by_service (spec , rank_map )
652654 # compiles all these lists so we can split on host instead of service
653655 all_conflicting_daemons .extend (conflicting_daemons )
656+ all_daemons_needing_fencing .extend (daemons_to_fence )
654657 all_daemons_to_deploy .extend (daemons_to_deploy )
655- all_daemons_to_remove .extend (all_daemons_to_remove )
658+ all_daemons_to_remove .extend (daemons_to_remove )
659+ if service_needs_fencing and rank_map is not None :
660+ services_in_need_of_fencing .append ((spec , rank_map ))
656661 except Exception as e :
657662 msg = f'Failed to apply { spec .service_name ()} spec { spec } : { str (e )} '
658663 self .log .exception (msg )
@@ -675,12 +680,17 @@ def _apply_all_services(self) -> bool:
675680
676681 async def _parallel_deploy_and_remove (
677682 hostname : str ,
683+ to_fence : List [orchestrator .DaemonDescription ],
678684 conflicts : List [orchestrator .DaemonDescription ],
679685 to_deploy : List [CephadmDaemonDeploySpec ],
680686 to_remove : List [orchestrator .DaemonDescription ]
681687 ) -> Tuple [bool , Set [str ], List [str ]]:
682688 r : bool = False
683- removed_conflict_daemons , conflict_hosts_altered = self .remove_given_daemons (conflicts )
689+ removed_fencing_daemons , fencing_hosts_altered = await self .remove_given_daemons (to_fence )
690+ if removed_fencing_daemons :
691+ r = True
692+ hosts_altered .update (fencing_hosts_altered )
693+ removed_conflict_daemons , conflict_hosts_altered = await self .remove_given_daemons (conflicts )
684694 if removed_conflict_daemons :
685695 r = True
686696 hosts_altered .update (conflict_hosts_altered )
@@ -695,24 +705,30 @@ async def _parallel_deploy_and_remove(
695705 return (r , hosts_altered , daemon_place_fails )
696706
697707 async def _deploy_and_remove_all (
708+ all_needing_fencing : List [orchestrator .DaemonDescription ],
698709 all_conflicts : List [orchestrator .DaemonDescription ],
699710 all_to_deploy : List [CephadmDaemonDeploySpec ],
700711 all_to_remove : List [orchestrator .DaemonDescription ]
701712 ) -> List [Tuple [bool , Set [str ], List [str ]]]:
702713 futures = []
703714
704715 for host in self .mgr .cache .get_hosts ():
716+ need_fencing_daemons = [dd for dd in all_needing_fencing if dd .hostname == host ]
705717 conflicting_daemons = [dd for dd in all_conflicts if dd .hostname == host ]
706718 daemons_to_deploy = [dd for dd in all_to_deploy if dd .host == host ]
707719 daemons_to_remove = [dd for dd in all_to_remove if dd .hostname == host ]
708- futures .append (_parallel_deploy_and_remove (host , conflicting_daemons , daemons_to_deploy , daemons_to_remove ))
720+ futures .append (_parallel_deploy_and_remove (host , need_fencing_daemons , conflicting_daemons , daemons_to_deploy , daemons_to_remove ))
709721
710722 return await gather (* futures )
711723
712- deploy_names = [d .name for d in all_daemons_to_deploy ]
713- rm_names = [d .name () for d in all_conflicting_daemons ] + [d .name () for d in all_daemons_to_remove ]
724+ deploy_names = [d .name () for d in all_daemons_to_deploy ]
725+ rm_names = (
726+ [d .name () for d in all_daemons_needing_fencing ]
727+ + [d .name () for d in all_conflicting_daemons ]
728+ + [d .name () for d in all_daemons_to_remove ]
729+ )
714730 with self .mgr .async_timeout_handler (cmd = f'cephadm deploying ({ deploy_names } and removing { rm_names } daemons)' ):
715- results = self .mgr .wait_async (_deploy_and_remove_all (all_conflicting_daemons , all_daemons_to_deploy , all_daemons_to_remove ))
731+ results = self .mgr .wait_async (_deploy_and_remove_all (all_daemons_needing_fencing , all_conflicting_daemons , all_daemons_to_deploy , all_daemons_to_remove ))
716732
717733 if any (res [0 ] for res in results ):
718734 changed = True
@@ -725,6 +741,11 @@ async def _deploy_and_remove_all(
725741 for place_failures in [res [2 ] for res in results ]:
726742 placement_failures .extend (place_failures )
727743
744+ for spec , ranking_map in services_in_need_of_fencing :
745+ daemons = self .mgr .cache .get_daemons_by_service (spec .service_name ())
746+ svc = service_registry .get_service (spec .service_type )
747+ svc .fence_old_ranks (spec , ranking_map , len (daemons ))
748+
728749 if placement_failures :
729750 self .mgr .set_health_warning ('CEPHADM_DAEMON_PLACE_FAIL' , f'Failed to place { len (placement_failures )} daemon(s)' , len (
730751 placement_failures ), placement_failures )
@@ -999,13 +1020,17 @@ def gather_conflicting_daemons_for_service(self, spec: ServiceSpec) -> List[orch
9991020 break
10001021 return conflict_daemons
10011022
1002- def handle_slot_names_and_rank_map_for_service (self , spec : ServiceSpec , rank_map : Optional [Dict [int , Dict [int , Optional [str ]]]]) -> List [DaemonPlacement ]:
1003- service_type = spec .service_type
1023+ def handle_slot_names_and_rank_map_for_service (
1024+ self ,
1025+ spec : ServiceSpec ,
1026+ rank_map : Optional [Dict [int , Dict [int , Optional [str ]]]]
1027+ ) -> Tuple [bool , List [DaemonPlacement ], List [orchestrator .DaemonDescription ]]:
10041028 service_name = spec .service_name ()
10051029 slots_to_add = self .mgr .daemon_deploy_queue .get_queued_daemon_placements_by_service (spec .service_name ())
10061030 daemons_to_remove = self .mgr .daemon_removal_queue .get_queued_daemon_descriptions_by_service (spec .service_name ())
1007- svc = service_registry .get_service (service_type )
10081031 daemons = self .mgr .cache .get_daemons_by_service (service_name )
1032+ daemons_to_fence : List [orchestrator .DaemonDescription ] = []
1033+ needs_fencing : bool = False
10091034 # assign names
10101035 for i in range (len (slots_to_add )):
10111036 slot = slots_to_add [i ]
@@ -1030,16 +1055,13 @@ def handle_slot_names_and_rank_map_for_service(self, spec: ServiceSpec, rank_map
10301055 # next mgr will clean up.
10311056 self .mgr .spec_store .save_rank_map (spec .service_name (), rank_map )
10321057
1033- # remove daemons now, since we are going to fence them anyway
1034- for d in daemons_to_remove :
1035- assert d . hostname is not None
1036- self . _remove_daemon ( d . name (), d . hostname )
1037- daemons_to_remove = []
1058+ # record daemons as needing to be fenced if this service
1059+ # has a rank map and daemons to fence
1060+ if daemons_to_remove :
1061+ daemons_to_fence = daemons_to_remove
1062+ needs_fencing = True
10381063
1039- # fence them
1040- svc .fence_old_ranks (spec , rank_map , len (slots_to_add ) + len (daemons ))
1041-
1042- return slots_to_add
1064+ return needs_fencing , slots_to_add , daemons_to_fence
10431065
10441066 def build_ok_for_removal_list_by_service (self , spec : ServiceSpec ) -> List [orchestrator .DaemonDescription ]:
10451067 service_type = spec .service_type
@@ -1175,7 +1197,7 @@ def prepare_daemons_to_add_and_remove_by_service(
11751197 self ,
11761198 spec : ServiceSpec ,
11771199 rank_map : Optional [Dict [int , Dict [int , Optional [str ]]]]
1178- ) -> Tuple [List [orchestrator .DaemonDescription ], List [CephadmDaemonDeploySpec ], List [orchestrator .DaemonDescription ]]:
1200+ ) -> Tuple [List [orchestrator .DaemonDescription ], List [CephadmDaemonDeploySpec ], List [orchestrator .DaemonDescription ], List [ orchestrator . DaemonDescription ], bool ]:
11791201 service_type = spec .service_type
11801202 service_name = spec .service_name ()
11811203 svc = service_registry .get_service (service_type )
@@ -1187,11 +1209,11 @@ def prepare_daemons_to_add_and_remove_by_service(
11871209 final_count = len (daemons ) + len (slots_to_add ) - len (daemons_to_remove )
11881210 if service_type in ['mon' , 'mgr' ] and final_count < 1 :
11891211 self .log .debug ('cannot scale mon|mgr below 1)' )
1190- return ([], [], [])
1212+ return ([], [], [], [], False )
11911213
11921214 try :
11931215 # assign names
1194- slots_to_add = self .handle_slot_names_and_rank_map_for_service (spec , rank_map )
1216+ needs_fencing , slots_to_add , daemons_to_fence = self .handle_slot_names_and_rank_map_for_service (spec , rank_map )
11951217
11961218 # create daemons
11971219 conflicting_daemons = self .gather_conflicting_daemons_for_service (spec )
@@ -1218,7 +1240,14 @@ def prepare_daemons_to_add_and_remove_by_service(
12181240 self .mgr .log .error (f'Hit an exception preparing daemons for creation/removal: { str (e )} ' )
12191241 raise
12201242
1221- return conflicting_daemons , daemon_specs_to_deploy , daemons_to_remove
1243+ # verify no two lists contain the same daemon
1244+ # prioritize fencing > conflicts > general daemon to remove
1245+ fencing_names = [d .name () for d in daemons_to_fence ]
1246+ conflicting_daemons = [d for d in conflicting_daemons if d .name () not in fencing_names ]
1247+ conflicting_names = [d .name () for d in conflicting_daemons ]
1248+ daemons_to_remove = [d for d in daemons_to_remove if d .name () not in (fencing_names + conflicting_names )]
1249+
1250+ return conflicting_daemons , daemon_specs_to_deploy , daemons_to_remove , daemons_to_fence , needs_fencing
12221251
12231252 def _find_orphans_to_remove (self ) -> List [orchestrator .DaemonDescription ]:
12241253 daemons = self .mgr .cache .get_daemons ()
@@ -1253,15 +1282,6 @@ def _check_daemons(self) -> None:
12531282 if dd .hostname in self .mgr .offline_hosts :
12541283 continue
12551284
1256- < << << << HEAD
1257- if not spec and dd .daemon_type not in ['mon' , 'mgr' , 'osd' ]:
1258- # (mon and mgr specs should always exist; osds aren't matched
1259- # to a service spec)
1260- self .log .info ('Removing orphan daemon %s...' % dd .name ())
1261- self ._remove_daemon (dd .name (), dd .hostname )
1262-
1263- == == == =
1264- >> >> >> > 470 b2d226c9 (mgr / cephadm : separate orphan cleanup code into its own functions )
12651285 # ignore unmanaged services
12661286 if spec and spec .unmanaged :
12671287 continue
0 commit comments