Skip to content

Commit bc168d7

Browse files
committed
mgr/smb: earmark resolver for subvolume
Signed-off-by: Avan Thakkar <[email protected]>
1 parent bd51dcf commit bc168d7

File tree

11 files changed

+355
-69
lines changed

11 files changed

+355
-69
lines changed

src/pybind/mgr/mgr_util.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
import os
22

3+
from ceph.fs.earmarking import (
4+
CephFSVolumeEarmarking,
5+
EarmarkParseError,
6+
EarmarkTopScope,
7+
EarmarkException
8+
)
9+
310
if 'UNITTEST' in os.environ:
411
import tests # noqa
512

@@ -335,6 +342,93 @@ def get_all_filesystems(self) -> List[str]:
335342
return fs_list
336343

337344

345+
class CephFSEarmarkResolver:
346+
def __init__(self, mgr: Module_T, *, client: Optional[CephfsClient] = None) -> None:
347+
self._mgr = mgr
348+
self._cephfs_client = client or CephfsClient(mgr)
349+
350+
def _extract_path_component(self, path: str, index: int) -> Optional[str]:
351+
"""
352+
Extracts a specific component from the path based on the given index.
353+
354+
:param path: The path in the format '/volumes/{subvolumegroup}/{subvolume}/..'
355+
:param index: The index of the component to extract (1 for subvolumegroup, 2 for subvolume)
356+
:return: The component at the specified index
357+
"""
358+
parts = path.strip('/').split('/')
359+
if len(parts) >= 3 and parts[0] == "volumes":
360+
return parts[index]
361+
return None
362+
363+
def _fetch_subvolumegroup_from_path(self, path: str) -> Optional[str]:
364+
"""
365+
Extracts and returns the subvolume group name from the given path.
366+
367+
:param path: The path in the format '/volumes/{subvolumegroup}/{subvolume}/..'
368+
:return: The subvolume group name
369+
"""
370+
return self._extract_path_component(path, 1)
371+
372+
def _fetch_subvolume_from_path(self, path: str) -> Optional[str]:
373+
"""
374+
Extracts and returns the subvolume name from the given path.
375+
376+
:param path: The path in the format '/volumes/{subvolumegroup}/{subvolume}/..'
377+
:return: The subvolume name
378+
"""
379+
return self._extract_path_component(path, 2)
380+
381+
def _manage_earmark(self, path: str, volume: str, operation: str, earmark: Optional[str] = None) -> Optional[str]:
382+
"""
383+
Manages (get or set) the earmark for a subvolume based on the provided parameters.
384+
385+
:param path: The path of the subvolume
386+
:param volume: The volume name
387+
:param earmark: The earmark to set (None if only getting the earmark)
388+
:return: The earmark if getting, otherwise None
389+
"""
390+
with open_filesystem(self._cephfs_client, volume) as fs:
391+
earmark_manager = CephFSVolumeEarmarking(fs, path)
392+
try:
393+
if operation == 'set' and earmark is not None:
394+
earmark_manager.set_earmark(earmark)
395+
return None
396+
elif operation == 'get':
397+
return earmark_manager.get_earmark()
398+
except EarmarkException as e:
399+
logger.error(f"Failed to manage earmark: {e}")
400+
return None
401+
return None
402+
403+
def get_earmark(self, path: str, volume: str) -> Optional[str]:
404+
"""
405+
Get earmark for a subvolume.
406+
"""
407+
return self._manage_earmark(path, volume, 'get')
408+
409+
def set_earmark(self, path: str, volume: str, earmark: str) -> None:
410+
"""
411+
Set earmark for a subvolume.
412+
"""
413+
self._manage_earmark(path, volume, 'set', earmark)
414+
415+
def check_earmark(self, earmark: str, top_level_scope: EarmarkTopScope) -> bool:
416+
"""
417+
Check if the earmark belongs to the mentioned top level scope.
418+
419+
:param earmark: The earmark string to check.
420+
:param top_level_scope: The expected top level scope.
421+
:return: True if the earmark matches the top level scope, False otherwise.
422+
"""
423+
try:
424+
parsed = CephFSVolumeEarmarking.parse_earmark(earmark)
425+
if parsed is None:
426+
return False
427+
return parsed.top == top_level_scope
428+
except EarmarkParseError:
429+
return False
430+
431+
338432
@contextlib.contextmanager
339433
def open_filesystem(fsc: CephfsClient, fs_name: str) -> Generator["cephfs.LibCephFS", None, None]:
340434
"""

src/pybind/mgr/smb/handler.py

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import time
1919

2020
from ceph.deployment.service_spec import SMBSpec
21+
from ceph.fs.earmarking import EarmarkTopScope
2122

2223
from . import config_store, external, resources
2324
from .enums import (
@@ -43,6 +44,7 @@
4344
AccessAuthorizer,
4445
ConfigEntry,
4546
ConfigStore,
47+
EarmarkResolver,
4648
EntryKey,
4749
OrchSubmitter,
4850
PathResolver,
@@ -112,6 +114,22 @@ def resolve(
112114
resolve_exists = resolve
113115

114116

117+
class _FakeEarmarkResolver:
118+
"""A stub EarmarkResolver for unit testing."""
119+
120+
def __init__(self) -> None:
121+
self._earmarks: Dict[Tuple[str, str], str] = {}
122+
123+
def get_earmark(self, path: str, volume: str) -> Optional[str]:
124+
return None
125+
126+
def set_earmark(self, path: str, volume: str, earmark: str) -> None:
127+
pass
128+
129+
def check_earmark(self, earmark: str, top_level_scope: str) -> bool:
130+
return True
131+
132+
115133
class _FakeAuthorizer:
116134
"""A stub AccessAuthorizer for unit testing."""
117135

@@ -325,6 +343,7 @@ def __init__(
325343
path_resolver: Optional[PathResolver] = None,
326344
authorizer: Optional[AccessAuthorizer] = None,
327345
orch: Optional[OrchSubmitter] = None,
346+
earmark_resolver: Optional[EarmarkResolver] = None,
328347
) -> None:
329348
self.internal_store = internal_store
330349
self.public_store = public_store
@@ -336,14 +355,18 @@ def __init__(
336355
authorizer = _FakeAuthorizer()
337356
self._authorizer: AccessAuthorizer = authorizer
338357
self._orch = orch # if None, disables updating the spec via orch
358+
if earmark_resolver is None:
359+
earmark_resolver = cast(EarmarkResolver, _FakeEarmarkResolver())
360+
self._earmark_resolver = earmark_resolver
339361
log.info(
340362
'Initialized new ClusterConfigHandler with'
341363
f' internal store {self.internal_store!r},'
342364
f' public store {self.public_store!r},'
343365
f' priv store {self.priv_store!r},'
344366
f' path resolver {self._path_resolver!r},'
345367
f' authorizer {self._authorizer!r},'
346-
f' orch {self._orch!r}'
368+
f' orch {self._orch!r},'
369+
f' earmark resolver {self._earmark_resolver!r}'
347370
)
348371

349372
def apply(
@@ -474,7 +497,12 @@ def _check(
474497
elif isinstance(
475498
resource, (resources.Share, resources.RemovedShare)
476499
):
477-
_check_share(resource, staging, self._path_resolver)
500+
_check_share(
501+
resource,
502+
staging,
503+
self._path_resolver,
504+
self._earmark_resolver,
505+
)
478506
elif isinstance(resource, resources.JoinAuth):
479507
_check_join_auths(resource, staging)
480508
elif isinstance(resource, resources.UsersAndGroups):
@@ -807,7 +835,10 @@ def _check_cluster(cluster: ClusterRef, staging: _Staging) -> None:
807835

808836

809837
def _check_share(
810-
share: ShareRef, staging: _Staging, resolver: PathResolver
838+
share: ShareRef,
839+
staging: _Staging,
840+
resolver: PathResolver,
841+
earmark_resolver: EarmarkResolver,
811842
) -> None:
812843
"""Check that the share resource can be updated."""
813844
if share.intent == Intent.REMOVED:
@@ -822,7 +853,7 @@ def _check_share(
822853
)
823854
assert share.cephfs is not None
824855
try:
825-
resolver.resolve_exists(
856+
volpath = resolver.resolve_exists(
826857
share.cephfs.volume,
827858
share.cephfs.subvolumegroup,
828859
share.cephfs.subvolume,
@@ -832,6 +863,34 @@ def _check_share(
832863
raise ErrorResult(
833864
share, msg="path is not a valid directory in volume"
834865
)
866+
if earmark_resolver:
867+
earmark = earmark_resolver.get_earmark(
868+
volpath,
869+
share.cephfs.volume,
870+
)
871+
if not earmark:
872+
smb_earmark = (
873+
f"{EarmarkTopScope.SMB.value}.cluster.{share.cluster_id}"
874+
)
875+
earmark_resolver.set_earmark(
876+
volpath,
877+
share.cephfs.volume,
878+
smb_earmark,
879+
)
880+
else:
881+
if not earmark_resolver.check_earmark(
882+
earmark, EarmarkTopScope.SMB
883+
):
884+
raise ErrorResult(
885+
share,
886+
msg=f"earmark has already been set by {earmark.split('.')[0]}",
887+
)
888+
# Check if earmark is set by same cluster
889+
if earmark.split('.')[2] != share.cluster_id:
890+
raise ErrorResult(
891+
share,
892+
msg=f"earmark has already been set by smb cluster {earmark.split('.')[2]}",
893+
)
835894
name_used_by = _share_name_in_use(staging, share)
836895
if name_used_by:
837896
raise ErrorResult(

src/pybind/mgr/smb/module.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import orchestrator
66
from ceph.deployment.service_spec import PlacementSpec, SMBSpec
77
from mgr_module import MgrModule, Option, OptionLevel
8+
from mgr_util import CephFSEarmarkResolver
89

910
from . import (
1011
cli,
@@ -55,6 +56,7 @@ def __init__(self, *args: str, **kwargs: Any) -> None:
5556
path_resolver = kwargs.pop('path_resolver', None)
5657
authorizer = kwargs.pop('authorizer', None)
5758
uo = kwargs.pop('update_orchestration', None)
59+
earmark_resolver = kwargs.pop('earmark_resolver', None)
5860
super().__init__(*args, **kwargs)
5961
if internal_store is not None:
6062
self._internal_store = internal_store
@@ -69,6 +71,7 @@ def __init__(self, *args: str, **kwargs: Any) -> None:
6971
public_store or rados_store.RADOSConfigStore.init(self)
7072
)
7173
path_resolver = path_resolver or fs.CachingCephFSPathResolver(self)
74+
earmark_resolver = earmark_resolver or CephFSEarmarkResolver(self)
7275
# Why the honk is the cast needed but path_resolver doesn't need it??
7376
# Sometimes mypy drives me batty.
7477
authorizer = cast(
@@ -81,6 +84,7 @@ def __init__(self, *args: str, **kwargs: Any) -> None:
8184
path_resolver=path_resolver,
8285
authorizer=authorizer,
8386
orch=self._orch_backend(enable_orch=uo),
87+
earmark_resolver=earmark_resolver,
8488
)
8589

8690
def _backend_store(self, store_conf: str = '') -> ConfigStore:

src/pybind/mgr/smb/proto.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import sys
1616

1717
from ceph.deployment.service_spec import SMBSpec
18+
from ceph.fs.earmarking import EarmarkTopScope
1819

1920
# this uses a version check as opposed to a try/except because this
2021
# form makes mypy happy and try/except doesn't.
@@ -185,3 +186,18 @@ def authorize_entity(
185186
self, volume: str, entity: str, caps: str = ''
186187
) -> None:
187188
... # pragma: no cover
189+
190+
191+
class EarmarkResolver(Protocol):
192+
"""A protocol for a type that can resolve earmarks for subvolumes."""
193+
194+
def get_earmark(self, path: str, volume: str) -> Optional[str]:
195+
... # pragma: no cover
196+
197+
def set_earmark(self, path: str, volume: str, earmark: str) -> None:
198+
... # pragma: no cover
199+
200+
def check_earmark(
201+
self, earmark: str, top_level_scope: EarmarkTopScope
202+
) -> bool:
203+
... # pragma: no cover

src/pybind/mgr/smb/tests/test_handler.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pytest
22

33
import smb
4+
from smb.handler import _FakeEarmarkResolver
45

56

67
def _cluster(**kwargs):
@@ -880,6 +881,7 @@ def remove_smb_service(self, service_name):
880881
self.deployed.remove(service_name)
881882

882883
thandler._orch = FakeOrch()
884+
thandler._earmark_resolver = _FakeEarmarkResolver()
883885
test_apply_full_cluster_create(thandler)
884886

885887
to_apply = [

src/pybind/mgr/smb/tests/test_smb.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ def tmodule():
2626
path_resolver=smb.handler._FakePathResolver(),
2727
authorizer=smb.handler._FakeAuthorizer(),
2828
update_orchestration=False,
29+
earmark_resolver=smb.handler._FakeEarmarkResolver(),
2930
)
3031

3132

0 commit comments

Comments
 (0)