Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 27 additions & 31 deletions asyncflow_queue_limit/asyncflow_mm1.ipynb

Large diffs are not rendered by default.

399 changes: 399 additions & 0 deletions asyncflow_queue_limit/asyncflow_mmc.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,399 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "9428ca92",
"metadata": {},
"source": [
"# AsyncFlow — MMc Theory vs Simulation (Guided Notebook)\n",
"\n",
"This notebook shows how to:\n",
"\n",
"1. Make imports work inside a notebook (src-layout or package install)\n",
"2. Build a **multi-server** scenario compatible with **M/M/c** assumptions\n",
"3. Run the simulation and collect results\n",
"4. Compare theory vs observed KPIs (pretty-printed table)\n",
"5. Plot the standard dashboards (latency, throughput, server time series)\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 96,
"id": "3e168d4a",
"metadata": {},
"outputs": [],
"source": [
"import sys, importlib\n",
"\n",
"\n",
"for m in list(sys.modules):\n",
" if m.startswith(\"asyncflow\"):\n",
" del sys.modules[m]\n",
"\n",
"\n",
"from asyncflow import AsyncFlow, SimulationRunner\n",
"from asyncflow.analysis import MMc, ResultsAnalyzer\n",
"from asyncflow.components import (\n",
" Client, Server, LinkEdge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
")\n",
"from asyncflow.settings import SimulationSettings\n",
"\n",
"import simpy"
]
},
{
"cell_type": "code",
"execution_count": 97,
"id": "dd39a8e3",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Imports OK.\n"
]
}
],
"source": [
"import matplotlib.pyplot as plt\n",
"import simpy\n",
"\n",
"# Public AsyncFlow API\n",
"from asyncflow import AsyncFlow, SimulationRunner, Sweep\n",
"from asyncflow.components import Client, Server, LinkEdge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
"from asyncflow.settings import SimulationSettings\n",
"from asyncflow.analysis import ResultsAnalyzer, SweepAnalyzer, MMc\n",
"from asyncflow.enums import Distribution\n",
"\n",
"print(\"Imports OK.\")"
]
},
{
"cell_type": "markdown",
"id": "48fbf4f3",
"metadata": {},
"source": [
"## 1) Build an M/M/c split-friendly scenario\n",
"\n",
"* **Multiple identical servers with exponential CPU service**\n",
" Topology includes **\\$c \\geq 2\\$ identical servers**, each exposing exactly **one endpoint** with exactly **one CPU-bound step**.\n",
" Service times follow an **Exponential** distribution with mean \\$E\\[S]\\$ (service rate \\$\\mu = 1/E\\[S]\\$). No RAM/IO steps are included in the pipeline.\n",
"\n",
"* **Load balancer with FCFS dispatch**\n",
"\n",
"* **“Poisson arrivals” via the generator**\n",
" \n",
" \n",
"\n",
"---\n",
"\n",
"```mermaid\n",
"graph LR;\n",
" rqs1[\"<b>RqsGenerator</b><br/>id: rqs-1\"]\n",
" client1[\"<b>Client</b><br/>id: client-1\"]\n",
" lb1[\"<b>LoadBalancer</b><br/>id: lb-1<br/>Policy: round_robin\"]\n",
" app1[\"<b>Server</b><br/>id: app-1<br/>Endpoint: /api\"]\n",
" app2[\"<b>Server</b><br/>id: app-2<br/>Endpoint: /api\"]\n",
"\n",
" rqs1 -- \"Edge: gen-client<br/>Latency: 0.0001\" --> client1;\n",
" client1 -- \"Request<br/>Edge: client-lb<br/>Latency: 0.0001\" --> lb1;\n",
" lb1 -- \"Dispatch<br/>Edge: lb-app1<br/>Latency: 0.0001\" --> app1;\n",
" lb1 -- \"Dispatch<br/>Edge: lb-app2<br/>Latency: 0.0001\" --> app2;\n",
" app1 -- \"Response<br/>Edge: app1-client<br/>Latency: 0.0001\" --> client1;\n",
" app2 -- \"Response<br/>Edge: app2-client<br/>Latency: 0.0001\" --> client1;\n",
"```\n",
"\n",
"---\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 98,
"id": "d2937e5e",
"metadata": {},
"outputs": [],
"source": [
"def build_payload():\n",
" generator = ArrivalsGenerator(\n",
" id=\"rqs-1\",\n",
" lambda_rps=270,\n",
" model=Distribution.POISSON\n",
" )\n",
"\n",
" client = Client(id=\"client-1\")\n",
"\n",
" endpoint = Endpoint(\n",
" endpoint_name=\"/api\",\n",
" probability=1.0,\n",
" steps=[\n",
" {\n",
" \"kind\": \"initial_parsing\",\n",
" \"step_operation\": {\n",
" \"cpu_time\": {\"mean\": 0.01, \"distribution\": \"exponential\"},\n",
" },\n",
" },\n",
" ],\n",
" )\n",
"\n",
" srv1 = Server(\n",
" id=\"srv-1\",\n",
" server_resources={\"cpu_cores\": 1, \"ram_mb\": 2048},\n",
" endpoints=[endpoint],\n",
" )\n",
" srv2 = Server(\n",
" id=\"srv-2\",\n",
" server_resources={\"cpu_cores\": 1, \"ram_mb\": 2048},\n",
" endpoints=[endpoint],\n",
" )\n",
" \n",
" srv3 = Server(\n",
" id=\"srv-3\",\n",
" server_resources={\"cpu_cores\": 1, \"ram_mb\": 2048},\n",
" endpoints=[endpoint],\n",
" )\n",
"\n",
" lb = LoadBalancer(\n",
" id=\"lb-1\",\n",
" algorithms=\"fcfs\", \n",
" server_covered={\"srv-1\", \"srv-2\", \"srv-3\"},\n",
" )\n",
"\n",
" edges = [\n",
" LinkEdge(id=\"gen-client\", source=\"rqs-1\", target=\"client-1\",),\n",
" LinkEdge(id=\"client-lb\", source=\"client-1\", target=\"lb-1\", ),\n",
" LinkEdge(id=\"lb-srv1\", source=\"lb-1\", target=\"srv-1\", ),\n",
" LinkEdge(id=\"lb-srv2\", source=\"lb-1\", target=\"srv-2\", ),\n",
" LinkEdge(id=\"lb-srv3\", source=\"lb-1\", target=\"srv-3\", ),\n",
" LinkEdge(id=\"srv1-client\", source=\"srv-1\", target=\"client-1\",),\n",
" LinkEdge(id=\"srv2-client\", source=\"srv-2\", target=\"client-1\",),\n",
" LinkEdge(id=\"srv3-client\", source=\"srv-3\", target=\"client-1\",),\n",
" ]\n",
"\n",
" settings = SimulationSettings(\n",
" total_simulation_time=3600,\n",
" sample_period_s=0.05,\n",
" )\n",
"\n",
" payload = (\n",
" AsyncFlow()\n",
" .add_arrivals_generator(generator)\n",
" .add_client(client)\n",
" .add_servers(srv1, srv2, srv3)\n",
" .add_load_balancer(lb)\n",
" .add_edges(*edges)\n",
" .add_simulation_settings(settings)\n",
" ).build_payload()\n",
"\n",
" return payload\n"
]
},
{
"cell_type": "markdown",
"id": "7682861f",
"metadata": {},
"source": [
"## 2) Run the simulation"
]
},
{
"cell_type": "code",
"execution_count": 99,
"id": "d0634bc8",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Done.\n"
]
}
],
"source": [
"payload = build_payload()\n",
"env = simpy.Environment()\n",
"runner = SimulationRunner(env=env, simulation_input=payload)\n",
"results: ResultsAnalyzer = runner.run()\n",
"print(\"Done.\")\n"
]
},
{
"cell_type": "markdown",
"id": "e5fe2a4a",
"metadata": {},
"source": [
"# 3) M/M/c (FCFS) — theory vs observed comparison\n",
"\n",
"This section shows how we compute the **theoretical Erlang-C KPIs** (pooled queue, FCFS) and compare them against **simulation estimates**.\n",
"\n",
"---\n",
"\n",
"## Variables\n",
"\n",
"* **$c$**: number of identical servers.\n",
"* **$\\lambda$**: global arrival rate (req/s).\n",
"* **$\\mu$**: per-server service rate (req/s), $\\mu = 1/\\mathbb{E}[S]$.\n",
"* **$\\rho$**: global utilization, $\\rho = \\lambda/(c\\mu)$.\n",
"* **$W$**: mean time in system (queue + service).\n",
"* **$W_q$**: mean waiting time in queue.\n",
"* **$L$**: mean number in system.\n",
"* **$L_q$**: mean number in queue.\n",
"\n",
"---\n",
"\n",
"## Theory (Erlang-C formulas)\n",
"\n",
"We assume **Poisson arrivals** for $\\lambda$ (taken directly from the payload).\n",
"\n",
"1. Offered load:\n",
"\n",
"$$\n",
"a = \\frac{\\lambda}{\\mu}\n",
"$$\n",
"\n",
"2. Probability system is empty:\n",
"\n",
"$$\n",
"P_0 = \\left[\\sum_{n=0}^{c-1}\\frac{a^n}{n!} + \\frac{a^c}{c!\\,(1-\\rho)}\\right]^{-1}\n",
"$$\n",
"\n",
"3. Probability of waiting (Erlang-C):\n",
"\n",
"$$\n",
"P_w = \\frac{a^c}{c!\\,(1-\\rho)} \\, P_0\n",
"$$\n",
"\n",
"4. Queue length and waiting:\n",
"\n",
"$$\n",
"L_q = P_w \\cdot \\frac{\\rho}{1-\\rho}, \\qquad\n",
"W_q = \\frac{L_q}{\\lambda}\n",
"$$\n",
"\n",
"5. Total response time and system size:\n",
"\n",
"$$\n",
"W = W_q + \\frac{1}{\\mu}, \\qquad\n",
"L = \\lambda W\n",
"$$\n",
"\n",
"If $\\rho \\ge 1$, the system is unstable and all metrics diverge to $+\\infty$.\n",
"\n",
"---\n",
"\n",
"## Observed (from simulation)\n",
"\n",
"After processing metrics:\n",
"\n",
"1. **Arrival rate**:\n",
"\n",
"$$\n",
"\\lambda_{\\text{Observed}} = \\text{mean throughput (client completions)}\n",
"$$\n",
"\n",
"2. **Service rate**:\n",
"\n",
"$$\n",
"\\mu_{\\text{Observed}} = 1 / \\overline{S}, \\quad \\overline{S} = \\text{mean(service\\_time)}\n",
"$$\n",
"\n",
"3. **End-to-end latency**:\n",
"\n",
"$$\n",
"W_{\\text{Observed}} = \\text{mean(client latencies)}\n",
"$$\n",
"\n",
"4. **Waiting time**:\n",
"\n",
"$$\n",
"W_{q,\\text{Observed}} = \\text{mean(waiting\\_time)} \n",
"$$\n",
"\n",
"5. **Little’s law check**:\n",
"\n",
"$$\n",
"L_{\\text{Observed}} = \\lambda_{\\text{Observed}} W_{\\text{Observed}}, \\qquad\n",
"L_{q,\\text{Observed}} = \\lambda_{\\text{Observed}} W_{q,\\text{Observed}}\n",
"$$\n",
"\n",
"6. **Utilization**:\n",
"\n",
"$$\n",
"\\rho_{\\text{Observed}} = \\lambda_{\\text{Observed}}/(c\\,\\mu_{\\text{Observed}})\n",
"$$\n",
"\n",
"---\n",
"\n",
"## Comparison\n",
"\n",
"The analyzer builds a table with two columns — **Theory** (Erlang-C closed forms) and **Observed** (empirical estimates) — and reports absolute and relative deltas.\n",
"\n",
"This allows us to verify whether AsyncFlow reproduces the textbook M/M/c (FCFS) predictions under Poisson arrivals and exponential service.\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 100,
"id": "ccd7379b",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"=================================================================\n",
"MMc (FCFS/Erlang-C) — Theory vs Observed\n",
"-----------------------------------------------------------------\n",
"sym metric theory observed abs rel%\n",
"-----------------------------------------------------------------\n",
"λ Arrival rate (1/s) 270.000000 270.258333 0.258333 0.10\n",
"μ Service rate (1/s) 100.000000 100.036707 0.036707 0.04\n",
"rho Utilization 0.900000 0.900531 0.000531 0.06\n",
"L Mean items in sys 10.053549 10.073544 0.019994 0.20\n",
"Lq Mean items in queue 7.353549 7.371934 0.018385 0.25\n",
"W Mean time in sys (s) 0.037235 0.037274 0.000038 0.10\n",
"Wq Mean waiting (s) 0.027235 0.027277 0.000042 0.15\n",
"=================================================================\n"
]
}
],
"source": [
"mmc = MMc()\n",
"if mmc.is_compatible(payload):\n",
" mmc.print_comparison(payload, results) \n",
"else:\n",
" print(\"Payload is not compatible with M/M/c:\")\n",
" for reason in mmc.explain_incompatibilities(payload):\n",
" print(\" -\", reason)\n",
" \n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "asyncflow-sim-py3.12 (3.12.3)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.3"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading