diff --git a/asyncflow_queue_limit/asyncflow_mm1_notebook.ipynb b/asyncflow_queue_limit/asyncflow_mm1_notebook.ipynb new file mode 100644 index 0000000..4d02abb --- /dev/null +++ b/asyncflow_queue_limit/asyncflow_mm1_notebook.ipynb @@ -0,0 +1,650 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# AsyncFlow — MM1 Theory vs Simulation (Guided Notebook)\n", + "\n", + "This notebook shows how to:\n", + "\n", + "1. Make imports work inside a notebook (src-layout or package install)\n", + "2. Build a **single-server** scenario compatible with **M/M/1** assumptions\n", + "3. Run the simulation and collect results\n", + "4. Compare theory vs observed KPIs (pretty-printed table)\n", + "5. Plot the standard dashboards (latency, throughput, server time series)\n", + "\n", + "> Tip: run this notebook from your project **root folder**.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "c3a69413", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import importlib, asyncflow, asyncflow.analysis\n", + "importlib.reload(asyncflow)\n", + "importlib.reload(asyncflow.analysis)" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "tags": [ + "imports" + ] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Imports OK.\n" + ] + } + ], + "source": [ + "import matplotlib.pyplot as plt\n", + "import simpy\n", + "\n", + "# Public AsyncFlow API\n", + "from asyncflow import AsyncFlow, SimulationRunner, Sweep\n", + "from asyncflow.components import Client, Server, Edge, Endpoint\n", + "from asyncflow.settings import SimulationSettings\n", + "from asyncflow.workload import RqsGenerator\n", + "from asyncflow.analysis import MM1, ResultsAnalyzer, SweepAnalyzer\n", + "\n", + "print(\"Imports OK.\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1) Build an M/M/1-friendly scenario\n", + "\n", + "* **Single server with exponential CPU service**\n", + " One server, one endpoint, exactly **one CPU-bound step** with an **Exponential** service-time RV (mean $E[S]$). No RAM/IO steps in the pipeline.\n", + "\n", + "* **No load balancer**\n", + " Topology has **exactly one server** and **no LB** (no fan-out, no parallelism).\n", + "\n", + "* **Deterministic, very small network latency**\n", + " All edges use a **fixed latency** $\\ll 1\\,\\mathrm{ms}$ so queueing is dominated by CPU service (closer to textbook M/M/1).\n", + "\n", + "* **“Poisson arrivals” via the generator (what we actually sample)**\n", + " We use a **two-stage, windowed sampler**: every user-sampling window $\\Delta$ we draw the active users $U$ (Poisson or Normal, per config). **Within that window**, arrivals are a **homogeneous Poisson process** with rate $\\Lambda = U \\cdot \\lambda_r/60$ (where $\\lambda_r$ is requests/min/user). If $U$ changes between windows, the overall process becomes a **piecewise-constant (mixed/Cox) Poisson** rather than one global Poisson.\n", + " *Implications:* with **small $\\Delta$**, **Poisson users**, **long runs**, and **tiny edge latency**, this closely matches M/M/1. Larger $\\Delta$, Normal users, or short horizons can introduce small, expected deviations in $\\lambda, W, L$ (especially during warm-up).\n", + "\n", + "\n", + "```mermaid\n", + "graph LR;\n", + " %% Definiamo i nodi con i loro dettagli\n", + " rqs1[\"RqsGenerator
id: rqs-1\"]\n", + " client1[\"Client
id: client-1\"]\n", + " app1[\"Server
id: app-1
Endpoint: /api\"]\n", + "\n", + " %% Definiamo le connessioni (edge) con latenza\n", + " rqs1 -- \"Edge: gen-client
Latency: 0.0001\" --> client1;\n", + " client1 -- \"Request
Edge: client-app
Latency: 0.0001\" --> app1;\n", + " app1 -- \"Response
Edge: app-client
Latency: 0.0001\" --> client1;" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "tags": [ + "build" + ] + }, + "outputs": [], + "source": [ + "def build_payload():\n", + " generator = RqsGenerator(\n", + " id=\"rqs-1\",\n", + " avg_active_users={\"mean\": 100},\n", + " avg_request_per_minute_per_user={\"mean\": 20},\n", + " user_sampling_window=60,\n", + " )\n", + "\n", + " client = Client(id=\"client-1\")\n", + "\n", + " endpoint = Endpoint(\n", + " endpoint_name=\"/api\",\n", + " probability=1.0,\n", + " steps=[\n", + " {\n", + " \"kind\": \"initial_parsing\", # CPU-bound step\n", + " \"step_operation\": {\n", + " \"cpu_time\": {\"mean\": 0.015, \"distribution\": \"exponential\"},\n", + " },\n", + " },\n", + " ],\n", + " )\n", + "\n", + " server = Server(\n", + " id=\"app-1\",\n", + " server_resources={\"cpu_cores\": 1, \"ram_mb\": 2048},\n", + " endpoints=[endpoint],\n", + " )\n", + "\n", + " e_gen_client = Edge(id=\"gen-client\", source=\"rqs-1\", target=\"client-1\", latency=0.0001, dropout_rate=0.0)\n", + " e_client_app = Edge(id=\"client-app\", source=\"client-1\", target=\"app-1\", latency=0.0001, dropout_rate=0.0)\n", + " e_app_client = Edge(id=\"app-client\", source=\"app-1\", target=\"client-1\", latency=0.0001, dropout_rate=0.0)\n", + "\n", + " settings = SimulationSettings(\n", + " total_simulation_time=900,\n", + " sample_period_s=0.05,\n", + " )\n", + "\n", + " payload = (\n", + " AsyncFlow()\n", + " .add_generator(generator)\n", + " .add_client(client)\n", + " .add_servers(server)\n", + " .add_edges(e_gen_client, e_client_app, e_app_client)\n", + " .add_simulation_settings(settings)\n", + " ).build_payload()\n", + " return payload\n", + "\n", + "payload = build_payload()\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2) Run the simulation\n" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": { + "tags": [ + "run" + ] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Done.\n" + ] + } + ], + "source": [ + "env = simpy.Environment()\n", + "runner = SimulationRunner(env=env, simulation_input=payload)\n", + "results: ResultsAnalyzer = runner.run()\n", + "print(\"Done.\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3) MM1 theory vs observed comparison \n", + "If the payload violates MM1 assumptions, a readable error is shown instead.\n", + "## Variables (what they represent)\n", + "\n", + "* **λ (lambda)**: mean **arrival rate**, in requests/second.\n", + "* **μ (mu)**: mean **service rate**, in requests/second (= 1 / mean service time).\n", + "* **ρ (rho)**: **utilization** of the server, ρ = λ / μ (unitless).\n", + "* **W**: **mean time in system** (end-to-end latency, queue + service), in seconds.\n", + "* **Wq**: **mean waiting time in queue** (before service), in seconds.\n", + "* **L**: **mean number in system** (in queue + in service), unitless.\n", + "* **Lq**: **mean number in queue**, unitless.\n", + "* **E\\[S]**: **mean service time** at the server (CPU only), in seconds.\n", + "\n", + "\n", + "> In the comparison table you’ll see two columns: **Theory** (closed-form M/M/1 values) and **Observed** (estimates from the run). The run is a single execution; “Theory” is the model prediction, “Observed” is what was measured.\n", + "\n", + "---\n", + "\n", + "## How we compute the **Theory** column (M/M/1)\n", + "\n", + "1. **Predicted arrival rate**\n", + "\n", + "$$\n", + "\\lambda_{\\text{Theory}} \\;=\\; \n", + "\\frac{\\texttt{avg\\_active\\_users.mean}\\times \\texttt{avg\\_request\\_per\\_minute\\_per\\_user.mean}}{60}\n", + "$$\n", + "\n", + "2. **Predicted service rate** (from the **CPU exponential step** with mean $E[S]$)\n", + "\n", + "$$\n", + "\\mu_{\\text{Theory}} \\;=\\; \\frac{1}{E[S]}\n", + "$$\n", + "\n", + "3. **M/M/1 closed forms** (valid when $\\lambda_{\\text{Theory}} < \\mu_{\\text{Theory}}$)\n", + "\n", + "$$\n", + "\\begin{aligned}\n", + "\\rho_{\\text{Theory}} &= \\frac{\\lambda_{\\text{Theory}}}{\\mu_{\\text{Theory}}} \\\\\n", + "W_{\\text{Theory}} &= \\frac{1}{\\mu_{\\text{Theory}} - \\lambda_{\\text{Theory}}} \\\\\n", + "W_{q,\\text{Theory}} &= \\frac{\\rho_{\\text{Theory}}}{\\mu_{\\text{Theory}} - \\lambda_{\\text{Theory}}} \\\\\n", + "L_{\\text{Theory}} &= \\lambda_{\\text{Theory}}\\, W_{\\text{Theory}} \\;=\\; \\frac{\\rho_{\\text{Theory}}}{1-\\rho_{\\text{Theory}}} \\\\\n", + "L_{q,\\text{Theory}} &= \\lambda_{\\text{Theory}}\\, W_{q,\\text{Theory}} \\;=\\; \\frac{\\rho_{\\text{Theory}}^{2}}{1-\\rho_{\\text{Theory}}}\n", + "\\end{aligned}\n", + "$$\n", + "\n", + "If $\\lambda_{\\text{Theory}} \\ge \\mu_{\\text{Theory}}$, the system is **unstable** and $W, W_q, L, L_q$ **diverge** (we display them as $+\\infty$).\n", + "\n", + "---\n", + "\n", + "## How we compute the **Observed** column (from the run)\n", + "\n", + "After the `ResultsAnalyzer` processes metrics:\n", + "\n", + "1. **Observed arrival rate** (mean throughput across time windows)\n", + "\n", + "$$\n", + "\\lambda_{\\text{Observed}} \\;=\\; \\text{mean}\\big(\\text{windowed RPS series}\\big)\n", + "$$\n", + "\n", + "2. **Observed time in system** (client end-to-end latency)\n", + "\n", + "$$\n", + "W_{\\text{Observed}} \\;=\\; \\text{mean}\\big(\\text{client latencies}\\big)\n", + "$$\n", + "\n", + "3. **Observed service rate** (from server service times)\n", + "\n", + "$$\n", + "\\overline{S}=\\text{mean}(\\text{service\\_time}), \n", + "\\qquad\n", + "\\mu_{\\text{Observed}}=\n", + "\\begin{cases}\n", + "1/\\overline{S} & \\overline{S}>0\\\\\n", + "+\\infty & \\overline{S}=0\n", + "\\end{cases}\n", + "$$\n", + "\n", + "4. **Observed waiting time in queue** (from server queue wait times)\n", + "\n", + "$$\n", + "W_{q,\\text{Observed}} \\;=\\; \\text{mean}(\\text{waiting\\_time})\n", + "$$\n", + "\n", + "5. **Little’s law (observed)**\n", + "\n", + "$$\n", + "L_{\\text{Observed}}=\\lambda_{\\text{Observed}}\\, W_{\\text{Observed}},\n", + "\\qquad\n", + "L_{q,\\text{Observed}}=\\lambda_{\\text{Observed}}\\, W_{q,\\text{Observed}}\n", + "$$\n", + "\n", + "6. **Observed utilization**\n", + "\n", + "$$\n", + "\\rho_{\\text{Observed}}=\n", + "\\begin{cases}\n", + "\\lambda_{\\text{Observed}} / \\mu_{\\text{Observed}} & \\mu_{\\text{Observed}} \\not\\in \\{0,+\\infty\\}\\\\\n", + "0 & \\text{otherwise}\n", + "\\end{cases}\n", + "$$\n", + "\n", + "> **Why small deltas appear:** warm-up effects, the user-sampling window (piecewise-constant rate), finite simulation horizon, and a (small) deterministic network latency naturally introduce small Theory vs Observed gaps. Increasing the simulation time and reducing network latency typically shrinks these deltas.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": { + "tags": [ + "mm1" + ] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "====================================================================\n", + "MM1 - Theory vs Observed\n", + "--------------------------------------------------------------------\n", + "sym metric theory observed abs rel%\n", + "--------------------------------------------------------------------\n", + "λ Arrival rate (1/s) 33.333333 33.204444 -0.128889 -0.39\n", + "μ Service rate (1/s) 66.666667 66.638004 -0.028663 -0.04\n", + "rho Utilization 0.500000 0.498281 -0.001719 -0.34\n", + "L Mean items in system 1.000000 1.034536 0.034536 3.45\n", + "Lq Mean items in queue 0.500000 0.526294 0.026294 5.26\n", + "W Mean time in system (s) 0.030000 0.031157 0.001157 3.86\n", + "Wq Mean waiting time (s) 0.015000 0.015850 0.000850 5.67\n", + "====================================================================\n" + ] + } + ], + "source": [ + "mm1 = MM1()\n", + "if mm1.is_compatible(payload):\n", + " mm1.print_comparison(payload, results) # ✅ metodo esistente\n", + "else:\n", + " print(\"Payload is not compatible with M/M/1:\")\n", + " for reason in mm1.explain_incompatibilities(payload):\n", + " print(\" -\", reason)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 4) Plot dashboards\n", + "\n", + "**System-level and per-server charts**\n", + "\n", + "Beyond the two main panels (latency histogram + throughput time series), AsyncFlow records **rich time series and per-request distributions** that make the system behavior easy to read. In your scenario (single server, exponential CPU only, no I/O/RAM), you’ll see:\n", + "\n", + "* **System dashboard**\n", + "\n", + " * **Request Latency Distribution**: end-to-end histogram (client→server→client) with **mean, P50, P95, P99** markers. Here latency is dominated by CPU service + short queue; vertical lines highlight tail behavior.\n", + " * **Throughput (RPS)**: windowed time series with **mean, P95, max**. Great for spotting stability, oscillations, and warm-up.\n", + "\n", + "* **Server time-series dashboard (for `app-1`)**\n", + "\n", + " * **Ready queue length**: CPU queue over time with **mean/min/max**. With ρ≈0.5 the mean queue ≈0.5, consistent with M/M/1.\n", + " * **I/O queue length**: flat at zero (no I/O step in the pipeline).\n", + " * **RAM in use**: flat at zero (no RAM step in the pipeline).\n", + "\n", + "* **Server event-metrics dashboard**\n", + "\n", + " * **Server-side latency**: histogram of (waiting + service) at the server.\n", + " * **CPU service time**: histogram of **service\\_time** (Exp \\~15 ms) with P95/P99.\n", + " * **CPU waiting time**: histogram of queue **waiting\\_time**; shows the heavy tail under bursts.\n", + " * **I/O time**: flat at zero (no I/O).\n", + "\n", + "#### What you “get for free” from the collected data\n", + "\n", + "* **Distributions** (per-request arrays): end-to-end latency, server latency, **service\\_time**, **waiting\\_time**, (optional) **io\\_time** ⇒ percentiles, variance, pre/post comparisons.\n", + "* **Time series** (periodic sampling): **ready\\_queue\\_len**, **event\\_loop\\_io\\_sleep** (if I/O exists), **ram\\_in\\_use**, **edge\\_concurrent\\_connection**, plus **throughput series** to estimate observed λ.\n", + "* **Derived checks**: automatic **Little’s Law** sanity (L≈λW, Lq≈λWq), observed utilization **ρ̂ = λ̂/μ̂**, and the **MM1 theory vs observed** comparison table you printed.\n", + "\n", + "> In this specific setup, I/O and RAM panels are flat by design; add I/O or RAM steps and those plots will populate accordingly.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": { + "tags": [ + "plots" + ] + }, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# 4.1 System dashboard: latency + throughput\n", + "fig_sys, axes_sys = plt.subplots(1, 2, figsize=(12, 4.5), dpi=140)\n", + "results.plot_latency_distribution(axes_sys[0])\n", + "results.plot_throughput(axes_sys[1])\n", + "fig_sys.tight_layout()\n", + "plt.show()\n", + "\n", + "# 4.2 Server time-series and event-metric dashboards\n", + "sids = results.list_server_ids()\n", + "if sids:\n", + " sid = sids[0]\n", + " fig_ts, axes_ts = plt.subplots(2, 2, figsize=(12, 8), dpi=140)\n", + " axes_ts[1, 1].axis(\"off\")\n", + " results.plot_server_timeseries_dashboard(\n", + " ax_ready=axes_ts[0, 0],\n", + " ax_io=axes_ts[0, 1],\n", + " ax_ram=axes_ts[1, 0],\n", + " server_id=sid,\n", + " )\n", + " fig_ts.tight_layout()\n", + " plt.show()\n", + "\n", + " fig_ev, axes_ev = plt.subplots(2, 2, figsize=(12, 8), dpi=140)\n", + " results.plot_server_event_metrics_dashboard(\n", + " ax_latency_hist=axes_ev[0, 0],\n", + " ax_service_hist=axes_ev[0, 1],\n", + " ax_io_hist=axes_ev[1, 0],\n", + " ax_wait_hist=axes_ev[1, 1],\n", + " server_id=sid,\n", + " )\n", + " fig_ev.tight_layout()\n", + " plt.show()\n", + "else:\n", + " print(\"No servers present in the topology.\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "8328475f", + "metadata": {}, + "source": [ + "\n", + "## 5) Sweep over mean concurrent users\n", + "We iterate the scenario by changing the *mean concurrent users* from 10 to 200 (step 5).\n", + "For each grid point we run a fresh simulation and keep the ResultsAnalyzer.\n", + "Then we wrap everything in `SweepAnalyzer`, which caches the data for plotting." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "c9063bbe", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Sweep points: 41\n", + "Server IDs detected: ['app-1']\n" + ] + } + ], + "source": [ + "payload_base = build_payload()\n", + "\n", + "sweeper = Sweep()\n", + "pairs = sweeper.sweep_on_user(\n", + " payload=payload_base,\n", + " user_lower_bound=50,\n", + " user_upper_bound=250,\n", + " step=5,\n", + ")\n", + "\n", + "# Wrap with the sweep analyzer and pre-collect/caches\n", + "sweep = SweepAnalyzer(pairs)\n", + "sweep.precollect()\n", + "\n", + "print(f\"Sweep points: {len(pairs)}\")\n", + "if pairs:\n", + " print(\"Server IDs detected:\", pairs[0][1].list_server_ids())\n" + ] + }, + { + "cell_type": "markdown", + "id": "dae40bfc", + "metadata": {}, + "source": [ + "## 6) Global plots (system-level)\n", + "We plot: \n", + " - Throughput (mean RPS) vs users\n", + " - Mean latency (W) vs users\n", + " - latency percentiles vs users.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "48716bc8", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "fig_global = sweep.plot_global_dashboard()\n", + "plt.show()\n", + "\n", + "# 6.2 Latency percentiles vs users (P50, P95, P99)\n", + "fig_pct, ax_pct = plt.subplots(1, 1, figsize=(6.5, 4.0), dpi=130)\n", + "sweep.plot_global_latency_percentiles(ax_pct)\n", + "fig_pct.tight_layout()\n", + "plt.show()\n" + ] + }, + { + "cell_type": "markdown", + "id": "71b7199b", + "metadata": {}, + "source": [ + "\n", + "## 7) Per-server overlays\n", + "We plot per-server curves over users (utilization ρ_i, waiting time Wq_i, service rate μ_i, throughput λ_i).\n", + "If multiple servers exist, overlays show a line per server.\n", + "Below we also show the *single-server* case by explicitly passing the server id.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "9b9f0236", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/tmp/ipykernel_127715/23993299.py:28: UserWarning: This figure includes Axes that are not compatible with tight_layout, so results might be incorrect.\n", + " fig.tight_layout()\n" + ] + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# 7.1 Single-server overlays (explicit server id), if present\n", + "server_ids = pairs[0][1].list_server_ids() if pairs else []\n", + "if server_ids:\n", + " sid = server_ids[0]\n", + "\n", + " fig = plt.figure(figsize=(12, 10), dpi=130)\n", + " gs = fig.add_gridspec(nrows=3, ncols=2, hspace=0.35, wspace=0.25)\n", + "\n", + " # Row 1 (2 charts)\n", + " ax11 = fig.add_subplot(gs[0, 0])\n", + " ax12 = fig.add_subplot(gs[0, 1])\n", + "\n", + " # Row 2 (2 charts)\n", + " ax21 = fig.add_subplot(gs[1, 0])\n", + " ax22 = fig.add_subplot(gs[1, 1])\n", + "\n", + " # Row 3 (1 chart spanning both columns)\n", + " ax3 = fig.add_subplot(gs[2, :])\n", + "\n", + " # Plots\n", + " sweep.plot_server_utilization_overlay(ax11, server_ids=[sid])\n", + " sweep.plot_server_waiting_time_overlay(ax12, server_ids=[sid])\n", + " sweep.plot_server_service_rate_overlay(ax21, server_ids=[sid])\n", + " sweep.plot_server_throughput_overlay(ax22, server_ids=[sid])\n", + " sweep.plot_server_latency_overlay(ax3, server_ids=[sid]) # full-width\n", + "\n", + " fig.suptitle(f\"Per-server overlays — {sid}\", y=0.98)\n", + " fig.tight_layout()\n", + " plt.show()\n", + "else:\n", + " print(\"No servers present — skipping per-server overlays.\")\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "asyncflow-sim-py3.12 (3.12.3)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/builder_input/event_injection/lb_two_servers.py b/examples/builder_input/event_injection/lb_two_servers.py index 8af411f..fa1e82e 100644 --- a/examples/builder_input/event_injection/lb_two_servers.py +++ b/examples/builder_input/event_injection/lb_two_servers.py @@ -37,8 +37,8 @@ from asyncflow.workload import RqsGenerator # Runner + Analyzer -from asyncflow.metrics.analyzer import ResultsAnalyzer -from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer +from asyncflow.runner.simulation import SimulationRunner def build_and_run() -> ResultsAnalyzer: diff --git a/examples/builder_input/event_injection/single_server.py b/examples/builder_input/event_injection/single_server.py index 0c514b2..dadae75 100644 --- a/examples/builder_input/event_injection/single_server.py +++ b/examples/builder_input/event_injection/single_server.py @@ -45,8 +45,8 @@ from asyncflow.workload import RqsGenerator # Runner + Analyzer -from asyncflow.runtime.simulation_runner import SimulationRunner -from asyncflow.metrics.analyzer import ResultsAnalyzer +from asyncflow.runner.simulation import SimulationRunner +from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer def build_and_run() -> ResultsAnalyzer: diff --git a/examples/builder_input/load_balancer/two_servers.py b/examples/builder_input/load_balancer/two_servers.py index a57d090..eebced4 100644 --- a/examples/builder_input/load_balancer/two_servers.py +++ b/examples/builder_input/load_balancer/two_servers.py @@ -51,8 +51,8 @@ from asyncflow.workload import RqsGenerator # Runner + Analyzer -from asyncflow.runtime.simulation_runner import SimulationRunner -from asyncflow.metrics.analyzer import ResultsAnalyzer +from asyncflow.runner.simulation import SimulationRunner +from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer def main() -> None: diff --git a/examples/builder_input/single_server/builder_service_plots.png b/examples/builder_input/single_server/builder_service_plots.png index 31c230e..8a0f270 100644 Binary files a/examples/builder_input/single_server/builder_service_plots.png and b/examples/builder_input/single_server/builder_service_plots.png differ diff --git a/examples/builder_input/single_server/single_server.py b/examples/builder_input/single_server/single_server.py index 7fb7e99..f405748 100644 --- a/examples/builder_input/single_server/single_server.py +++ b/examples/builder_input/single_server/single_server.py @@ -40,8 +40,8 @@ from asyncflow.workload import RqsGenerator # Runner + Analyzer -from asyncflow.runtime.simulation_runner import SimulationRunner -from asyncflow.metrics.analyzer import ResultsAnalyzer +from asyncflow.runner.simulation import SimulationRunner +from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer def build_and_run() -> ResultsAnalyzer: diff --git a/examples/yaml_input/event_injections/heavy_single_server.py b/examples/yaml_input/event_injections/heavy_single_server.py index 72605af..5ac9b16 100644 --- a/examples/yaml_input/event_injections/heavy_single_server.py +++ b/examples/yaml_input/event_injections/heavy_single_server.py @@ -19,8 +19,8 @@ import matplotlib.pyplot as plt import simpy -from asyncflow.metrics.analyzer import ResultsAnalyzer -from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer +from asyncflow.runner.simulation import SimulationRunner def main() -> None: diff --git a/examples/yaml_input/event_injections/lb_two_servers.py b/examples/yaml_input/event_injections/lb_two_servers.py index a2b666c..700f42d 100644 --- a/examples/yaml_input/event_injections/lb_two_servers.py +++ b/examples/yaml_input/event_injections/lb_two_servers.py @@ -15,8 +15,8 @@ import matplotlib.pyplot as plt import simpy -from asyncflow.metrics.analyzer import ResultsAnalyzer -from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer +from asyncflow.runner.simulation import SimulationRunner def main() -> None: diff --git a/examples/yaml_input/event_injections/single_server.py b/examples/yaml_input/event_injections/single_server.py index 58d1603..3f5b182 100644 --- a/examples/yaml_input/event_injections/single_server.py +++ b/examples/yaml_input/event_injections/single_server.py @@ -19,8 +19,8 @@ import matplotlib.pyplot as plt import simpy -from asyncflow.metrics.analyzer import ResultsAnalyzer -from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer +from asyncflow.runner.simulation import SimulationRunner def main() -> None: diff --git a/examples/yaml_input/load_balancer/two_servers.py b/examples/yaml_input/load_balancer/two_servers.py index a6fb125..622d277 100644 --- a/examples/yaml_input/load_balancer/two_servers.py +++ b/examples/yaml_input/load_balancer/two_servers.py @@ -23,8 +23,8 @@ import simpy import matplotlib.pyplot as plt -from asyncflow.runtime.simulation_runner import SimulationRunner -from asyncflow.metrics.analyzer import ResultsAnalyzer +from asyncflow.runner.simulation import SimulationRunner +from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer def main() -> None: diff --git a/examples/yaml_input/single_server/single_server.py b/examples/yaml_input/single_server/single_server.py index 722de75..8e03779 100644 --- a/examples/yaml_input/single_server/single_server.py +++ b/examples/yaml_input/single_server/single_server.py @@ -44,8 +44,8 @@ import matplotlib.pyplot as plt # The only imports a user needs to run a simulation -from asyncflow.metrics.analyzer import ResultsAnalyzer -from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer +from asyncflow.runner.simulation import SimulationRunner # --- Basic Logging Setup --- logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") diff --git a/src/asyncflow/__init__.py b/src/asyncflow/__init__.py index 0f38c83..bcaa60c 100644 --- a/src/asyncflow/__init__.py +++ b/src/asyncflow/__init__.py @@ -2,6 +2,7 @@ from __future__ import annotations from asyncflow.builder.asyncflow_builder import AsyncFlow -from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.runner.simulation import SimulationRunner +from asyncflow.runner.sweep import Sweep -__all__ = ["AsyncFlow", "SimulationRunner"] +__all__ = ["AsyncFlow", "SimulationRunner", "Sweep"] diff --git a/src/asyncflow/analysis/__init__.py b/src/asyncflow/analysis/__init__.py index 825de6e..e7338af 100644 --- a/src/asyncflow/analysis/__init__.py +++ b/src/asyncflow/analysis/__init__.py @@ -1,5 +1,7 @@ """Public module exposing the results analyzer""" -from asyncflow.metrics.analyzer import ResultsAnalyzer +from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer +from asyncflow.metrics.sweep_analyzer import SweepAnalyzer +from asyncflow.queue_theory_analysis.mm1 import MM1 -__all__ = ["ResultsAnalyzer"] +__all__ = ["MM1", "ResultsAnalyzer", "SweepAnalyzer"] diff --git a/src/asyncflow/config/constants.py b/src/asyncflow/config/constants.py index fb23820..a1a8e8b 100644 --- a/src/asyncflow/config/constants.py +++ b/src/asyncflow/config/constants.py @@ -213,7 +213,7 @@ class SamplePeriods(float, Enum): STANDARD_TIME = 0.01 # 10 MILLISECONDS MINIMUM_TIME = 0.001 # 1 MILLISECOND - MAXIMUM_TIME = 0.1 # 100 MILLISECONDS + MAXIMUM_TIME = 0.5 # 500 MILLISECONDS # ====================================================================== # CONSTANTS FOR EVENT METRICS @@ -227,7 +227,13 @@ class EventMetricName(StrEnum): # Mandatory RQS_CLOCK = "rqs_clock" # useful to collect starting and finishing time of rqs - # Not mandatory + RQS_SERVER_CLOCK = "rqs_server_clock" #useful for latency and throughput of the server + SERVICE_TIME = "service_time" + IO_TIME = "io_time" + WAITING_TIME = "waiting_time" + + + # Not mandatory now not implemented LLM_COST = "llm_cost" @@ -240,6 +246,14 @@ class AggregatedMetricName(StrEnum): LATENCY_STATS = "latency_stats" THROUGHPUT = "throughput_rps" + SERVER_THROUGHPUT = "server_throughput" + SERVER_LATENCY_STATS = "server_latency_stats" + SERVICE_TIME_STATS = "service_time_stats" + IO_TIME_STATS = "io_time_stats" + WAITING_TIME_STATS = "waiting_time_stats" + UTILIZATION = "utilization" + + # now not implemented LLM_STATS = "llm_stats" # ====================================================================== diff --git a/src/asyncflow/metrics/client.py b/src/asyncflow/metrics/client.py index 2e49638..c2632d2 100644 --- a/src/asyncflow/metrics/client.py +++ b/src/asyncflow/metrics/client.py @@ -9,7 +9,9 @@ class RqsClock(NamedTuple): """ structure to register time of generation and - time of elaboration for each request + time of elaboration for each request during + all the cycle of elaboration starting and ending + with the client """ start: float diff --git a/src/asyncflow/metrics/server.py b/src/asyncflow/metrics/server.py index 6ebb96e..516680f 100644 --- a/src/asyncflow/metrics/server.py +++ b/src/asyncflow/metrics/server.py @@ -1,9 +1,9 @@ """ initialization of the structure to gather the sampled metrics -for the server of the system +and event metrics for the server of the system """ - from collections.abc import Iterable +from dataclasses import dataclass from asyncflow.config.constants import SampledMetricName @@ -32,3 +32,14 @@ def build_server_metrics( metric: [] for metric in SERVER_METRICS if metric in enabled_sample_metrics } + +# For the client we choosed a named tuple, here we prefer +# a dataclass because we need mutability since start and +# are updated in two different steps +@dataclass +class ServerClock: + """Server-side request timing: start + finish.""" + + start: float + finish: float | None = None + diff --git a/src/asyncflow/metrics/analyzer.py b/src/asyncflow/metrics/simulation_analyzer.py similarity index 66% rename from src/asyncflow/metrics/analyzer.py rename to src/asyncflow/metrics/simulation_analyzer.py index b9a6ea2..9b82dc6 100644 --- a/src/asyncflow/metrics/analyzer.py +++ b/src/asyncflow/metrics/simulation_analyzer.py @@ -3,11 +3,15 @@ from __future__ import annotations from collections import defaultdict -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, TypedDict import numpy as np -from asyncflow.config.constants import LatencyKey, SampledMetricName +from asyncflow.config.constants import ( + EventMetricName, + LatencyKey, + SampledMetricName, +) from asyncflow.config.plot_constants import ( LATENCY_PLOT, RAM_PLOT, @@ -15,6 +19,7 @@ THROUGHPUT_PLOT, PlotCfg, ) +from asyncflow.metrics.server import ServerClock if TYPE_CHECKING: # Standard library typing imports in type-checking block (TC003). @@ -29,10 +34,22 @@ from asyncflow.schemas.settings.simulation import SimulationSettings -# Short alias to keep signatures within 88 chars (E501). +# Short alias to keep signatures within 88 chars. Series = tuple[list[float], list[float]] +class ServerArrays(TypedDict): + """Object to collect relevant data for each server""" + + latencies: list[float] + service_time: list[float] + io_time: list[float] + waiting_time: list[float] + finish_times: list[float] + +ServerArraysMap = dict[str, ServerArrays] + + class ResultsAnalyzer: """Analyze and visualize the results of a completed simulation. @@ -68,18 +85,122 @@ def __init__( self.throughput_series: Series | None = None # Sampled metrics are stored with string metric keys for simplicity. self.sampled_metrics: dict[str, dict[str, list[float]]] | None = None + # Per-server, per-request arrays (filled lazily by _collect_server_event_arrays) + # Map: server_id -> { + # 'latencies': list[float], # server-side (finish - start) + # 'service_time': list[float], + # 'io_time': list[float], + # 'waiting_time': list[float], + # 'finish_times': list[float], # for per-server throughput + # } + self.server_event_arrays: ServerArraysMap | None = None # ───────────────────────────────────────────── # Core computation # ───────────────────────────────────────────── def process_all_metrics(self) -> None: """Compute all aggregated and sampled metrics if not already done.""" + # Client-side: end-to-end latencies + 1s throughput if self.latency_stats is None and self._client.rqs_clock: self._process_event_metrics() + # Sampled time series from servers/edges (RAM, queues, etc.) if self.sampled_metrics is None: self._extract_sampled_metrics() + # Per-server per-request arrays (service/io/wait/server-latency/finishes) + self.get_server_event_arrays() # single call, handles lazy init + + def _build_server_event_arrays(self) -> ServerArraysMap: + """Pure builder: returns {server_id -> arrays} without mutating self.""" + out: ServerArraysMap = {} + + for srv in self._servers: + sid = srv.server_config.id + latencies: list[float] = [] + service: list[float] = [] + io_w: list[float] = [] + wait: list[float] = [] + finishes: list[float] = [] + + # srv.server_rqs_clock: Mapping[int, MetricBucket] + for bucket in srv.server_rqs_clock.values(): + # Server clock (if present and completed) + clock = bucket.get(EventMetricName.RQS_SERVER_CLOCK) + if isinstance(clock, ServerClock) and clock.finish is not None: + latencies.append(float(clock.finish - clock.start)) + finishes.append(float(clock.finish)) + + # Accumulators are floats in the bucket + st = bucket.get(EventMetricName.SERVICE_TIME, 0.0) + if isinstance(st, float): + service.append(st) + + it = bucket.get(EventMetricName.IO_TIME, 0.0) + if isinstance(it, float): + io_w.append(it) + + wt = bucket.get(EventMetricName.WAITING_TIME, 0.0) + if isinstance(wt, float): + wait.append(wt) + + out[sid] = ServerArrays( + latencies=latencies, + service_time=service, + io_time=io_w, + waiting_time=wait, + finish_times=finishes, + ) + + return out + + def _ensure_server_arrays(self) -> ServerArraysMap: + """Ensure self.server_event_arrays is built exactly once, and return it.""" + if self.server_event_arrays is None: + self.server_event_arrays = self._build_server_event_arrays() + return self.server_event_arrays + + def get_server_event_arrays(self) -> ServerArraysMap: + """Return {server_id -> per-request arrays} (computed lazily).""" + return self._ensure_server_arrays() + + def get_server_throughput_series( + self, server_id: str, *, window_s: float | None = None, + ) -> Series: + """ + Return (timestamps, RPS) for a single server + in fixed windows (default 1s) + """ + if window_s is None: + window_s = ResultsAnalyzer._WINDOW_SIZE_S + + arrays = self.get_server_event_arrays().get(server_id) + if arrays is None: + return ([], []) + + finishes = sorted(arrays["finish_times"]) + if not finishes: + return ([], []) + + end_time = self._settings.total_simulation_time + timestamps: list[float] = [] + rps_values: list[float] = [] + idx = 0 + window = float(window_s) + current_end = window + + while current_end <= end_time: + count = 0 + while idx < len(finishes) and finishes[idx] <= current_end: + count += 1 + idx += 1 + timestamps.append(current_end) + rps_values.append(count / window) + current_end += window + + return (timestamps, rps_values) + + def _process_event_metrics(self) -> None: """Calculate latency stats and throughput time series (1s RPS).""" # 1) Latencies @@ -533,7 +654,6 @@ def plot_single_server_io_queue(self, ax: Axes, server_id: str) -> None: leg.get_frame().set_facecolor("white") - def plot_single_server_ram(self, ax: Axes, server_id: str) -> None: """Plot RAM usage with mean/min/max lines and a single legend box with values. No trend/ewma, no legend entry for the main series. @@ -587,3 +707,187 @@ def plot_single_server_ram(self, ax: Axes, server_id: str) -> None: fontsize=9.5, ) leg.get_frame().set_facecolor("white") + + # ------------------------------------------------- + # SERVER METRICS PLOT + #-------------------------------------------------- + + def _plot_histogram_with_overlays( + self, + ax: Axes, + data: list[float], + *, + title: str, + xlabel: str, + show_p50: bool = False, + ) -> None: + """Render a histogram with mean/(optional)P50/P95/P99 overlays + and a compact legend. + """ + if not data: + ax.text(0.5, 0.5, "No data", ha="center", va="center") + ax.set_title(title) + ax.set_xlabel(xlabel) + ax.set_ylabel("count") + ax.grid(visible=True) + return + + # Colors consistent with the rest of the module + col_mean = "#d62728" # red + col_p50 = "#ff7f0e" # orange + col_p95 = "#2ca02c" # green + col_p99 = "#9467bd" # purple + hist_color = "#1f77b4" # soft blue + + arr = np.asarray(data, dtype=float) + v_mean = float(np.mean(arr)) + v_p95 = float(np.percentile(arr, 95)) + v_p99 = float(np.percentile(arr, 99)) + + # Histogram (subtle to let overlays stand out) + ax.hist( + arr, bins=50, color=hist_color, alpha=0.40, + edgecolor="none", zorder=1, + ) + + # Overlays + ax.axvline( + v_mean, color=col_mean, linestyle=":", linewidth=1.8, + alpha=0.95, zorder=3, + ) + handles: list[Line2D] = [] + + # Legend handles (dummy lines with values) + h_mean = ax.plot( + [], [], color=col_mean, linestyle=":", linewidth=2.4, + label=f"mean = {v_mean:.3f}", + )[0] + handles.append(h_mean) + + if show_p50: + v_p50 = float(np.percentile(arr, 50)) + ax.axvline( + v_p50, color=col_p50, linestyle="-.", linewidth=1.6, + alpha=0.90, zorder=3, + ) + h_p50 = ax.plot( + [], [], color=col_p50, linestyle="-.", linewidth=2.4, + label=f"P50 = {v_p50:.3f}", + )[0] + handles.append(h_p50) + + ax.axvline( + v_p95, color=col_p95, linestyle="--", linewidth=1.6, + alpha=0.90, zorder=3, + ) + ax.axvline( + v_p99, color=col_p99, linestyle="--", linewidth=1.6, + alpha=0.90, zorder=3, + ) + + h_p95 = ax.plot( + [], [], color=col_p95, linestyle="--", linewidth=2.4, + label=f"P95 = {v_p95:.3f}", + )[0] + h_p99 = ax.plot( + [], [], color=col_p99, linestyle="--", linewidth=2.4, + label=f"P99 = {v_p99:.3f}", + )[0] + handles.extend([h_p95, h_p99]) + + # Titles / labels / grid + ax.set_title(title) + ax.set_xlabel(xlabel) + ax.set_ylabel("count") + ax.grid(visible=True) + + # Legend (top-right) with readable background + leg = ax.legend( + handles=handles, + loc="upper right", + bbox_to_anchor=(0.98, 0.98), + borderaxespad=0.0, + framealpha=0.90, + fancybox=True, + handlelength=2.6, + fontsize=9.5, + ) + leg.get_frame().set_facecolor("white") + + + def plot_server_event_metrics_dashboard( + self, + ax_latency_hist: Axes, + ax_service_hist: Axes, + ax_io_hist: Axes, + ax_wait_hist: Axes, + server_id: str, + ) -> None: + """Dashboard of per-request distributions for a single server: + - server-side latency (finish - start) + - accumulated SERVICE_TIME (CPU) + - accumulated IO_TIME + - accumulated WAITING_TIME + """ + arrays = self.get_server_event_arrays().get(server_id, None) + if arrays is None: + # Graceful empty state for all panes + for ax, msg in [ + (ax_latency_hist, "No server-side latencies"), + (ax_service_hist, "No service-time samples"), + (ax_io_hist, "No I/O-time samples"), + (ax_wait_hist, "No waiting-time samples"), + ]: + ax.text(0.5, 0.5, msg, ha="center", va="center") + ax.grid(visible=True) + return + + # 1) Server-side latency histogram (mean/P50/P95/P99) + self._plot_histogram_with_overlays( + ax_latency_hist, + arrays["latencies"], + title=f"Server latency — {server_id}", + xlabel="seconds", + show_p50=True, + ) + + # 2) CPU service time (mean/P95/P99) + self._plot_histogram_with_overlays( + ax_service_hist, + arrays["service_time"], + title=f"CPU service time — {server_id}", + xlabel="seconds", + show_p50=False, + ) + + # 3) I/O wait time (mean/P95/P99) + self._plot_histogram_with_overlays( + ax_io_hist, + arrays["io_time"], + title=f"I/O time — {server_id}", + xlabel="seconds", + show_p50=False, + ) + + # 4) CPU waiting time (mean/P95/P99) + self._plot_histogram_with_overlays( + ax_wait_hist, + arrays["waiting_time"], + title=f"CPU waiting time — {server_id}", + xlabel="seconds", + show_p50=False, + ) + + def plot_server_timeseries_dashboard( + self, + ax_ready: Axes, + ax_io: Axes, + ax_ram: Axes, + server_id: str, + ) -> None: + """Quick dashboard for one server: Ready queue, I/O queue, and RAM series.""" + # Reuse existing single-plot helpers for consistency. + self.plot_single_server_ready_queue(ax_ready, server_id) + self.plot_single_server_io_queue(ax_io, server_id) + self.plot_single_server_ram(ax_ram, server_id) + diff --git a/src/asyncflow/metrics/sweep_analyzer.py b/src/asyncflow/metrics/sweep_analyzer.py new file mode 100644 index 0000000..6d50dd3 --- /dev/null +++ b/src/asyncflow/metrics/sweep_analyzer.py @@ -0,0 +1,455 @@ +""" +SweepAnalyzer — build plots from a sweep over *mean concurrent users*. + +Global +------ +- Throughput (mean RPS) vs. users +- Mean latency (W) vs. users +- (Opzionale) Latency percentiles vs. users + +Per-server (overlay) +-------------------- +- Utilization rho_i vs. users +- Waiting time Wq_i vs. users +- Service rate mu_i vs. users +- Throughput lambda_i vs. users +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Literal, cast + +import matplotlib.pyplot as plt + +from asyncflow.config.constants import LatencyKey + +if TYPE_CHECKING: # pragma: no cover + from collections.abc import Iterable + + from matplotlib.axes import Axes + from matplotlib.figure import Figure + + from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer + + +# ────────────────────────────────────────────────────────────────────── +# Data containers +# ────────────────────────────────────────────────────────────────────── + +@dataclass(frozen=True) +class GlobalPoint: + """One sweep point (global/system metrics collected today).""" + + users: int + lambda_rps: float # mean throughput (RPS) + W: float # mean end-to-end latency (s) + p50: float # latency median (s) + p95: float # latency P95 (s) + p99: float # latency P99 (s) + + +@dataclass(frozen=True) +class ServerPoint: + """One sweep point for a single server.""" + + users: int + server_id: str + lambda_rps: float # server throughput (estimated) + mu_rps: float # 1 / mean(service_time) + rho: float # lambda / mu + Wq: float # mean waiting time (s) + service_mean_s: float # mean service time (s) + completions: int # number of completed requests + server_latency_mean_s: float + + +# ────────────────────────────────────────────────────────────────────── +# Analyzer +# ────────────────────────────────────────────────────────────────────── + +class SweepAnalyzer: + """ + Build plots from a sweep over *mean concurrent users*. + + Input + ----- + pairs : Iterable[tuple[int, ResultsAnalyzer]] + Output of Sweep.sweep_on_user(...): (users, analyzer). + + Caching + ------- + Collections are private and executed once. All plotters read from caches. + """ + + def __init__(self, pairs: Iterable[tuple[int, ResultsAnalyzer]]) -> None: + """Initialize with (users, ResultsAnalyzer) pairs and prepare caches.""" + self._pairs: list[tuple[int, ResultsAnalyzer]] = sorted( + (int(u), ra) for (u, ra) in pairs + ) + # Caches + self._global_points: list[GlobalPoint] = [] + self._server_points: dict[str, list[ServerPoint]] = {} + self._collected_global: bool = False + self._collected_servers: bool = False + + # ────────────────────────────────────────────────────────────────── + # Public convenience + # ────────────────────────────────────────────────────────────────── + + def precollect(self) -> None: + """Warm both caches once (optional).""" + self._ensure_global_collected() + self._ensure_servers_collected() + + # ────────────────────────────────────────────────────────────────── + # Private collectors (one-time) + # ────────────────────────────────────────────────────────────────── + + def _ensure_global_collected(self) -> None: + if self._collected_global: + return + self._collect_global() + self._collected_global = True + + def _ensure_servers_collected(self) -> None: + if self._collected_servers: + return + self._collect_servers() + self._collected_servers = True + + def _collect_global(self) -> None: + """Compute global/system metrics (throughput & latency) once.""" + out: list[GlobalPoint] = [] + + for users, ra in self._pairs: + # Ensure metrics are computed + ra.process_all_metrics() + + # λ: mean throughput from the time series + _, rps_series = ra.get_throughput_series() + if rps_series: + lambda_rps = float(sum(rps_series)) / float(len(rps_series)) + else: + lambda_rps = 0.0 + + # Latency stats → W and percentiles + lat = ra.get_latency_stats() + w_mean = float(lat.get(LatencyKey.MEAN, 0.0)) + p50 = float(lat.get(LatencyKey.MEDIAN, 0.0)) + p95 = float(lat.get(LatencyKey.P95, 0.0)) + p99 = float(lat.get(LatencyKey.P99, 0.0)) + + out.append( + GlobalPoint( + users=users, + lambda_rps=lambda_rps, + W=w_mean, + p50=p50, + p95=p95, + p99=p99, + ), + ) + + self._global_points = out + + def _collect_servers(self) -> None: + """ + Compute per-server metrics across the sweep once. + + Server throughput λᵢ is estimated via completions split: + λᵢ ≈ (nᵢ / Σⱼ nⱼ) · λ_tot + """ + points: dict[str, list[ServerPoint]] = {} + + for users, ra in self._pairs: + ra.process_all_metrics() + + # Global lambda for proportional split + _, rps_series = ra.get_throughput_series() + lambda_tot = ( + float(sum(rps_series)) / float(len(rps_series)) + if rps_series + else 0.0 + ) + + arrays_map = cast( + "dict[str, dict[str, list[float]]]", + ra.get_server_event_arrays(), + ) + sids = ra.list_server_ids() + + total_compl = 0 + per_server_compl: dict[str, int] = {} + for sid in sids: + arr_for_count: dict[str, list[float]] = arrays_map.get(sid, {}) + n = len(arr_for_count.get("finish_times", [])) + per_server_compl[sid] = n + total_compl += n + + for sid in sids: + arr: dict[str, list[float]] = arrays_map.get(sid, {}) + + # μᵢ from mean(service_time) + s_vals: list[float] = arr.get("service_time", []) + if s_vals: + s_mean = float(sum(s_vals)) / float(len(s_vals)) + mu = (1.0 / s_mean) if s_mean > 0.0 else float("inf") + else: + s_mean = 0.0 + mu = float("inf") + + # Wqᵢ from mean(waiting_time) + wq_vals: list[float] = arr.get("waiting_time", []) + wq_mean = ( + float(sum(wq_vals)) / float(len(wq_vals)) + if wq_vals + else 0.0 + ) + + # server latency + lat_vals: list[float] = arr.get("latencies", []) + if lat_vals: + server_lat_mean = float(sum(lat_vals)) / float(len(lat_vals)) + else: + server_lat_mean = wq_mean + ( + 1.0 / mu if mu not in (0.0, float("inf")) else s_mean) + + # λᵢ via proportional split of completions + n_i = per_server_compl.get(sid, 0) + if total_compl > 0: + lambda_i = (n_i / float(total_compl)) * lambda_tot + else: + lambda_i = 0.0 + + # ρᵢ = λᵢ / μᵢ + rho = (lambda_i / mu) if mu not in (0.0, float("inf")) else 0.0 + + points.setdefault(sid, []).append( + ServerPoint( + users=users, + server_id=sid, + lambda_rps=lambda_i, + mu_rps=mu, + rho=rho, + Wq=wq_mean, + service_mean_s=s_mean, + completions=n_i, + server_latency_mean_s=server_lat_mean, + ), + ) + + self._server_points = points + + # ────────────────────────────────────────────────────────────────── + # Global plotters (cached) + # ────────────────────────────────────────────────────────────────── + + def plot_global_throughput(self, ax: Axes) -> None: + """Plot mean throughput (RPS) vs. mean concurrent users.""" + self._ensure_global_collected() + pts = self._global_points + xs = [p.users for p in pts] + ys = [p.lambda_rps for p in pts] + ax.plot(xs, ys, marker="o") + ax.set_title("Throughput (mean RPS) vs. concurrent users") + ax.set_xlabel("Mean concurrent users") + ax.set_ylabel("RPS") + ax.grid(visible=True, alpha=0.3) + + def plot_global_latency(self, ax: Axes) -> None: + """Plot mean system time (W) vs. mean concurrent users.""" + self._ensure_global_collected() + pts = self._global_points + xs = [p.users for p in pts] + ys = [p.W for p in pts] + ax.plot(xs, ys, marker="o") + ax.set_title("Mean system time (W) vs. concurrent users") + ax.set_xlabel("Mean concurrent users") + ax.set_ylabel("W (seconds)") + ax.grid(visible=True, alpha=0.3) + + def plot_global_latency_percentiles(self, ax: Axes) -> None: + """Plot P50, P95, P99 latency vs. mean concurrent users.""" + self._ensure_global_collected() + pts = self._global_points + xs = [p.users for p in pts] + ax.plot(xs, [p.p50 for p in pts], marker="o", label="P50") + ax.plot(xs, [p.p95 for p in pts], marker="o", label="P95") + ax.plot(xs, [p.p99 for p in pts], marker="o", label="P99") + ax.set_title("Latency percentiles vs. concurrent users") + ax.set_xlabel("Mean concurrent users") + ax.set_ylabel("Latency (seconds)") + ax.legend() + ax.grid(visible=True, alpha=0.3) + + def plot_global_dashboard(self) -> Figure: + """1x2 dashboard: throughput and mean latency (W).""" + fig, axes = plt.subplots(1, 2, figsize=(12, 4.5), dpi=130) + self.plot_global_throughput(axes[0]) + self.plot_global_latency(axes[1]) + fig.tight_layout() + return fig + + # ────────────────────────────────────────────────────────────────── + # Per-server plotters (overlay; cached) + # ────────────────────────────────────────────────────────────────── + + def _select_top_servers( + self, + by: Literal["rho", "Wq", "mu", "lambda"], + max_servers: int, + ) -> list[str]: + """Pick up to `max_servers` “hottest” servers at max users (from cache).""" + self._ensure_servers_collected() + data = self._server_points + if not data: + return [] + + max_users = max( + (pt.users for pts in data.values() for pt in pts), + default=0, + ) + scores: list[tuple[str, float]] = [] + for sid, pts in data.items(): + at_max = [p for p in pts if p.users == max_users] + if not at_max: + continue + p = at_max[-1] + if by == "rho": + val = p.rho + elif by == "Wq": + val = p.Wq + elif by == "mu": + val = -p.mu_rps if p.mu_rps not in (0.0, float("inf")) else -0.0 + else: # "lambda" + val = p.lambda_rps + scores.append((sid, float(val))) + + scores.sort(key=lambda x: x[1], reverse=True) + return [sid for sid, _ in scores[:max_servers]] + + def plot_server_utilization_overlay( + self, + ax: Axes, + *, + max_servers: int = 5, + server_ids: list[str] | None = None, + ) -> None: + """Overlay of server utilization rho vs. users (auto-picks hottest)""" + self._ensure_servers_collected() + ids = server_ids or self._select_top_servers("rho", max_servers) + for sid in sorted(ids): + pts = self._server_points.get(sid, []) + xs = [p.users for p in pts] + ys = [p.rho for p in pts] + ax.plot(xs, ys, marker="o", label=sid) + ax.set_title("Server utilization (rho) vs. concurrent users") + ax.set_xlabel("Mean concurrent users") + ax.set_ylabel("rho") + if ids: + ax.legend() + ax.grid(visible=True, alpha=0.3) + + def plot_server_waiting_time_overlay( + self, + ax: Axes, + *, + max_servers: int = 5, + server_ids: list[str] | None = None, + ) -> None: + """Overlay of server waiting time Wqᵢ vs. users (auto-picks hottest)""" + self._ensure_servers_collected() + ids = server_ids or self._select_top_servers("Wq", max_servers) + for sid in sorted(ids): + pts = self._server_points.get(sid, []) + xs = [p.users for p in pts] + ys = [p.Wq for p in pts] + ax.plot(xs, ys, marker="o", label=sid) + ax.set_title("Server waiting time (Wq) vs. concurrent users") + ax.set_xlabel("Mean concurrent users") + ax.set_ylabel("Wq (seconds)") + if ids: + ax.legend() + ax.grid(visible=True, alpha=0.3) + + def plot_server_service_rate_overlay( + self, + ax: Axes, + *, + max_servers: int = 5, + server_ids: list[str] | None = None, + ) -> None: + """Overlay of server service rate μ vs. users (auto-picks hottest)""" + self._ensure_servers_collected() + ids = server_ids or self._select_top_servers("mu", max_servers) + for sid in sorted(ids): + pts = self._server_points.get(sid, []) + xs = [p.users for p in pts] + ys = [p.mu_rps for p in pts] + ax.plot(xs, ys, marker="o", label=sid) + ax.set_title("Server service rate (mu) vs. concurrent users") + ax.set_xlabel("Mean concurrent users") + ax.set_ylabel("mu (1/s)") + if ids: + ax.legend() + ax.grid(visible=True, alpha=0.3) + + def plot_server_throughput_overlay( + self, + ax: Axes, + *, + max_servers: int = 5, + server_ids: list[str] | None = None, + ) -> None: + """Overlay of server throughput λ vs. users (auto-picks hottest by default).""" + self._ensure_servers_collected() + ids = server_ids or self._select_top_servers("lambda", max_servers) + for sid in sorted(ids): + pts = self._server_points.get(sid, []) + xs = [p.users for p in pts] + ys = [p.lambda_rps for p in pts] + ax.plot(xs, ys, marker="o", label=sid) + ax.set_title("Server throughput (lambda) vs. concurrent users") + ax.set_xlabel("Mean concurrent users") + ax.set_ylabel("lambda (1/s)") + if ids: + ax.legend() + ax.grid(visible=True, alpha=0.3) + + def plot_server_latency_overlay( + self, ax: Axes, *, max_servers: int = 5, server_ids: list[str] | None = None, + ) -> None: + """Plot of the latency vs concurrent user""" + self._ensure_servers_collected() + ids = server_ids or self._select_top_servers("Wq", max_servers) + for sid in sorted(ids): + pts = self._server_points.get(sid, []) + xs = [p.users for p in pts] + ys = [p.server_latency_mean_s for p in pts] + ax.plot(xs, ys, marker="o", label=sid) + ax.set_title("Server latency (waiting+service) vs. concurrent users") + ax.set_xlabel("Mean concurrent users") + ax.set_ylabel("Server latency (s)") + if ids: + ax.legend() + ax.grid(visible=True, alpha=0.3) + + + def plot_server_dashboard(self) -> Figure: + """2x3 per-server overlay: rho_i, Wq_i, mu_i, lambda_i, server latency.""" + fig, axes = plt.subplots(2, 3, figsize=(16, 8), dpi=130) + + # Row 1 + self.plot_server_utilization_overlay(axes[0, 0]) + self.plot_server_waiting_time_overlay(axes[0, 1]) + self.plot_server_service_rate_overlay(axes[0, 2]) + + # Row 2 + self.plot_server_throughput_overlay(axes[1, 0]) + self.plot_server_latency_overlay(axes[1, 1]) + axes[1, 2].axis("off") # keep layout symmetric + + fig.tight_layout() + return fig + diff --git a/src/asyncflow/queue_theory_analysis/base.py b/src/asyncflow/queue_theory_analysis/base.py new file mode 100644 index 0000000..5de4bbb --- /dev/null +++ b/src/asyncflow/queue_theory_analysis/base.py @@ -0,0 +1,37 @@ +"""Base interfaces for queueing-theory analyzers. + +Each concrete analyzer (e.g. MM1) must: +- declare its compatibility rules against an AsyncFlow payload +- compute closed-form KPIs when assumptions are satisfied +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from asyncflow.schemas.payload import SimulationPayload + + +class QueueTheoryBase(ABC): + """Abstract base for all queue-theory analyzers.""" + + @abstractmethod + def explain_incompatibilities(self, payload: SimulationPayload) -> list[str]: + """Return a list of human-readable reasons why the payload is incompatible. + Empty list means 'compatible'. + """ + + def is_compatible(self, payload: SimulationPayload) -> bool: + """Shorthand boolean check.""" + return not self.explain_incompatibilities(payload) + + def validate_or_raise(self, payload: SimulationPayload) -> None: + """Raise ValueError with a compact message if incompatible.""" + errs = self.explain_incompatibilities(payload) + if errs: + bullet = "\n - " + msg = "Payload is not compatible with this queueing model:" + bullet + msg += bullet.join(errs) + raise ValueError(msg) diff --git a/src/asyncflow/queue_theory_analysis/mm1.py b/src/asyncflow/queue_theory_analysis/mm1.py new file mode 100644 index 0000000..6c22b64 --- /dev/null +++ b/src/asyncflow/queue_theory_analysis/mm1.py @@ -0,0 +1,423 @@ +""" +Check if asyncflow under the hypothesis of a mm1 queue +reproduce the theory. +""" + +from __future__ import annotations + +import sys +from typing import TYPE_CHECKING, TextIO, TypedDict, cast + +from asyncflow.config.constants import ( + Distribution, + EndpointStepCPU, + LatencyKey, + StepOperation, +) +from asyncflow.queue_theory_analysis.base import QueueTheoryBase +from asyncflow.schemas.common.random_variables import RVConfig + +if TYPE_CHECKING: + from collections.abc import Callable + + from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer + from asyncflow.schemas.payload import SimulationPayload + + +class MM1Results(TypedDict): + """Closed-form KPIs for an M/M/1 queue.""" + + lambda_rate: float # arrival rate (1/s) + mu_rate: float # service rate (1/s) + rho: float # utilization + L: float # mean items in system + Lq: float # mean items in queue + W: float # mean time in system (s) + Wq: float # mean waiting time (s) + + +class KPIRow(TypedDict): + """One formatted row for theory vs observed comparison.""" + + symbol: str + name: str + theory: float | str + observed: float | str + abs_diff: float | str + rel_diff_pct: float | str # percentage (e.g., "3.2" means +3.2%) + + +class MM1(QueueTheoryBase): + """Analyzer for the M/M/1 queue with strict model checks.""" + + # Upper bound for "negligible" deterministic network latency + MAX_EDGE_LATENCY_S: float = 1e-3 # 1 ms + + # ────────────────────────────────────────────────────────────────── + # Compatibility checks split into helpers to keep cyclomatic low + # ────────────────────────────────────────────────────────────────── + def _check_topology(self, payload: SimulationPayload) -> list[str]: + errs: list[str] = [] + nodes = payload.topology_graph.nodes + if len(nodes.servers) != 1: + errs.append("requires exactly one server (no parallel servers).") + if nodes.load_balancer is not None: + errs.append("load balancer must be absent (fan-out not allowed).") + return errs + + def _check_generator(self, payload: SimulationPayload) -> list[str]: + errs: list[str] = [] + gen = payload.rqs_input + if gen.avg_active_users.distribution != Distribution.POISSON: + errs.append("avg_active_users must be Poisson.") + if gen.avg_request_per_minute_per_user.distribution != Distribution.POISSON: + errs.append("avg_request_per_minute_per_user must be Poisson.") + if gen.avg_active_users.mean <= 0: + errs.append("avg_active_users.mean must be > 0.") + if gen.avg_request_per_minute_per_user.mean <= 0: + errs.append("avg_request_per_minute_per_user.mean must be > 0.") + return errs + + def _check_edges(self, payload: SimulationPayload) -> list[str]: + errs: list[str] = [] + for edge in payload.topology_graph.edges: + latency = edge.latency + if isinstance(latency, RVConfig): + errs.append( + f"edge '{edge.id}' latency must be deterministic (<=1ms), " + "not a random variable.", + ) + continue + if float(latency) > self.MAX_EDGE_LATENCY_S: + errs.append( + f"edge '{edge.id}' deterministic latency must be <= 1 ms.", + ) + return errs + + def _check_server_model(self, payload: SimulationPayload) -> list[str]: + errs: list[str] = [] + srv = payload.topology_graph.nodes.servers[0] + if len(srv.endpoints) != 1: + errs.append("server must expose exactly one endpoint.") + return errs + + steps = srv.endpoints[0].steps + if len(steps) != 1: + errs.append("endpoint must contain exactly one step.") + return errs + + step = steps[0] + if not isinstance(step.kind, EndpointStepCPU): + errs.append("the single step must be CPU-bound.") + return errs + + op_key, op_data = next(iter(step.step_operation.items())) + if op_key is not StepOperation.CPU_TIME: + errs.append("CPU step must use CPU_TIME as its operation.") + return errs + + # Must be exponential RV (not deterministic) + if not isinstance(op_data, RVConfig): + errs.append("service time must be an exponential RVConfig.") + return errs + if op_data.distribution != Distribution.EXPONENTIAL: + errs.append("service time distribution must be exponential.") + if op_data.mean <= 0: + errs.append("service time mean must be > 0.") + return errs + + # ------------- Compatibility (public) -------------------------------- + def explain_incompatibilities( + self, payload: SimulationPayload, + ) -> list[str]: + """Collect and return all MM1 assumption violations.""" + errors: list[str] = [] + errors.extend(self._check_topology(payload)) + errors.extend(self._check_generator(payload)) + errors.extend(self._check_edges(payload)) + # Only check server model if we do have servers + if payload.topology_graph.nodes.servers: + errors.extend(self._check_server_model(payload)) + return errors + + # ────────────────────────────────────────────────────────────────── + # Closed forms + # ────────────────────────────────────────────────────────────────── + def _arrival_rate_lambda(self, payload: SimulationPayload) -> float: + """λ = users_mean * rpm_per_user / 60.""" + gen = payload.rqs_input + users = float(gen.avg_active_users.mean) + rpm = float(gen.avg_request_per_minute_per_user.mean) + return users * rpm / 60.0 + + def _service_rate_mu(self, payload: SimulationPayload) -> float: + """μ = 1 / E[S] from the single CPU exponential step.""" + srv = payload.topology_graph.nodes.servers[0] + step = srv.endpoints[0].steps[0] + op_key, op_val = next(iter(step.step_operation.items())) + assert op_key is StepOperation.CPU_TIME + rv = cast("RVConfig", op_val) + assert rv.distribution is Distribution.EXPONENTIAL + return 1.0 / float(rv.mean) + + def _theoretical_kpis(self, payload: SimulationPayload) -> MM1Results: + """Closed-form KPIs. For rho>=1 returns +inf for divergent metrics.""" + self.validate_or_raise(payload) + + lam = self._arrival_rate_lambda(payload) + mu = self._service_rate_mu(payload) + rho = lam / mu + + if rho >= 1.0: + inf = float("inf") + return MM1Results( + lambda_rate=lam, + mu_rate=mu, + rho=rho, + L=inf, + Lq=inf, + W=inf, + Wq=inf, + ) + + l_sys = rho / (1.0 - rho) + lq = (rho * rho) / (1.0 - rho) + w_sys = 1.0 / (mu - lam) + wq = rho / (mu - lam) + + return MM1Results( + lambda_rate=lam, + mu_rate=mu, + rho=rho, + L=l_sys, + Lq=lq, + W=w_sys, + Wq=wq, + ) + + def evaluate(self, payload: SimulationPayload) -> MM1Results: + """Public entry-point: return closed-form KPIs for this payload.""" + return self._theoretical_kpis(payload) + + + # ────────────────────────────────────────────────────────────────── + # Observed KPIs from a run (no private members) + # ────────────────────────────────────────────────────────────────── + def _observed_kpis(self, ra: ResultsAnalyzer) -> MM1Results: + """ + Empirical KPIs from the analyzer: + - lambda_hat: average throughput across windows + - mu_hat: 1 / mean(service_time) + - W_hat: mean end-to-end latency (client) + - Wq_hat: mean waiting_time (server arrays) + - L_hat: lambda_hat * W_hat (Little's law) + - Lq_hat: lambda_hat * Wq_hat + """ + ra.process_all_metrics() + + # λ̂ via throughput series (mean of window RPS) + ts, rps = ra.get_throughput_series() + lambda_hat = (sum(rps) / len(rps)) if rps else 0.0 + + # Ŵ from latency stats + lat_stats = ra.get_latency_stats() + w_hat = float(lat_stats.get(LatencyKey.MEAN, 0.0)) + + # Per-server arrays (first server if present) + server_ids = ra.list_server_ids() + arrays_map = ra.get_server_event_arrays() + arrays = arrays_map.get(server_ids[0]) if server_ids else None + + # mean service time and wait + if arrays and arrays["service_time"]: + s_vals = arrays["service_time"] + s_mean = float(sum(s_vals)) / float(len(s_vals)) + else: + s_mean = 0.0 + mu_hat = (1.0 / s_mean) if s_mean > 0.0 else float("inf") + + if arrays and arrays["waiting_time"]: + wq_vals = arrays["waiting_time"] + wq_hat = float(sum(wq_vals)) / float(len(wq_vals)) + else: + wq_hat = 0.0 + + l_hat = lambda_hat * w_hat + lq_hat = lambda_hat * wq_hat + rho_hat = ( + lambda_hat / mu_hat if mu_hat not in (0.0, float("inf")) else 0.0 + ) + + return MM1Results( + lambda_rate=lambda_hat, + mu_rate=mu_hat, + rho=rho_hat, + L=l_hat, + Lq=lq_hat, + W=w_hat, + Wq=wq_hat, + ) + + # ────────────────────────────────────────────────────────────────── + # Comparison table + # ────────────────────────────────────────────────────────────────── + @staticmethod + def _safe_delta(theory: float, obs: float) -> tuple[str, str, str]: + """Return (theory_str, abs_diff_str, rel_diff_str) with inf-safe logic.""" + def fmt(x: float) -> str: + return "∞" if x == float("inf") else f"{x:.6f}" + + th_s = fmt(theory) + if theory == float("inf"): + return th_s, "—", "—" + + abs_d = obs - theory + rel = (abs_d / theory * 100.0) if theory != 0.0 else float("inf") + rel_s = "∞" if rel == float("inf") else f"{rel:.2f}" + return th_s, f"{abs_d:.6f}", rel_s + + def compare_against_run( + self, + payload: SimulationPayload, + ra: ResultsAnalyzer, + ) -> list[KPIRow]: + """ + Build a table with theory vs observed and absolute/relative deltas. + + Returns + ------- + list[KPIRow] + Rows in a stable order suitable for printing or DataFrame usage. + + """ + self.validate_or_raise(payload) + + th = self._theoretical_kpis(payload) + ob = self._observed_kpis(ra) + + rows: list[KPIRow] = [] + + def add(symbol: str, name: str, getter: Callable[[MM1Results], float]) -> None: + th_v = float(getter(th)) + ob_v = float(getter(ob)) + th_s, abs_s, rel_s = self._safe_delta(th_v, ob_v) + rows.append( + KPIRow( + symbol=symbol, + name=name, + theory=th_s, + observed=f"{ob_v:.6f}", + abs_diff=abs_s, + rel_diff_pct=rel_s, + ), + ) + + add("λ", "Arrival rate (1/s)", lambda m: m["lambda_rate"]) + add("μ", "Service rate (1/s)", lambda m: m["mu_rate"]) + add("rho", "Utilization", lambda m: m["rho"]) + add("L", "Mean items in system", lambda m: m["L"]) + add("Lq", "Mean items in queue", lambda m: m["Lq"]) + add("W", "Mean time in system (s)", lambda m: m["W"]) + add("Wq", "Mean waiting time (s)", lambda m: m["Wq"]) + + return rows + + # ────────────────────────────────────────────────────────────────── + # Pretty printing + # ────────────────────────────────────────────────────────────────── + + @staticmethod + def _format_rows_table(rows: list[KPIRow]) -> str: + """ + Return a compact ASCII table for `compare_against_run(...)` rows. + + The layout is stable, with right-aligned numeric columns and widths + computed from the data for nice alignment in plain-text consoles. + """ + # Extract as strings (observed/theory already formatted in rows). + data: list[tuple[str, str, str, str, str, str]] = [ + ( + r["symbol"], + r["name"], + str(r["theory"]), + str(r["observed"]), + str(r["abs_diff"]), + str(r["rel_diff_pct"]), + ) + for r in rows + ] + + headers = ("sym", "metric", "theory", "observed", "abs", "rel%") + + # Compute column widths. + w_sym = max(len(headers[0]), *(len(d[0]) for d in data)) + w_met = max(len(headers[1]), *(len(d[1]) for d in data)) + w_th = max(len(headers[2]), *(len(d[2]) for d in data)) + w_ob = max(len(headers[3]), *(len(d[3]) for d in data)) + w_abs = max(len(headers[4]), *(len(d[4]) for d in data)) + w_rel = max(len(headers[5]), *(len(d[5]) for d in data)) + + # Title and separators sized to the header length. + header_line = ( + f"{headers[0]:<{w_sym}} {headers[1]:<{w_met}} " + f"{headers[2]:>{w_th}} {headers[3]:>{w_ob}} " + f"{headers[4]:>{w_abs}} {headers[5]:>{w_rel}}" + ) + sep = "-" * len(header_line) + title = "MM1 - Theory vs Observed" + top = "=" * max(len(title), len(header_line)) + + lines: list[str] = [ + top, + title, + sep, + header_line, + sep, + ] + + for sym, met, th, ob, ad, rd in data: + lines.append( + f"{sym:<{w_sym}} {met:<{w_met}} " + f"{th:>{w_th}} {ob:>{w_ob}} {ad:>{w_abs}} {rd:>{w_rel}}", + ) + + lines.append(top) + return "\n".join(lines) + + def compare_and_format( + self, + payload: SimulationPayload, + ra: ResultsAnalyzer, + ) -> str: + """ + Convenience: run `compare_against_run()` and return a formatted table. + + Use this when you want a ready-to-print string for logs/CLI output. + """ + rows = self.compare_against_run(payload, ra) + return self._format_rows_table(rows) + + def print_comparison( + self, + payload: SimulationPayload, + ra: ResultsAnalyzer, + *, + file: TextIO | None = None, + ) -> None: + """ + Print a pretty 'theory vs observed' table to the given stream. + + Parameters + ---------- + payload : SimulationPayload + The validated simulation payload. + ra : ResultsAnalyzer + Results analyzer with processed metrics. + file : TextIO | None + Output stream (defaults to stdout). + + """ + out = self.compare_and_format(payload, ra) + stream: TextIO = sys.stdout if file is None else file + print(out, file=stream) + diff --git a/src/asyncflow/runtime/simulation_runner.py b/src/asyncflow/runner/simulation.py similarity index 97% rename from src/asyncflow/runtime/simulation_runner.py rename to src/asyncflow/runner/simulation.py index 08ae0df..8669266 100644 --- a/src/asyncflow/runtime/simulation_runner.py +++ b/src/asyncflow/runner/simulation.py @@ -12,8 +12,8 @@ import simpy import yaml -from asyncflow.metrics.analyzer import ResultsAnalyzer from asyncflow.metrics.collector import SampledMetricCollector +from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer from asyncflow.resources.registry import ResourcesRuntime from asyncflow.runtime.actors.client import ClientRuntime from asyncflow.runtime.actors.edge import EdgeRuntime @@ -199,9 +199,6 @@ def _build_load_balancer(self) -> None: lb_box=self._make_inbox(), ) - - - def _build_edges(self) -> None: """Initialization of the edges runtime dictionary from the input data""" # We need to merge all previous dictionary for the nodes to assign @@ -397,5 +394,13 @@ def from_yaml( payload = SimulationPayload.model_validate(data) return cls(env=env, simulation_input=payload) + # Method usefull to pass to the sweep class a payload + # directly from a yaml + @classmethod + def payload_from_yaml(cls, yaml_path: str | Path) -> SimulationPayload: + """Helper to return a valid payload""" + data = yaml.safe_load(Path(yaml_path).read_text()) + return SimulationPayload.model_validate(data) + diff --git a/src/asyncflow/runner/sweep.py b/src/asyncflow/runner/sweep.py new file mode 100644 index 0000000..ed87819 --- /dev/null +++ b/src/asyncflow/runner/sweep.py @@ -0,0 +1,129 @@ +""" +class to define method to iterate over some variables of the input +to evaluate how a given scenario with different initial conditions +(for example the number of concurrent users), behave. It is really +useful to find insights and analyze eventual breakpoint on a given +topology. Right now the class will accept only as a varying parameter +the concurrent users, in the future we will extend it to arbitrary +parameters. +""" + + +import simpy + +from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer +from asyncflow.runner.simulation import SimulationRunner +from asyncflow.schemas.payload import SimulationPayload + + +class Sweep: + """ + Class to manage scenario when we want to iterate over a + set of initial data to see for example the impact on a defined + topology varying the initial workload + """ + + def __init__( + self, + *, + # passing the object class not instance of the class + # Why: + # - Each sweep run must be isolated: fresh Environment, fresh state, + # fresh queues. + # - Reusing a single instance would carry state from the previous run + # (SimPy processes, resources, partial metrics, RNG state, etc.) + # → tainted results. + # - By passing the CLASS we can instantiate on demand inside the loop, + # guaranteeing a fresh object for every grid point. + simulation_cls: type[SimulationRunner] = SimulationRunner, + ) -> None: + """ + Instantiation of the sweep class + Args: + simulation_cls (type[SimulationRunner], optional): object of + the SimulationRunner class + """ + self.simulation_cls = simulation_cls + + # to trace the last grid + self._last_users_grid: list[int] = [] + + # --------------------------------------------------- + # Helpers + # --------------------------------------------------- + + @staticmethod + def _default_env_factory() -> simpy.Environment: + """Ritorna un Environment nuovo e pulito per ogni run.""" + return simpy.Environment() + + + + #---------------------------------------------------- + # Method to iterate over the users + # --------------------------------------------------- + + def sweep_on_user( + self, + # we pass a validated payload from yaml or from + # the pythonic builder + payload: SimulationPayload, + user_lower_bound: int, + user_upper_bound: int, + step: int, + ) -> list[tuple[int, ResultsAnalyzer]]: + """ + Function to prepare a list of results analzyer + with all the data necessary to evaluate how the + topology react on a given scenario by varying the + average concurrent users + """ + # Error handling to have a coherent interval + if step <= 0: + msg = "step must be > 0" + raise ValueError(msg) + + if user_lower_bound <= 0 or user_upper_bound <= 0: + msg = "The lower and upper bound must be strictly bigger than 0" + raise ValueError(msg) + + if user_upper_bound < user_lower_bound: + msg = "user_upper_bound must be >= user_lower_bound" + raise ValueError(msg) + + # definition of the grid + users_grid: list[int] = list( + range(user_lower_bound, user_upper_bound + 1, step)) + self._last_users_grid = users_grid.copy() + + # last grid used + self._last_users_grid = users_grid[:] + + results: list[tuple[int, ResultsAnalyzer]] = [] + + # Iteration to populate the list + for users in users_grid: + # 1) payload override + payload = payload.model_copy(deep=True) + payload.rqs_input.avg_active_users = ( + payload.rqs_input.avg_active_users.model_copy( + update={"mean": users}, + ) +) + + # 2) instantiation of the new object for the simulation run + runner = self.simulation_cls( + env=self._default_env_factory(), + simulation_input=payload, + ) + + analyzer = runner.run() + + # 3) Accumulation of the analyzer + results.append((users, analyzer)) + + return results + + + + diff --git a/src/asyncflow/runtime/actors/routing/lb_algorithms.py b/src/asyncflow/runtime/actors/routing/lb_algorithms.py index 47f950d..49763ba 100644 --- a/src/asyncflow/runtime/actors/routing/lb_algorithms.py +++ b/src/asyncflow/runtime/actors/routing/lb_algorithms.py @@ -35,7 +35,6 @@ def round_robin( return value - LB_TABLE: dict[LbAlgorithmsName, Callable[[OrderedDict[str, EdgeRuntime]], EdgeRuntime]] = { LbAlgorithmsName.LEAST_CONNECTIONS: least_connections, diff --git a/src/asyncflow/runtime/actors/server.py b/src/asyncflow/runtime/actors/server.py index 2fae18e..524cab3 100644 --- a/src/asyncflow/runtime/actors/server.py +++ b/src/asyncflow/runtime/actors/server.py @@ -3,8 +3,9 @@ during the simulation """ - -from collections.abc import Generator +from collections import defaultdict +from collections.abc import Generator, Mapping +from types import MappingProxyType from typing import cast import numpy as np @@ -15,12 +16,13 @@ EndpointStepCPU, EndpointStepIO, EndpointStepRAM, + EventMetricName, SampledMetricName, ServerResourceName, StepOperation, SystemNodes, ) -from asyncflow.metrics.server import build_server_metrics +from asyncflow.metrics.server import ServerClock, build_server_metrics from asyncflow.resources.server_containers import ServerContainers from asyncflow.runtime.actors.edge import EdgeRuntime from asyncflow.runtime.rqs_state import RequestState @@ -29,10 +31,34 @@ from asyncflow.schemas.settings.simulation import SimulationSettings from asyncflow.schemas.topology.nodes import Server +# Initialization of the nested dict to collect the metrics +# for the server +MetricValue = ServerClock | float +MetricBucket = dict[EventMetricName, MetricValue] class ServerRuntime: """class to define the server during the simulation""" + @staticmethod + def _new_metric_bucket() -> MetricBucket: + """ + Factory for a per-request metric bucket. + Returns a fresh dict pre-populated with cumulative metrics that + always start at 0.0 (I/O time, waiting time, service time). + Event-specific clocks (e.g. RQS_SERVER_CLOCK) are added later + when the request is actually dispatched. + This function is used as the `default_factory` for the + `_server_rqs_clock` defaultdict, so each new request id gets + its own independent bucket automatically. + """ + return { + EventMetricName.IO_TIME: 0.0, + EventMetricName.WAITING_TIME: 0.0, + EventMetricName.SERVICE_TIME: 0.0, + # RQS_SERVER_CLOCK will be added in the dispatcher + } + + def __init__( # noqa: PLR0913 self, *, @@ -79,6 +105,9 @@ def __init__( # noqa: PLR0913 settings.enabled_sample_metrics, ) + self._server_rqs_clock: defaultdict[int, MetricBucket] + self._server_rqs_clock = defaultdict(self._new_metric_bucket) + # ------------------------------------------------------------------ # HELPERS # ------------------------------------------------------------------ @@ -118,6 +147,46 @@ def _compute_latency_io( # Main function to elaborate a request # ------------------------------------------------------------------- + def _dispatcher(self) -> Generator[simpy.Event, None, None]: + """ + The main dispatcher loop. It pulls requests from the inbox and + spawns a new '_handle_request' process for each one. + """ + # we assume in the current model that there is a one + # to one correspondence between cpu cores and workers + # before entering in the loop in the current implementation + # we reserve the ram necessary to run the processes + if self.server_config.ram_per_process: + processes_ram = ( + self.server_config.ram_per_process * + self.server_config.server_resources.cpu_cores + ) + + yield self.server_resources[ + ServerResourceName.RAM.value + ].get(processes_ram) + + + while True: + # Wait for a request to arrive in the server's inbox + raw_state = yield self.server_box.get() + request_state = cast("RequestState", raw_state) + + # Start the collection of the metric initializing + # the principal key that is the unique id of the + # state elaborated + bucket = self._server_rqs_clock[request_state.id] + bucket[EventMetricName.RQS_SERVER_CLOCK] = ServerClock( + start=self.env.now, + ) + + # Spawn a new, independent process to handle this request + self.env.process(self._handle_request(request_state)) + + def start(self) -> simpy.Process: + """Generate the process to simulate the server inside simpy env""" + return self.env.process(self._dispatcher()) + # right now we disable the warnings but a refactor will be done soon def _handle_request( # noqa: PLR0915, PLR0912, C901 self, @@ -199,6 +268,7 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901 core_locked = False is_in_io_queue = False waiting_cpu = False + wait_start: float | None = None # --- Step Execution: CPU & I/O dynamics --- @@ -251,6 +321,11 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901 is_in_io_queue = False self._el_io_queue_len -= 1 + # core_locked is a local variable just for the single request + # if the request already block the core so we avoid all the if + # conditions and we add the coroutine, if it is not blocked, we + # have to ask for a core, because it might be occupy from another + # request if not core_locked: # simpy create an event and if it can be satisfied is triggered cpu_req = self.server_resources[ServerResourceName.CPU.value].get(1) @@ -258,6 +333,7 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901 # no trigger ready queue without execution if not cpu_req.triggered: waiting_cpu = True + wait_start = self.env.now self._el_ready_queue_len += 1 # at this point wait for the cpu @@ -265,6 +341,18 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901 # here the cpu is free if waiting_cpu: + assert wait_start is not None + bucket = self._server_rqs_clock[state.id] + + # mypy assert + value = bucket[EventMetricName.WAITING_TIME] + assert isinstance(value, float) + + # assign delta + bucket[EventMetricName.WAITING_TIME] = ( + value + (self.env.now - wait_start) + ) + wait_start = None waiting_cpu = False self._el_ready_queue_len -= 1 @@ -273,6 +361,16 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901 cpu_time = self._compute_latency_cpu( step.step_operation[StepOperation.CPU_TIME], ) + + bucket = self._server_rqs_clock[state.id] + + # mypy assertion + value = bucket[EventMetricName.SERVICE_TIME] + assert isinstance(value, float) + + # delta assignment + bucket[EventMetricName.SERVICE_TIME] = value + cpu_time + # Execute the step giving back the control to the simpy env yield self.env.timeout(cpu_time) @@ -300,6 +398,14 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901 is_in_io_queue = True self._el_io_queue_len += 1 + bucket = self._server_rqs_clock[state.id] + + # assert for mypy + value = bucket[EventMetricName.IO_TIME] + assert isinstance(value, float) + + # assign the delta + bucket[EventMetricName.IO_TIME] = value + io_time yield self.env.timeout(io_time) if core_locked: @@ -314,17 +420,19 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901 waiting_cpu = False self._el_ready_queue_len -= 1 - if total_ram: - self._ram_in_use -= total_ram yield self.server_resources[ServerResourceName.RAM.value].put(total_ram) + bucket = self._server_rqs_clock[state.id] + clock = cast("ServerClock", bucket[EventMetricName.RQS_SERVER_CLOCK]) + clock.finish = self.env.now + assert self.out_edge is not None self.out_edge.transport(state) - # we need three accessor because we need to read these private attribute + # we need these accessor because we need to read these private attribute # in the sampled metric collector @property def ready_queue_len(self) -> int: @@ -346,36 +454,28 @@ def enabled_metrics(self) -> dict[SampledMetricName, list[float | int]]: """Read-only access to the metric store.""" return self._server_enabled_metrics - - - def _dispatcher(self) -> Generator[simpy.Event, None, None]: - """ - The main dispatcher loop. It pulls requests from the inbox and - spawns a new '_handle_request' process for each one. + @property + def server_rqs_clock(self) -> Mapping[int, MetricBucket]: """ - # we assume in the current model that there is a one - # to one correspondence between cpu cores and workers - # before entering in the loop in the current implementation - # we reserve the ram necessary to run the processes - if self.server_config.ram_per_process: - processes_ram = ( - self.server_config.ram_per_process * - self.server_config.server_resources.cpu_cores - ) + Read-only snapshot of the per-request server metrics. + + Returns + ------- + Mapping[int, MetricBucket] + A mapping from request id → metric bucket, where each bucket is a + dict[EventMetricName, float | ServerClock]. The top-level mapping is + immutable (cannot add/remove keys) and is created from a shallow copy + to avoid defaultdict autovivification. + + Notes + ----- + This is a *snapshot* of the current state: as the server runs, the + underlying buckets may continue to change. + Buckets themselves are not frozen; **do not mutate them** from callers. + Treat the returned structure as read-only. - yield self.server_resources[ - ServerResourceName.RAM.value - ].get(processes_ram) + """ + return MappingProxyType(dict(self._server_rqs_clock)) - while True: - # Wait for a request to arrive in the server's inbox - raw_state = yield self.server_box.get() - request_state = cast("RequestState", raw_state) - # Spawn a new, independent process to handle this request - self.env.process(self._handle_request(request_state)) - - def start(self) -> simpy.Process: - """Generate the process to simulate the server inside simpy env""" - return self.env.process(self._dispatcher()) diff --git a/src/asyncflow/schemas/settings/simulation.py b/src/asyncflow/schemas/settings/simulation.py index 7f0d145..4dbf5d4 100644 --- a/src/asyncflow/schemas/settings/simulation.py +++ b/src/asyncflow/schemas/settings/simulation.py @@ -32,6 +32,10 @@ class SimulationSettings(BaseModel): enabled_event_metrics: set[EventMetricName] = Field( default_factory=lambda: { EventMetricName.RQS_CLOCK, + EventMetricName.RQS_SERVER_CLOCK, + EventMetricName.SERVICE_TIME, + EventMetricName.IO_TIME, + EventMetricName.WAITING_TIME, }, description="Which per-event KPIs to collect by default.", ) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 2c94d6f..eb148a5 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -7,7 +7,7 @@ import pytest import simpy -from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.runner.simulation import SimulationRunner if TYPE_CHECKING: from collections.abc import Callable diff --git a/tests/integration/event_injection/lb_two_servers.py b/tests/integration/event_injection/lb_two_servers.py index 3c29e95..2a3e86a 100644 --- a/tests/integration/event_injection/lb_two_servers.py +++ b/tests/integration/event_injection/lb_two_servers.py @@ -21,7 +21,7 @@ import simpy from asyncflow.config.constants import Distribution, EventDescription, LatencyKey -from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.runner.simulation import SimulationRunner from asyncflow.schemas.common.random_variables import RVConfig from asyncflow.schemas.events.injection import EventInjection from asyncflow.schemas.payload import SimulationPayload @@ -38,7 +38,7 @@ from asyncflow.schemas.workload.rqs_generator import RqsGenerator if TYPE_CHECKING: - from asyncflow.metrics.analyzer import ResultsAnalyzer + from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer def _server(sid: str) -> Server: diff --git a/tests/integration/event_injection/single_server.py b/tests/integration/event_injection/single_server.py index 4431786..c6fe33a 100644 --- a/tests/integration/event_injection/single_server.py +++ b/tests/integration/event_injection/single_server.py @@ -20,7 +20,7 @@ import simpy from asyncflow.config.constants import Distribution, EventDescription, LatencyKey -from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.runner.simulation import SimulationRunner from asyncflow.schemas.common.random_variables import RVConfig from asyncflow.schemas.events.injection import EventInjection from asyncflow.schemas.payload import SimulationPayload @@ -37,7 +37,7 @@ from asyncflow.schemas.workload.rqs_generator import RqsGenerator if TYPE_CHECKING: - from asyncflow.metrics.analyzer import ResultsAnalyzer + from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer def _server(sid: str) -> Server: diff --git a/tests/integration/load_balancer/test_lb_basic.py b/tests/integration/load_balancer/test_lb_basic.py index 0bcebad..cd21509 100644 --- a/tests/integration/load_balancer/test_lb_basic.py +++ b/tests/integration/load_balancer/test_lb_basic.py @@ -24,7 +24,7 @@ SampledMetricName, StepOperation, ) -from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.runner.simulation import SimulationRunner from asyncflow.schemas.common.random_variables import RVConfig from asyncflow.schemas.payload import SimulationPayload from asyncflow.schemas.settings.simulation import SimulationSettings @@ -44,7 +44,7 @@ from asyncflow.schemas.workload.rqs_generator import RqsGenerator if TYPE_CHECKING: - from asyncflow.metrics.analyzer import ResultsAnalyzer + from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer def _server(server_id: str) -> Server: diff --git a/tests/integration/minimal/conftest.py b/tests/integration/minimal/conftest.py index f29bf49..3c1aebe 100644 --- a/tests/integration/minimal/conftest.py +++ b/tests/integration/minimal/conftest.py @@ -15,7 +15,7 @@ import simpy from asyncflow.config.constants import TimeDefaults -from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.runner.simulation import SimulationRunner from asyncflow.schemas.common.random_variables import RVConfig from asyncflow.schemas.workload.rqs_generator import RqsGenerator diff --git a/tests/integration/minimal/test_minimal.py b/tests/integration/minimal/test_minimal.py index 7ae9507..0688706 100644 --- a/tests/integration/minimal/test_minimal.py +++ b/tests/integration/minimal/test_minimal.py @@ -18,8 +18,8 @@ import pytest import simpy -from asyncflow.metrics.analyzer import ResultsAnalyzer -from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer +from asyncflow.runner.simulation import SimulationRunner if TYPE_CHECKING: from asyncflow.schemas.payload import SimulationPayload diff --git a/tests/integration/single_server/conftest.py b/tests/integration/single_server/conftest.py index f45633a..7ecf298 100644 --- a/tests/integration/single_server/conftest.py +++ b/tests/integration/single_server/conftest.py @@ -16,7 +16,7 @@ import simpy if TYPE_CHECKING: # heavy imports only when type-checking - from asyncflow.runtime.simulation_runner import SimulationRunner + from asyncflow.runner.simulation import SimulationRunner # --------------------------------------------------------------------------- # @@ -38,7 +38,7 @@ def runner(env: simpy.Environment) -> SimulationRunner: :pymeth:`SimulationRunner.from_yaml`. """ # import deferred to avoid ruff TC001 - from asyncflow.runtime.simulation_runner import SimulationRunner # noqa: PLC0415 + from asyncflow.runner.simulation import SimulationRunner # noqa: PLC0415 yaml_path: Path = ( Path(__file__).parent / "data" / "single_server.yml" diff --git a/tests/integration/single_server/test_int_single_server.py b/tests/integration/single_server/test_int_single_server.py index efb1ef9..5498611 100644 --- a/tests/integration/single_server/test_int_single_server.py +++ b/tests/integration/single_server/test_int_single_server.py @@ -18,8 +18,8 @@ from asyncflow.config.constants import LatencyKey, SampledMetricName if TYPE_CHECKING: # only needed for type-checking - from asyncflow.metrics.analyzer import ResultsAnalyzer - from asyncflow.runtime.simulation_runner import SimulationRunner + from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer + from asyncflow.runner.simulation import SimulationRunner # --------------------------------------------------------------------------- # diff --git a/tests/system/test_sys_ev_inj_lb_two_servers.py b/tests/system/test_sys_ev_inj_lb_two_servers.py index 15e978b..91ba0e0 100644 --- a/tests/system/test_sys_ev_inj_lb_two_servers.py +++ b/tests/system/test_sys_ev_inj_lb_two_servers.py @@ -33,12 +33,12 @@ from asyncflow import AsyncFlow from asyncflow.components import Client, Edge, Endpoint, LoadBalancer, Server from asyncflow.config.constants import LatencyKey -from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.runner.simulation import SimulationRunner from asyncflow.settings import SimulationSettings from asyncflow.workload import RqsGenerator if TYPE_CHECKING: - from asyncflow.metrics.analyzer import ResultsAnalyzer + from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer from asyncflow.schemas.payload import SimulationPayload pytestmark = [ diff --git a/tests/system/test_sys_ev_inj_single_server.py b/tests/system/test_sys_ev_inj_single_server.py index e1132c3..da5afb7 100644 --- a/tests/system/test_sys_ev_inj_single_server.py +++ b/tests/system/test_sys_ev_inj_single_server.py @@ -36,12 +36,12 @@ from asyncflow import AsyncFlow from asyncflow.components import Client, Edge, Endpoint, Server from asyncflow.config.constants import LatencyKey -from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.runner.simulation import SimulationRunner from asyncflow.settings import SimulationSettings from asyncflow.workload import RqsGenerator if TYPE_CHECKING: - from asyncflow.metrics.analyzer import ResultsAnalyzer + from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer from asyncflow.schemas.payload import SimulationPayload pytestmark = [ diff --git a/tests/system/test_sys_lb_two_servers.py b/tests/system/test_sys_lb_two_servers.py index b273065..47bb0ee 100644 --- a/tests/system/test_sys_lb_two_servers.py +++ b/tests/system/test_sys_lb_two_servers.py @@ -27,13 +27,13 @@ from asyncflow import AsyncFlow from asyncflow.components import Client, Edge, Endpoint, LoadBalancer, Server from asyncflow.config.constants import LatencyKey -from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.runner.simulation import SimulationRunner from asyncflow.settings import SimulationSettings from asyncflow.workload import RqsGenerator if TYPE_CHECKING: # Imported only for type checking (ruff: TC001) - from asyncflow.metrics.analyzer import ResultsAnalyzer + from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer from asyncflow.schemas.payload import SimulationPayload pytestmark = [ diff --git a/tests/system/test_sys_single_server.py b/tests/system/test_sys_single_server.py index ff2cd32..c8d2034 100644 --- a/tests/system/test_sys_single_server.py +++ b/tests/system/test_sys_single_server.py @@ -26,13 +26,13 @@ from asyncflow import AsyncFlow from asyncflow.components import Client, Edge, Endpoint, Server from asyncflow.config.constants import LatencyKey -from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.runner.simulation import SimulationRunner from asyncflow.settings import SimulationSettings from asyncflow.workload import RqsGenerator if TYPE_CHECKING: # Imported only for type checking (ruff: TC001) - from asyncflow.metrics.analyzer import ResultsAnalyzer + from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer from asyncflow.schemas.payload import SimulationPayload pytestmark = [ diff --git a/tests/unit/metrics/test_analyzer.py b/tests/unit/metrics/test_simulation_analyzer.py similarity index 65% rename from tests/unit/metrics/test_analyzer.py rename to tests/unit/metrics/test_simulation_analyzer.py index 901b646..4afc6a8 100644 --- a/tests/unit/metrics/test_analyzer.py +++ b/tests/unit/metrics/test_simulation_analyzer.py @@ -19,7 +19,9 @@ from matplotlib.figure import Figure from asyncflow.analysis import ResultsAnalyzer +from asyncflow.config.constants import EventMetricName from asyncflow.enums import SampledMetricName +from asyncflow.metrics.server import ServerClock if TYPE_CHECKING: from asyncflow.runtime.actors.client import ClientRuntime @@ -73,6 +75,9 @@ def __init__(self, identifier: str, metrics: dict[str, list[float]]) -> None: self.enabled_metrics = { DummyName(name): values for name, values in metrics.items() } + self.server_rqs_clock: dict[ + int, + dict[EventMetricName, float | ServerClock]] = {} class DummyEdgeConfig: @@ -288,3 +293,145 @@ def test_plot_single_server_ram( assert any(lbl.lower().startswith("max") for lbl in labels) assert len(labels) == 3 +# --------------------------------------------------------------- +# Test server event metric +# --------------------------------------------------------------- + +def _mk_bucket( + st: float, io: float, wt: float, start: float, finish: float, +) -> dict[EventMetricName, float | ServerClock]: + """Helper to build one metric bucket like the real server does.""" + return { + EventMetricName.SERVICE_TIME: st, + EventMetricName.IO_TIME: io, + EventMetricName.WAITING_TIME: wt, + EventMetricName.RQS_SERVER_CLOCK: ServerClock(start=start, finish=finish), + } + + +def test_get_server_event_arrays_extracts_fields( + sim_settings: SimulationSettings) -> None: + """Analyzer should extract per-request arrays for a server from its buckets.""" + sim_settings.total_simulation_time = 2 + sim_settings.sample_period_s = 0.5 + + client = DummyClient([]) + + srv = DummyServer("srvA", { + "ready_queue_len": [0, 0], + "event_loop_io_sleep": [0, 1], + "ram_in_use": [128.0, 256.0], + }) + # Populate per-request buckets (two requests) + srv.server_rqs_clock = { + 1: _mk_bucket(0.004, 0.010, 0.001, 0.100, 0.115), # latency 0.015 + 2: _mk_bucket(0.006, 0.020, 0.000, 0.300, 0.326), # latency 0.026 + } + + an = ResultsAnalyzer( + client=cast("ClientRuntime", client), + servers=[cast("ServerRuntime", srv)], + edges=[], + settings=sim_settings, + ) + + arrays = an.get_server_event_arrays() + assert "srvA" in arrays + a = arrays["srvA"] + + # Order of buckets is not relevant; compare sorted values + assert sorted(a["service_time"]) == [0.004, 0.006] + assert sorted(a["io_time"]) == [0.010, 0.020] + assert sorted(a["waiting_time"]) == [0.000, 0.001] + assert pytest.approx(sorted(a["latencies"])) == sorted([0.015, 0.026]) + assert pytest.approx(sorted(a["finish_times"])) == sorted([0.115, 0.326]) + + +def test_get_server_throughput_series_per_server( + sim_settings: SimulationSettings) -> None: + """Throughput per-server should count completions within each fixed window.""" + sim_settings.total_simulation_time = 3 + sim_settings.sample_period_s = 0.5 + client = DummyClient([]) + + srv = DummyServer("srvT", {}) + # Three completions at 0.8s, 1.2s, 2.6s + srv.server_rqs_clock = { + 10: _mk_bucket(0.001, 0.002, 0.000, 0.00, 0.80), + 11: _mk_bucket(0.001, 0.002, 0.000, 0.90, 1.20), + 12: _mk_bucket(0.001, 0.002, 0.000, 2.30, 2.60), + } + + an = ResultsAnalyzer( + client=cast("ClientRuntime", client), + servers=[cast("ServerRuntime", srv)], + edges=[], + settings=sim_settings, + ) + + # 1s windows → boundaries at 1.0, 2.0, 3.0 → counts [1,1,1] + ts1, rps1 = an.get_server_throughput_series("srvT", window_s=1.0) + assert ts1 == [1.0, 2.0, 3.0] + assert rps1 == [1.0, 1.0, 1.0] + + # 0.5s windows → boundaries 0.5,1.0,1.5,2.0,2.5,3.0 + # counts per window [0,1,1,0,0,1] → rates [0,2,2,0,0,2] + ts2, rps2 = an.get_server_throughput_series("srvT", window_s=0.5) + assert ts2[:6] == [0.5, 1.0, 1.5, 2.0, 2.5, 3.0] + assert rps2[:6] == [0.0, 2.0, 2.0, 0.0, 0.0, 2.0] + + +def test_plot_server_event_metrics_dashboard_smoke_and_legends( + sim_settings: SimulationSettings, +) -> None: + """Dashboard (latency/service/io/wait) should set titles and show a legend.""" + sim_settings.total_simulation_time = 1 + client = DummyClient([]) + + srv = DummyServer("srvZ", {}) + srv.server_rqs_clock = { + 1: _mk_bucket(0.003, 0.012, 0.000, 0.10, 0.115), + 2: _mk_bucket(0.007, 0.018, 0.002, 0.20, 0.230), + 3: _mk_bucket(0.005, 0.010, 0.001, 0.30, 0.315), + } + + an = ResultsAnalyzer( + client=cast("ClientRuntime", client), + servers=[cast("ServerRuntime", srv)], + edges=[], + settings=sim_settings, + ) + + fig = Figure() + ax_lat, ax_svc, ax_io, ax_wait = fig.subplots(2, 2).ravel() + an.plot_server_event_metrics_dashboard(ax_lat, ax_svc, ax_io, ax_wait, "srvZ") + + # Titles contain expected labels + assert "Server latency — srvZ" in ax_lat.get_title() + assert "CPU service time — srvZ" in ax_svc.get_title() + assert "I/O time — srvZ" in ax_io.get_title() + assert "CPU waiting time — srvZ" in ax_wait.get_title() + + # Legends exist and contain at least 'mean' (and 'P50' on latency pane) + for ax in (ax_lat, ax_svc, ax_io, ax_wait): + lg = ax.get_legend() + assert lg is not None + labels = [t.get_text().lower() for t in lg.get_texts()] + assert any(lbl.startswith("mean") for lbl in labels) + # Latency pane also shows P50 + lat_labels = [t.get_text() for t in ax_lat.get_legend().get_texts()] + assert any("P50" in s for s in lat_labels) + + +def test_plot_server_timeseries_dashboard_sets_titles( + analyzer_with_metrics: ResultsAnalyzer, +) -> None: + """Time-series dashboard for a server wires the three single-plot helpers.""" + fig = Figure() + ax_ready, ax_io, ax_ram = fig.subplots(1, 3) + analyzer_with_metrics.plot_server_timeseries_dashboard( + ax_ready, ax_io, ax_ram, "srvX") + + assert "Ready Queue" in ax_ready.get_title() + assert "I/O Queue" in ax_io.get_title() + assert "RAM" in ax_ram.get_title() diff --git a/tests/unit/metrics/test_sweep_analyzer.py b/tests/unit/metrics/test_sweep_analyzer.py new file mode 100644 index 0000000..2717f4c --- /dev/null +++ b/tests/unit/metrics/test_sweep_analyzer.py @@ -0,0 +1,261 @@ +"""Unit tests for SweepAnalyzer (global and per-server collections & plots).""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, cast + +import matplotlib as mpl +import matplotlib.pyplot as plt +import pytest + +from asyncflow.config.constants import LatencyKey +from asyncflow.metrics.sweep_analyzer import SweepAnalyzer + +# Headless backend for CI +mpl.use("Agg") + +if TYPE_CHECKING: # pragma: no cover + from collections.abc import Iterable + + from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer + + +# --------------------------------------------------------------------------- +# Fakes +# --------------------------------------------------------------------------- + + +class _FakeResultsAnalyzer: + """Minimal fake of ResultsAnalyzer for SweepAnalyzer tests.""" + + def __init__( + self, + *, + rps_series: list[float], + latency_stats: dict[LatencyKey, float], + server_ids: list[str], + server_arrays: dict[str, dict[str, list[float]]], + ) -> None: + self._rps_series = list(rps_series) + self._latency_stats = dict(latency_stats) + self._server_ids = list(server_ids) + self._server_arrays = { + sid: {k: list(v) for k, v in arrays.items()} + for sid, arrays in server_arrays.items() + } + self.process_calls = 0 + + # Public API mirrored from the real analyzer + + def process_all_metrics(self) -> None: + self.process_calls += 1 + + def get_throughput_series(self) -> tuple[list[float], list[float]]: + """Return (timestamps, rps). We only care about the RPS values.""" + n = len(self._rps_series) + timestamps = [float(i + 1) for i in range(n)] + return timestamps, list(self._rps_series) + + def get_latency_stats(self) -> dict[LatencyKey, float]: + return dict(self._latency_stats) + + def list_server_ids(self) -> list[str]: + return list(self._server_ids) + + def get_server_event_arrays(self) -> dict[str, dict[str, list[float]]]: + return { + sid: {k: list(v) for k, v in arrays.items()} + for sid, arrays in self._server_arrays.items() + } + + +def _cast_pair( + users: int, + fra: _FakeResultsAnalyzer, +) -> tuple[int, ResultsAnalyzer]: + """Cast helper to satisfy mypy on constructor signature.""" + return users, cast("ResultsAnalyzer", fra) + + +# --------------------------------------------------------------------------- +# Tests — Global collection and plots +# --------------------------------------------------------------------------- + + +def test_global_collection_and_plots_match_expected_values() -> None: + """Global throughput mean and mean latency must match the inputs.""" + # Pair 1: mean RPS = 100, mean latency = 0.05 + ra1 = _FakeResultsAnalyzer( + rps_series=[100.0, 110.0, 90.0], + latency_stats={ + LatencyKey.MEAN: 0.05, + LatencyKey.MEDIAN: 0.04, + LatencyKey.P95: 0.10, + LatencyKey.P99: 0.20, + }, + server_ids=["s1"], + server_arrays={ + "s1": { + "service_time": [0.01], + "waiting_time": [0.005], + "finish_times": [0.1, 0.2], + }, + }, + ) + + # Pair 2: mean RPS = 200, mean latency = 0.06 + ra2 = _FakeResultsAnalyzer( + rps_series=[200.0, 190.0, 210.0], + latency_stats={ + LatencyKey.MEAN: 0.06, + LatencyKey.MEDIAN: 0.05, + LatencyKey.P95: 0.12, + LatencyKey.P99: 0.22, + }, + server_ids=["s1"], + server_arrays={ + "s1": { + "service_time": [0.01], + "waiting_time": [0.004], + "finish_times": [0.3, 0.4], + }, + }, + ) + + sa = SweepAnalyzer( + [_cast_pair(10, ra1), _cast_pair(20, ra2)], + ) + + # Global dashboard and line data + fig = sa.plot_global_dashboard() + axes = fig.get_axes() + assert len(axes) == 2 + + # Throughput axis + thr_line = axes[0].lines[0] + x_thr = list(cast("Iterable[float]", thr_line.get_xdata())) + y_thr = list(cast("Iterable[float]", thr_line.get_ydata())) + assert x_thr == [10, 20] + assert pytest.approx(y_thr) == [100.0, 200.0] + + # Latency axis + lat_line = axes[1].lines[0] + x_lat = list(cast("Iterable[float]", lat_line.get_xdata())) + y_lat = list(cast("Iterable[float]", lat_line.get_ydata())) + assert x_lat == [10, 20] + assert pytest.approx(y_lat) == [0.05, 0.06] + + plt.close(fig) + + +# --------------------------------------------------------------------------- +# Tests — Per-server collection and overlays +# --------------------------------------------------------------------------- + + +def test_server_overlays_compute_lambda_mu_rho_and_wq() -> None: + """Per-server metrics must follow completions split and means.""" + # lambda_tot = 100 (mean of [100, 100]) + # completions: s1=60, s2=40 -> lambda1=60, lambda2=40 + # mu1 = 1/0.01 = 100, mu2 = 1/0.02 = 50 + # rho1 = 0.6, rho2 = 0.8 + # wq means from waiting_time arrays + ra = _FakeResultsAnalyzer( + rps_series=[100.0, 100.0], + latency_stats={LatencyKey.MEAN: 0.05}, + server_ids=["s1", "s2"], + server_arrays={ + "s1": { + "service_time": [0.01] * 5, + "waiting_time": [0.005] * 5, + "finish_times": [0.0] * 60, + }, + "s2": { + "service_time": [0.02] * 5, + "waiting_time": [0.004] * 5, + "finish_times": [0.0] * 40, + }, + }, + ) + + sa = SweepAnalyzer([_cast_pair(10, ra)]) + + # Utilization overlay + fig1, ax1 = plt.subplots(1, 1) + sa.plot_server_utilization_overlay(ax1, server_ids=["s1", "s2"]) + lines = {line.get_label(): line for line in ax1.get_lines()} + assert "s1" in lines + assert "s2" in lines + y_rho_s1 = next(iter(cast("Iterable[float]", lines["s1"].get_ydata()))) + y_rho_s2 = next(iter(cast("Iterable[float]", lines["s2"].get_ydata()))) + assert pytest.approx(y_rho_s1) == 0.6 + assert pytest.approx(y_rho_s2) == 0.8 + plt.close(fig1) + + # Service rate overlay + fig2, ax2 = plt.subplots(1, 1) + sa.plot_server_service_rate_overlay(ax2, server_ids=["s1", "s2"]) + lines2 = {line.get_label(): line for line in ax2.get_lines()} + mu_s1 = next(iter(cast("Iterable[float]", lines2["s1"].get_ydata()))) + mu_s2 = next(iter(cast("Iterable[float]", lines2["s2"].get_ydata()))) + assert pytest.approx(mu_s1) == 100.0 + assert pytest.approx(mu_s2) == 50.0 + plt.close(fig2) + + # Waiting time overlay + fig3, ax3 = plt.subplots(1, 1) + sa.plot_server_waiting_time_overlay(ax3, server_ids=["s1", "s2"]) + lines3 = {line.get_label(): line for line in ax3.get_lines()} + wq_s1 = next(iter(cast("Iterable[float]", lines3["s1"].get_ydata()))) + wq_s2 = next(iter(cast("Iterable[float]", lines3["s2"].get_ydata()))) + assert pytest.approx(wq_s1) == 0.005 + assert pytest.approx(wq_s2) == 0.004 + plt.close(fig3) + + # Throughput overlay + fig4, ax4 = plt.subplots(1, 1) + sa.plot_server_throughput_overlay(ax4, server_ids=["s1", "s2"]) + lines4 = {line.get_label(): line for line in ax4.get_lines()} + lam_s1 = next(iter(cast("Iterable[float]", lines4["s1"].get_ydata()))) + lam_s2 = next(iter(cast("Iterable[float]", lines4["s2"].get_ydata()))) + assert pytest.approx(lam_s1) == 60.0 + assert pytest.approx(lam_s2) == 40.0 + plt.close(fig4) + + +# --------------------------------------------------------------------------- +# Tests — Caching behavior +# --------------------------------------------------------------------------- + + +def test_precollect_runs_each_analyzer_once_per_collector() -> None: + """precollect() plus plots should call process_all_metrics exactly twice.""" + # Two pairs to make sure iteration is covered. + ra1 = _FakeResultsAnalyzer( + rps_series=[10.0, 20.0], + latency_stats={LatencyKey.MEAN: 0.1}, + server_ids=["s1"], + server_arrays={"s1": {"service_time": [0.05], "waiting_time": [0.01]}}, + ) + ra2 = _FakeResultsAnalyzer( + rps_series=[30.0, 40.0], + latency_stats={LatencyKey.MEAN: 0.2}, + server_ids=["s2"], + server_arrays={"s2": {"service_time": [0.02], "waiting_time": [0.02]}}, + ) + + sa = SweepAnalyzer([_cast_pair(5, ra1), _cast_pair(15, ra2)]) + + # Warm caches (global + servers) + sa.precollect() + + # Plot both dashboards (should NOT trigger recomputation) + fig_g = sa.plot_global_dashboard() + fig_s = sa.plot_server_dashboard() + plt.close(fig_g) + plt.close(fig_s) + + # Each analyzer should have been processed exactly twice: + # once for global collection + once for server collection. + assert ra1.process_calls == 2 + assert ra2.process_calls == 2 diff --git a/tests/unit/queue_theory_analysis/test_mm1.py b/tests/unit/queue_theory_analysis/test_mm1.py new file mode 100644 index 0000000..0b69dc7 --- /dev/null +++ b/tests/unit/queue_theory_analysis/test_mm1.py @@ -0,0 +1,239 @@ +"""Unit tests for the MM1 queue-theory analyzer.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, cast + +import pytest + +from asyncflow.config.constants import LatencyKey +from asyncflow.queue_theory_analysis.mm1 import MM1 +from asyncflow.schemas.payload import SimulationPayload + +if TYPE_CHECKING: + from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- +def _make_mm1_payload( + *, + users_mean: float = 30.0, + rpm_per_user: float = 2.0, + service_mean_s: float = 0.4, + edge_latency_s: float | None = 0.0005, + total_time_s: int = 10, +) -> SimulationPayload: + """Build a minimal payload compatible with MM1 assumptions.""" + step = { + "kind": "cpu_bound_operation", + "step_operation": { + "cpu_time": {"mean": service_mean_s, "distribution": "exponential"}, + }, + } + + payload_dict = { + "rqs_input": { + "id": "gen-1", + "avg_active_users": {"mean": users_mean, "distribution": "poisson"}, + "avg_request_per_minute_per_user": { + "mean": rpm_per_user, + "distribution": "poisson", + }, + "user_sampling_window": 10, + }, + "topology_graph": { + "nodes": { + "client": {"id": "client-1"}, + "servers": [ + { + "id": "srv-1", + "server_resources": {"cpu_cores": 1, "ram_mb": 1024}, + "endpoints": [{"endpoint_name": "echo", "steps": [step]}], + }, + ], + "load_balancer": None, + }, + "edges": [ + { + "id": "gen-cli", + "source": "gen-1", + "target": "client-1", + "latency": 0.0004 if edge_latency_s is None else edge_latency_s, + }, + { + "id": "gen-srv", + "source": "gen-1", + "target": "srv-1", + "latency": 0.0004 if edge_latency_s is None else edge_latency_s, + }, + { + "id": "srv-cli", + "source": "srv-1", + "target": "client-1", + "latency": 0.0004 if edge_latency_s is None else edge_latency_s, + }, + ], + }, + "sim_settings": {"total_simulation_time": total_time_s}, + "events": None, + } + + return SimulationPayload.model_validate(payload_dict) + + +class _FakeResultsAnalyzer: + """Minimal fake ResultsAnalyzer for compare_against_run().""" + + def __init__( + self, + *, + total_time_s: float, + n_completed: int, + latencies_s: list[float], + service_times_s: list[float], + waiting_times_s: list[float], + ) -> None: + self._total_time_s = float(total_time_s) + self._n_completed = int(n_completed) + self._latencies = latencies_s + self._service = service_times_s + self._waiting = waiting_times_s + + def process_all_metrics(self) -> None: + """No-op for the fake analyzer.""" + + def get_throughput_series( + self, + _window_s: float | None = None, + ) -> tuple[list[float], list[float]]: + """Return evenly spaced windows with constant RPS from totals.""" + # Build 1-second windows; constant RPS = n / T. + n = float(self._n_completed) + t = float(self._total_time_s) if self._total_time_s > 0 else 1.0 + rps = n / t + steps = int(t) + timestamps = [float(i + 1) for i in range(steps)] + values = [rps for _ in range(steps)] + return timestamps, values + + def get_latency_stats(self) -> dict[LatencyKey, float]: + """Return only the mean latency (keyed by LatencyKey.MEAN).""" + if not self._latencies: + return {} + mean = float(sum(self._latencies)) / float(len(self._latencies)) + return {LatencyKey.MEAN: mean} + + def list_server_ids(self) -> list[str]: + """Report exactly one server id.""" + return ["srv-1"] + + def get_server_event_arrays(self) -> dict[str, dict[str, list[float]]]: + """Expose arrays with service and waiting times.""" + return { + "srv-1": { + "latencies": [], + "service_time": list(self._service), + "io_time": [], + "waiting_time": list(self._waiting), + "finish_times": [], + }, + } + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- +def test_mm1_is_compatible_and_evaluate_closed_form() -> None: + """A valid MM1 payload should be compatible and produce correct KPIs.""" + # lambda = 30 * 2 / 60 = 1.0; mu = 1/0.4 = 2.5; rho = 0.4 + payload = _make_mm1_payload( + users_mean=30.0, + rpm_per_user=2.0, + service_mean_s=0.4, + total_time_s=10, + ) + mm1 = MM1() + + assert mm1.is_compatible(payload) is True + assert mm1.explain_incompatibilities(payload) == [] + + out = mm1.evaluate(payload) + assert out["lambda_rate"] == pytest.approx(1.0, rel=1e-9, abs=1e-9) + assert out["mu_rate"] == pytest.approx(2.5, rel=1e-9, abs=1e-9) + assert out["rho"] == pytest.approx(0.4, rel=1e-9, abs=1e-9) + # W = 1/(mu-lambda) = 2/3; Wq = rho/(mu-lambda) = 0.2666... + assert out["W"] == pytest.approx(2.0 / 3.0, rel=1e-9, abs=1e-9) + assert out["Wq"] == pytest.approx(0.2666666667, rel=1e-9, abs=1e-9) + # L = lambda*W; Lq = lambda*Wq + assert out["L"] == pytest.approx(1.0 * (2.0 / 3.0), rel=1e-9, abs=1e-9) + assert out["Lq"] == pytest.approx(1.0 * 0.2666666667, rel=1e-9, abs=1e-9) + + +def test_mm1_compare_against_run_produces_rows_with_small_deltas() -> None: + """compare_against_run() should produce rows with tiny diffs for ideal data.""" + payload = _make_mm1_payload( + users_mean=30.0, + rpm_per_user=2.0, + service_mean_s=0.4, + total_time_s=10, + ) + # Theory: lambda=1, mu=2.5, rho=0.4, W=2/3, Wq≈0.2667 + n = 10 + t = 10.0 + w = 2.0 / 3.0 + wq = 0.2666666667 + + ra = _FakeResultsAnalyzer( + total_time_s=t, + n_completed=n, + latencies_s=[w] * n, + service_times_s=[0.4] * n, + waiting_times_s=[wq] * n, + ) + + mm1 = MM1() + rows = mm1.compare_against_run(payload, cast("ResultsAnalyzer", ra)) + + assert len(rows) == 7 + by_sym = {r["symbol"]: r for r in rows} + + lam = by_sym["λ"] + mu = by_sym["μ"] + rho = by_sym["rho"] + w_row = by_sym["W"] + wq_row = by_sym["Wq"] + l_row = by_sym["L"] + lq_row = by_sym["Lq"] + + # Observed equals theory in our synthetic scenario + assert float(lam["abs_diff"]) == pytest.approx(0.0, abs=1e-6) + assert float(mu["abs_diff"]) == pytest.approx(0.0, abs=1e-6) + assert float(rho["abs_diff"]) == pytest.approx(0.0, abs=1e-6) + assert float(w_row["abs_diff"]) == pytest.approx(0.0, abs=1e-6) + assert float(wq_row["abs_diff"]) == pytest.approx(0.0, abs=1e-6) + assert float(l_row["abs_diff"]) == pytest.approx(0.0, abs=1e-6) + assert float(lq_row["abs_diff"]) == pytest.approx(0.0, abs=1e-6) + + +@pytest.mark.parametrize( + ("edge_latency_s", "msg"), + [ + (0.01, "deterministic latency must be < 1 ms"), + (None, ""), + ], +) +def test_mm1_validate_or_raise_incompatibilities( + edge_latency_s: float | None, + msg: str, +) -> None: + """Invalid payloads must raise with a readable message.""" + payload = _make_mm1_payload(edge_latency_s=edge_latency_s) + mm1 = MM1() + + if msg: + with pytest.raises(ValueError, match="Payload is not compatible"): + mm1.validate_or_raise(payload) + else: + mm1.validate_or_raise(payload) diff --git a/tests/unit/runtime/test_simulation_runner.py b/tests/unit/runner/test_simulation.py similarity index 99% rename from tests/unit/runtime/test_simulation_runner.py rename to tests/unit/runner/test_simulation.py index 66c25cb..34f1c44 100644 --- a/tests/unit/runtime/test_simulation_runner.py +++ b/tests/unit/runner/test_simulation.py @@ -16,7 +16,7 @@ from tests.unit.helpers import make_min_ep from asyncflow.config.constants import Distribution, EventDescription -from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.runner.simulation import SimulationRunner from asyncflow.schemas.common.random_variables import RVConfig from asyncflow.schemas.events.injection import EventInjection from asyncflow.schemas.payload import SimulationPayload diff --git a/tests/unit/runner/test_sweep.py b/tests/unit/runner/test_sweep.py new file mode 100644 index 0000000..39fa45a --- /dev/null +++ b/tests/unit/runner/test_sweep.py @@ -0,0 +1,181 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, ClassVar, cast + +import pytest + +from asyncflow.config.constants import Distribution, TimeDefaults +from asyncflow.runner.sweep import Sweep +from asyncflow.schemas.common.random_variables import RVConfig +from asyncflow.schemas.payload import SimulationPayload +from asyncflow.schemas.settings.simulation import SimulationSettings +from asyncflow.schemas.topology.graph import TopologyGraph +from asyncflow.schemas.topology.nodes import Client, TopologyNodes +from asyncflow.schemas.workload.rqs_generator import RqsGenerator + +if TYPE_CHECKING: + import simpy + + from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer + from asyncflow.runner.simulation import SimulationRunner + + +# --------------------------------------------------------------------------- # +# Helpers # +# --------------------------------------------------------------------------- # +def _make_min_payload( + *, + users_mean: int = 1, + rpm_mean: int = 2, + sim_time: int = TimeDefaults.MIN_SIMULATION_TIME, +) -> SimulationPayload: + """Return a minimal, validated payload (client only, no servers).""" + rqs = RqsGenerator( + id="gen", + avg_active_users=RVConfig( + mean=users_mean, distribution=Distribution.POISSON, + ), + avg_request_per_minute_per_user=RVConfig( + mean=rpm_mean, distribution=Distribution.POISSON, + ), + ) + client = Client(id="cli") + nodes = TopologyNodes(servers=[], client=client, load_balancer=None) + graph = TopologyGraph(nodes=nodes, edges=[]) + settings = SimulationSettings(total_simulation_time=sim_time) + return SimulationPayload( + rqs_input=rqs, topology_graph=graph, sim_settings=settings, + ) + + +class _DummyAnalyzer: + """Trivial object we return as analyzer surrogate in the fake runner.""" + + def __init__(self, tag: int) -> None: + self.tag = tag + + +class FakeSimulationRunner: + """ + Test double for SimulationRunner: + - records every (env, payload) received + - returns a dummy analyzer-like object + """ + + run_calls: ClassVar[list[tuple[simpy.Environment, SimulationPayload]]] = [] + + def __init__( + self, + *, + env: simpy.Environment, + simulation_input: SimulationPayload, + ) -> None: + """Store args for inspection; does not start any real process.""" + self.env = env + self.payload = simulation_input + + def run(self) -> ResultsAnalyzer: + """Record call and return a dummy analyzer marked with users mean.""" + FakeSimulationRunner.run_calls.append((self.env, self.payload)) + tag = int(self.payload.rqs_input.avg_active_users.mean) + return cast("ResultsAnalyzer", _DummyAnalyzer(tag)) + + +@pytest.fixture(autouse=True) +def _reset_fake_runner() -> None: + """Ensure fake runner call log is clean before each test.""" + FakeSimulationRunner.run_calls.clear() + + +# --------------------------------------------------------------------------- # +# Tests # +# --------------------------------------------------------------------------- # +def test_sweep_on_user_inclusive_grid_and_preserves_payload() -> None: + payload = _make_min_payload(users_mean=7) + sweeper = Sweep( + simulation_cls=cast("type[SimulationRunner]", FakeSimulationRunner), + ) + + res = sweeper.sweep_on_user( + payload=payload, user_lower_bound=2, user_upper_bound=6, step=2, + ) + + # Inclusive grid [2, 4, 6] + assert [u for (u, _a) in res] == [2, 4, 6] + assert sweeper._last_users_grid == [2, 4, 6] # noqa: SLF001 + + # Underlying payload not mutated by the sweep + assert payload.rqs_input.avg_active_users.mean == 7 + + # Fake runner saw three runs with the expected users injected + seen = [ + int(p.rqs_input.avg_active_users.mean) + for (_e, p) in FakeSimulationRunner.run_calls + ] + assert seen == [2, 4, 6] + + # Each run got a fresh copy (not the same object) + for (_e, p) in FakeSimulationRunner.run_calls: + assert p is not payload + + +def test_sweep_on_user_creates_fresh_env_per_run() -> None: + payload = _make_min_payload() + sweeper = Sweep( + simulation_cls=cast("type[SimulationRunner]", FakeSimulationRunner), + ) + + res = sweeper.sweep_on_user( + payload=payload, user_lower_bound=1, user_upper_bound=3, step=1, + ) + assert len(res) == 3 + + env_ids = [id(e) for (e, _p) in FakeSimulationRunner.run_calls] + assert len(set(env_ids)) == 3 # all distinct envs + # brand-new SimPy environments start at t=0 + assert all(e.now == 0 for (e, _p) in FakeSimulationRunner.run_calls) + + +@pytest.mark.parametrize( + ("lo", "hi", "step", "msg_substr"), + [ + (1, 5, 0, "step must be > 0"), + (0, 5, 1, "strictly bigger than 0"), + (1, 0, 1, "strictly bigger than 0"), + (5, 1, 1, "user_upper_bound must be >= user_lower_bound"), + ], +) +def test_sweep_on_user_invalid_inputs_raise( + lo: int, hi: int, step: int, msg_substr: str, +) -> None: + payload = _make_min_payload() + sweeper = Sweep( + simulation_cls=cast("type[SimulationRunner]", FakeSimulationRunner), + ) + + with pytest.raises(ValueError, match=msg_substr): + sweeper.sweep_on_user( + payload=payload, + user_lower_bound=lo, + user_upper_bound=hi, + step=step, + ) + + +def test_sweep_on_user_returns_pairs_with_analyzers() -> None: + payload = _make_min_payload() + sweeper = Sweep( + simulation_cls=cast("type[SimulationRunner]", FakeSimulationRunner), + ) + + res = sweeper.sweep_on_user( + payload=payload, user_lower_bound=2, user_upper_bound=4, step=1, + ) + + # Tuple shape: (users, analyzer) + users_list = [u for (u, _a) in res] + assert users_list == [2, 3, 4] + + # Analyzer is the dummy object we returned (check runtime marker) + tags = [getattr(a, "tag", None) for (_u, a) in res] + assert tags == [2, 3, 4] diff --git a/tests/unit/runtime/actors/test_server_rt.py b/tests/unit/runtime/actors/test_server_rt.py index 53da3ef..9714532 100644 --- a/tests/unit/runtime/actors/test_server_rt.py +++ b/tests/unit/runtime/actors/test_server_rt.py @@ -28,9 +28,11 @@ EndpointStepCPU, EndpointStepIO, EndpointStepRAM, + EventMetricName, SampledMetricName, StepOperation, ) +from asyncflow.metrics.server import ServerClock from asyncflow.resources.server_containers import build_containers from asyncflow.runtime.actors import server as server_mod from asyncflow.runtime.actors.server import ServerRuntime @@ -479,3 +481,135 @@ def fake_sampler(cfg: RVConfig, rng: NpGenerator) -> float: # Deterministic int/float paths assert server._compute_latency_cpu(2) == pytest.approx(2.0) # noqa: SLF001 assert server._compute_latency_io(0.5) == pytest.approx(0.5) # noqa: SLF001 + + +def test_server_clock_and_cumulative_metrics_default_pipeline() -> None: + """ + Single request on default pipeline: + - SERVICE_TIME should equal CPU(5ms) + - IO_TIME should equal I/O(20ms) + - WAITING_TIME should be ~0 (2 cores, no contention) + - RQS_SERVER_CLOCK has start/finish with finish > start + """ + env = simpy.Environment() + server, _ = _make_server_runtime(env) # default: 2 cores, default steps + + req_id = 301 + server.server_box.put(RequestState(id=req_id, initial_time=0.0)) + server.start() + env.run() + + bucket = server.server_rqs_clock[req_id] + # Clock present and well-formed + assert EventMetricName.RQS_SERVER_CLOCK in bucket + clock = bucket[EventMetricName.RQS_SERVER_CLOCK] + assert isinstance(clock, ServerClock) + assert clock.finish is not None + assert clock.finish >= clock.start + + # Accumulators + assert bucket[EventMetricName.SERVICE_TIME] == pytest.approx(0.005, abs=1e-9) + assert bucket[EventMetricName.IO_TIME] == pytest.approx(0.020, abs=1e-9) + assert bucket[EventMetricName.WAITING_TIME] == pytest.approx(0.0, abs=1e-12) + + # Server-side elapsed time should be at least the sum (avoid approx on RHS of >=) + elapsed = clock.finish - clock.start + assert elapsed >= (0.005 + 0.020) - 1e-9 + +def test_waiting_time_accumulates_under_contention() -> None: + """ + With 1 core and two overlapping requests on a CPU-only endpoint: + - The second request's WAITING_TIME ~= (first CPU time - overlap). + """ + env = simpy.Environment() + + steps = ( + Step( + kind=EndpointStepRAM.RAM, + step_operation={StepOperation.NECESSARY_RAM: 64}, + ), + Step( + kind=EndpointStepCPU.CPU_BOUND_OPERATION, + step_operation={StepOperation.CPU_TIME: 0.008}, + ), + ) + server, _ = _make_server_runtime(env, steps=steps, cpu_cores=1) + + first_id, second_id = 401, 402 + + # First arrives at t=0.0 + server.server_box.put(RequestState(id=first_id, initial_time=0.0)) + + # Schedule the second to actually arrive at t=0.001 (creates 1ms overlap) + def _arrive_later() -> Generator[simpy.Event, None, None]: + yield env.timeout(0.001) + yield server.server_box.put(RequestState(id=second_id, initial_time=0.001)) + + env.process(_arrive_later()) + + server.start() + env.run() + + b1 = server.server_rqs_clock[first_id] + b2 = server.server_rqs_clock[second_id] + + # First: no waiting, service time = 8ms, no IO + assert b1[EventMetricName.WAITING_TIME] == pytest.approx(0.0, abs=1e-9) + assert b1[EventMetricName.SERVICE_TIME] == pytest.approx(0.008, abs=1e-9) + assert b1[EventMetricName.IO_TIME] == pytest.approx(0.0, abs=1e-12) + + # Second: expected wait ≈ 0.007 (first CPU 8ms - 1ms overlap) + assert b2[EventMetricName.WAITING_TIME] == pytest.approx(0.007, abs=2e-4) + assert b2[EventMetricName.SERVICE_TIME] == pytest.approx(0.008, abs=1e-9) + assert b2[EventMetricName.IO_TIME] == pytest.approx(0.0, abs=1e-12) + + +def test_metrics_follow_rv_samples(monkeypatch: pytest.MonkeyPatch) -> None: + """ + With RVConfig on CPU and IO, SERVICE_TIME and IO_TIME must match + the (patched) sampler outcomes. + """ + # 6ms for CPU sentinel, 13ms for IO sentinel + def fake_sampler(cfg: RVConfig, rng: NpGenerator) -> float: + if cfg.mean == 0.321: + return 0.006 + if cfg.mean == 0.654: + return 0.013 + return 0.0 + + monkeypatch.setattr(server_mod, "general_sampler", fake_sampler) + + env = simpy.Environment() + steps = ( + Step( + kind=EndpointStepRAM.RAM, + step_operation={StepOperation.NECESSARY_RAM: 64}, + ), + Step( + kind=EndpointStepCPU.CPU_BOUND_OPERATION, + step_operation={StepOperation.CPU_TIME: RVConfig(mean=0.321)}, + ), + Step( + kind=EndpointStepIO.DB, + step_operation={StepOperation.IO_WAITING_TIME: RVConfig(mean=0.654)}, + ), + ) + server, _ = _make_server_runtime(env, steps=steps, cpu_cores=1) + + req_id = 501 + server.server_box.put(RequestState(id=req_id, initial_time=0.0)) + server.start() + env.run() + + bucket = server.server_rqs_clock[req_id] + assert bucket[EventMetricName.SERVICE_TIME] == pytest.approx(0.006, abs=1e-9) + assert bucket[EventMetricName.IO_TIME] == pytest.approx(0.013, abs=1e-9) + assert bucket[EventMetricName.WAITING_TIME] == pytest.approx(0.0, abs=1e-12) + + clock = bucket[EventMetricName.RQS_SERVER_CLOCK] + assert isinstance(clock, ServerClock) + assert clock.finish is not None + + # Elapsed server-side time must be at least the sum of CPU+IO + elapsed = clock.finish - clock.start + assert elapsed >= (0.006 + 0.013) - 1e-9