Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
481 changes: 480 additions & 1 deletion poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pydantic-settings = "^2.10.1"
pydantic = {extras = ["email"], version = "^2.11.7"}
numpy = "^2.3.1"
simpy = "^4.1.1"
matplotlib = "^3.10.3"

[tool.poetry.group.dev.dependencies]
pytest = "^8.4.1"
Expand Down
28 changes: 22 additions & 6 deletions src/app/config/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,9 @@ class SamplePeriods(float, Enum):
we have to extract a time series
"""

STANDARD_TIME = 0.005 # 5 MILLISECONDS
STANDARD_TIME = 0.01 # 10 MILLISECONDS
MINIMUM_TIME = 0.001 # 1 MILLISECOND
MAXIMUM_TIME = 0.1 # 10 MILLISECONDS
MAXIMUM_TIME = 0.1 # 100 MILLISECONDS

# ======================================================================
# CONSTANTS FOR EVENT METRICS
Expand All @@ -244,15 +244,31 @@ class AggregatedMetricName(StrEnum):
"""aggregated metrics to calculate at the end of simulation"""

LATENCY_STATS = "latency_stats"
THROUGHPUT_RPS = "throughput_rps"
THROUGHPUT = "throughput_rps"
LLM_STATS = "llm_stats"

# ======================================================================
# CONSTANTS FOR SERVER RUNTIME
# ======================================================================

class ServerResourceName(StrEnum):
"""Keys for each server resource type, used when building the container map."""
"""Keys for each server resource type, used when building the container map."""

CPU = "CPU"
RAM = "RAM"
CPU = "CPU"
RAM = "RAM"

# ======================================================================
# CONSTANTS FOR LATENCY STATS
# ======================================================================

class LatencyKey(StrEnum):
"""Keys for the collection of the latency stats"""

TOTAL_REQUESTS = "total_requests"
MEAN = "mean"
MEDIAN = "median"
STD_DEV = "std_dev"
P95 = "p95"
P99 = "p99"
MIN = "min"
MAX = "max"
47 changes: 47 additions & 0 deletions src/app/config/plot_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Dataclass to define a central structure to plot the metrics"""
from dataclasses import dataclass


@dataclass(frozen=True)
class PlotCfg:
"""Dataclass for the plot of the various metrics"""

no_data: str
title: str
x_label: str
y_label: str
ready_label: str | None = None
io_label: str | None = None
legend_label: str | None = None

LATENCY_PLOT = PlotCfg(
no_data="No latency data",
title="Request Latency Distribution",
x_label="Latency (s)",
y_label="Frequency",
)

THROUGHPUT_PLOT = PlotCfg(
no_data="No throughput data",
title="Throughput (RPS)",
x_label="Time (s)",
y_label="Requests/s",
)


SERVER_QUEUES_PLOT = PlotCfg(
no_data="No queue data",
title="Server Queues",
x_label="Time (s)",
y_label="Queue length",
ready_label="Ready queue",
io_label="I/O queue",
)

RAM_PLOT = PlotCfg(
no_data="No RAM data",
title="RAM Usage",
x_label="Time (s)",
y_label="RAM (MB)",
legend_label="RAM",
)
225 changes: 225 additions & 0 deletions src/app/metrics/analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
"""Module for post-simulation analysis and visualization."""

from __future__ import annotations

from collections import defaultdict
from typing import TYPE_CHECKING

import numpy as np

from app.config.constants import LatencyKey, SampledMetricName
from app.config.plot_constants import (
LATENCY_PLOT,
RAM_PLOT,
SERVER_QUEUES_PLOT,
THROUGHPUT_PLOT,
)

if TYPE_CHECKING:

from matplotlib.axes import Axes

from app.runtime.actors.client import ClientRuntime
from app.runtime.actors.edge import EdgeRuntime
from app.runtime.actors.server import ServerRuntime
from app.schemas.simulation_settings_input import SimulationSettings


class ResultsAnalyzer:
"""Analyze and visualize the results of a completed simulation.

This class holds the raw runtime objects and lazily computes:
- latency statistics
- throughput time series (RPS)
- sampled metrics from servers and edges
"""

# Class attribute to define the period to calculate the throughput in s
_WINDOW_SIZE_S: float = 1

def __init__(
self,
*,
client: ClientRuntime,
servers: list[ServerRuntime],
edges: list[EdgeRuntime],
settings: SimulationSettings,
) -> None:
"""
Args:
client: Client runtime object, containing RqsClock entries.
servers: List of server runtime objects.
edges: List of edge runtime objects.
settings: Original simulation settings.

"""
self._client = client
self._servers = servers
self._edges = edges
self._settings = settings

# Lazily computed caches
self.latencies: list[float] | None = None
self.latency_stats: dict[LatencyKey, float] | None = None
self.throughput_series: tuple[list[float], list[float]] | None = None
self.sampled_metrics: dict[str, dict[str, list[float]]] | None = None

def process_all_metrics(self) -> None:
"""Compute all aggregated and sampled metrics if not already done."""
if self.latency_stats is None and self._client.rqs_clock:
self._process_event_metrics()

if self.sampled_metrics is None:
self._extract_sampled_metrics()

def _process_event_metrics(self) -> None:
"""Calculate latency stats and throughput time series (RPS)."""
# 1) Latencies
self.latencies = [
clock.finish - clock.start for clock in self._client.rqs_clock
]

