Skip to content

Commit 8b5c30e

Browse files
committed
mgr/cephadm: swap _rm_daemon to use cephadm _orch rm-daemons
As part of this 1) _remove_daemon is now async 2) remove_given_daemons is now async 3) _remove_daemon now calls `cephadm _orch rm-daemons` instead of `cephadm rm-daemon` 4) fixed an issue where we weren't properly building daemons to remove list While _remove_daemon is now built for removing multiple daemons, as of this call we still only ever pass it one at a time. Signed-off-by: Adam King <[email protected]> (cherry picked from commit 40a89d1) Conflicts: src/pybind/mgr/cephadm/serve.py Resolves: rhbz#2364414
1 parent dba8b78 commit 8b5c30e

File tree

5 files changed

+71
-47
lines changed

5 files changed

+71
-47
lines changed

src/cephadm/cephadm.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3196,10 +3196,7 @@ def command_deploy(ctx):
31963196
# type: (CephadmContext) -> None
31973197
lock = FileLock(ctx, ctx.fsid)
31983198
lock.acquire()
3199-
try:
3200-
_common_deploy(ctx)
3201-
except DaemonStartException:
3202-
sys.exit(DAEMON_FAILED_ERROR)
3199+
_common_deploy(ctx)
32033200

32043201

32053202
def apply_deploy_config_to_ctx(
@@ -3249,8 +3246,6 @@ def _deploy_daemon(ctx: CephadmContext) -> Tuple[str, int]:
32493246
rc: int = 0
32503247
try:
32513248
_common_deploy(ctx)
3252-
except DaemonStartException:
3253-
rc = DAEMON_FAILED_ERROR
32543249
except Exception:
32553250
# TODO: better rc based on exception?
32563251
rc = -1
@@ -5931,6 +5926,7 @@ def _get_parser():
59315926
default=False,
59325927
action='store_true',
59335928
help='Remove max coredump size drop-in and LimitCORE=infinity from unit file'
5929+
)
59345930

59355931
parser_rm_daemon_from = subparsers_orch.add_parser(
59365932
'rm-daemons', help='remove daemons')

