Skip to content

Commit 2c71930

Browse files
cephadm: add listing_updaters module
Add a small suite of new daemon status updater subclasses to a listing_updaters.py. These classes were not put directly in listing.py as I wanted to keep that module smaller with few complex dependencies. These updater classes, based on the temporary _update_daemon_and_container_status and _update_legacy_status functions in cephadm.py will be used to replace those functions in a future change. Signed-off-by: John Mulligan <[email protected]>
1 parent ae23ced commit 2c71930

File tree

1 file changed

+362
-0
lines changed

1 file changed

+362
-0
lines changed
Lines changed: 362 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,362 @@
1+
# Additional types to help with container & daemon listing
2+
3+
from typing import Any, Dict, List, Optional
4+
5+
import json
6+
import logging
7+
import os
8+
9+
from .call_wrappers import call, CallVerbosity
10+
from .container_engines import (
11+
normalize_container_id,
12+
parsed_container_cpu_perc,
13+
parsed_container_mem_usage,
14+
)
15+
from .container_types import get_container_stats
16+
from .context import CephadmContext
17+
from .daemon_identity import DaemonIdentity
18+
from .daemons import (
19+
CephIscsi,
20+
CephNvmeof,
21+
CustomContainer,
22+
Monitoring,
23+
NFSGanesha,
24+
SMB,
25+
SNMPGateway,
26+
MgmtGateway,
27+
OAuth2Proxy,
28+
)
29+
from .daemons.ceph import ceph_daemons
30+
from .data_utils import normalize_image_digest, try_convert_datetime
31+
from .file_utils import get_file_timestamp
32+
from .listing import DaemonStatusUpdater
33+
from .systemd import check_unit
34+
35+
36+
logger = logging.getLogger()
37+
38+
39+
class CoreStatusUpdater(DaemonStatusUpdater):
40+
def __init__(self, keep_container_info: str = '') -> None:
41+
# set keep_container_info to a custom key that will be used to cache
42+
# the ContainerInfo object in the status dict.
43+
self.keep_container_info = keep_container_info
44+
45+
def update(
46+
self,
47+
val: Dict[str, Any],
48+
ctx: CephadmContext,
49+
identity: DaemonIdentity,
50+
data_dir: str,
51+
) -> None:
52+
enabled, state, _ = check_unit(ctx, identity.unit_name)
53+
val['enabled'] = enabled
54+
val['state'] = state
55+
56+
container_id = image_name = image_id = version = None
57+
start_stamp = None
58+
daemon_dir = os.path.join(
59+
data_dir, identity.fsid, identity.daemon_name
60+
)
61+
cinfo = get_container_stats(ctx, identity)
62+
if cinfo:
63+
if self.keep_container_info:
64+
val[self.keep_container_info] = cinfo
65+
container_id = cinfo.container_id
66+
image_name = cinfo.image_name
67+
image_id = cinfo.image_id
68+
version = cinfo.version
69+
image_id = normalize_container_id(image_id)
70+
start_stamp = try_convert_datetime(cinfo.start)
71+
else:
72+
vfile = os.path.join(daemon_dir, 'unit.image')
73+
try:
74+
with open(vfile, 'r') as f:
75+
image_name = f.read().strip() or None
76+
except IOError:
77+
pass
78+
79+
# unit.meta?
80+
mfile = os.path.join(daemon_dir, 'unit.meta')
81+
try:
82+
with open(mfile, 'r') as f:
83+
meta = json.loads(f.read())
84+
val.update(meta)
85+
except IOError:
86+
pass
87+
88+
val['container_id'] = container_id
89+
val['container_image_name'] = image_name
90+
val['container_image_id'] = image_id
91+
val['version'] = version
92+
val['started'] = start_stamp
93+
val['created'] = get_file_timestamp(
94+
os.path.join(daemon_dir, 'unit.created')
95+
)
96+
val['deployed'] = get_file_timestamp(
97+
os.path.join(daemon_dir, 'unit.image')
98+
)
99+
val['configured'] = get_file_timestamp(
100+
os.path.join(daemon_dir, 'unit.configured')
101+
)
102+
103+
def legacy_update(
104+
self,
105+
val: Dict[str, Any],
106+
ctx: CephadmContext,
107+
fsid: str,
108+
daemon_type: str,
109+
name: str,
110+
data_dir: str,
111+
) -> None:
112+
cache = getattr(self, '_cache', {})
113+
setattr(self, '_cache', cache)
114+
legacy_unit_name = val['name']
115+
(val['enabled'], val['state'], _) = check_unit(ctx, legacy_unit_name)
116+
if not cache.get('host_version'):
117+
try:
118+
out, err, code = call(
119+
ctx,
120+
['ceph', '-v'],
121+
verbosity=CallVerbosity.QUIET,
122+
)
123+
if not code and out.startswith('ceph version '):
124+
cache['host_version'] = out.split(' ')[2]
125+
except Exception:
126+
pass
127+
val['host_version'] = cache.get('host_version')
128+
129+
130+
class DigestsStatusUpdater(DaemonStatusUpdater):
131+
def __init__(self) -> None:
132+
self.seen_digests: Dict[str, List[str]] = {}
133+
134+
def update(
135+
self,
136+
val: Dict[str, Any],
137+
ctx: CephadmContext,
138+
identity: DaemonIdentity,
139+
data_dir: str,
140+
) -> None:
141+
# collect digests for this image id
142+
assert 'container_image_digests' not in val
143+
image_id = val.get('container_image_id', None)
144+
if not image_id:
145+
val['container_image_digests'] = None
146+
return # container info missing or no longer running?
147+
container_path = ctx.container_engine.path
148+
image_digests = self.seen_digests.get(image_id)
149+
if not image_digests:
150+
out, err, code = call(
151+
ctx,
152+
[
153+
container_path,
154+
'image',
155+
'inspect',
156+
image_id,
157+
'--format',
158+
'{{.RepoDigests}}',
159+
],
160+
verbosity=CallVerbosity.QUIET,
161+
)
162+
if not code:
163+
image_digests = list(
164+
set(
165+
map(
166+
normalize_image_digest,
167+
out.strip()[1:-1].split(' '),
168+
)
169+
)
170+
)
171+
self.seen_digests[image_id] = image_digests
172+
val['container_image_digests'] = image_digests
173+
174+
175+
class VersionStatusUpdater(DaemonStatusUpdater):
176+
def __init__(self) -> None:
177+
self.seen_versions: Dict[str, Optional[str]] = {}
178+
179+
def update(
180+
self,
181+
val: Dict[str, Any],
182+
ctx: CephadmContext,
183+
identity: DaemonIdentity,
184+
data_dir: str,
185+
) -> None:
186+
# collect digests for this image id
187+
container_path = ctx.container_engine.path
188+
image_id = val.get('container_image_id', None)
189+
version = val.get('version', None)
190+
daemon_type = identity.daemon_type
191+
container_id = val['container_id']
192+
if not image_id and not version:
193+
return # container info missing or no longer running?
194+
# identify software version inside the container (if we can)
195+
if not version or '.' not in version:
196+
version = self.seen_versions.get(image_id, None)
197+
if daemon_type == NFSGanesha.daemon_type:
198+
version = NFSGanesha.get_version(ctx, container_id)
199+
if daemon_type == CephIscsi.daemon_type:
200+
version = CephIscsi.get_version(ctx, container_id)
201+
if daemon_type == CephNvmeof.daemon_type:
202+
version = CephNvmeof.get_version(ctx, container_id)
203+
if daemon_type == SMB.daemon_type:
204+
version = SMB.get_version(ctx, container_id)
205+
elif not version:
206+
if daemon_type in ceph_daemons():
207+
out, err, code = call(
208+
ctx,
209+
[
210+
container_path,
211+
'exec',
212+
container_id,
213+
'ceph',
214+
'-v',
215+
],
216+
verbosity=CallVerbosity.QUIET,
217+
)
218+
if not code and out.startswith('ceph version '):
219+
version = out.split(' ')[2]
220+
self.seen_versions[image_id] = version
221+
elif daemon_type == 'grafana':
222+
out, err, code = call(
223+
ctx,
224+
[
225+
container_path,
226+
'exec',
227+
container_id,
228+
'grafana',
229+
'server',
230+
'-v',
231+
],
232+
verbosity=CallVerbosity.QUIET,
233+
)
234+
if not code and out.startswith('Version '):
235+
version = out.split(' ')[1]
236+
self.seen_versions[image_id] = version
237+
elif daemon_type in [
238+
'prometheus',
239+
'alertmanager',
240+
'node-exporter',
241+
'loki',
242+
'promtail',
243+
]:
244+
version = Monitoring.get_version(
245+
ctx, container_id, daemon_type
246+
)
247+
self.seen_versions[image_id] = version
248+
elif daemon_type == 'haproxy':
249+
out, err, code = call(
250+
ctx,
251+
[
252+
container_path,
253+
'exec',
254+
container_id,
255+
'haproxy',
256+
'-v',
257+
],
258+
verbosity=CallVerbosity.QUIET,
259+
)
260+
if (
261+
not code
262+
and out.startswith('HA-Proxy version ')
263+
or out.startswith('HAProxy version ')
264+
):
265+
version = out.split(' ')[2]
266+
self.seen_versions[image_id] = version
267+
elif daemon_type == 'keepalived':
268+
out, err, code = call(
269+
ctx,
270+
[
271+
container_path,
272+
'exec',
273+
container_id,
274+
'keepalived',
275+
'--version',
276+
],
277+
verbosity=CallVerbosity.QUIET,
278+
)
279+
if not code and err.startswith('Keepalived '):
280+
version = err.split(' ')[1]
281+
if version[0] == 'v':
282+
version = version[1:]
283+
self.seen_versions[image_id] = version
284+
elif daemon_type == CustomContainer.daemon_type:
285+
# Because a custom container can contain
286+
# everything, we do not know which command
287+
# to execute to get the version.
288+
pass
289+
elif daemon_type == SNMPGateway.daemon_type:
290+
version = SNMPGateway.get_version(
291+
ctx, identity.fsid, identity.daemon_id
292+
)
293+
self.seen_versions[image_id] = version
294+
elif daemon_type == MgmtGateway.daemon_type:
295+
version = MgmtGateway.get_version(ctx, container_id)
296+
self.seen_versions[image_id] = version
297+
elif daemon_type == OAuth2Proxy.daemon_type:
298+
version = OAuth2Proxy.get_version(ctx, container_id)
299+
self.seen_versions[image_id] = version
300+
else:
301+
logger.warning(
302+
'version for unknown daemon type %s' % daemon_type
303+
)
304+
val['version'] = version
305+
306+
307+
class MemUsageStatusUpdater(DaemonStatusUpdater):
308+
def __init__(self) -> None:
309+
self._loaded = False
310+
self.seen_memusage_cid_len: int = 0
311+
self.seen_memusage: Dict[str, int] = {}
312+
313+
def _load(self, ctx: CephadmContext) -> None:
314+
if self._loaded:
315+
return
316+
length, memusage = parsed_container_mem_usage(ctx)
317+
self.seen_memusage_cid_len = length
318+
self.seen_memusage = memusage
319+
self._loaded = True
320+
321+
def update(
322+
self,
323+
val: Dict[str, Any],
324+
ctx: CephadmContext,
325+
identity: DaemonIdentity,
326+
data_dir: str,
327+
) -> None:
328+
container_id = val.get('container_id', None)
329+
if container_id:
330+
self._load(ctx)
331+
val['memory_usage'] = self.seen_memusage.get(
332+
container_id[0 : self.seen_memusage_cid_len]
333+
)
334+
335+
336+
class CPUUsageStatusUpdater(DaemonStatusUpdater):
337+
def __init__(self) -> None:
338+
self._loaded = False
339+
self.seen_cpuperc_cid_len: int = 0
340+
self.seen_cpuperc: Dict[str, str] = {}
341+
342+
def _load(self, ctx: CephadmContext) -> None:
343+
if self._loaded:
344+
return
345+
length, cpuperc = parsed_container_cpu_perc(ctx)
346+
self.seen_cpuperc_cid_len = length
347+
self.seen_cpuperc = cpuperc
348+
self._loaded = True
349+
350+
def update(
351+
self,
352+
val: Dict[str, Any],
353+
ctx: CephadmContext,
354+
identity: DaemonIdentity,
355+
data_dir: str,
356+
) -> None:
357+
container_id = val.get('container_id', None)
358+
if container_id:
359+
self._load(ctx)
360+
val['cpu_percentage'] = self.seen_cpuperc.get(
361+
container_id[0 : self.seen_cpuperc_cid_len]
362+
)

0 commit comments

Comments
 (0)