Skip to content

Commit b3b6577

Browse files
authored
Merge pull request ceph#61783 from adk3798/cephadm-nvmeof-one-daemon-per-node
mgr/cephadm: block deploying nvmeof daemons of different groups on same host Reviewed-by: John Mulligan <[email protected]>
2 parents ff024eb + bf10110 commit b3b6577

File tree

6 files changed

+227
-6
lines changed

6 files changed

+227
-6
lines changed

src/pybind/mgr/cephadm/schedule.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,13 +153,15 @@ def __init__(self,
153153
primary_daemon_type: Optional[str] = None,
154154
per_host_daemon_type: Optional[str] = None,
155155
rank_map: Optional[Dict[int, Dict[int, Optional[str]]]] = None,
156+
blocking_daemon_hosts: Optional[List[orchestrator.HostSpec]] = None,
156157
):
157158
assert spec
158159
self.spec = spec # type: ServiceSpec
159160
self.primary_daemon_type = primary_daemon_type or spec.service_type
160161
self.hosts: List[orchestrator.HostSpec] = hosts
161162
self.unreachable_hosts: List[orchestrator.HostSpec] = unreachable_hosts
162163
self.draining_hosts: List[orchestrator.HostSpec] = draining_hosts
164+
self.blocking_daemon_hosts: List[orchestrator.HostSpec] = blocking_daemon_hosts or []
163165
self.filter_new_host = filter_new_host
164166
self.service_name = spec.service_name()
165167
self.daemons = daemons
@@ -333,10 +335,28 @@ def expand_candidates(ls: List[DaemonPlacement], num: int) -> List[DaemonPlaceme
333335
existing = existing_active + existing_standby
334336

335337
# build to_add
338+
blocking_daemon_hostnames = [
339+
h.hostname for h in self.blocking_daemon_hosts
340+
]
341+
unreachable_hostnames = [
342+
h.hostname for h in self.unreachable_hosts
343+
]
336344
if not count:
337-
to_add = [dd for dd in others if dd.hostname not in [
338-
h.hostname for h in self.unreachable_hosts]]
345+
to_add = [
346+
dd for dd in others if (
347+
dd.hostname not in blocking_daemon_hostnames
348+
and dd.hostname not in unreachable_hostnames
349+
)
350+
]
339351
else:
352+
if blocking_daemon_hostnames:
353+
to_remove.extend([
354+
dd for dd in existing if dd.hostname in blocking_daemon_hostnames
355+
])
356+
existing = [
357+
dd for dd in existing if dd.hostname not in blocking_daemon_hostnames
358+
]
359+
340360
# The number of new slots that need to be selected in order to fulfill count
341361
need = count - len(existing)
342362

@@ -356,7 +376,7 @@ def expand_candidates(ls: List[DaemonPlacement], num: int) -> List[DaemonPlaceme
356376
for dp in matching_dps:
357377
if need <= 0:
358378
break
359-
if dp.hostname in related_service_hosts and dp.hostname not in [h.hostname for h in self.unreachable_hosts]:
379+
if dp.hostname in related_service_hosts and dp.hostname not in unreachable_hostnames:
360380
logger.debug(f'Preferring {dp.hostname} for service {self.service_name} as related daemons have been placed there')
361381
to_add.append(dp)
362382
need -= 1 # this is last use of need so it can work as a counter
@@ -370,7 +390,10 @@ def expand_candidates(ls: List[DaemonPlacement], num: int) -> List[DaemonPlaceme
370390
for dp in others:
371391
if need <= 0:
372392
break
373-
if dp.hostname not in [h.hostname for h in self.unreachable_hosts]:
393+
if (
394+
dp.hostname not in unreachable_hostnames
395+
and dp.hostname not in blocking_daemon_hostnames
396+
):
374397
to_add.append(dp)
375398
need -= 1 # this is last use of need in this function so it can work as a counter
376399

src/pybind/mgr/cephadm/serve.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,8 @@ def _apply_service(self, spec: ServiceSpec) -> bool:
755755

756756
svc = service_registry.get_service(service_type)
757757
daemons = self.mgr.cache.get_daemons_by_service(service_name)
758+
759+
blocking_daemon_hosts = svc.get_blocking_daemon_hosts(service_name)
758760
related_service_daemons = self.mgr.cache.get_related_service_daemons(spec)
759761

760762
public_networks: List[str] = []
@@ -824,6 +826,7 @@ def has_interface_for_vip(host: str, sspec: ServiceSpec) -> bool:
824826
) == 'agent' else self.mgr.cache.get_schedulable_hosts(),
825827
unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
826828
draining_hosts=self.mgr.cache.get_draining_hosts(),
829+
blocking_daemon_hosts=blocking_daemon_hosts,
827830
daemons=daemons,
828831
related_service_daemons=related_service_daemons,
829832
networks=self.mgr.cache.networks,

src/pybind/mgr/cephadm/services/cephadmservice.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@
2323
)
2424
from ceph.deployment.utils import is_ipv6, unwrap_ipv6
2525
from mgr_util import build_url, merge_dicts
26-
from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
26+
from orchestrator import (
27+
OrchestratorError,
28+
DaemonDescription,
29+
DaemonDescriptionStatus,
30+
HostSpec
31+
)
2732
from orchestrator._interface import daemon_type_to_service
2833
from cephadm import utils
2934
from .service_registry import register_cephadm_service
@@ -581,6 +586,9 @@ def ignore_possible_stray(
581586
"""
582587
return False
583588

589+
def get_blocking_daemon_hosts(self, service_name: str) -> List[HostSpec]:
590+
return []
591+
584592

585593
class CephService(CephadmService):
586594

src/pybind/mgr/cephadm/services/nvmeof.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@
77
from mgr_module import HandleCommandResult
88
from ceph.deployment.service_spec import NvmeofServiceSpec
99

10-
from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
10+
from orchestrator import (
11+
OrchestratorError,
12+
DaemonDescription,
13+
DaemonDescriptionStatus,
14+
HostSpec,
15+
)
1116
from .cephadmservice import CephadmDaemonDeploySpec, CephService
1217
from .service_registry import register_cephadm_service
1318
from .. import utils
@@ -234,3 +239,22 @@ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None
234239
_, _, err = self.mgr.mon_command(cmd)
235240
if err:
236241
self.mgr.log.error(f"Unable to send monitor command {cmd}, error {err}")
242+
243+
def get_blocking_daemon_hosts(self, service_name: str) -> List[HostSpec]:
244+
# we should not deploy nvmeof daemons on hosts that have nvmeof daemons
245+
# from services with a different "group" attribute (as recommended by
246+
# the nvmeof team)
247+
spec = cast(NvmeofServiceSpec, self.mgr.spec_store[service_name].spec)
248+
nvmeof_group = cast(NvmeofServiceSpec, spec).group
249+
blocking_daemons: List[DaemonDescription] = []
250+
other_group_nvmeof_services = [
251+
nspec for nspec in self.mgr.spec_store.get_specs_by_type('nvmeof').values()
252+
if cast(NvmeofServiceSpec, nspec).group != nvmeof_group
253+
]
254+
for other_group_nvmeof_service in other_group_nvmeof_services:
255+
blocking_daemons += self.mgr.cache.get_daemons_by_service(other_group_nvmeof_service.service_name())
256+
blocking_daemon_hosts = [
257+
HostSpec(hostname=blocking_daemon.hostname)
258+
for blocking_daemon in blocking_daemons if blocking_daemon.hostname is not None
259+
]
260+
return blocking_daemon_hosts

src/pybind/mgr/cephadm/tests/test_cephadm.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
PrivKey,
1616
CERT_STORE_CERT_PREFIX,
1717
CERT_STORE_KEY_PREFIX,
18+
SpecDescription,
1819
)
1920
from cephadm.services.osd import OSD, OSDRemovalQueue, OsdIdClaims
21+
from cephadm.services.nvmeof import NvmeofService
2022
from cephadm.utils import SpecialHostLabels
2123

2224
try:
@@ -3052,3 +3054,60 @@ def test_process_ls_output(self, cephadm_module):
30523054
assert osd.cpu_percentage == '6.54%'
30533055
assert osd.memory_usage == 73410805
30543056
assert osd.created == str_to_datetime('2023-09-22T22:41:03.615080Z')
3057+
3058+
@mock.patch("cephadm.inventory.HostCache.get_daemons_by_service")
3059+
@mock.patch("cephadm.inventory.SpecStore.get_specs_by_type")
3060+
@mock.patch("cephadm.inventory.SpecStore.__getitem__")
3061+
def test_nvmeof_build_blocking_daemon_hosts(
3062+
self,
3063+
_spec_store_get_item,
3064+
_get_specs_by_type,
3065+
_get_daemons_by_service,
3066+
cephadm_module: CephadmOrchestrator
3067+
):
3068+
# for nvmeof, the blocking daemon host list should be all hosts with an nvmeof
3069+
# daemon that belongs to a service with a different "group" parameter
3070+
nvmeof_services = [
3071+
ServiceSpec(service_type='nvmeof', pool='foo', group='foo', service_id='foo.foo'),
3072+
ServiceSpec(service_type='nvmeof', pool='bar', group='bar', service_id='bar.bar')
3073+
]
3074+
nvmeof_foo_daemons = [
3075+
DaemonDescription(daemon_type='nvmeof', daemon_id='foo.foo.host1', hostname='host1'),
3076+
DaemonDescription(daemon_type='nvmeof', daemon_id='foo.foo.host2', hostname='host2')
3077+
]
3078+
nvmeof_bar_daemons = [
3079+
DaemonDescription(daemon_type='nvmeof', daemon_id='bar.bar.host3', hostname='host3')
3080+
]
3081+
3082+
def _get_nvmeof_specs(sname) -> SpecDescription:
3083+
if sname == 'nvmeof.foo.foo':
3084+
return SpecDescription(
3085+
nvmeof_services[0], {}, None, None
3086+
)
3087+
elif sname == 'nvmeof.bar.bar':
3088+
return SpecDescription(
3089+
nvmeof_services[1], {}, None, None
3090+
)
3091+
3092+
def _get_nvmeof_daemons(sname) -> List[DaemonDescription]:
3093+
if sname == 'nvmeof.foo.foo':
3094+
return nvmeof_foo_daemons
3095+
elif sname == 'nvmeof.bar.bar':
3096+
return nvmeof_bar_daemons
3097+
3098+
_get_specs_by_type.return_value = {
3099+
'nvmeof.foo.foo': nvmeof_services[0],
3100+
'nvmeof.bar.bar': nvmeof_services[1],
3101+
}
3102+
_spec_store_get_item.side_effect = _get_nvmeof_specs
3103+
_get_daemons_by_service.side_effect = _get_nvmeof_daemons
3104+
3105+
# first test for nvmeof.foo.foo, which should get blocking host based
3106+
# on nvmeof.bar.bar's daemons
3107+
nvmeof_foo_blocking_hosts = NvmeofService(cephadm_module).get_blocking_daemon_hosts('nvmeof.foo.foo')
3108+
assert set([h.hostname for h in nvmeof_foo_blocking_hosts]) == set(['host3'])
3109+
3110+
# now test for nvmeof.bar.bar, which should get blocking host based
3111+
# on nvmeof.foo.foo's daemons
3112+
nvmeof_bar_blocking_hosts = NvmeofService(cephadm_module).get_blocking_daemon_hosts('nvmeof.bar.bar')
3113+
assert set([h.hostname for h in nvmeof_bar_blocking_hosts]) == set(['host1', 'host2'])

src/pybind/mgr/cephadm/tests/test_scheduling.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1753,3 +1753,107 @@ def test_placement_regex_host_pattern(service_type, placement, hosts, expected_a
17531753
daemons=[],
17541754
).place()
17551755
assert sorted([h.hostname for h in to_add]) == expected_add
1756+
1757+
1758+
class BlockingDaemonHostsTest(NamedTuple):
1759+
service_type: str
1760+
placement: PlacementSpec
1761+
hosts: List[str]
1762+
unreachables_hosts: List[str]
1763+
blocking_daemon_hosts: List[str]
1764+
daemons: List[DaemonDescription]
1765+
expected_add: List[List[str]]
1766+
expected_remove: List[List[str]]
1767+
1768+
1769+
@pytest.mark.parametrize("service_type,placement,hosts,unreachable_hosts,blocking_daemon_hosts,daemons,expected_add,expected_remove",
1770+
[
1771+
BlockingDaemonHostsTest(
1772+
'crash',
1773+
PlacementSpec(count=3),
1774+
'host1 host2 host3'.split(),
1775+
[],
1776+
['host1'],
1777+
[],
1778+
[['host2', 'host3']],
1779+
[[]],
1780+
),
1781+
BlockingDaemonHostsTest(
1782+
'crash',
1783+
PlacementSpec(hosts=['host2', 'host3']),
1784+
'host1 host2 host3'.split(),
1785+
[],
1786+
['host2'],
1787+
[DaemonDescription('crash', 'host1', 'host1')],
1788+
[['host3']],
1789+
[['crash.host1']],
1790+
),
1791+
BlockingDaemonHostsTest(
1792+
'crash',
1793+
PlacementSpec(hosts=['host1', 'host2', 'host3', 'host4']),
1794+
'host1 host2 host3 host4'.split(),
1795+
['host1'],
1796+
['host2'],
1797+
[DaemonDescription('crash', 'host3', 'host3')],
1798+
[['host4']],
1799+
[[]],
1800+
),
1801+
BlockingDaemonHostsTest(
1802+
'crash',
1803+
PlacementSpec(count=4),
1804+
'host1 host2 host3 host4'.split(),
1805+
['host4'],
1806+
['host2'],
1807+
[DaemonDescription('crash', 'host3', 'host3')],
1808+
[['host1']],
1809+
[[]],
1810+
),
1811+
BlockingDaemonHostsTest(
1812+
'crash',
1813+
PlacementSpec(hosts=['host1', 'host2', 'host3', 'host4']),
1814+
'host1 host2 host3 host4'.split(),
1815+
['host1'],
1816+
['host4'],
1817+
[DaemonDescription('crash', 'host2', 'host2')],
1818+
[['host3']],
1819+
[[]],
1820+
),
1821+
BlockingDaemonHostsTest(
1822+
'crash',
1823+
PlacementSpec(count=2),
1824+
'host1 host2 host3'.split(),
1825+
[],
1826+
['host2'],
1827+
[
1828+
DaemonDescription('crash', 'host2', 'host2'),
1829+
DaemonDescription('crash', 'host3', 'host3')
1830+
],
1831+
[['host1']],
1832+
[['crash.host2']],
1833+
),
1834+
])
1835+
def test_blocking_daemon_host(
1836+
service_type,
1837+
placement,
1838+
hosts,
1839+
unreachable_hosts,
1840+
blocking_daemon_hosts,
1841+
daemons,
1842+
expected_add,
1843+
expected_remove
1844+
):
1845+
1846+
spec = ServiceSpec(service_type=service_type,
1847+
service_id=None,
1848+
placement=placement)
1849+
1850+
hosts, to_add, to_remove = HostAssignment(
1851+
spec=spec,
1852+
hosts=[HostSpec(h) for h in hosts],
1853+
unreachable_hosts=[HostSpec(h) for h in unreachable_hosts],
1854+
draining_hosts=[],
1855+
blocking_daemon_hosts=[HostSpec(h) for h in blocking_daemon_hosts],
1856+
daemons=daemons,
1857+
).place()
1858+
assert sorted([h.hostname for h in to_add]) in expected_add
1859+
assert sorted([h.name() for h in to_remove]) in expected_remove

0 commit comments

Comments
 (0)