Skip to content

Commit 7f1f7de

Browse files
authored
Features/metrics elaboration (#8)
* introduction to the analyzer class + tests * refactor of the analyzer
1 parent 2686845 commit 7f1f7de

File tree

6 files changed

+965
-7
lines changed

6 files changed

+965
-7
lines changed

poetry.lock

Lines changed: 480 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pydantic-settings = "^2.10.1"
1515
pydantic = {extras = ["email"], version = "^2.11.7"}
1616
numpy = "^2.3.1"
1717
simpy = "^4.1.1"
18+
matplotlib = "^3.10.3"
1819

1920
[tool.poetry.group.dev.dependencies]
2021
pytest = "^8.4.1"

src/app/config/constants.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -216,9 +216,9 @@ class SamplePeriods(float, Enum):
216216
we have to extract a time series
217217
"""
218218

219-
STANDARD_TIME = 0.005 # 5 MILLISECONDS
219+
STANDARD_TIME = 0.01 # 10 MILLISECONDS
220220
MINIMUM_TIME = 0.001 # 1 MILLISECOND
221-
MAXIMUM_TIME = 0.1 # 10 MILLISECONDS
221+
MAXIMUM_TIME = 0.1 # 100 MILLISECONDS
222222

223223
# ======================================================================
224224
# CONSTANTS FOR EVENT METRICS
@@ -244,15 +244,31 @@ class AggregatedMetricName(StrEnum):
244244
"""aggregated metrics to calculate at the end of simulation"""
245245

246246
LATENCY_STATS = "latency_stats"
247-
THROUGHPUT_RPS = "throughput_rps"
247+
THROUGHPUT = "throughput_rps"
248248
LLM_STATS = "llm_stats"
249249

250250
# ======================================================================
251251
# CONSTANTS FOR SERVER RUNTIME
252252
# ======================================================================
253253

254254
class ServerResourceName(StrEnum):
255-
"""Keys for each server resource type, used when building the container map."""
255+
"""Keys for each server resource type, used when building the container map."""
256256

257-
CPU = "CPU"
258-
RAM = "RAM"
257+
CPU = "CPU"
258+
RAM = "RAM"
259+
260+
# ======================================================================
261+
# CONSTANTS FOR LATENCY STATS
262+
# ======================================================================
263+
264+
class LatencyKey(StrEnum):
265+
"""Keys for the collection of the latency stats"""
266+
267+
TOTAL_REQUESTS = "total_requests"
268+
MEAN = "mean"
269+
MEDIAN = "median"
270+
STD_DEV = "std_dev"
271+
P95 = "p95"
272+
P99 = "p99"
273+
MIN = "min"
274+
MAX = "max"

src/app/config/plot_constants.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
"""Dataclass to define a central structure to plot the metrics"""
2+
from dataclasses import dataclass
3+
4+
5+
@dataclass(frozen=True)
6+
class PlotCfg:
7+
"""Dataclass for the plot of the various metrics"""
8+
9+
no_data: str
10+
title: str
11+
x_label: str
12+
y_label: str
13+
ready_label: str | None = None
14+
io_label: str | None = None
15+
legend_label: str | None = None
16+
17+
LATENCY_PLOT = PlotCfg(
18+
no_data="No latency data",
19+
title="Request Latency Distribution",
20+
x_label="Latency (s)",
21+
y_label="Frequency",
22+
)
23+
24+
THROUGHPUT_PLOT = PlotCfg(
25+
no_data="No throughput data",
26+
title="Throughput (RPS)",
27+
x_label="Time (s)",
28+
y_label="Requests/s",
29+
)
30+
31+
32+
SERVER_QUEUES_PLOT = PlotCfg(
33+
no_data="No queue data",
34+
title="Server Queues",
35+
x_label="Time (s)",
36+
y_label="Queue length",
37+
ready_label="Ready queue",
38+
io_label="I/O queue",
39+
)
40+
41+
RAM_PLOT = PlotCfg(
42+
no_data="No RAM data",
43+
title="RAM Usage",
44+
x_label="Time (s)",
45+
y_label="RAM (MB)",
46+
legend_label="RAM",
47+
)

src/app/metrics/analyzer.py

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
"""Module for post-simulation analysis and visualization."""
2+
3+
from __future__ import annotations
4+
5+
from collections import defaultdict
6+
from typing import TYPE_CHECKING
7+
8+
import numpy as np
9+
10+
from app.config.constants import LatencyKey, SampledMetricName
11+
from app.config.plot_constants import (
12+
LATENCY_PLOT,
13+
RAM_PLOT,
14+
SERVER_QUEUES_PLOT,
15+
THROUGHPUT_PLOT,
16+
)
17+
18+
if TYPE_CHECKING:
19+
20+
from matplotlib.axes import Axes
21+
22+
from app.runtime.actors.client import ClientRuntime
23+
from app.runtime.actors.edge import EdgeRuntime
24+
from app.runtime.actors.server import ServerRuntime
25+
from app.schemas.simulation_settings_input import SimulationSettings
26+
27+
28+
class ResultsAnalyzer:
29+
"""Analyze and visualize the results of a completed simulation.
30+
31+
This class holds the raw runtime objects and lazily computes:
32+
- latency statistics
33+
- throughput time series (RPS)
34+
- sampled metrics from servers and edges
35+
"""
36+
37+
# Class attribute to define the period to calculate the throughput in s
38+
_WINDOW_SIZE_S: float = 1
39+
40+
def __init__(
41+
self,
42+
*,
43+
client: ClientRuntime,
44+
servers: list[ServerRuntime],
45+
edges: list[EdgeRuntime],
46+
settings: SimulationSettings,
47+
) -> None:
48+
"""
49+
Args:
50+
client: Client runtime object, containing RqsClock entries.
51+
servers: List of server runtime objects.
52+
edges: List of edge runtime objects.
53+
settings: Original simulation settings.
54+
55+
"""
56+
self._client = client
57+
self._servers = servers
58+
self._edges = edges
59+
self._settings = settings
60+
61+
# Lazily computed caches
62+
self.latencies: list[float] | None = None
63+
self.latency_stats: dict[LatencyKey, float] | None = None
64+
self.throughput_series: tuple[list[float], list[float]] | None = None
65+
self.sampled_metrics: dict[str, dict[str, list[float]]] | None = None
66+
67+
def process_all_metrics(self) -> None:
68+
"""Compute all aggregated and sampled metrics if not already done."""
69+
if self.latency_stats is None and self._client.rqs_clock:
70+
self._process_event_metrics()
71+
72+
if self.sampled_metrics is None:
73+
self._extract_sampled_metrics()
74+
75+
def _process_event_metrics(self) -> None:
76+
"""Calculate latency stats and throughput time series (RPS)."""
77+
# 1) Latencies
78+
self.latencies = [
79+
clock.finish - clock.start for clock in self._client.rqs_clock
80+
]
81+
82+
# 2) Summary stats
83+
if self.latencies:
84+
arr = np.array(self.latencies)
85+
self.latency_stats = {
86+
LatencyKey.TOTAL_REQUESTS: float(arr.size),
87+
LatencyKey.MEAN: float(np.mean(arr)),
88+
LatencyKey.MEDIAN: float(np.median(arr)),
89+
LatencyKey.STD_DEV: float(np.std(arr)),
90+
LatencyKey.P95: float(np.percentile(arr, 95)),
91+
LatencyKey.P99: float(np.percentile(arr, 99)),
92+
LatencyKey.MIN: float(np.min(arr)),
93+
LatencyKey.MAX: float(np.max(arr)),
94+
}
95+
else:
96+
self.latency_stats = {}
97+
98+
# 3) Throughput per 1s window
99+
completion_times = sorted(clock.finish for clock in self._client.rqs_clock)
100+
end_time = self._settings.total_simulation_time
101+
102+
timestamps: list[float] = []
103+
rps_values: list[float] = []
104+
count = 0
105+
idx = 0
106+
current_end = ResultsAnalyzer._WINDOW_SIZE_S
107+
108+
while current_end <= end_time:
109+
while idx < len(completion_times) and completion_times[idx] <= current_end:
110+
count += 1
111+
idx += 1
112+
timestamps.append(current_end)
113+
rps_values.append(count / ResultsAnalyzer._WINDOW_SIZE_S)
114+
current_end += ResultsAnalyzer._WINDOW_SIZE_S
115+
count = 0
116+
117+
self.throughput_series = (timestamps, rps_values)
118+
119+
def _extract_sampled_metrics(self) -> None:
120+
"""Gather sampled metrics from servers and edges into a nested dict."""
121+
metrics: dict[str, dict[str, list[float]]] = defaultdict(dict)
122+
123+
for server in self._servers:
124+
sid = server.server_config.id
125+
for name, values in server.enabled_metrics.items():
126+
metrics[name.value][sid] = values
127+
128+
for edge in self._edges:
129+
eid = edge.edge_config.id
130+
for name, values in edge.enabled_metrics.items():
131+
metrics[name.value][eid] = values
132+
133+
self.sampled_metrics = metrics
134+
135+
def get_latency_stats(self) -> dict[LatencyKey, float]:
136+
"""Return latency statistics, computing them if necessary."""
137+
self.process_all_metrics()
138+
return self.latency_stats or {}
139+
140+
def get_throughput_series(self) -> tuple[list[float], list[float]]:
141+
"""Return throughput time series (timestamps, RPS)."""
142+
self.process_all_metrics()
143+
assert self.throughput_series is not None
144+
return self.throughput_series
145+
146+
def get_sampled_metrics(self) -> dict[str, dict[str, list[float]]]:
147+
"""Return sampled metrics from servers and edges."""
148+
self.process_all_metrics()
149+
assert self.sampled_metrics is not None
150+
return self.sampled_metrics
151+
152+
def plot_latency_distribution(self, ax: Axes) -> None:
153+
"""Draw a histogram of request latencies onto the given Axes."""
154+
if not self.latencies:
155+
ax.text(0.5, 0.5, LATENCY_PLOT.no_data, ha="center", va="center")
156+
return
157+
158+
ax.hist(self.latencies, bins=50)
159+
ax.set_title(LATENCY_PLOT.title)
160+
ax.set_xlabel(LATENCY_PLOT.x_label)
161+
ax.set_ylabel(LATENCY_PLOT.y_label)
162+
ax.grid(visible=True)
163+
164+
def plot_throughput(self, ax: Axes) -> None:
165+
"""Draw throughput (RPS) over time onto the given Axes."""
166+
timestamps, values = self.get_throughput_series()
167+
if not timestamps:
168+
ax.text(0.5, 0.5, THROUGHPUT_PLOT.no_data, ha="center", va="center")
169+
return
170+
171+
ax.plot(timestamps, values, marker="o", linestyle="-")
172+
ax.set_title(THROUGHPUT_PLOT.title)
173+
ax.set_xlabel(THROUGHPUT_PLOT.x_label)
174+
ax.set_ylabel(THROUGHPUT_PLOT.y_label)
175+
ax.grid(visible=True)
176+
177+
def plot_server_queues(self, ax: Axes) -> None:
178+
"""Draw server queue lengths over time onto the given Axes."""
179+
metrics = self.get_sampled_metrics()
180+
ready = metrics.get(SampledMetricName.READY_QUEUE_LEN, {})
181+
io_q = metrics.get(SampledMetricName.EVENT_LOOP_IO_SLEEP, {})
182+
183+
if not (ready or io_q):
184+
ax.text(0.5, 0.5, SERVER_QUEUES_PLOT.no_data, ha="center", va="center")
185+
return
186+
187+
samples = len(next(iter(ready.values()), []))
188+
times = np.arange(samples) * self._settings.sample_period_s
189+
190+
for sid, vals in ready.items():
191+
ax.plot(times, vals, label=f"{sid} {SERVER_QUEUES_PLOT.ready_label}")
192+
for sid, vals in io_q.items():
193+
ax.plot(
194+
times,
195+
vals,
196+
label=f"{sid} {SERVER_QUEUES_PLOT.io_label}",
197+
linestyle="--",
198+
)
199+
200+
ax.set_title(SERVER_QUEUES_PLOT.title)
201+
ax.set_xlabel(SERVER_QUEUES_PLOT.x_label)
202+
ax.set_ylabel(SERVER_QUEUES_PLOT.y_label)
203+
ax.legend()
204+
ax.grid(visible=True)
205+
206+
def plot_ram_usage(self, ax: Axes) -> None:
207+
"""Draw RAM usage over time onto the given Axes."""
208+
metrics = self.get_sampled_metrics()
209+
ram = metrics.get(SampledMetricName.RAM_IN_USE, {})
210+
211+
if not ram:
212+
ax.text(0.5, 0.5, RAM_PLOT.no_data, ha="center", va="center")
213+
return
214+
215+
samples = len(next(iter(ram.values())))
216+
times = np.arange(samples) * self._settings.sample_period_s
217+
218+
for sid, vals in ram.items():
219+
ax.plot(times, vals, label=f"{sid} {RAM_PLOT.legend_label}")
220+
221+
ax.set_title(RAM_PLOT.title)
222+
ax.set_xlabel(RAM_PLOT.x_label)
223+
ax.set_ylabel(RAM_PLOT.y_label)
224+
ax.legend()
225+
ax.grid(visible=True)

0 commit comments

Comments
 (0)