Skip to content

Commit c5a8a52

Browse files
authored
FCFS load balancer (central queue) with Erlang-C validation, proper Wq measurement, and metrics plumbing (#31)
* Implemented fcfs with tests * Implemented fcfs algo and validation of mmc models Erlang C * small changes
1 parent c66eeb9 commit c5a8a52

File tree

14 files changed

+1055
-120
lines changed

14 files changed

+1055
-120
lines changed

asyncflow_queue_limit/asyncflow_mm1.ipynb

Lines changed: 27 additions & 31 deletions
Large diffs are not rendered by default.
Lines changed: 399 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,399 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"id": "9428ca92",
6+
"metadata": {},
7+
"source": [
8+
"# AsyncFlow — MMc Theory vs Simulation (Guided Notebook)\n",
9+
"\n",
10+
"This notebook shows how to:\n",
11+
"\n",
12+
"1. Make imports work inside a notebook (src-layout or package install)\n",
13+
"2. Build a **multi-server** scenario compatible with **M/M/c** assumptions\n",
14+
"3. Run the simulation and collect results\n",
15+
"4. Compare theory vs observed KPIs (pretty-printed table)\n",
16+
"5. Plot the standard dashboards (latency, throughput, server time series)\n",
17+
"\n",
18+
"\n"
19+
]
20+
},
21+
{
22+
"cell_type": "code",
23+
"execution_count": 96,
24+
"id": "3e168d4a",
25+
"metadata": {},
26+
"outputs": [],
27+
"source": [
28+
"import sys, importlib\n",
29+
"\n",
30+
"\n",
31+
"for m in list(sys.modules):\n",
32+
" if m.startswith(\"asyncflow\"):\n",
33+
" del sys.modules[m]\n",
34+
"\n",
35+
"\n",
36+
"from asyncflow import AsyncFlow, SimulationRunner\n",
37+
"from asyncflow.analysis import MMc, ResultsAnalyzer\n",
38+
"from asyncflow.components import (\n",
39+
" Client, Server, LinkEdge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
40+
")\n",
41+
"from asyncflow.settings import SimulationSettings\n",
42+
"\n",
43+
"import simpy"
44+
]
45+
},
46+
{
47+
"cell_type": "code",
48+
"execution_count": 97,
49+
"id": "dd39a8e3",
50+
"metadata": {},
51+
"outputs": [
52+
{
53+
"name": "stdout",
54+
"output_type": "stream",
55+
"text": [
56+
"Imports OK.\n"
57+
]
58+
}
59+
],
60+
"source": [
61+
"import matplotlib.pyplot as plt\n",
62+
"import simpy\n",
63+
"\n",
64+
"# Public AsyncFlow API\n",
65+
"from asyncflow import AsyncFlow, SimulationRunner, Sweep\n",
66+
"from asyncflow.components import Client, Server, LinkEdge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
67+
"from asyncflow.settings import SimulationSettings\n",
68+
"from asyncflow.analysis import ResultsAnalyzer, SweepAnalyzer, MMc\n",
69+
"from asyncflow.enums import Distribution\n",
70+
"\n",
71+
"print(\"Imports OK.\")"
72+
]
73+
},
74+
{
75+
"cell_type": "markdown",
76+
"id": "48fbf4f3",
77+
"metadata": {},
78+
"source": [
79+
"## 1) Build an M/M/c split-friendly scenario\n",
80+
"\n",
81+
"* **Multiple identical servers with exponential CPU service**\n",
82+
" Topology includes **\\$c \\geq 2\\$ identical servers**, each exposing exactly **one endpoint** with exactly **one CPU-bound step**.\n",
83+
" Service times follow an **Exponential** distribution with mean \\$E\\[S]\\$ (service rate \\$\\mu = 1/E\\[S]\\$). No RAM/IO steps are included in the pipeline.\n",
84+
"\n",
85+
"* **Load balancer with FCFS dispatch**\n",
86+
"\n",
87+
"* **“Poisson arrivals” via the generator**\n",
88+
" \n",
89+
" \n",
90+
"\n",
91+
"---\n",
92+
"\n",
93+
"```mermaid\n",
94+
"graph LR;\n",
95+
" rqs1[\"<b>RqsGenerator</b><br/>id: rqs-1\"]\n",
96+
" client1[\"<b>Client</b><br/>id: client-1\"]\n",
97+
" lb1[\"<b>LoadBalancer</b><br/>id: lb-1<br/>Policy: round_robin\"]\n",
98+
" app1[\"<b>Server</b><br/>id: app-1<br/>Endpoint: /api\"]\n",
99+
" app2[\"<b>Server</b><br/>id: app-2<br/>Endpoint: /api\"]\n",
100+
"\n",
101+
" rqs1 -- \"Edge: gen-client<br/>Latency: 0.0001\" --> client1;\n",
102+
" client1 -- \"Request<br/>Edge: client-lb<br/>Latency: 0.0001\" --> lb1;\n",
103+
" lb1 -- \"Dispatch<br/>Edge: lb-app1<br/>Latency: 0.0001\" --> app1;\n",
104+
" lb1 -- \"Dispatch<br/>Edge: lb-app2<br/>Latency: 0.0001\" --> app2;\n",
105+
" app1 -- \"Response<br/>Edge: app1-client<br/>Latency: 0.0001\" --> client1;\n",
106+
" app2 -- \"Response<br/>Edge: app2-client<br/>Latency: 0.0001\" --> client1;\n",
107+
"```\n",
108+
"\n",
109+
"---\n",
110+
"\n"
111+
]
112+
},
113+
{
114+
"cell_type": "code",
115+
"execution_count": 98,
116+
"id": "d2937e5e",
117+
"metadata": {},
118+
"outputs": [],
119+
"source": [
120+
"def build_payload():\n",
121+
" generator = ArrivalsGenerator(\n",
122+
" id=\"rqs-1\",\n",
123+
" lambda_rps=270,\n",
124+
" model=Distribution.POISSON\n",
125+
" )\n",
126+
"\n",
127+
" client = Client(id=\"client-1\")\n",
128+
"\n",
129+
" endpoint = Endpoint(\n",
130+
" endpoint_name=\"/api\",\n",
131+
" probability=1.0,\n",
132+
" steps=[\n",
133+
" {\n",
134+
" \"kind\": \"initial_parsing\",\n",
135+
" \"step_operation\": {\n",
136+
" \"cpu_time\": {\"mean\": 0.01, \"distribution\": \"exponential\"},\n",
137+
" },\n",
138+
" },\n",
139+
" ],\n",
140+
" )\n",
141+
"\n",
142+
" srv1 = Server(\n",
143+
" id=\"srv-1\",\n",
144+
" server_resources={\"cpu_cores\": 1, \"ram_mb\": 2048},\n",
145+
" endpoints=[endpoint],\n",
146+
" )\n",
147+
" srv2 = Server(\n",
148+
" id=\"srv-2\",\n",
149+
" server_resources={\"cpu_cores\": 1, \"ram_mb\": 2048},\n",
150+
" endpoints=[endpoint],\n",
151+
" )\n",
152+
" \n",
153+
" srv3 = Server(\n",
154+
" id=\"srv-3\",\n",
155+
" server_resources={\"cpu_cores\": 1, \"ram_mb\": 2048},\n",
156+
" endpoints=[endpoint],\n",
157+
" )\n",
158+
"\n",
159+
" lb = LoadBalancer(\n",
160+
" id=\"lb-1\",\n",
161+
" algorithms=\"fcfs\", \n",
162+
" server_covered={\"srv-1\", \"srv-2\", \"srv-3\"},\n",
163+
" )\n",
164+
"\n",
165+
" edges = [\n",
166+
" LinkEdge(id=\"gen-client\", source=\"rqs-1\", target=\"client-1\",),\n",
167+
" LinkEdge(id=\"client-lb\", source=\"client-1\", target=\"lb-1\", ),\n",
168+
" LinkEdge(id=\"lb-srv1\", source=\"lb-1\", target=\"srv-1\", ),\n",
169+
" LinkEdge(id=\"lb-srv2\", source=\"lb-1\", target=\"srv-2\", ),\n",
170+
" LinkEdge(id=\"lb-srv3\", source=\"lb-1\", target=\"srv-3\", ),\n",
171+
" LinkEdge(id=\"srv1-client\", source=\"srv-1\", target=\"client-1\",),\n",
172+
" LinkEdge(id=\"srv2-client\", source=\"srv-2\", target=\"client-1\",),\n",
173+
" LinkEdge(id=\"srv3-client\", source=\"srv-3\", target=\"client-1\",),\n",
174+
" ]\n",
175+
"\n",
176+
" settings = SimulationSettings(\n",
177+
" total_simulation_time=3600,\n",
178+
" sample_period_s=0.05,\n",
179+
" )\n",
180+
"\n",
181+
" payload = (\n",
182+
" AsyncFlow()\n",
183+
" .add_arrivals_generator(generator)\n",
184+
" .add_client(client)\n",
185+
" .add_servers(srv1, srv2, srv3)\n",
186+
" .add_load_balancer(lb)\n",
187+
" .add_edges(*edges)\n",
188+
" .add_simulation_settings(settings)\n",
189+
" ).build_payload()\n",
190+
"\n",
191+
" return payload\n"
192+
]
193+
},
194+
{
195+
"cell_type": "markdown",
196+
"id": "7682861f",
197+
"metadata": {},
198+
"source": [
199+
"## 2) Run the simulation"
200+
]
201+
},
202+
{
203+
"cell_type": "code",
204+
"execution_count": 99,
205+
"id": "d0634bc8",
206+
"metadata": {},
207+
"outputs": [
208+
{
209+
"name": "stdout",
210+
"output_type": "stream",
211+
"text": [
212+
"Done.\n"
213+
]
214+
}
215+
],
216+
"source": [
217+
"payload = build_payload()\n",
218+
"env = simpy.Environment()\n",
219+
"runner = SimulationRunner(env=env, simulation_input=payload)\n",
220+
"results: ResultsAnalyzer = runner.run()\n",
221+
"print(\"Done.\")\n"
222+
]
223+
},
224+
{
225+
"cell_type": "markdown",
226+
"id": "e5fe2a4a",
227+
"metadata": {},
228+
"source": [
229+
"# 3) M/M/c (FCFS) — theory vs observed comparison\n",
230+
"\n",
231+
"This section shows how we compute the **theoretical Erlang-C KPIs** (pooled queue, FCFS) and compare them against **simulation estimates**.\n",
232+
"\n",
233+
"---\n",
234+
"\n",
235+
"## Variables\n",
236+
"\n",
237+
"* **$c$**: number of identical servers.\n",
238+
"* **$\\lambda$**: global arrival rate (req/s).\n",
239+
"* **$\\mu$**: per-server service rate (req/s), $\\mu = 1/\\mathbb{E}[S]$.\n",
240+
"* **$\\rho$**: global utilization, $\\rho = \\lambda/(c\\mu)$.\n",
241+
"* **$W$**: mean time in system (queue + service).\n",
242+
"* **$W_q$**: mean waiting time in queue.\n",
243+
"* **$L$**: mean number in system.\n",
244+
"* **$L_q$**: mean number in queue.\n",
245+
"\n",
246+
"---\n",
247+
"\n",
248+
"## Theory (Erlang-C formulas)\n",
249+
"\n",
250+
"We assume **Poisson arrivals** for $\\lambda$ (taken directly from the payload).\n",
251+
"\n",
252+
"1. Offered load:\n",
253+
"\n",
254+
"$$\n",
255+
"a = \\frac{\\lambda}{\\mu}\n",
256+
"$$\n",
257+
"\n",
258+
"2. Probability system is empty:\n",
259+
"\n",
260+
"$$\n",
261+
"P_0 = \\left[\\sum_{n=0}^{c-1}\\frac{a^n}{n!} + \\frac{a^c}{c!\\,(1-\\rho)}\\right]^{-1}\n",
262+
"$$\n",
263+
"\n",
264+
"3. Probability of waiting (Erlang-C):\n",
265+
"\n",
266+
"$$\n",
267+
"P_w = \\frac{a^c}{c!\\,(1-\\rho)} \\, P_0\n",
268+
"$$\n",
269+
"\n",
270+
"4. Queue length and waiting:\n",
271+
"\n",
272+
"$$\n",
273+
"L_q = P_w \\cdot \\frac{\\rho}{1-\\rho}, \\qquad\n",
274+
"W_q = \\frac{L_q}{\\lambda}\n",
275+
"$$\n",
276+
"\n",
277+
"5. Total response time and system size:\n",
278+
"\n",
279+
"$$\n",
280+
"W = W_q + \\frac{1}{\\mu}, \\qquad\n",
281+
"L = \\lambda W\n",
282+
"$$\n",
283+
"\n",
284+
"If $\\rho \\ge 1$, the system is unstable and all metrics diverge to $+\\infty$.\n",
285+
"\n",
286+
"---\n",
287+
"\n",
288+
"## Observed (from simulation)\n",
289+
"\n",
290+
"After processing metrics:\n",
291+
"\n",
292+
"1. **Arrival rate**:\n",
293+
"\n",
294+
"$$\n",
295+
"\\lambda_{\\text{Observed}} = \\text{mean throughput (client completions)}\n",
296+
"$$\n",
297+
"\n",
298+
"2. **Service rate**:\n",
299+
"\n",
300+
"$$\n",
301+
"\\mu_{\\text{Observed}} = 1 / \\overline{S}, \\quad \\overline{S} = \\text{mean(service\\_time)}\n",
302+
"$$\n",
303+
"\n",
304+
"3. **End-to-end latency**:\n",
305+
"\n",
306+
"$$\n",
307+
"W_{\\text{Observed}} = \\text{mean(client latencies)}\n",
308+
"$$\n",
309+
"\n",
310+
"4. **Waiting time**:\n",
311+
"\n",
312+
"$$\n",
313+
"W_{q,\\text{Observed}} = \\text{mean(waiting\\_time)} \n",
314+
"$$\n",
315+
"\n",
316+
"5. **Little’s law check**:\n",
317+
"\n",
318+
"$$\n",
319+
"L_{\\text{Observed}} = \\lambda_{\\text{Observed}} W_{\\text{Observed}}, \\qquad\n",
320+
"L_{q,\\text{Observed}} = \\lambda_{\\text{Observed}} W_{q,\\text{Observed}}\n",
321+
"$$\n",
322+
"\n",
323+
"6. **Utilization**:\n",
324+
"\n",
325+
"$$\n",
326+
"\\rho_{\\text{Observed}} = \\lambda_{\\text{Observed}}/(c\\,\\mu_{\\text{Observed}})\n",
327+
"$$\n",
328+
"\n",
329+
"---\n",
330+
"\n",
331+
"## Comparison\n",
332+
"\n",
333+
"The analyzer builds a table with two columns — **Theory** (Erlang-C closed forms) and **Observed** (empirical estimates) — and reports absolute and relative deltas.\n",
334+
"\n",
335+
"This allows us to verify whether AsyncFlow reproduces the textbook M/M/c (FCFS) predictions under Poisson arrivals and exponential service.\n",
336+
"\n",
337+
"\n"
338+
]
339+
},
340+
{
341+
"cell_type": "code",
342+
"execution_count": 100,
343+
"id": "ccd7379b",
344+
"metadata": {},
345+
"outputs": [
346+
{
347+
"name": "stdout",
348+
"output_type": "stream",
349+
"text": [
350+
"=================================================================\n",
351+
"MMc (FCFS/Erlang-C) — Theory vs Observed\n",
352+
"-----------------------------------------------------------------\n",
353+
"sym metric theory observed abs rel%\n",
354+
"-----------------------------------------------------------------\n",
355+
"λ Arrival rate (1/s) 270.000000 270.258333 0.258333 0.10\n",
356+
"μ Service rate (1/s) 100.000000 100.036707 0.036707 0.04\n",
357+
"rho Utilization 0.900000 0.900531 0.000531 0.06\n",
358+
"L Mean items in sys 10.053549 10.073544 0.019994 0.20\n",
359+
"Lq Mean items in queue 7.353549 7.371934 0.018385 0.25\n",
360+
"W Mean time in sys (s) 0.037235 0.037274 0.000038 0.10\n",
361+
"Wq Mean waiting (s) 0.027235 0.027277 0.000042 0.15\n",
362+
"=================================================================\n"
363+
]
364+
}
365+
],
366+
"source": [
367+
"mmc = MMc()\n",
368+
"if mmc.is_compatible(payload):\n",
369+
" mmc.print_comparison(payload, results) \n",
370+
"else:\n",
371+
" print(\"Payload is not compatible with M/M/c:\")\n",
372+
" for reason in mmc.explain_incompatibilities(payload):\n",
373+
" print(\" -\", reason)\n",
374+
" \n"
375+
]
376+
}
377+
],
378+
"metadata": {
379+
"kernelspec": {
380+
"display_name": "asyncflow-sim-py3.12 (3.12.3)",
381+
"language": "python",
382+
"name": "python3"
383+
},
384+
"language_info": {
385+
"codemirror_mode": {
386+
"name": "ipython",
387+
"version": 3
388+
},
389+
"file_extension": ".py",
390+
"mimetype": "text/x-python",
391+
"name": "python",
392+
"nbconvert_exporter": "python",
393+
"pygments_lexer": "ipython3",
394+
"version": "3.12.3"
395+
}
396+
},
397+
"nbformat": 4,
398+
"nbformat_minor": 5
399+
}

0 commit comments

Comments
 (0)