Skip to content

Commit 59f4ca0

Browse files
authored
Merge pull request #291 from powerapi-ng/refactor/k8s-monitor-process
refactor(processor/k8s): Rework Kubernetes pre-processor
2 parents 5731225 + c0b682b commit 59f4ca0

32 files changed

+1198
-1503
lines changed

src/powerapi/actor/actor.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,11 @@ def _signal_handler_setup(self):
150150
Define how to handle signal interrupts
151151
"""
152152

153-
def term_handler(_, __):
154-
self.logger.debug("Term handler")
155-
pp_handler = self.state.get_corresponding_handler(PoisonPillMessage())
156-
pp_handler.handle(PoisonPillMessage(soft=False, sender_name='Term handler'))
153+
def term_handler(signum, _):
154+
signame = signal.Signals(signum).name
155+
self.logger.debug("Received signal %s (%s), terminating actor...", signame, signum)
156+
157+
self.state.alive = False
157158
self._kill_process()
158159
sys.exit(0)
159160

src/powerapi/cli/common_cli_parsing_manager.py

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -406,19 +406,7 @@ def __init__(self):
406406

407407
subparser_k8s_pre_processor = SubgroupConfigParsingManager("k8s")
408408
subparser_k8s_pre_processor.add_argument(
409-
"a", "k8s-api-mode", help_text="k8s api mode (local, manual or cluster)"
410-
)
411-
subparser_k8s_pre_processor.add_argument(
412-
"t",
413-
"time-interval",
414-
help_text="time interval for the k8s monitoring",
415-
argument_type=int
416-
)
417-
subparser_k8s_pre_processor.add_argument(
418-
"o",
419-
"timeout-query",
420-
help_text="timeout for k8s queries",
421-
argument_type=int
409+
"a", "api-mode", help_text="k8s api mode (local, manual or cluster)"
422410
)
423411

424412
subparser_k8s_pre_processor.add_argument(
@@ -429,7 +417,7 @@ def __init__(self):
429417

430418
subparser_k8s_pre_processor.add_argument(
431419
"h",
432-
"host",
420+
"api-host",
433421
help_text="host required for k8s manual configuration",
434422
)
435423

src/powerapi/cli/generator.py

Lines changed: 45 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,17 @@
3333
from typing import Dict, Type, Callable
3434

3535
from powerapi.actor import Actor
36+
from powerapi.database import MongoDB, CsvDB, OpenTSDB, SocketDB, PrometheusDB, VirtioFSDB, FileDB
3637
from powerapi.database.influxdb2 import InfluxDB2
3738
from powerapi.exception import PowerAPIException, ModelNameAlreadyUsed, DatabaseNameDoesNotExist, ModelNameDoesNotExist, \
38-
DatabaseNameAlreadyUsed, ProcessorTypeDoesNotExist, ProcessorTypeAlreadyUsed, MonitorTypeDoesNotExist
39+
DatabaseNameAlreadyUsed, ProcessorTypeDoesNotExist, ProcessorTypeAlreadyUsed
3940
from powerapi.filter import Filter
40-
from powerapi.processor.pre.k8s.k8s_monitor import K8sMonitorAgent
41-
from powerapi.processor.pre.k8s.k8s_pre_processor_actor import K8sPreProcessorActor, TIME_INTERVAL_DEFAULT_VALUE, \
42-
TIMEOUT_QUERY_DEFAULT_VALUE
41+
from powerapi.processor.pre.k8s import K8sPreProcessorActor
4342
from powerapi.processor.pre.libvirt.libvirt_pre_processor_actor import LibvirtPreProcessorActor
4443
from powerapi.processor.processor_actor import ProcessorActor
45-
from powerapi.report import HWPCReport, PowerReport, ControlReport, ProcfsReport, Report, FormulaReport
46-
from powerapi.database import MongoDB, CsvDB, OpenTSDB, SocketDB, PrometheusDB, \
47-
VirtioFSDB, FileDB
4844
from powerapi.puller import PullerActor
4945
from powerapi.pusher import PusherActor
46+
from powerapi.report import HWPCReport, PowerReport, ControlReport, ProcfsReport, Report, FormulaReport
5047

5148
COMPONENT_TYPE_KEY = 'type'
5249
COMPONENT_MODEL_KEY = 'model'
@@ -59,23 +56,19 @@
5956
ACTOR_NAME_KEY = 'actor_name'
6057
TARGET_ACTORS_KEY = 'target_actors'
6158
REGEXP_KEY = 'regexp'
62-
K8S_API_MODE_KEY = 'k8s-api-mode'
63-
TIME_INTERVAL_KEY = 'time-interval'
64-
TIMEOUT_QUERY_KEY = 'timeout-query'
59+
6560
PULLER_NAME_KEY = 'puller'
6661
PUSHER_NAME_KEY = 'pusher'
67-
API_KEY_KEY = 'api-key'
68-
HOST_KEY = 'host'
62+
63+
K8S_API_MODE_KEY = 'api-mode'
64+
K8S_API_KEY_KEY = 'api-key'
65+
K8S_API_HOST_KEY = 'api-host'
6966

7067
LISTENER_ACTOR_KEY = 'listener_actor'
7168

7269
GENERAL_CONF_STREAM_MODE_KEY = 'stream'
7370
GENERAL_CONF_VERBOSE_KEY = 'verbose'
7471

75-
MONITOR_NAME_SUFFIX = '_monitor'
76-
MONITOR_KEY = 'monitor'
77-
K8S_COMPONENT_TYPE_VALUE = 'k8s'
78-
7972

8073
class Generator:
8174
"""
@@ -332,39 +325,48 @@ def _gen_actor(self, component_config: dict, main_config: dict, component_name:
332325

333326
class PreProcessorGenerator(ProcessorGenerator):
334327
"""
335-
Generator that initialises the pre-processor from config
328+
Generator that initialises the pre-processor from config.
336329
"""
337330

338331
def __init__(self):
339-
ProcessorGenerator.__init__(self, 'pre-processor', self._get_default_processor_factories())
332+
super().__init__('pre-processor', self._get_default_processor_factories())
340333

341334
@staticmethod
342-
def _get_default_processor_factories() -> Dict[str, Callable[[Dict], ProcessorActor]]:
335+
def _libvirt_pre_processor_factory(processor_config: dict) -> LibvirtPreProcessorActor:
336+
"""
337+
Libvirt pre-processor actor factory.
338+
:param processor_config: Pre-Processor configuration
339+
:return: Configured Libvirt pre-processor actor
340+
"""
341+
name = processor_config[ACTOR_NAME_KEY]
342+
uri = processor_config[COMPONENT_URI_KEY]
343+
regexp = processor_config[REGEXP_KEY]
344+
target_actors_name = [processor_config[PULLER_NAME_KEY]]
345+
level_logger = logging.DEBUG if processor_config[GENERAL_CONF_VERBOSE_KEY] else logging.INFO
346+
return LibvirtPreProcessorActor(name, uri, regexp, [], target_actors_name, level_logger)
347+
348+
@staticmethod
349+
def _k8s_pre_processor_factory(processor_config: Dict) -> K8sPreProcessorActor:
350+
"""
351+
Kubernetes pre-processor actor factory.
352+
:param processor_config: Pre-Processor configuration
353+
:return: Configured Kubernetes pre-processor actor
354+
"""
355+
name = processor_config[ACTOR_NAME_KEY]
356+
api_mode = processor_config.get(K8S_API_MODE_KEY, 'manual') # use manual mode by default
357+
api_host = processor_config.get(K8S_API_HOST_KEY, None)
358+
api_key = processor_config.get(K8S_API_KEY_KEY, None)
359+
target_actors_name = [processor_config[PULLER_NAME_KEY]]
360+
level_logger = logging.DEBUG if processor_config[GENERAL_CONF_VERBOSE_KEY] else logging.INFO
361+
return K8sPreProcessorActor(name, [], target_actors_name, api_mode, api_host, api_key, level_logger)
362+
363+
def _get_default_processor_factories(self) -> Dict[str, Callable[[Dict], ProcessorActor]]:
364+
"""
365+
Return the default pre-processors factory.
366+
"""
343367
return {
344-
'libvirt': lambda processor_config: LibvirtPreProcessorActor(name=processor_config[ACTOR_NAME_KEY],
345-
uri=processor_config[COMPONENT_URI_KEY],
346-
regexp=processor_config[REGEXP_KEY],
347-
target_actors_names=[processor_config
348-
[PULLER_NAME_KEY]]),
349-
'k8s': lambda processor_config: K8sPreProcessorActor(name=processor_config[ACTOR_NAME_KEY],
350-
ks8_api_mode=None if
351-
K8S_API_MODE_KEY not in processor_config else
352-
processor_config[K8S_API_MODE_KEY],
353-
time_interval=TIME_INTERVAL_DEFAULT_VALUE if
354-
TIME_INTERVAL_KEY not in processor_config else
355-
processor_config[TIME_INTERVAL_KEY],
356-
timeout_query=TIMEOUT_QUERY_DEFAULT_VALUE if
357-
TIMEOUT_QUERY_KEY not in processor_config
358-
else processor_config[TIMEOUT_QUERY_KEY],
359-
api_key=None if API_KEY_KEY not in processor_config
360-
else processor_config[API_KEY_KEY],
361-
host=None if HOST_KEY not in processor_config
362-
else processor_config[HOST_KEY],
363-
level_logger=logging.DEBUG if
364-
processor_config[GENERAL_CONF_VERBOSE_KEY] else
365-
logging.INFO,
366-
target_actors_names=[processor_config[PULLER_NAME_KEY]]
367-
)
368+
'libvirt': self._libvirt_pre_processor_factory,
369+
'k8s': self._k8s_pre_processor_factory,
368370
}
369371

370372

@@ -379,48 +381,3 @@ def __init__(self):
379381
@staticmethod
380382
def _get_default_processor_factories() -> Dict[str, Callable[[Dict], ProcessorActor]]:
381383
return {}
382-
383-
384-
class MonitorGenerator(Generator):
385-
"""
386-
Generator that initialises the monitor by using a K8sPreProcessorActor
387-
"""
388-
389-
def __init__(self):
390-
Generator.__init__(self, component_group_name=MONITOR_KEY)
391-
392-
self.monitor_factory = {
393-
K8S_COMPONENT_TYPE_VALUE: lambda monitor_config: K8sMonitorAgent(
394-
name=monitor_config[ACTOR_NAME_KEY],
395-
concerned_actor_state=monitor_config[LISTENER_ACTOR_KEY].state,
396-
level_logger=monitor_config[LISTENER_ACTOR_KEY].logger.getEffectiveLevel()
397-
)
398-
399-
}
400-
401-
def _gen_actor(self, component_config: dict, main_config: dict, component_name: str):
402-
403-
monitor_actor_type = component_config[COMPONENT_TYPE_KEY]
404-
405-
if monitor_actor_type not in self.monitor_factory:
406-
raise MonitorTypeDoesNotExist(monitor_type=monitor_actor_type)
407-
408-
component_config[ACTOR_NAME_KEY] = component_name + MONITOR_NAME_SUFFIX
409-
return self.monitor_factory[monitor_actor_type](component_config)
410-
411-
def generate_from_processors(self, processors: dict) -> dict:
412-
"""
413-
Generates monitors associated with the given processors
414-
:param dict processors: Dictionary with the processors for the generation
415-
"""
416-
417-
monitors_config = {MONITOR_KEY: {}}
418-
419-
for processor_name, processor in processors.items():
420-
421-
if isinstance(processor, K8sPreProcessorActor):
422-
monitors_config[MONITOR_KEY][processor_name + MONITOR_NAME_SUFFIX] = {
423-
COMPONENT_TYPE_KEY: K8S_COMPONENT_TYPE_VALUE,
424-
LISTENER_ACTOR_KEY: processor}
425-
426-
return self.generate(main_config=monitors_config)

src/powerapi/processor/pre/k8s/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,5 @@
2626
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
2727
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
2828
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
30+
from .actor import K8sPreProcessorActor
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Copyright (c) 2024, Inria
2+
# Copyright (c) 2024, University of Lille
3+
# All rights reserved.
4+
#
5+
# Redistribution and use in source and binary forms, with or without
6+
# modification, are permitted provided that the following conditions are met:
7+
#
8+
# * Redistributions of source code must retain the above copyright notice, this
9+
# list of conditions and the following disclaimer.
10+
#
11+
# * Redistributions in binary form must reproduce the above copyright notice,
12+
# this list of conditions and the following disclaimer in the documentation
13+
# and/or other materials provided with the distribution.
14+
#
15+
# * Neither the name of the copyright holder nor the names of its
16+
# contributors may be used to endorse or promote products derived from
17+
# this software without specific prior written permission.
18+
#
19+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
30+
31+
def is_target_a_valid_k8s_cgroups_path(target: str) -> bool:
32+
"""
33+
Checks if the provided target is a valid Kubernetes cgroups path.
34+
"""
35+
return target.startswith('/kubepods')
36+
37+
38+
def extract_container_id_from_k8s_cgroups_path(cgroups_path: str) -> str:
39+
"""
40+
Extract the container id from a Kubernetes cgroups path.
41+
:param cgroups_path: Cgroups path of the target
42+
:return: The container id
43+
"""
44+
container_id = cgroups_path.rsplit('/', 1)[1]
45+
46+
# handle cgroups v2 (systemd) paths:
47+
if container_id.endswith('.scope'):
48+
container_id = container_id.replace('.scope', '')
49+
container_id = container_id.rsplit('-', 1)[1] # remove CRI prefix (i.e. 'containerd-<id>')
50+
51+
return container_id
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# Copyright (c) 2023, INRIA
2+
# Copyright (c) 2023, University of Lille
3+
# All rights reserved.
4+
#
5+
# Redistribution and use in source and binary forms, with or without
6+
# modification, are permitted provided that the following conditions are met:
7+
#
8+
# * Redistributions of source code must retain the above copyright notice, this
9+
# list of conditions and the following disclaimer.
10+
#
11+
# * Redistributions in binary form must reproduce the above copyright notice,
12+
# this list of conditions and the following disclaimer in the documentation
13+
# and/or other materials provided with the distribution.
14+
#
15+
# * Neither the name of the copyright holder nor the names of its
16+
# contributors may be used to endorse or promote products derived from
17+
# this software without specific prior written permission.
18+
#
19+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
30+
import logging
31+
from multiprocessing import Manager
32+
33+
from powerapi.actor import Actor
34+
from powerapi.message import StartMessage, PoisonPillMessage
35+
from powerapi.processor.processor_actor import ProcessorState, ProcessorActor
36+
from powerapi.report import HWPCReport
37+
from .handlers import K8sPreProcessorActorHWPCReportHandler
38+
from .handlers import K8sPreProcessorActorStartMessageHandler, K8sPreProcessorActorPoisonPillMessageHandler
39+
from .metadata_cache_manager import K8sMetadataCacheManager
40+
from .monitor_agent import K8sMonitorAgent
41+
42+
43+
class K8sPreProcessorState(ProcessorState):
44+
"""
45+
State of the Kubernetes pre-processor actor.
46+
"""
47+
48+
def __init__(self, actor: Actor, target_actors: list, target_actors_names: list, api_mode: str, api_host: str, api_key: str):
49+
super().__init__(actor, target_actors, target_actors_names)
50+
51+
self.api_mode = api_mode
52+
self.api_host = api_host
53+
self.api_key = api_key
54+
55+
self.manager = None
56+
self.metadata_cache_manager = None
57+
self.monitor_agent = None
58+
59+
def initialize_metadata_cache_manager(self):
60+
"""
61+
Initialize the metadata cache manager.
62+
This method should **ONLY** be called from the pre-processor actor process.
63+
"""
64+
self.manager = Manager()
65+
self.metadata_cache_manager = K8sMetadataCacheManager(self.manager)
66+
self.monitor_agent = K8sMonitorAgent(self.metadata_cache_manager, self.api_mode, self.api_host, self.api_key)
67+
68+
69+
class K8sPreProcessorActor(ProcessorActor):
70+
"""
71+
Pre-Processor Actor that adds Kubernetes related metadata to reports.
72+
"""
73+
74+
def __init__(self, name: str, target_actors: list, target_actors_names: list, api_mode: str = None, api_host: str = None,
75+
api_key: str = None, level_logger: int = logging.WARNING, timeout: int = 5000):
76+
super().__init__(name, level_logger, timeout)
77+
78+
self.state = K8sPreProcessorState(self, target_actors, target_actors_names, api_mode, api_key, api_host)
79+
80+
def setup(self):
81+
"""
82+
Set up the Kubernetes pre-processor actor.
83+
"""
84+
self.state.initialize_metadata_cache_manager()
85+
86+
self.add_handler(StartMessage, K8sPreProcessorActorStartMessageHandler(self.state))
87+
self.add_handler(HWPCReport, K8sPreProcessorActorHWPCReportHandler(self.state))
88+
self.add_handler(PoisonPillMessage, K8sPreProcessorActorPoisonPillMessageHandler(self.state))

0 commit comments

Comments
 (0)