Skip to content

Commit a726a2a

Browse files
GioeleB00Copilot
andauthored
FCFS load balancer (central queue) with Erlang-C validation, proper Wq measurement, and metrics plumbing (#29)
* setting up the branch * minor correction * minor changes * improved script for linux * minor change * Delete docker_fs/.env.dev * Delete docker_fs/.env.test * Features/event generator (#1) * setting up the generator * poisson-poisson requests sampling * Delete docker_fs/.env.dev * Delete docker_fs/.env.test * changes * refactor for event generator * added gaussian truncated generator and unit tests for helpers * imprved documentantion and simulation time handling * simulation start with the vent generator * small refactor + tests for the generation of events * minor changes * Update src/app/schemas/simulation_input.py Co-authored-by: Copilot <[email protected]> * Update tests/unit/sampler/test_sampler_helper.py Co-authored-by: Copilot <[email protected]> * Update scripts/quality-check.sh Co-authored-by: Copilot <[email protected]> * Update tests/integration/db_initialization/test_init_models.py Co-authored-by: Copilot <[email protected]> * minor changes after PR review --------- Co-authored-by: Copilot <[email protected]> * minor changes * Features/event generator documentation test improvements (#2) * moving file to correct folder * Documentation added * Update requests_generator.md * test Added, introduced constants for the sampling window * Update tests/unit/sampler/test_poisson_posson.py Co-authored-by: Copilot <[email protected]> --------- Co-authored-by: Copilot <[email protected]> * README update * Improved constants management * Clean and refactor * Features/request handler endpoint input (#3) * Introduction of the input structure for endpoints * Documentation + foundation of topology input * Improve Step validation, added unit test for the input * improved documentation for the input * Features/definition full payload simulation (#4) * Improved input structure and pytest * Improved pytest structure accordingly to the new schema * definition of the metrics to be measured and update of the simulation input * improved documentations added rationale behind metrics * improved pytest logic and code coherence * Update src/app/core/helpers.py Co-authored-by: Copilot <[email protected]> --------- Co-authored-by: Copilot <[email protected]> * Features/rqs generator runtime (#5) * definition of state and RqsGeneratorRuntime * defined edge runtime and more central logic for sampler * minor changes * minor changes * minor bug fixed * pytest adapted to the new structure, added pytest for rqs_state * Update src/app/core/runtime/rqs_generator.py Co-authored-by: Copilot <[email protected]> * Update src/app/core/runtime/rqs_generator.py Co-authored-by: Copilot <[email protected]> * Update src/app/core/runtime/edge.py Co-authored-by: Copilot <[email protected]> * Update src/app/core/runtime/edge.py Co-authored-by: Copilot <[email protected]> * Update tests/unit/runtime/test_requests_generator.py Co-authored-by: Copilot <[email protected]> * Update src/app/core/event_samplers/common_helpers.py Co-authored-by: Copilot <[email protected]> * Update src/app/core/event_samplers/common_helpers.py Co-authored-by: Copilot <[email protected]> * Update src/app/core/event_samplers/common_helpers.py Co-authored-by: Copilot <[email protected]> * minor changes --------- Co-authored-by: Copilot <[email protected]> * Features/client server runtime (#6) * defined client runtime + tests * initiated the server runtime plus correction of tghe structure * code refactor + test added for runtime + documentation * Update tests/conftest.py Co-authored-by: Copilot <[email protected]> --------- Co-authored-by: Copilot <[email protected]> * Features/metric sampler and collection (#7) * defined architecture for the central collector + documentation * server modification to collect metrics and updated docs * Completed metric collector for server plus test * remove web app dependencies and added metrics for rqs latency * improved docs and improved metric collection * changes to make the code compatible with new changes * improved ci against toml changes * update lock file * minor change * Features/metrics elaboration (#8) * introduction to the analyzer class + tests * refactor of the analyzer * Features/load balancer node (#9) * Added LB structure in the input for the simulation + tests * introduced LB runtime + documentation * Added test for LB and small refactor to define the SimRunner * Features/simulation runner (#10) * added the first method to the simulation runner * added methods to handle nodes * completed simulation runtime + integration test * Features/integration tests unit tests (#11) * new integration test + bug fixing * added integration and unit test * first tests working * minor change * first working example, upgrade docs, upgrade docstrings * improvements * path bug fixed * new readme and guide to build yaml * Update README.md * Added pybuilder and unit tests (#12) * Refactor/change project name plus docs improvement (#13) * Changed name from fastsim to asyncflow app folder renamed in asyncflow * improved docs * minor changes * Refactor/pypi preparation (#14) * Reafctor for the folder schemas + defined import for public api * Defined public api, improved docs * minor changes * minor changes * Features/lb example and docs tutorial (#15) * minor docs changes * improved pytoml * improved docs + LB examples * sanity ci check * Ci for main (#16) * Readme final, docs improved * Docs improvement, refactor analyzer, example added * Added scripts for setup, added pytest, improved readme * Example LB improvements * CI for main + system tests * fixing a bug * bug fixed * version bump * version bump * Feature/event injection input (#19) * Structure defined for the pydantic model representing Event to be injected * Pydantic validation in the payload for events part 1 * Definition of the input for the events injection part 2 * minor fixes ruff and mypy compliant * full tests and docs added for input event injection * fixed minor bugs * Feature/event injection runtime (#20) * Events injection for edges part 1 * Logic + docs for the event injection to simulate a server down * Added pydantic validation + unit + int tests for eventinjection * added exaple for event inj yaml + builder added int tests * improved server model + system tests for eventinjection * ruff small fix * Refactor/fixing readme small fixes (#21) * new readme, ci to have pycov badge, comment on server runtime * small changes * bump version * Structure input for multiprocessing, improved tests, improved description (#23) * better test coverage, edges deterministic, step server random (#24) * new sim value * Feature/multi processing event loop 3 (#25) * added metric for server, added logic to collect them + tests * add kpi to collect in settings default * added metric plot for server and tests * Sweep introduction + MM1 heory vs observation + tests * analysis for sweep results + tests * mm1 notebook complete + server latency * Add M/M/c (split) queue-theory analyzer, RANDOM load-balancer policy, and end-to-end comparison reports (#26) * mmc split + notebook mmc split + random lb algo * tests for mmc * New Api RqsGenerator ---> ArrivalsGenerator (#27) * changing api per ArrivalsGenerator and implementing distrib for ArrivalRuntime * Reafctoring API Arrival generator, tests integrations now working * Api Arrival generator migration complete with tests * Updated plots * Added edges without network latency plus tests (#28) * Implemented fcfs with tests * Implemented fcfs algo and validation of mmc models Erlang C * small changes --------- Co-authored-by: Copilot <[email protected]>
1 parent 6aa3b85 commit a726a2a

File tree

156 files changed

+8845
-3994
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

156 files changed

+8845
-3994
lines changed

asyncflow_queue_limit/asyncflow_mm1.ipynb

Lines changed: 646 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 399 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,399 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"id": "9428ca92",
6+
"metadata": {},
7+
"source": [
8+
"# AsyncFlow — MMc Theory vs Simulation (Guided Notebook)\n",
9+
"\n",
10+
"This notebook shows how to:\n",
11+
"\n",
12+
"1. Make imports work inside a notebook (src-layout or package install)\n",
13+
"2. Build a **multi-server** scenario compatible with **M/M/c** assumptions\n",
14+
"3. Run the simulation and collect results\n",
15+
"4. Compare theory vs observed KPIs (pretty-printed table)\n",
16+
"5. Plot the standard dashboards (latency, throughput, server time series)\n",
17+
"\n",
18+
"\n"
19+
]
20+
},
21+
{
22+
"cell_type": "code",
23+
"execution_count": 96,
24+
"id": "3e168d4a",
25+
"metadata": {},
26+
"outputs": [],
27+
"source": [
28+
"import sys, importlib\n",
29+
"\n",
30+
"\n",
31+
"for m in list(sys.modules):\n",
32+
" if m.startswith(\"asyncflow\"):\n",
33+
" del sys.modules[m]\n",
34+
"\n",
35+
"\n",
36+
"from asyncflow import AsyncFlow, SimulationRunner\n",
37+
"from asyncflow.analysis import MMc, ResultsAnalyzer\n",
38+
"from asyncflow.components import (\n",
39+
" Client, Server, LinkEdge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
40+
")\n",
41+
"from asyncflow.settings import SimulationSettings\n",
42+
"\n",
43+
"import simpy"
44+
]
45+
},
46+
{
47+
"cell_type": "code",
48+
"execution_count": 97,
49+
"id": "dd39a8e3",
50+
"metadata": {},
51+
"outputs": [
52+
{
53+
"name": "stdout",
54+
"output_type": "stream",
55+
"text": [
56+
"Imports OK.\n"
57+
]
58+
}
59+
],
60+
"source": [
61+
"import matplotlib.pyplot as plt\n",
62+
"import simpy\n",
63+
"\n",
64+
"# Public AsyncFlow API\n",
65+
"from asyncflow import AsyncFlow, SimulationRunner, Sweep\n",
66+
"from asyncflow.components import Client, Server, LinkEdge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
67+
"from asyncflow.settings import SimulationSettings\n",
68+
"from asyncflow.analysis import ResultsAnalyzer, SweepAnalyzer, MMc\n",
69+
"from asyncflow.enums import Distribution\n",
70+
"\n",
71+
"print(\"Imports OK.\")"
72+
]
73+
},
74+
{
75+
"cell_type": "markdown",
76+
"id": "48fbf4f3",
77+
"metadata": {},
78+
"source": [
79+
"## 1) Build an M/M/c split-friendly scenario\n",
80+
"\n",
81+
"* **Multiple identical servers with exponential CPU service**\n",
82+
" Topology includes **\\$c \\geq 2\\$ identical servers**, each exposing exactly **one endpoint** with exactly **one CPU-bound step**.\n",
83+
" Service times follow an **Exponential** distribution with mean \\$E\\[S]\\$ (service rate \\$\\mu = 1/E\\[S]\\$). No RAM/IO steps are included in the pipeline.\n",
84+
"\n",
85+
"* **Load balancer with FCFS dispatch**\n",
86+
"\n",
87+
"* **“Poisson arrivals” via the generator**\n",
88+
" \n",
89+
" \n",
90+
"\n",
91+
"---\n",
92+
"\n",
93+
"```mermaid\n",
94+
"graph LR;\n",
95+
" rqs1[\"<b>RqsGenerator</b><br/>id: rqs-1\"]\n",
96+
" client1[\"<b>Client</b><br/>id: client-1\"]\n",
97+
" lb1[\"<b>LoadBalancer</b><br/>id: lb-1<br/>Policy: round_robin\"]\n",
98+
" app1[\"<b>Server</b><br/>id: app-1<br/>Endpoint: /api\"]\n",
99+
" app2[\"<b>Server</b><br/>id: app-2<br/>Endpoint: /api\"]\n",
100+
"\n",
101+
" rqs1 -- \"Edge: gen-client<br/>Latency: 0.0001\" --> client1;\n",
102+
" client1 -- \"Request<br/>Edge: client-lb<br/>Latency: 0.0001\" --> lb1;\n",
103+
" lb1 -- \"Dispatch<br/>Edge: lb-app1<br/>Latency: 0.0001\" --> app1;\n",
104+
" lb1 -- \"Dispatch<br/>Edge: lb-app2<br/>Latency: 0.0001\" --> app2;\n",
105+
" app1 -- \"Response<br/>Edge: app1-client<br/>Latency: 0.0001\" --> client1;\n",
106+
" app2 -- \"Response<br/>Edge: app2-client<br/>Latency: 0.0001\" --> client1;\n",
107+
"```\n",
108+
"\n",
109+
"---\n",
110+
"\n"
111+
]
112+
},
113+
{
114+
"cell_type": "code",
115+
"execution_count": 98,
116+
"id": "d2937e5e",
117+
"metadata": {},
118+
"outputs": [],
119+
"source": [
120+
"def build_payload():\n",
121+
" generator = ArrivalsGenerator(\n",
122+
" id=\"rqs-1\",\n",
123+
" lambda_rps=270,\n",
124+
" model=Distribution.POISSON\n",
125+
" )\n",
126+
"\n",
127+
" client = Client(id=\"client-1\")\n",
128+
"\n",
129+
" endpoint = Endpoint(\n",
130+
" endpoint_name=\"/api\",\n",
131+
" probability=1.0,\n",
132+
" steps=[\n",
133+
" {\n",
134+
" \"kind\": \"initial_parsing\",\n",
135+
" \"step_operation\": {\n",
136+
" \"cpu_time\": {\"mean\": 0.01, \"distribution\": \"exponential\"},\n",
137+
" },\n",
138+
" },\n",
139+
" ],\n",
140+
" )\n",
141+
"\n",
142+
" srv1 = Server(\n",
143+
" id=\"srv-1\",\n",
144+
" server_resources={\"cpu_cores\": 1, \"ram_mb\": 2048},\n",
145+
" endpoints=[endpoint],\n",
146+
" )\n",
147+
" srv2 = Server(\n",
148+
" id=\"srv-2\",\n",
149+
" server_resources={\"cpu_cores\": 1, \"ram_mb\": 2048},\n",
150+
" endpoints=[endpoint],\n",
151+
" )\n",
152+
" \n",
153+
" srv3 = Server(\n",
154+
" id=\"srv-3\",\n",
155+
" server_resources={\"cpu_cores\": 1, \"ram_mb\": 2048},\n",
156+
" endpoints=[endpoint],\n",
157+
" )\n",
158+
"\n",
159+
" lb = LoadBalancer(\n",
160+
" id=\"lb-1\",\n",
161+
" algorithms=\"fcfs\", \n",
162+
" server_covered={\"srv-1\", \"srv-2\", \"srv-3\"},\n",
163+
" )\n",
164+
"\n",
165+
" edges = [\n",
166+
" LinkEdge(id=\"gen-client\", source=\"rqs-1\", target=\"client-1\",),\n",
167+
" LinkEdge(id=\"client-lb\", source=\"client-1\", target=\"lb-1\", ),\n",
168+
" LinkEdge(id=\"lb-srv1\", source=\"lb-1\", target=\"srv-1\", ),\n",
169+
" LinkEdge(id=\"lb-srv2\", source=\"lb-1\", target=\"srv-2\", ),\n",
170+
" LinkEdge(id=\"lb-srv3\", source=\"lb-1\", target=\"srv-3\", ),\n",
171+
" LinkEdge(id=\"srv1-client\", source=\"srv-1\", target=\"client-1\",),\n",
172+
" LinkEdge(id=\"srv2-client\", source=\"srv-2\", target=\"client-1\",),\n",
173+
" LinkEdge(id=\"srv3-client\", source=\"srv-3\", target=\"client-1\",),\n",
174+
" ]\n",
175+
"\n",
176+
" settings = SimulationSettings(\n",
177+
" total_simulation_time=3600,\n",
178+
" sample_period_s=0.05,\n",
179+
" )\n",
180+
"\n",
181+
" payload = (\n",
182+
" AsyncFlow()\n",
183+
" .add_arrivals_generator(generator)\n",
184+
" .add_client(client)\n",
185+
" .add_servers(srv1, srv2, srv3)\n",
186+
" .add_load_balancer(lb)\n",
187+
" .add_edges(*edges)\n",
188+
" .add_simulation_settings(settings)\n",
189+
" ).build_payload()\n",
190+
"\n",
191+
" return payload\n"
192+
]
193+
},
194+
{
195+
"cell_type": "markdown",
196+
"id": "7682861f",
197+
"metadata": {},
198+
"source": [
199+
"## 2) Run the simulation"
200+
]
201+
},
202+
{
203+
"cell_type": "code",
204+
"execution_count": 99,
205+
"id": "d0634bc8",
206+
"metadata": {},
207+
"outputs": [
208+
{
209+
"name": "stdout",
210+
"output_type": "stream",
211+
"text": [
212+
"Done.\n"
213+
]
214+
}
215+
],
216+
"source": [
217+
"payload = build_payload()\n",
218+
"env = simpy.Environment()\n",
219+
"runner = SimulationRunner(env=env, simulation_input=payload)\n",
220+
"results: ResultsAnalyzer = runner.run()\n",
221+
"print(\"Done.\")\n"
222+
]
223+
},
224+
{
225+
"cell_type": "markdown",
226+
"id": "e5fe2a4a",
227+
"metadata": {},
228+
"source": [
229+
"# 3) M/M/c (FCFS) — theory vs observed comparison\n",
230+
"\n",
231+
"This section shows how we compute the **theoretical Erlang-C KPIs** (pooled queue, FCFS) and compare them against **simulation estimates**.\n",
232+
"\n",
233+
"---\n",
234+
"\n",
235+
"## Variables\n",
236+
"\n",
237+
"* **$c$**: number of identical servers.\n",
238+
"* **$\\lambda$**: global arrival rate (req/s).\n",
239+
"* **$\\mu$**: per-server service rate (req/s), $\\mu = 1/\\mathbb{E}[S]$.\n",
240+
"* **$\\rho$**: global utilization, $\\rho = \\lambda/(c\\mu)$.\n",
241+
"* **$W$**: mean time in system (queue + service).\n",
242+
"* **$W_q$**: mean waiting time in queue.\n",
243+
"* **$L$**: mean number in system.\n",
244+
"* **$L_q$**: mean number in queue.\n",
245+
"\n",
246+
"---\n",
247+
"\n",
248+
"## Theory (Erlang-C formulas)\n",
249+
"\n",
250+
"We assume **Poisson arrivals** for $\\lambda$ (taken directly from the payload).\n",
251+
"\n",
252+
"1. Offered load:\n",
253+
"\n",
254+
"$$\n",
255+
"a = \\frac{\\lambda}{\\mu}\n",
256+
"$$\n",
257+
"\n",
258+
"2. Probability system is empty:\n",
259+
"\n",
260+
"$$\n",
261+
"P_0 = \\left[\\sum_{n=0}^{c-1}\\frac{a^n}{n!} + \\frac{a^c}{c!\\,(1-\\rho)}\\right]^{-1}\n",
262+
"$$\n",
263+
"\n",
264+
"3. Probability of waiting (Erlang-C):\n",
265+
"\n",
266+
"$$\n",
267+
"P_w = \\frac{a^c}{c!\\,(1-\\rho)} \\, P_0\n",
268+
"$$\n",
269+
"\n",
270+
"4. Queue length and waiting:\n",
271+
"\n",
272+
"$$\n",
273+
"L_q = P_w \\cdot \\frac{\\rho}{1-\\rho}, \\qquad\n",
274+
"W_q = \\frac{L_q}{\\lambda}\n",
275+
"$$\n",
276+
"\n",
277+
"5. Total response time and system size:\n",
278+
"\n",
279+
"$$\n",
280+
"W = W_q + \\frac{1}{\\mu}, \\qquad\n",
281+
"L = \\lambda W\n",
282+
"$$\n",
283+
"\n",
284+
"If $\\rho \\ge 1$, the system is unstable and all metrics diverge to $+\\infty$.\n",
285+
"\n",
286+
"---\n",
287+
"\n",
288+
"## Observed (from simulation)\n",
289+
"\n",
290+
"After processing metrics:\n",
291+
"\n",
292+
"1. **Arrival rate**:\n",
293+
"\n",
294+
"$$\n",
295+
"\\lambda_{\\text{Observed}} = \\text{mean throughput (client completions)}\n",
296+
"$$\n",
297+
"\n",
298+
"2. **Service rate**:\n",
299+
"\n",
300+
"$$\n",
301+
"\\mu_{\\text{Observed}} = 1 / \\overline{S}, \\quad \\overline{S} = \\text{mean(service\\_time)}\n",
302+
"$$\n",
303+
"\n",
304+
"3. **End-to-end latency**:\n",
305+
"\n",
306+
"$$\n",
307+
"W_{\\text{Observed}} = \\text{mean(client latencies)}\n",
308+
"$$\n",
309+
"\n",
310+
"4. **Waiting time**:\n",
311+
"\n",
312+
"$$\n",
313+
"W_{q,\\text{Observed}} = \\text{mean(waiting\\_time)} \n",
314+
"$$\n",
315+
"\n",
316+
"5. **Little’s law check**:\n",
317+
"\n",
318+
"$$\n",
319+
"L_{\\text{Observed}} = \\lambda_{\\text{Observed}} W_{\\text{Observed}}, \\qquad\n",
320+
"L_{q,\\text{Observed}} = \\lambda_{\\text{Observed}} W_{q,\\text{Observed}}\n",
321+
"$$\n",
322+
"\n",
323+
"6. **Utilization**:\n",
324+
"\n",
325+
"$$\n",
326+
"\\rho_{\\text{Observed}} = \\lambda_{\\text{Observed}}/(c\\,\\mu_{\\text{Observed}})\n",
327+
"$$\n",
328+
"\n",
329+
"---\n",
330+
"\n",
331+
"## Comparison\n",
332+
"\n",
333+
"The analyzer builds a table with two columns — **Theory** (Erlang-C closed forms) and **Observed** (empirical estimates) — and reports absolute and relative deltas.\n",
334+
"\n",
335+
"This allows us to verify whether AsyncFlow reproduces the textbook M/M/c (FCFS) predictions under Poisson arrivals and exponential service.\n",
336+
"\n",
337+
"\n"
338+
]
339+
},
340+
{
341+
"cell_type": "code",
342+
"execution_count": 100,
343+
"id": "ccd7379b",
344+
"metadata": {},
345+
"outputs": [
346+
{
347+
"name": "stdout",
348+
"output_type": "stream",
349+
"text": [
350+
"=================================================================\n",
351+
"MMc (FCFS/Erlang-C) — Theory vs Observed\n",
352+
"-----------------------------------------------------------------\n",
353+
"sym metric theory observed abs rel%\n",
354+
"-----------------------------------------------------------------\n",
355+
"λ Arrival rate (1/s) 270.000000 270.258333 0.258333 0.10\n",
356+
"μ Service rate (1/s) 100.000000 100.036707 0.036707 0.04\n",
357+
"rho Utilization 0.900000 0.900531 0.000531 0.06\n",
358+
"L Mean items in sys 10.053549 10.073544 0.019994 0.20\n",
359+
"Lq Mean items in queue 7.353549 7.371934 0.018385 0.25\n",
360+
"W Mean time in sys (s) 0.037235 0.037274 0.000038 0.10\n",
361+
"Wq Mean waiting (s) 0.027235 0.027277 0.000042 0.15\n",
362+
"=================================================================\n"
363+
]
364+
}
365+
],
366+
"source": [
367+
"mmc = MMc()\n",
368+
"if mmc.is_compatible(payload):\n",
369+
" mmc.print_comparison(payload, results) \n",
370+
"else:\n",
371+
" print(\"Payload is not compatible with M/M/c:\")\n",
372+
" for reason in mmc.explain_incompatibilities(payload):\n",
373+
" print(\" -\", reason)\n",
374+
" \n"
375+
]
376+
}
377+
],
378+
"metadata": {
379+
"kernelspec": {
380+
"display_name": "asyncflow-sim-py3.12 (3.12.3)",
381+
"language": "python",
382+
"name": "python3"
383+
},
384+
"language_info": {
385+
"codemirror_mode": {
386+
"name": "ipython",
387+
"version": 3
388+
},
389+
"file_extension": ".py",
390+
"mimetype": "text/x-python",
391+
"name": "python",
392+
"nbconvert_exporter": "python",
393+
"pygments_lexer": "ipython3",
394+
"version": "3.12.3"
395+
}
396+
},
397+
"nbformat": 4,
398+
"nbformat_minor": 5
399+
}

0 commit comments

Comments
 (0)