src/pybind/mgr/cephadm/inventory.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ def __init__(
6767
self.hostname = hostname
6868

6969

70-
<<<<<<< HEAD
7170
class CoredumpctlOverrides:
7271
"""
7372
Class to track supported coredumpctl override parameters
@@ -112,7 +111,8 @@ def __eq__(self, other: Any) -> bool:
112111
return False
113112
return True
114113
return NotImplemented
115-
=======
114+
115+
116116
class DaemonDeployQueue:
117117
"""
118118
Queue of daemons cephadm thinks it should deploy
@@ -166,7 +166,6 @@ def get_queued_daemon_placements_by_service(self, service_name: str) -> List[Dae
166166

167167
def clear_queued_daemons(self) -> None:
168168
self._daemons = {}
169-
>>>>>>> 9c228c014be (mgr/cephadm: add DaemonDeployQueue class)
170169

171170

172171
class DaemonRemovalQueue:

src/pybind/mgr/cephadm/module.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2814,7 +2814,7 @@ def remove_daemons(self, names):
28142814
if not args:
28152815
raise OrchestratorError('Unable to find daemon(s) %s' % (names))
28162816
self.log.info('Remove daemons %s' % ' '.join([a[0] for a in args]))
2817-
return self._remove_daemons(args)
2817+
return [msg for msgs in self._remove_daemons(args) for msg in msgs]
28182818

28192819
@handle_orch_error
28202820
def remove_service(self, service_name: str, force: bool = False) -> str:
@@ -3173,8 +3173,8 @@ def _calc_daemon_deps(self,
31733173
return sorted(deps)
31743174

31753175
@forall_hosts
3176-
def _remove_daemons(self, name: str, host: str) -> str:
3177-
return CephadmServe(self)._remove_daemon(name, host)
3176+
def _remove_daemons(self, name: str, host: str) -> List[str]:
3177+
return self.wait_async(CephadmServe(self)._remove_daemon([name], host))
31783178

31793179
def _check_pool_exists(self, pool: str, service_name: str) -> None:
31803180
logger.info(f'Checking pool "{pool}" exists for service {service_name}')

src/pybind/mgr/cephadm/serve.py

Lines changed: 63 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ def _check_for_moved_osds(self) -> None:
611611
for e in error:
612612
assert e.hostname
613613
try:
614-
self._remove_daemon(e.name(), e.hostname, no_post_remove=True)
614+
self.mgr.wait_async(self._remove_daemon([e.name()], e.hostname, no_post_removals={e.name(): True}))
615615
self.mgr.events.for_daemon(
616616
e.name(), 'INFO', f"Removed duplicated daemon on host '{e.hostname}'")
617617
except OrchestratorError as ex:
@@ -709,7 +709,7 @@ async def _parallel_deploy_and_remove(
709709
if daemons_placed:
710710
r = True
711711
hosts_altered.update(daemon_deployed_hosts)
712-
removed_daemons, removed_daemon_hosts = self.remove_given_daemons(to_remove)
712+
removed_daemons, removed_daemon_hosts = await self.remove_given_daemons(to_remove)
713713
if removed_daemons:
714714
r = True
715715
hosts_altered.update(conflict_hosts_altered)
@@ -1648,8 +1648,7 @@ async def _create_daemon(self,
16481648
),
16491649
init_containers=init_containers,
16501650
),
1651-
config_blobs=daemon_spec.final_config,
1652-
use_current_daemon_image=reconfig,
1651+
config_blobs=daemon_spec.final_config
16531652
)
16541653
)
16551654

@@ -1798,50 +1797,80 @@ def _setup_init_containers(
17981797
ic_params.append(ic.to_json(flatten_args=True))
17991798
return ic_meta
18001799

1801-
def _remove_daemon(self, name: str, host: str, no_post_remove: bool = False) -> str:
1800+
async def _remove_daemon(
1801+
self,
1802+
names: List[str],
1803+
host: str,
1804+
no_post_removals: Optional[Dict[str, bool]] = None
1805+
) -> List[str]:
18021806
"""
1803-
Remove a daemon
1807+
Remove daemons
18041808
"""
1805-
dd = self.mgr.cache.get_daemon(name)
1806-
daemon_type = dd.daemon_type
1807-
daemon_id = dd.daemon_id
1808-
assert (daemon_type is not None and daemon_id is not None)
1809-
daemon = orchestrator.DaemonDescription(
1810-
daemon_type=daemon_type,
1811-
daemon_id=daemon_id,
1812-
service_name=dd.service_name(),
1813-
hostname=host)
1814-
1815-
with set_exception_subject('service', daemon.service_id(), overwrite=True):
1816-
1817-
service_registry.get_service(daemon_type_to_service(daemon_type)).pre_remove(daemon)
1818-
# NOTE: we are passing the 'force' flag here, which means
1819-
# we can delete a mon instances data.
1820-
if dd.ports:
1821-
args = ['--name', name, '--force', '--tcp-ports', ' '.join(map(str, dd.ports))]
1822-
else:
1823-
args = ['--name', name, '--force']
1824-
1809+
if not no_post_removals:
1810+
no_post_removals = {}
1811+
# Json blob specifying the removal info for each daemon.
1812+
# Currently that just consists of a name and list of ports.
1813+
# If it gets more complex we should add a class to
1814+
# exchange.Deploy to encapsulate it
1815+
daemon_rm_info: List[Dict[str, Union[str, List[str]]]] = []
1816+
for name in names:
1817+
dd = self.mgr.cache.get_daemon(name)
1818+
daemon_type = dd.daemon_type
1819+
daemon_id = dd.daemon_id
1820+
assert (daemon_type is not None and daemon_id is not None)
1821+
daemon = orchestrator.DaemonDescription(
1822+
daemon_type=daemon_type,
1823+
daemon_id=daemon_id,
1824+
service_name=dd.service_name(),
1825+
hostname=host)
1826+
with set_exception_subject('service', daemon.service_id(), overwrite=True):
1827+
service_registry.get_service(daemon_type_to_service(daemon_type)).pre_remove(daemon)
1828+
daemon_rm_info.append({'name': name, 'tcp_ports': [str(port) for port in (dd.ports or [])]})
18251829
self.log.info('Removing daemon %s from %s -- ports %s' % (name, host, dd.ports))
1826-
with self.mgr.async_timeout_handler(host, f'cephadm rm-daemon (daemon {name})'):
1827-
out, err, code = self.mgr.wait_async(self._run_cephadm(
1828-
host, name, 'rm-daemon', args))
1829-
if not code:
1830+
1831+
out, err, code = await self._run_cephadm(
1832+
host,
1833+
name,
1834+
['_orch', 'rm-daemons'],
1835+
[],
1836+
stdin=json.dumps(daemon_rm_info),
1837+
error_ok=True
1838+
)
1839+
1840+
if len(out) != 1: # _run_cephadm puts the output into a list, we should only get one thing here
1841+
raise OrchestratorError(f'Got unexpected non-length 1 list of output in _remove_daemon: {out}')
1842+
results: Dict[str, int] = json.loads(out[0])
1843+
msgs: List[str] = []
1844+
for name in names:
1845+
dd = self.mgr.cache.get_daemon(name)
1846+
daemon_type = dd.daemon_type
1847+
daemon_id = dd.daemon_id
1848+
assert (daemon_type is not None and daemon_id is not None)
1849+
daemon = orchestrator.DaemonDescription(
1850+
daemon_type=daemon_type,
1851+
daemon_id=daemon_id,
1852+
service_name=dd.service_name(),
1853+
hostname=host)
1854+
if results[name] == 0:
18301855
# remove item from cache
18311856
self.mgr.cache.rm_daemon(host, name)
1832-
self.mgr.cache.invalidate_host_daemons(host)
18331857

1834-
if not no_post_remove:
1858+
if name not in no_post_removals or not no_post_removals[name]:
18351859
if daemon_type not in ['iscsi']:
18361860
service_registry.get_service(daemon_type_to_service(
18371861
daemon_type)).post_remove(daemon, is_failed_deploy=False)
18381862
else:
18391863
self.mgr.scheduled_async_actions.append(lambda: service_registry.get_service(daemon_type_to_service(
18401864
daemon_type)).post_remove(daemon, is_failed_deploy=False))
18411865
self.mgr._kick_serve_loop()
1842-
18431866
self.mgr.recently_altered_daemons[name] = datetime_now()
1844-
return "Removed {} from host '{}'".format(name, host)
1867+
msg = "Removed {} from host '{}'".format(name, host)
1868+
self.mgr.log.info(msg)
1869+
msgs.append(msg)
1870+
1871+
self.mgr.cache.invalidate_host_daemons(host)
1872+
1873+
return msgs
18451874

18461875
async def _run_cephadm_json(self,
18471876
host: str,

src/pybind/mgr/cephadm/services/osd.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -892,7 +892,7 @@ def process_removal_queue(self) -> bool:
892892
assert osd.hostname is not None
893893

894894
if self.mgr.cache.has_daemon(f'osd.{osd.osd_id}'):
895-
CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname)
895+
self.mgr.wait_async(CephadmServe(self.mgr)._remove_daemon([f'osd.{osd.osd_id}'], osd.hostname))
896896
logger.info(f"Successfully removed {osd} on {osd.hostname}")
897897
result = True
898898
else:

0 commit comments

Comments
 (0)