From bc86aa8688470f92b1b3f4f2778fba275fff738e Mon Sep 17 00:00:00 2001 From: Aleksey Mikhaylov Date: Fri, 27 Sep 2024 01:51:51 +0600 Subject: [PATCH] Refactor running processess metric --- aqueduct/flow.py | 19 +++++++++++-------- aqueduct/metrics/collect.py | 11 ++++++----- aqueduct/metrics/export.py | 5 ++--- aqueduct/metrics/processes.py | 24 ------------------------ setup.py | 2 +- 5 files changed, 20 insertions(+), 41 deletions(-) delete mode 100644 aqueduct/metrics/processes.py diff --git a/aqueduct/flow.py b/aqueduct/flow.py index 9b4d3b8..4a23782 100644 --- a/aqueduct/flow.py +++ b/aqueduct/flow.py @@ -25,7 +25,6 @@ from .metrics.collect import Collector, TasksStats from .metrics.export import Exporter from .metrics.manager import get_metrics_manager -from .metrics.processes import ProcessesStats from .metrics.queue import TaskMetricsQueue from .metrics.timer import timeit from .multiprocessing import ( @@ -255,7 +254,7 @@ async def _check_memory_usage(self, sleep_sec: float = 1.): nprocs_memory_sum = 0 for process in processes: memory = process.memory_info().rss - nprocs_memory_sum += memory + nprocs_memory_sum += memory metrics.add(flow_step_name, memory) all_memory_usage += nprocs_memory_sum if len(processes) != 1: @@ -264,6 +263,14 @@ async def _check_memory_usage(self, sleep_sec: float = 1.): self._metrics_manager.collector.add_memory_usage(metrics) await asyncio.sleep(sleep_sec) + async def _count_processes(self, sleep_sec: float = 1.): + pod_name = os.environ.get('K8S_POD', 'local') + while self.state != FlowState.STOPPED: + metrics = MetricsItems() + metrics.add(f'running.{pod_name}', len(mp.active_children())) + self._metrics_manager.collector.add_processes_count(metrics) + await asyncio.sleep(sleep_sec) + def _run_steps(self, timeout: Optional[int]): if len(self._steps) == 0: log.info('Flow has zero steps -> do nothing') @@ -331,7 +338,9 @@ def _run_tasks(self): self._tasks.append(asyncio.ensure_future(self._check_is_alive())) self._metrics_manager.start(queues_info=self._get_queues_info()) + self._tasks.append(asyncio.ensure_future(self._check_memory_usage())) + self._tasks.append(asyncio.ensure_future(self._count_processes())) def _get_queues_info(self) -> Dict[mp.Queue, str]: """Returns queues between Step handlers and its names. @@ -412,7 +421,6 @@ async def _check_is_alive(self, sleep_sec: float = 1.): If at least one process is not alive, it stops Flow. """ while self.state != FlowState.STOPPED: - processes_stats = ProcessesStats() for handler, context in self._contexts.items(): for proc in context.processes: if not proc.is_alive(): @@ -420,12 +428,7 @@ async def _check_is_alive(self, sleep_sec: float = 1.): handler_name = handler.__class__.__name__ log.error('The process %s for %s handler is dead', proc.pid, handler_name) - processes_stats.add_dead_process() - self._metrics_manager.collector.add_processes_stats(processes_stats) await self.stop(graceful=False) - else: - processes_stats.add_running_process() - self._metrics_manager.collector.add_processes_stats(processes_stats) await asyncio.sleep(sleep_sec) @staticmethod diff --git a/aqueduct/metrics/collect.py b/aqueduct/metrics/collect.py index a3b9bbc..029fa04 100644 --- a/aqueduct/metrics/collect.py +++ b/aqueduct/metrics/collect.py @@ -5,7 +5,6 @@ from . import IMetricsItems from .base import MetricsItems, MetricsTypes -from .processes import ProcessesStats from .task import TasksMetricsStorage @@ -31,18 +30,20 @@ def __init__(self): self.queue_sizes = MetricsItems() self.tasks_stats = TasksStats() self.memory_usage = MetricsItems() - self.processes_stats = ProcessesStats() + self.processes_count = MetricsItems() def extend(self, storage: TasksMetricsStorage): super().extend(storage) if isinstance(storage, AqueductMetricsStorage): self.queue_sizes.extend(storage.queue_sizes) self.tasks_stats.extend(storage.tasks_stats) - self.processes_stats.extend(storage.processes_stats) def extend_memory_usage(self, metrics: MetricsItems): self.memory_usage.extend(metrics) + def extend_processes_count(self, metrics: MetricsItems): + self.processes_count.extend(metrics) + class Collector: def __init__(self, collectible_metrics: Iterable[MetricsTypes] = None, @@ -73,8 +74,8 @@ def add_tasks_stats(self, stats: TasksStats): def add_memory_usage(self, metrics: MetricsItems): self._metrics.extend_memory_usage(metrics) - def add_processes_stats(self, stats: ProcessesStats): - self._metrics.processes_stats.extend(stats) + def add_processes_count(self, metrics: MetricsItems): + self._metrics.extend_processes_count(metrics) def extract_metrics(self) -> AqueductMetricsStorage: metrics = self._metrics diff --git a/aqueduct/metrics/export.py b/aqueduct/metrics/export.py index b107ffd..4061183 100644 --- a/aqueduct/metrics/export.py +++ b/aqueduct/metrics/export.py @@ -77,9 +77,8 @@ def export(self, metrics: AqueductMetricsStorage): for name, memory_usage in metrics.memory_usage.items: self.target.timing(f'{self.prefix}.{MEMORY_USAGE_PREFIX}.{name}', memory_usage) - for name, cnt in metrics.processes_stats.items: - if cnt > 0: - self.target.count(f'{self.prefix}.{PROCESSES_PREFIX}.{name}', cnt) + for name, processes_count in metrics.processes_count.items: + self.target.timing(f'{self.prefix}.{PROCESSES_PREFIX}.{name}', processes_count) class DummyExporter(Exporter): diff --git a/aqueduct/metrics/processes.py b/aqueduct/metrics/processes.py deleted file mode 100644 index 79dfed3..0000000 --- a/aqueduct/metrics/processes.py +++ /dev/null @@ -1,24 +0,0 @@ -from dataclasses import asdict, dataclass -from typing import Iterable, Tuple - -from . import IMetricsItems - - -@dataclass -class ProcessesStats(IMetricsItems): - dead: int = 0 - running: int = 0 - - @property - def items(self) -> Iterable[Tuple[str, int]]: - return asdict(self).items() - - def extend(self, stats: 'ProcessesStats'): - self.dead += stats.dead - self.running += stats.running - - def add_dead_process(self): - self.dead += 1 - - def add_running_process(self): - self.running += 1 diff --git a/setup.py b/setup.py index 8267679..17a26f8 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ setup( name='aqueduct', packages=find_packages(), - version='1.11.7', + version='1.11.8', license='MIT', license_files='LICENSE.txt', author='Data Science SWAT',