-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathmetrics_utils.py
More file actions
167 lines (138 loc) · 7.04 KB
/
metrics_utils.py
File metadata and controls
167 lines (138 loc) · 7.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# -*- coding: utf-8 -*-
import datetime
from dateutil import parser
import math
# --- Funciones Auxiliares ---
def parse_datetime(dt_str):
"""Safely parses various ISO-like datetime formats."""
if not dt_str or dt_str == "0001-01-01T00:00:00Z":
return None
try:
# Manejar potenciales nanosegundos con los que dateutil.parser podría tener problemas
if '.' in dt_str:
parts = dt_str.split('.')
fractional = parts[1].replace('Z', '') # Quitar Z si está aquí
if len(fractional) > 6: # Mantener solo hasta 6 dígitos fraccionales (microsegundos)
fractional = fractional[:6]
dt_str = parts[0] + '.' + fractional
# Manejar 'Z' para UTC que podría quedar si no hay segundos fraccionales
if dt_str.endswith('Z'):
dt_str = dt_str[:-1] + '+00:00' # Reemplazar Z con offset UTC
# dateutil.parser debería manejar strings con timezone correctamente
return parser.isoparse(dt_str)
except Exception as e:
# print(f"Warning: Could not parse datetime string '{dt_str}': {e}") # Reduced verbosity
return None
def format_uptime(total_seconds):
"""Formats total seconds as a string 'Xd Yh Zm Ws'."""
if total_seconds is None or not isinstance(total_seconds, (int, float)) or total_seconds < 0:
return "N/A"
total_seconds = int(total_seconds) # Trabajar con enteros
if total_seconds < 1:
return "0d 0h 0m 0s"
days, remainder = divmod(total_seconds, 86400) # 86400 segundos en un día
hours, remainder = divmod(remainder, 3600) # 3600 segundos en una hora
minutes, seconds = divmod(remainder, 60) # 60 segundos en un minuto
return f"{days}d {hours}h {minutes}m {seconds}s"
def calc_cpu_percent(current_stats, prev_stats):
"""
Calcula el porcentaje de CPU usando el delta entre estadísticas actuales y previas.
Args:
current_stats (dict): El diccionario de estadísticas de la lectura actual.
prev_stats (dict): El diccionario de estadísticas de la lectura previa.
Returns:
float: Porcentaje de uso de CPU.
"""
try:
cpu_percent = 0.0
if not isinstance(current_stats, dict) or not isinstance(prev_stats, dict):
return 0.0 # Necesita diccionarios válidos
# Asegurar que 'cpu_stats' existe en ambas lecturas
if "cpu_stats" not in current_stats or "cpu_stats" not in prev_stats:
return 0.0
cpu_stats = current_stats["cpu_stats"]
precpu_stats = prev_stats["cpu_stats"] # Usar estadísticas previas aquí
# Comprobar claves esenciales de uso en ambos
if not cpu_stats.get("cpu_usage") or not precpu_stats.get("cpu_usage") or \
"total_usage" not in cpu_stats["cpu_usage"] or \
"total_usage" not in precpu_stats["cpu_usage"]:
return 0.0
# Comprobar claves de uso del sistema (pueden faltar en algunas plataformas/versiones)
system_cpu_usage = cpu_stats.get("system_cpu_usage")
pre_system_cpu_usage = precpu_stats.get("system_cpu_usage")
if system_cpu_usage is None or pre_system_cpu_usage is None:
return 0.0 # No se puede calcular con precisión sin el uso del sistema
# Número de CPUs (usar lectura actual)
cpu_count = cpu_stats.get("online_cpus")
if cpu_count is None:
percpu_usage = cpu_stats.get("cpu_usage", {}).get("percpu_usage")
cpu_count = len(percpu_usage) if percpu_usage else 1
if cpu_count == 0:
return 0.0
# Valores de estadísticas actuales y previas
cpu_total_usage = float(cpu_stats["cpu_usage"].get("total_usage", 0))
precpu_total_usage = float(precpu_stats["cpu_usage"].get("total_usage", 0)) # De previas
system_usage = float(system_cpu_usage or 0)
pre_system_usage = float(pre_system_cpu_usage or 0) # De previas
# Cálculo usando deltas
cpu_delta = cpu_total_usage - precpu_total_usage
system_delta = system_usage - pre_system_usage
if system_delta > 0.0 and cpu_delta >= 0.0: # cpu_delta puede ser 0 o positivo
cpu_percent = (cpu_delta / system_delta) * float(cpu_count) * 100.0
return max(0.0, cpu_percent) # Retornar porcentaje no negativo
except (KeyError, TypeError, ValueError, ZeroDivisionError, AttributeError) as e:
# print(f"Warning: Error calculando porcentaje CPU: {e}") # Verbosidad reducida
return 0.0 # Retornar 0 en cualquier error de cálculo
def calc_mem_percent_usage(d):
"""Calculates memory percent and usage from Docker stats."""
try:
if not isinstance(d, dict): return 0.0, 0 # Asegurar que la entrada es un dict
mem_stats = d.get("memory_stats", {})
usage = mem_stats.get("usage") # Esto incluye caché en Linux
limit = mem_stats.get("limit")
if usage is None or limit is None or limit <= 0: # También comprobar limit > 0
return 0.0, 0 # Retornar 0 porciento y 0 uso
mem_percent = (usage / limit) * 100.0
usage_mib = round(usage / (1024 * 1024), 2)
# Limitar resultado 0-100
mem_percent = max(0.0, min(mem_percent, 100.0))
return mem_percent, usage_mib # Retorna % y Uso en MiB
except (KeyError, TypeError, ValueError, ZeroDivisionError, AttributeError) as e:
# print(f"Warning: Error calculating memory percent: {e}") # Reduced verbosity
return 0.0, 0 # Return 0 on error
def calc_net_io(stats):
"""Calcula I/O de red acumulativa en MB."""
rx_b = 0
tx_b = 0
try:
if not isinstance(stats, dict): return 0, 0
networks = stats.get('networks', {})
if networks and isinstance(networks, dict):
for if_name, data in networks.items():
if isinstance(data, dict):
rx_b += data.get('rx_bytes', 0)
tx_b += data.get('tx_bytes', 0)
return round(rx_b / (1024*1024), 2), round(tx_b / (1024*1024), 2)
except Exception as e:
# print(f"Warning: Error calculando Net I/O: {e}") # Verbosidad reducida
return 0, 0
def calc_block_io(stats):
"""Calcula I/O de bloque acumulativa en MB."""
read_b = 0
write_b = 0
try:
if not isinstance(stats, dict): return 0, 0
blkio_stats = stats.get('blkio_stats', {})
if blkio_stats and isinstance(blkio_stats, dict):
io_bytes = blkio_stats.get('io_service_bytes_recursive', [])
if io_bytes and isinstance(io_bytes, list):
for entry in io_bytes:
if isinstance(entry, dict) and 'op' in entry and 'value' in entry:
if entry['op'].lower() == 'read':
read_b += entry.get('value', 0)
elif entry['op'].lower() == 'write':
write_b += entry.get('value', 0)
return round(read_b / (1024*1024), 2), round(write_b / (1024*1024), 2)
except Exception as e:
# print(f"Warning: Error calculando Block I/O: {e}") # Verbosidad reducida
return 0, 0