Skip to content

Commit f860da3

Browse files
sambacc: add permissions module
The permissions module holds types that can be used to manage the permissions on share directories. The current implemetations include a no-op class, a initialize-posix-permissions-once class, and a always-set-posix-permissions class. Signed-off-by: John Mulligan <[email protected]>
1 parent c729f82 commit f860da3

File tree

1 file changed

+170
-0
lines changed

1 file changed

+170
-0
lines changed

sambacc/permissions.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
#
2+
# sambacc: a samba container configuration tool
3+
# Copyright (C) 2022 John Mulligan
4+
#
5+
# This program is free software: you can redistribute it and/or modify
6+
# it under the terms of the GNU General Public License as published by
7+
# the Free Software Foundation, either version 3 of the License, or
8+
# (at your option) any later version.
9+
#
10+
# This program is distributed in the hope that it will be useful,
11+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
# GNU General Public License for more details.
14+
#
15+
# You should have received a copy of the GNU General Public License
16+
# along with this program. If not, see <http://www.gnu.org/licenses/>
17+
#
18+
19+
from __future__ import annotations
20+
21+
import datetime
22+
import errno
23+
import logging
24+
import os
25+
import typing
26+
27+
import xattr # type: ignore
28+
29+
_logger = logging.getLogger(__name__)
30+
31+
32+
class PermissionsHandler(typing.Protocol):
33+
def has_status(self) -> bool:
34+
"""Return true if the path has status metadata."""
35+
... # pragma: no cover
36+
37+
def status_ok(self) -> bool:
38+
"""Return true if status is OK (no changes are needed)."""
39+
... # pragma: no cover
40+
41+
def update(self) -> None:
42+
"""Update the permissions as needed."""
43+
... # pragma: no cover
44+
45+
def path(self) -> str:
46+
"""Return the path under consideration."""
47+
... # pragma: no cover
48+
49+
50+
class NoopPermsHandler:
51+
def __init__(
52+
self,
53+
path: str,
54+
status_xattr: str,
55+
options: typing.Dict[str, str],
56+
root: str = "/",
57+
) -> None:
58+
self._path = path
59+
60+
def path(self) -> str:
61+
return self._path
62+
63+
def has_status(self) -> bool:
64+
return False
65+
66+
def status_ok(self) -> bool:
67+
return True
68+
69+
def update(self) -> None:
70+
pass
71+
72+
73+
class InitPosixPermsHandler:
74+
"""Initialize posix permissions on a share (directory).
75+
76+
This handler sets posix permissions only.
77+
78+
It will only set the permissions when the status xattr does not
79+
match the expected prefix value. This prevents it from overwiting
80+
permissions that may have been changed intentionally after
81+
share initialization.
82+
"""
83+
84+
_default_mode = 0o777
85+
_default_status_prefix = "v1"
86+
87+
def __init__(
88+
self,
89+
path: str,
90+
status_xattr: str,
91+
options: typing.Dict[str, str],
92+
root: str = "/",
93+
) -> None:
94+
self._path = path
95+
self._root = root
96+
self._xattr = status_xattr
97+
try:
98+
self._mode = int(options["mode"], 8)
99+
except KeyError:
100+
self._mode = self._default_mode
101+
try:
102+
self._prefix = options["status_prefix"]
103+
except KeyError:
104+
self._prefix = self._default_status_prefix
105+
106+
def path(self) -> str:
107+
return self._path
108+
109+
def _full_path(self) -> str:
110+
return os.path.join(self._root, self._path.lstrip("/"))
111+
112+
def has_status(self):
113+
try:
114+
self._get_status()
115+
return True
116+
except KeyError:
117+
return False
118+
119+
def status_ok(self):
120+
try:
121+
sval = self._get_status()
122+
except KeyError:
123+
return False
124+
curr_prefix = sval.split("/")[0]
125+
return curr_prefix == self._prefix
126+
127+
def update(self):
128+
if self.status_ok():
129+
return
130+
self._set_perms()
131+
self._set_status()
132+
133+
def _get_status(self) -> str:
134+
path = self._full_path()
135+
_logger.debug("reading xattr %r: %r", self._xattr, path)
136+
try:
137+
value = xattr.get(path, self._xattr, nofollow=True)
138+
except OSError as err:
139+
if err.errno == errno.ENODATA:
140+
raise KeyError(self._xattr)
141+
raise
142+
return value.decode("utf8")
143+
144+
def _set_perms(self) -> None:
145+
# yeah, this is really simple compared to all the state management
146+
# stuff.
147+
path = self._full_path()
148+
os.chmod(path, self._mode, follow_symlinks=False)
149+
150+
def _timestamp(self) -> str:
151+
return datetime.datetime.now().strftime("%s")
152+
153+
def _set_status(self) -> None:
154+
# we save the marker prefix followed by a timestamp as a debugging hint
155+
ts = self._timestamp()
156+
val = f"{self._prefix}/{ts}"
157+
path = self._full_path()
158+
_logger.debug("setting xattr %r=%r: %r", self._xattr, val, self._path)
159+
return xattr.set(path, self._xattr, val, nofollow=True)
160+
161+
162+
class AlwaysPosixPermsHandler(InitPosixPermsHandler):
163+
"""Works like the init handler, but always sets the permissions,
164+
even if the status xattr exists and is valid.
165+
May be useful for testing and debugging.
166+
"""
167+
168+
def update(self):
169+
self._set_perms()
170+
self._set_status()

0 commit comments

Comments
 (0)