# 2) Summary stats
if self.latencies:
arr = np.array(self.latencies)
self.latency_stats = {
LatencyKey.TOTAL_REQUESTS: float(arr.size),
LatencyKey.MEAN: float(np.mean(arr)),
LatencyKey.MEDIAN: float(np.median(arr)),
LatencyKey.STD_DEV: float(np.std(arr)),
LatencyKey.P95: float(np.percentile(arr, 95)),
LatencyKey.P99: float(np.percentile(arr, 99)),
LatencyKey.MIN: float(np.min(arr)),
LatencyKey.MAX: float(np.max(arr)),
}
else:
self.latency_stats = {}

# 3) Throughput per 1s window
completion_times = sorted(clock.finish for clock in self._client.rqs_clock)
end_time = self._settings.total_simulation_time

timestamps: list[float] = []
rps_values: list[float] = []
count = 0
idx = 0
current_end = ResultsAnalyzer._WINDOW_SIZE_S

while current_end <= end_time:
while idx < len(completion_times) and completion_times[idx] <= current_end:
count += 1
idx += 1
timestamps.append(current_end)
rps_values.append(count / ResultsAnalyzer._WINDOW_SIZE_S)
current_end += ResultsAnalyzer._WINDOW_SIZE_S
count = 0

self.throughput_series = (timestamps, rps_values)

def _extract_sampled_metrics(self) -> None:
"""Gather sampled metrics from servers and edges into a nested dict."""
metrics: dict[str, dict[str, list[float]]] = defaultdict(dict)

for server in self._servers:
sid = server.server_config.id
for name, values in server.enabled_metrics.items():
metrics[name.value][sid] = values

for edge in self._edges:
eid = edge.edge_config.id
for name, values in edge.enabled_metrics.items():
metrics[name.value][eid] = values

self.sampled_metrics = metrics

def get_latency_stats(self) -> dict[LatencyKey, float]:
"""Return latency statistics, computing them if necessary."""
self.process_all_metrics()
return self.latency_stats or {}

def get_throughput_series(self) -> tuple[list[float], list[float]]:
"""Return throughput time series (timestamps, RPS)."""
self.process_all_metrics()
assert self.throughput_series is not None
return self.throughput_series

def get_sampled_metrics(self) -> dict[str, dict[str, list[float]]]:
"""Return sampled metrics from servers and edges."""
self.process_all_metrics()
assert self.sampled_metrics is not None
return self.sampled_metrics

def plot_latency_distribution(self, ax: Axes) -> None:
"""Draw a histogram of request latencies onto the given Axes."""
if not self.latencies:
ax.text(0.5, 0.5, LATENCY_PLOT.no_data, ha="center", va="center")
return

ax.hist(self.latencies, bins=50)
ax.set_title(LATENCY_PLOT.title)
ax.set_xlabel(LATENCY_PLOT.x_label)
ax.set_ylabel(LATENCY_PLOT.y_label)
ax.grid(visible=True)

def plot_throughput(self, ax: Axes) -> None:
"""Draw throughput (RPS) over time onto the given Axes."""
timestamps, values = self.get_throughput_series()
if not timestamps:
ax.text(0.5, 0.5, THROUGHPUT_PLOT.no_data, ha="center", va="center")
return

ax.plot(timestamps, values, marker="o", linestyle="-")
ax.set_title(THROUGHPUT_PLOT.title)
ax.set_xlabel(THROUGHPUT_PLOT.x_label)
ax.set_ylabel(THROUGHPUT_PLOT.y_label)
ax.grid(visible=True)

def plot_server_queues(self, ax: Axes) -> None:
"""Draw server queue lengths over time onto the given Axes."""
metrics = self.get_sampled_metrics()
ready = metrics.get(SampledMetricName.READY_QUEUE_LEN, {})
io_q = metrics.get(SampledMetricName.EVENT_LOOP_IO_SLEEP, {})

if not (ready or io_q):
ax.text(0.5, 0.5, SERVER_QUEUES_PLOT.no_data, ha="center", va="center")
return

samples = len(next(iter(ready.values()), []))
times = np.arange(samples) * self._settings.sample_period_s

for sid, vals in ready.items():
ax.plot(times, vals, label=f"{sid} {SERVER_QUEUES_PLOT.ready_label}")
for sid, vals in io_q.items():
ax.plot(
times,
vals,
label=f"{sid} {SERVER_QUEUES_PLOT.io_label}",
linestyle="--",
)

ax.set_title(SERVER_QUEUES_PLOT.title)
ax.set_xlabel(SERVER_QUEUES_PLOT.x_label)
ax.set_ylabel(SERVER_QUEUES_PLOT.y_label)
ax.legend()
ax.grid(visible=True)

def plot_ram_usage(self, ax: Axes) -> None:
"""Draw RAM usage over time onto the given Axes."""
metrics = self.get_sampled_metrics()
ram = metrics.get(SampledMetricName.RAM_IN_USE, {})

if not ram:
ax.text(0.5, 0.5, RAM_PLOT.no_data, ha="center", va="center")
return

samples = len(next(iter(ram.values())))
times = np.arange(samples) * self._settings.sample_period_s

for sid, vals in ram.items():
ax.plot(times, vals, label=f"{sid} {RAM_PLOT.legend_label}")

ax.set_title(RAM_PLOT.title)
ax.set_xlabel(RAM_PLOT.x_label)
ax.set_ylabel(RAM_PLOT.y_label)
ax.legend()
ax.grid(visible=True)
Loading