Skip to content

Commit 51fd35d

Browse files
authored
Merge pull request ceph#57817 from phlogistonjohn/jjm-smb-sqlite
mgr/smb: add sqlite internal store backend for smb mgr module Reviewed-by: Adam King <[email protected]>
2 parents d43d1fc + d77139e commit 51fd35d

File tree

11 files changed

+1128
-26
lines changed

11 files changed

+1128
-26
lines changed

qa/suites/orch/cephadm/smb/tasks/deploy_smb_mgr_basic.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ tasks:
1717
- cephadm.configure_samba_client_container:
1818
role: host.b
1919
- cephadm:
20+
single_host_defaults: true
2021

2122
- cephadm.shell:
2223
host.a:

qa/suites/orch/cephadm/smb/tasks/deploy_smb_mgr_domain.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ tasks:
1717
- cephadm.deploy_samba_ad_dc:
1818
role: host.b
1919
- cephadm:
20+
single_host_defaults: true
2021

2122
- cephadm.shell:
2223
host.a:

qa/suites/orch/cephadm/smb/tasks/deploy_smb_mgr_res_basic.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ tasks:
1717
- cephadm.configure_samba_client_container:
1818
role: host.b
1919
- cephadm:
20+
single_host_defaults: true
2021

2122
- cephadm.shell:
2223
host.a:

qa/suites/orch/cephadm/smb/tasks/deploy_smb_mgr_res_dom.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ tasks:
1717
- cephadm.deploy_samba_ad_dc:
1818
role: host.b
1919
- cephadm:
20+
single_host_defaults: true
2021

2122
- cephadm.shell:
2223
host.a:

src/pybind/mgr/mgr_module.py

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,22 @@
11
import ceph_module # noqa
22

3-
from typing import cast, Tuple, Any, Dict, Generic, Optional, Callable, List, \
4-
Mapping, NamedTuple, Sequence, Union, Set, TYPE_CHECKING
3+
from typing import (
4+
Any,
5+
Callable,
6+
Dict,
7+
Generic,
8+
Iterator,
9+
List,
10+
Mapping,
11+
NamedTuple,
12+
Optional,
13+
Sequence,
14+
Set,
15+
TYPE_CHECKING,
16+
Tuple,
17+
Union,
18+
cast,
19+
)
520
if TYPE_CHECKING:
621
import sys
722
if sys.version_info >= (3, 8):
@@ -17,6 +32,7 @@
1732
import subprocess
1833
import threading
1934
from collections import defaultdict
35+
from contextlib import contextmanager
2036
from enum import IntEnum, Enum
2137
import os
2238
import rados
@@ -175,6 +191,13 @@ class MgrDBNotReady(RuntimeError):
175191
pass
176192

177193

194+
class MgrDBNotAllowed(MgrDBNotReady):
195+
"""A more specific subclass of MgrDBNotReady raised when mgr_pool option
196+
disabled.
197+
"""
198+
pass
199+
200+
178201
class OSDMap(ceph_module.BasePyOSDMap):
179202
def get_epoch(self) -> int:
180203
return self._get_epoch()
@@ -1307,6 +1330,7 @@ def close_db(self) -> None:
13071330
def open_db(self) -> Optional[sqlite3.Connection]:
13081331
if not self.pool_exists(self.MGR_POOL_NAME):
13091332
if not self.have_enough_osds():
1333+
self.log.warning('not enough osds to create mgr pool')
13101334
return None
13111335
self.create_mgr_pool()
13121336
uri = f"file:///{self.MGR_POOL_NAME}:{self.module_name}/main.db?vfs=ceph"
@@ -1340,12 +1364,29 @@ def db(self) -> sqlite3.Connection:
13401364
return self._db
13411365
db_allowed = self.get_ceph_option("mgr_pool")
13421366
if not db_allowed:
1343-
raise MgrDBNotReady()
1367+
raise MgrDBNotAllowed()
13441368
self._db = self.open_db()
13451369
if self._db is None:
13461370
raise MgrDBNotReady()
13471371
return self._db
13481372

