Skip to content

Commit 84f9897

Browse files
authored
Features/load balancer node (#9)
* Added LB structure in the input for the simulation + tests * introduced LB runtime + documentation * Added test for LB and small refactor to define the SimRunner
1 parent 7f1f7de commit 84f9897

File tree

19 files changed

+547
-34
lines changed

19 files changed

+547
-34
lines changed

docs/fastsim-docs/runtime_and_resources.md

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,4 +358,94 @@ The client pulls requests from its `client_box`. It then makes a critical decisi
358358

359359
**Design Note & Real-World Analogy:**
360360
The current logic for this decision—`if state.history[-2].component_type != SystemNodes.GENERATOR`—is **fragile**. While it works, it's not robust. A future improvement would be to add a more explicit routing mechanism.
361-
In the real world, the `ClientRuntime` could be a user's **web browser**, a **mobile application**, or even a **Backend-For-Frontend (BFF)** service that both initiates requests and receives the final aggregated responses.
361+
In the real world, the `ClientRuntime` could be a user's **web browser**, a **mobile application**, or even a **Backend-For-Frontend (BFF)** service that both initiates requests and receives the final aggregated responses.
362+
363+
## **5.5 `LoadBalancerRuntime` — The Traffic Cop 🚦**
364+
365+
The **Load Balancer** actor lives in `app/runtime/actors/load_balancer_runtime.py`.
366+
It receives a `RequestState` from the client side, decides **which outbound
367+
edge** should carry it to a server, and immediately forwards the request.
368+
369+
```text
370+
lb_box.get() choose edge transport(state)
371+
┌───────────────┐ ───────────────────────────► ┌────────────────┐
372+
│ LoadBalancer │ │ EdgeRuntime n │
373+
└───────────────┘ ◄───────────────────────────┘ └────────────────┘
374+
```
375+
376+
### **Constructor Parameters**
377+
378+
| Parameter | Meaning |
379+
| ------------- | ----------------------------------------------------------------------- |
380+
| `env` | Shared `simpy.Environment` |
381+
| `lb_config` | Validated `LoadBalancer` schema (ID, chosen algorithm enum) |
382+
| `outer_edges` | Immutable list `list[EdgeRuntime]`, one per target server |
383+
| `lb_box` | `simpy.Store` acting as the inbox for requests arriving from the client |
384+
385+
```python
386+
self._round_robin_index: int = 0 # round-robin pointer (private state)
387+
```
388+
389+
### **Supported Algorithms**
390+
391+
| Enum value (`LbAlgorithmsName`) | Function used | Signature |
392+
| ------------------------------- | --------------------------------------------- | --------- |
393+
| `ROUND_ROBIN` | `round_robin(edges, idx)``(edge, new_idx)` | O(1) |
394+
| `LEAST_CONNECTIONS` | `least_connections(edges)``edge` | O(N) scan |
395+
396+
*Both helpers live in* `app/runtime/actors/helpers/lb_algorithms.py`.
397+
398+
#### **Why an index and not list rotation?**
399+
400+
`outer_edges` is **shared** with other components (e.g. metrics collector,
401+
tests). Rotating it in-place would mutate a shared object and create
402+
hard-to-trace bugs (aliasing).
403+
Keeping `_round_robin_index` inside the LB runtime preserves immutability while
404+
still advancing the pointer on every request.
405+
406+
### **Process Workflow**
407+
408+
```python
409+
def _forwarder(self) -> Generator[simpy.Event, None, None]:
410+
while True:
411+
state: RequestState = yield self.lb_box.get() # ① wait for a request
412+
413+
state.record_hop(SystemNodes.LOAD_BALANCER,
414+
self.lb_config.id,
415+
self.env.now) # ② trace
416+
417+
if self.lb_config.algorithms is LbAlgorithmsName.ROUND_ROBIN:
418+
edge, self._round_robin_index = round_robin(
419+
self.outer_edges,
420+
self._round_robin_index,
421+
) # ③a choose RR edge
422+
else: # LEAST_CONNECTIONS
423+
edge = least_connections(self.outer_edges) # ③b choose LC edge
424+
425+
edge.transport(state) # ④ forward
426+
```
427+
428+
| Step | What happens | Real-world analogue |
429+
| ---- | ------------------------------------------------------------------------ | ---------------------------------------- |
430+
|| Pull next `RequestState` out of `lb_box`. | Socket `accept()` on the LB front-end. |
431+
|| Add a `Hop` stamped `LOAD_BALANCER`. | Trace span in NGINX/Envoy access log. |
432+
| ③a | **Round-Robin** – pick `outer_edges[_round_robin_index]`, advance index. | Classic DNS-RR or NGINX default. |
433+
| ③b | **Least-Connections**`min(edges, key=concurrent_connections)`. | HAProxy `leastconn`, NGINX `least_conn`. |
434+
|| Spawn network transit via `EdgeRuntime.transport()`. | LB writes request to backend socket. |
435+
436+
### **Edge-Case Safety**
437+
438+
* **Empty `outer_edges`** → impossible by schema validation (LB must cover >1 server).
439+
* **Single server** → RR degenerates to index 0; LC always returns that edge.
440+
* **Concurrency metric** (`edge.concurrent_connections`) is updated inside
441+
`EdgeRuntime` in real time, so `least_connections` adapts instantly to load spikes.
442+
443+
### **Key Takeaways**
444+
445+
1. **Stateful but side-effect-free**`_round_robin_index` keeps per-LB state without mutating the shared edge list.
446+
2. **Uniform API** – both algorithms integrate through a simple `if/else`; additional strategies can be added with negligible changes.
447+
3. **Deterministic & reproducible** – no randomness inside the LB, ensuring repeatable simulations.
448+
449+
With these mechanics the `LoadBalancerRuntime` faithfully emulates behaviour of
450+
production LBs (NGINX, HAProxy, AWS ALB) while remaining lightweight and
451+
fully deterministic inside the FastSim event loop.

src/app/config/constants.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,16 @@ class NetworkParameters:
161161
DROPOUT_RATE = 0.01
162162
MAX_DROPOUT_RATE = 1.0
163163

164+
# ======================================================================
165+
# NAME FOR LOAD BALANCER ALGORITHMS
166+
# ======================================================================
167+
168+
class LbAlgorithmsName(StrEnum):
169+
"""definition of the available algortithms for the Load Balancer"""
170+
171+
ROUND_ROBIN = "round_robin"
172+
LEAST_CONNECTIONS = "least_connection"
173+
164174

165175
# ======================================================================
166176
# CONSTANTS FOR THE MACRO-TOPOLOGY GRAPH
@@ -178,10 +188,6 @@ class SystemNodes(StrEnum):
178188
SERVER = "server"
179189
CLIENT = "client"
180190
LOAD_BALANCER = "load_balancer"
181-
API_GATEWAY = "api_gateway"
182-
DATABASE = "database"
183-
CACHE = "cache"
184-
185191

186192
class SystemEdges(StrEnum):
187193
"""

src/app/metrics/analyzer.py

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,15 @@
1313
RAM_PLOT,
1414
SERVER_QUEUES_PLOT,
1515
THROUGHPUT_PLOT,
16+
PlotCfg,
1617
)
1718

1819
if TYPE_CHECKING:
1920

21+
from collections.abc import Iterable
22+
2023
from matplotlib.axes import Axes
24+
from matplotlib.lines import Line2D
2125

2226
from app.runtime.actors.client import ClientRuntime
2327
from app.runtime.actors.edge import EdgeRuntime
@@ -35,7 +39,7 @@ class ResultsAnalyzer:
3539
"""
3640

3741
# Class attribute to define the period to calculate the throughput in s
38-
_WINDOW_SIZE_S: float = 1
42+
_WINDOW_SIZE_S: float = 1.0
3943

4044
def __init__(
4145
self,
@@ -64,6 +68,22 @@ def __init__(
6468
self.throughput_series: tuple[list[float], list[float]] | None = None
6569
self.sampled_metrics: dict[str, dict[str, list[float]]] | None = None
6670

71+
@staticmethod
72+
def _apply_plot_cfg(
73+
ax: Axes,
74+
cfg: PlotCfg,
75+
*,
76+
legend_handles: Iterable[Line2D] | None = None,
77+
) -> None:
78+
"""Apply title / axis labels / grid and (optionally) legend to ax."""
79+
ax.set_title(cfg.title)
80+
ax.set_xlabel(cfg.x_label)
81+
ax.set_ylabel(cfg.y_label)
82+
ax.grid(visible=True)
83+
84+
if legend_handles:
85+
ax.legend(handles=legend_handles)
86+
6787
def process_all_metrics(self) -> None:
6888
"""Compute all aggregated and sampled metrics if not already done."""
6989
if self.latency_stats is None and self._client.rqs_clock:
@@ -150,32 +170,26 @@ def get_sampled_metrics(self) -> dict[str, dict[str, list[float]]]:
150170
return self.sampled_metrics
151171

152172
def plot_latency_distribution(self, ax: Axes) -> None:
153-
"""Draw a histogram of request latencies onto the given Axes."""
173+
"""Plot the distribution of the latency"""
154174
if not self.latencies:
155175
ax.text(0.5, 0.5, LATENCY_PLOT.no_data, ha="center", va="center")
156176
return
157177

158178
ax.hist(self.latencies, bins=50)
159-
ax.set_title(LATENCY_PLOT.title)
160-
ax.set_xlabel(LATENCY_PLOT.x_label)
161-
ax.set_ylabel(LATENCY_PLOT.y_label)
162-
ax.grid(visible=True)
179+
self._apply_plot_cfg(ax, LATENCY_PLOT)
163180

164181
def plot_throughput(self, ax: Axes) -> None:
165-
"""Draw throughput (RPS) over time onto the given Axes."""
182+
"""Plot the distribution of the throughput"""
166183
timestamps, values = self.get_throughput_series()
167184
if not timestamps:
168185
ax.text(0.5, 0.5, THROUGHPUT_PLOT.no_data, ha="center", va="center")
169186
return
170187

171188
ax.plot(timestamps, values, marker="o", linestyle="-")
172-
ax.set_title(THROUGHPUT_PLOT.title)
173-
ax.set_xlabel(THROUGHPUT_PLOT.x_label)
174-
ax.set_ylabel(THROUGHPUT_PLOT.y_label)
175-
ax.grid(visible=True)
189+
self._apply_plot_cfg(ax, THROUGHPUT_PLOT)
176190

177191
def plot_server_queues(self, ax: Axes) -> None:
178-
"""Draw server queue lengths over time onto the given Axes."""
192+
"""Plot the server queues"""
179193
metrics = self.get_sampled_metrics()
180194
ready = metrics.get(SampledMetricName.READY_QUEUE_LEN, {})
181195
io_q = metrics.get(SampledMetricName.EVENT_LOOP_IO_SLEEP, {})
@@ -197,14 +211,11 @@ def plot_server_queues(self, ax: Axes) -> None:
197211
linestyle="--",
198212
)
199213

200-
ax.set_title(SERVER_QUEUES_PLOT.title)
201-
ax.set_xlabel(SERVER_QUEUES_PLOT.x_label)
202-
ax.set_ylabel(SERVER_QUEUES_PLOT.y_label)
203-
ax.legend()
204-
ax.grid(visible=True)
214+
self._apply_plot_cfg(ax, SERVER_QUEUES_PLOT, legend_handles=ax.lines)
215+
205216

206217
def plot_ram_usage(self, ax: Axes) -> None:
207-
"""Draw RAM usage over time onto the given Axes."""
218+
"""Plot the ram usage"""
208219
metrics = self.get_sampled_metrics()
209220
ram = metrics.get(SampledMetricName.RAM_IN_USE, {})
210221

@@ -218,8 +229,4 @@ def plot_ram_usage(self, ax: Axes) -> None:
218229
for sid, vals in ram.items():
219230
ax.plot(times, vals, label=f"{sid} {RAM_PLOT.legend_label}")
220231

221-
ax.set_title(RAM_PLOT.title)
222-
ax.set_xlabel(RAM_PLOT.x_label)
223-
ax.set_ylabel(RAM_PLOT.y_label)
224-
ax.legend()
225-
ax.grid(visible=True)
232+
self._apply_plot_cfg(ax, RAM_PLOT, legend_handles=ax.lines)

src/app/metrics/collector.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ def _build_time_series(self) -> Generator[simpy.Event, None, None]:
6161
server.enabled_metrics[self._io_key].append(server.io_queue_len)
6262
server.enabled_metrics[self._ready_key].append(server.ready_queue_len)
6363

64-
64+
def start(self) -> simpy.Process:
65+
"""Definition of the process to collect sampled metrics"""
66+
return self.env.process(self._build_time_series())
6567

6668

6769

src/app/resources/registry.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class ResourcesRuntime:
1818

1919
def __init__(
2020
self,
21+
*,
2122
env: simpy.Environment,
2223
data: TopologyGraph,
2324

src/app/runtime/actors/client.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ class ClientRuntime:
2020

2121
def __init__(
2222
self,
23+
*,
2324
env: simpy.Environment,
24-
out_edge: EdgeRuntime,
25+
out_edge: EdgeRuntime | None,
2526
client_box: simpy.Store,
2627
completed_box: simpy.Store,
2728
client_config: Client,
@@ -41,6 +42,7 @@ def __init__(
4142

4243
def _forwarder(self) -> Generator[simpy.Event, None, None]:
4344
"""Updtate the state before passing it to another node"""
45+
assert self.out_edge is not None
4446
while True:
4547
state: RequestState = yield self.client_box.get() # type: ignore[assignment]
4648

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
"""algorithms to simulate the load balancer during the simulation"""
2+
3+
4+
5+
from app.runtime.actors.edge import EdgeRuntime
6+
7+
8+
def least_connections(list_edges: list[EdgeRuntime]) -> EdgeRuntime:
9+
"""We send the state to the edge with less concurrent connections"""
10+
concurrent_connections = [edge.concurrent_connections for edge in list_edges]
11+
12+
idx_min = concurrent_connections.index(min(concurrent_connections))
13+
14+
return list_edges[idx_min]
15+
16+
def round_robin(edges: list[EdgeRuntime], idx: int) -> tuple[EdgeRuntime, int]:
17+
"""
18+
We send states to different server in uniform way by
19+
rotating the list of edges that should transport the state
20+
to the correct server, we rotate the index and not the list
21+
to avoid aliasing since the list is shared by many components
22+
"""
23+
idx %= len(edges)
24+
chosen = edges[idx]
25+
idx = (idx + 1) % len(edges)
26+
return chosen, idx
27+
28+
29+
30+
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
"""Definition of the node represented by the LB in the simulation"""
2+
3+
from collections.abc import Generator
4+
from typing import TYPE_CHECKING
5+
6+
import simpy
7+
8+
from app.config.constants import LbAlgorithmsName, SystemNodes
9+
from app.runtime.actors.edge import EdgeRuntime
10+
from app.runtime.actors.helpers.lb_algorithms import (
11+
least_connections,
12+
round_robin,
13+
)
14+
from app.schemas.system_topology.full_system_topology import LoadBalancer
15+
16+
if TYPE_CHECKING:
17+
from app.runtime.rqs_state import RequestState
18+
19+
20+
21+
class LoadBalancerRuntime:
22+
"""class to define the behaviour of the LB in the simulation"""
23+
24+
def __init__(
25+
self,
26+
*,
27+
env: simpy.Environment,
28+
lb_config: LoadBalancer,
29+
out_edges: list[EdgeRuntime] | None,
30+
lb_box: simpy.Store,
31+
) -> None:
32+
"""
33+
Descriprion of the instance attributes for the class
34+
Args:
35+
env (simpy.Environment): env of the simulation
36+
lb_config (LoadBalancer): input to define the lb in the runtime
37+
rqs_state (RequestState): state of the simulation
38+
out_edges (list[EdgeRuntime]): list of edges that connects lb with servers
39+
lb_box (simpy.Store): store to add the state
40+
41+
"""
42+
self.env = env
43+
self.lb_config = lb_config
44+
self.out_edges = out_edges
45+
self.lb_box = lb_box
46+
self._round_robin_index: int = 0
47+
48+
49+
def _forwarder(self) -> Generator[simpy.Event, None, None]:
50+
"""Updtate the state before passing it to another node"""
51+
assert self.out_edges is not None
52+
while True:
53+
state: RequestState = yield self.lb_box.get() # type: ignore[assignment]
54+
55+
state.record_hop(
56+
SystemNodes.LOAD_BALANCER,
57+
self.lb_config.id,
58+
self.env.now,
59+
)
60+
61+
if self.lb_config.algorithms == LbAlgorithmsName.ROUND_ROBIN:
62+
out_edge, self._round_robin_index = round_robin(
63+
self.out_edges,
64+
self._round_robin_index,
65+
)
66+
else:
67+
out_edge = least_connections(self.out_edges)
68+
69+
out_edge.transport(state)
70+
71+
def start(self) -> simpy.Process:
72+
"""Initialization of the simpy process for the LB"""
73+
return self.env.process(self._forwarder())

0 commit comments

Comments
 (0)