1373+
@contextmanager
1374+
def exclusive_db_access(self) -> Iterator[sqlite3.Connection]:
1375+
"""Context manager that grants exclusive access to the manager module sqlite3
1376+
db connection, while establishing a new db transaction.
1377+
"""
1378+
with self._db_lock, self.db:
1379+
yield self.db
1380+
1381+
@contextmanager
1382+
def exclusive_db_cursor(self) -> Iterator[sqlite3.Cursor]:
1383+
"""Context manager that yields a db cursor after getting exclusive
1384+
access to the manager module sqlite3 connection and a new db
1385+
transaction.
1386+
"""
1387+
with self.exclusive_db_access() as db:
1388+
yield db.cursor()
1389+
13491390
@property
13501391
def release_name(self) -> str:
13511392
"""

src/pybind/mgr/smb/config_store.py

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
from typing import Collection, Dict, Iterator
1+
from typing import Collection, Dict, Iterator, Optional
22

3-
from .proto import ConfigEntry, EntryKey, Simplified
3+
from .proto import ConfigEntry, ConfigStore, EntryKey, FindParams, Simplified
44

55

66
class MemConfigEntry:
@@ -97,3 +97,83 @@ def contents(self, ns: str) -> Collection[str]:
9797

9898
def __iter__(self) -> Iterator[EntryKey]:
9999
return iter(self._entries.keys())
100+
101+
102+
class ObjectCachingEntry:
103+
"""A config entry that wraps a different ConfigEntry and caches the
104+
simplified object. If the object is set the cache will be updated. If the
105+
object is removed the cached object will be forgotten. The cached object
106+
can be manually reset with the `clear_cached_obj` method.
107+
"""
108+
109+
def __init__(
110+
self, base_entry: ConfigEntry, *, obj: Optional[Simplified] = None
111+
) -> None:
112+
self._base = base_entry
113+
self._obj = obj
114+
115+
def clear_cached_obj(self) -> None:
116+
self._obj = None
117+
118+
def set(self, obj: Simplified) -> None:
119+
self._obj = None # if base.set fails, obj will be left unset
120+
self._base.set(obj)
121+
self._obj = obj
122+
123+
def get(self) -> Simplified:
124+
if self._obj is not None:
125+
return self._obj
126+
self._obj = self._base.get()
127+
return self._obj
128+
129+
def remove(self) -> bool:
130+
self._obj = None
131+
return self._base.remove()
132+
133+
def exists(self) -> bool:
134+
return self._base.exists()
135+
136+
@property
137+
def uri(self) -> str:
138+
return self._base.uri
139+
140+
@property
141+
def full_key(self) -> EntryKey:
142+
return self._base.full_key
143+
144+
145+
def find_in_store(
146+
store: ConfigStore, ns: str, params: FindParams
147+
) -> Collection[ConfigEntry]:
148+
"""Given a ConfigStore and namespace within that store, search the stored
149+
objects for matching parameters. Params is a dict that will be compared to
150+
the same keys/attributes of the objects being searched. Only exact matches
151+
will be returned.
152+
If the store implements the FindingConfigStore protocol the operation
153+
of finding
154+
"""
155+
# is it a FindingConfigStore?
156+
_find_entries = getattr(store, 'find_entries', None)
157+
if _find_entries:
158+
try:
159+
return _find_entries(ns, params)
160+
except NotImplementedError:
161+
# Allow the store to reject any of the ns/params/whatnot with a
162+
# NotImplementedError even if it implements the find_entries
163+
# function. This will fall back to the simple-but-slow approach of
164+
# deserializing and examining every object.
165+
pass
166+
return _find_in_store(store, ns, params)
167+
168+
169+
def _find_in_store(
170+
store: ConfigStore, ns: str, params: FindParams
171+
) -> Collection[ConfigEntry]:
172+
"""Fallback mode for find_in_store."""
173+
found = []
174+
for sub_key in store.contents(ns):
175+
entry = store[(ns, sub_key)]
176+
obj = entry.get()
177+
if all(obj[pkey] == pval for pkey, pval in params.items()):
178+
found.append(ObjectCachingEntry(entry, obj=obj))
179+
return found

src/pybind/mgr/smb/handler.py

Lines changed: 88 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
cast,
1313
)
1414

15+
import contextlib
1516
import logging
17+
import operator
1618
import time
1719

1820
from ceph.deployment.service_spec import SMBSpec
@@ -21,6 +23,7 @@
2123
from .enums import (
2224
AuthMode,
2325
CephFSStorageProvider,
26+
ConfigNS,
2427
Intent,
2528
JoinSourceType,
2629
LoginAccess,
@@ -355,24 +358,33 @@ def apply(
355358
incoming = order_resources(inputs)
356359
for resource in incoming:
357360
staging.stage(resource)
358-
for resource in incoming:
359-
results.append(
360-
self._check(resource, staging, create_only=create_only)
361-
)
361+
with _store_transaction(staging.destination_store):
362+
for resource in incoming:
363+
results.append(
364+
self._check(
365+
resource, staging, create_only=create_only
366+
)
367+
)
362368
except ErrorResult as err:
363369
results.append(err)
364370
except Exception as err:
365371
log.exception("error updating resource")
366-
result = ErrorResult(resource, msg=str(err))
372+
msg = str(err)
373+
if not msg:
374+
# handle the case where the exception has no text
375+
msg = f"error updating resource: {type(err)} (see logs for details)"
376+
result = ErrorResult(resource, msg=msg)
367377
results.append(result)
368378
if results.success:
369379
log.debug(
370380
'successfully updated %s resources. syncing changes to public stores',
371381
len(list(results)),
372382
)
373-
results = staging.save()
374-
_prune_linked_entries(staging)
375-
self._sync_modified(results)
383+
with _store_transaction(staging.destination_store):
384+
results = staging.save()
385+
_prune_linked_entries(staging)
386+
with _store_transaction(staging.destination_store):
387+
self._sync_modified(results)
376388
return results
377389

378390
def cluster_ids(self) -> List[str]:
@@ -394,13 +406,15 @@ def user_and_group_ids(self) -> List[str]:
394406
return list(UsersAndGroupsEntry.ids(self.internal_store))
395407

396408
def all_resources(self) -> List[SMBResource]:
397-
return self._search_resources(_Matcher())
409+
with _store_transaction(self.internal_store):
410+
return self._search_resources(_Matcher())
398411

399412
def matching_resources(self, names: List[str]) -> List[SMBResource]:
400413
matcher = _Matcher()
401414
for name in names:
402415
matcher.parse(name)
403-
return self._search_resources(matcher)
416+
with _store_transaction(self.internal_store):
417+
return self._search_resources(matcher)
404418

405419
def _search_resources(self, matcher: _Matcher) -> List[SMBResource]:
406420
log.debug("performing search with matcher: %s", matcher)
@@ -822,6 +836,58 @@ def _check_share(
822836
raise ErrorResult(
823837
share, msg="path is not a valid directory in volume"
824838
)
839+
name_used_by = _share_name_in_use(staging, share)
840+
if name_used_by:
841+
raise ErrorResult(
842+
share,
843+
msg="share name already in use",
844+
status={"conflicting_share_id": name_used_by},
845+
)
846+
847+
848+
def _share_name_in_use(
849+
staging: _Staging, share: resources.Share
850+
) -> Optional[str]:
851+
"""Returns the share_id value if the share's name is already in
852+
use by a different share in the cluster. Returns None if no other
853+
shares are using the name.
854+
"""
855+
share_ids = (share.cluster_id, share.share_id)
856+
share_ns = str(ConfigNS.SHARES)
857+
# look for any duplicate names in the staging area.
858+
# these items are already in memory
859+
for ekey, res in staging.incoming.items():
860+
if ekey[0] != share_ns:
861+
continue # not a share
862+
assert isinstance(res, resources.Share)
863+
if (res.cluster_id, res.share_id) == share_ids:
864+
continue # this share
865+
if (res.cluster_id, res.name) == (share.cluster_id, share.name):
866+
return res.share_id
867+
# look for any duplicate names in the underyling store
868+
found = config_store.find_in_store(
869+
staging.destination_store,
870+
share_ns,
871+
{'cluster_id': share.cluster_id, 'name': share.name},
872+
)
873+
# remove any shares that are deleted in staging
874+
found_curr = [
875+
entry for entry in found if entry.full_key not in staging.deleted
876+
]
877+
# remove self-share from list
878+
id_pair = operator.itemgetter('cluster_id', 'share_id')
879+
found_curr = [
880+
entry for entry in found_curr if id_pair(entry.get()) != share_ids
881+
]
882+
if not found_curr:
883+
return None
884+
if len(found_curr) != 1:
885+
# this should not normally happen
886+
log.warning(
887+
'multiple shares with one name in cluster: %s',
888+
' '.join(s.get()['share_id'] for s in found_curr),
889+
)
890+
return found_curr[0].get()['share_id']
825891

826892

827893
def _check_join_auths(
@@ -1244,3 +1310,15 @@ def _cephx_data_entity(cluster_id: str) -> str:
12441310
use for data access.
12451311
"""
12461312
return f'client.smb.fs.cluster.{cluster_id}'
1313+
1314+
1315+
@contextlib.contextmanager
1316+
def _store_transaction(store: ConfigStore) -> Iterator[None]:
1317+
transaction = getattr(store, 'transaction', None)
1318+
if not transaction:
1319+
log.debug("No transaction support for store")
1320+
yield None
1321+
return
1322+
log.debug("Using store transaction")
1323+
with transaction():
1324+
yield None

0 commit comments

Comments
 (0)