Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 91 additions & 1 deletion docs/fastsim-docs/runtime_and_resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,4 +358,94 @@ The client pulls requests from its `client_box`. It then makes a critical decisi

**Design Note & Real-World Analogy:**
The current logic for this decision—`if state.history[-2].component_type != SystemNodes.GENERATOR`—is **fragile**. While it works, it's not robust. A future improvement would be to add a more explicit routing mechanism.
In the real world, the `ClientRuntime` could be a user's **web browser**, a **mobile application**, or even a **Backend-For-Frontend (BFF)** service that both initiates requests and receives the final aggregated responses.
In the real world, the `ClientRuntime` could be a user's **web browser**, a **mobile application**, or even a **Backend-For-Frontend (BFF)** service that both initiates requests and receives the final aggregated responses.

## **5.5 `LoadBalancerRuntime` — The Traffic Cop 🚦**

The **Load Balancer** actor lives in `app/runtime/actors/load_balancer_runtime.py`.
It receives a `RequestState` from the client side, decides **which outbound
edge** should carry it to a server, and immediately forwards the request.

```text
lb_box.get() choose edge transport(state)
┌───────────────┐ ───────────────────────────► ┌────────────────┐
│ LoadBalancer │ │ EdgeRuntime n │
└───────────────┘ ◄───────────────────────────┘ └────────────────┘
```

### **Constructor Parameters**

| Parameter | Meaning |
| ------------- | ----------------------------------------------------------------------- |
| `env` | Shared `simpy.Environment` |
| `lb_config` | Validated `LoadBalancer` schema (ID, chosen algorithm enum) |
| `outer_edges` | Immutable list `list[EdgeRuntime]`, one per target server |
| `lb_box` | `simpy.Store` acting as the inbox for requests arriving from the client |

```python
self._round_robin_index: int = 0 # round-robin pointer (private state)
```

### **Supported Algorithms**

| Enum value (`LbAlgorithmsName`) | Function used | Signature |
| ------------------------------- | --------------------------------------------- | --------- |
| `ROUND_ROBIN` | `round_robin(edges, idx)` → `(edge, new_idx)` | O(1) |
| `LEAST_CONNECTIONS` | `least_connections(edges)` → `edge` | O(N) scan |

*Both helpers live in* `app/runtime/actors/helpers/lb_algorithms.py`.

#### **Why an index and not list rotation?**

`outer_edges` is **shared** with other components (e.g. metrics collector,
tests). Rotating it in-place would mutate a shared object and create
hard-to-trace bugs (aliasing).
Keeping `_round_robin_index` inside the LB runtime preserves immutability while
still advancing the pointer on every request.

### **Process Workflow**

```python
def _forwarder(self) -> Generator[simpy.Event, None, None]:
while True:
state: RequestState = yield self.lb_box.get() # ① wait for a request

state.record_hop(SystemNodes.LOAD_BALANCER,
self.lb_config.id,
self.env.now) # ② trace

if self.lb_config.algorithms is LbAlgorithmsName.ROUND_ROBIN:
edge, self._round_robin_index = round_robin(
self.outer_edges,
self._round_robin_index,
) # ③a choose RR edge
else: # LEAST_CONNECTIONS
edge = least_connections(self.outer_edges) # ③b choose LC edge

edge.transport(state) # ④ forward
```

| Step | What happens | Real-world analogue |
| ---- | ------------------------------------------------------------------------ | ---------------------------------------- |
| ① | Pull next `RequestState` out of `lb_box`. | Socket `accept()` on the LB front-end. |
| ② | Add a `Hop` stamped `LOAD_BALANCER`. | Trace span in NGINX/Envoy access log. |
| ③a | **Round-Robin** – pick `outer_edges[_round_robin_index]`, advance index. | Classic DNS-RR or NGINX default. |
| ③b | **Least-Connections** – `min(edges, key=concurrent_connections)`. | HAProxy `leastconn`, NGINX `least_conn`. |
| ④ | Spawn network transit via `EdgeRuntime.transport()`. | LB writes request to backend socket. |

### **Edge-Case Safety**

* **Empty `outer_edges`** → impossible by schema validation (LB must cover >1 server).
* **Single server** → RR degenerates to index 0; LC always returns that edge.
* **Concurrency metric** (`edge.concurrent_connections`) is updated inside
`EdgeRuntime` in real time, so `least_connections` adapts instantly to load spikes.

### **Key Takeaways**

1. **Stateful but side-effect-free** – `_round_robin_index` keeps per-LB state without mutating the shared edge list.
2. **Uniform API** – both algorithms integrate through a simple `if/else`; additional strategies can be added with negligible changes.
3. **Deterministic & reproducible** – no randomness inside the LB, ensuring repeatable simulations.

With these mechanics the `LoadBalancerRuntime` faithfully emulates behaviour of
production LBs (NGINX, HAProxy, AWS ALB) while remaining lightweight and
fully deterministic inside the FastSim event loop.
14 changes: 10 additions & 4 deletions src/app/config/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,16 @@ class NetworkParameters:
DROPOUT_RATE = 0.01
MAX_DROPOUT_RATE = 1.0

# ======================================================================
# NAME FOR LOAD BALANCER ALGORITHMS
# ======================================================================

class LbAlgorithmsName(StrEnum):
"""definition of the available algortithms for the Load Balancer"""

ROUND_ROBIN = "round_robin"
LEAST_CONNECTIONS = "least_connection"


# ======================================================================
# CONSTANTS FOR THE MACRO-TOPOLOGY GRAPH
Expand All @@ -178,10 +188,6 @@ class SystemNodes(StrEnum):
SERVER = "server"
CLIENT = "client"
LOAD_BALANCER = "load_balancer"
API_GATEWAY = "api_gateway"
DATABASE = "database"
CACHE = "cache"


class SystemEdges(StrEnum):
"""
Expand Down
53 changes: 30 additions & 23 deletions src/app/metrics/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
RAM_PLOT,
SERVER_QUEUES_PLOT,
THROUGHPUT_PLOT,
PlotCfg,
)

if TYPE_CHECKING:

from collections.abc import Iterable

from matplotlib.axes import Axes
from matplotlib.lines import Line2D

from app.runtime.actors.client import ClientRuntime
from app.runtime.actors.edge import EdgeRuntime
Expand All @@ -35,7 +39,7 @@ class ResultsAnalyzer:
"""

# Class attribute to define the period to calculate the throughput in s
_WINDOW_SIZE_S: float = 1
_WINDOW_SIZE_S: float = 1.0

def __init__(
self,
Expand Down Expand Up @@ -64,6 +68,22 @@ def __init__(
self.throughput_series: tuple[list[float], list[float]] | None = None
self.sampled_metrics: dict[str, dict[str, list[float]]] | None = None

@staticmethod
def _apply_plot_cfg(
ax: Axes,
cfg: PlotCfg,
*,
legend_handles: Iterable[Line2D] | None = None,
) -> None:
"""Apply title / axis labels / grid and (optionally) legend to ax."""
ax.set_title(cfg.title)
ax.set_xlabel(cfg.x_label)
ax.set_ylabel(cfg.y_label)
ax.grid(visible=True)

if legend_handles:
ax.legend(handles=legend_handles)

def process_all_metrics(self) -> None:
"""Compute all aggregated and sampled metrics if not already done."""
if self.latency_stats is None and self._client.rqs_clock:
Expand Down Expand Up @@ -150,32 +170,26 @@ def get_sampled_metrics(self) -> dict[str, dict[str, list[float]]]:
return self.sampled_metrics

def plot_latency_distribution(self, ax: Axes) -> None:
"""Draw a histogram of request latencies onto the given Axes."""
"""Plot the distribution of the latency"""
if not self.latencies:
ax.text(0.5, 0.5, LATENCY_PLOT.no_data, ha="center", va="center")
return

ax.hist(self.latencies, bins=50)
ax.set_title(LATENCY_PLOT.title)
ax.set_xlabel(LATENCY_PLOT.x_label)
ax.set_ylabel(LATENCY_PLOT.y_label)
ax.grid(visible=True)
self._apply_plot_cfg(ax, LATENCY_PLOT)

def plot_throughput(self, ax: Axes) -> None:
"""Draw throughput (RPS) over time onto the given Axes."""
"""Plot the distribution of the throughput"""
timestamps, values = self.get_throughput_series()
if not timestamps:
ax.text(0.5, 0.5, THROUGHPUT_PLOT.no_data, ha="center", va="center")
return

ax.plot(timestamps, values, marker="o", linestyle="-")
ax.set_title(THROUGHPUT_PLOT.title)
ax.set_xlabel(THROUGHPUT_PLOT.x_label)
ax.set_ylabel(THROUGHPUT_PLOT.y_label)
ax.grid(visible=True)
self._apply_plot_cfg(ax, THROUGHPUT_PLOT)

def plot_server_queues(self, ax: Axes) -> None:
"""Draw server queue lengths over time onto the given Axes."""
"""Plot the server queues"""
metrics = self.get_sampled_metrics()
ready = metrics.get(SampledMetricName.READY_QUEUE_LEN, {})
io_q = metrics.get(SampledMetricName.EVENT_LOOP_IO_SLEEP, {})
Expand All @@ -197,14 +211,11 @@ def plot_server_queues(self, ax: Axes) -> None:
linestyle="--",
)

ax.set_title(SERVER_QUEUES_PLOT.title)
ax.set_xlabel(SERVER_QUEUES_PLOT.x_label)
ax.set_ylabel(SERVER_QUEUES_PLOT.y_label)
ax.legend()
ax.grid(visible=True)
self._apply_plot_cfg(ax, SERVER_QUEUES_PLOT, legend_handles=ax.lines)


def plot_ram_usage(self, ax: Axes) -> None:
"""Draw RAM usage over time onto the given Axes."""
"""Plot the ram usage"""
metrics = self.get_sampled_metrics()
ram = metrics.get(SampledMetricName.RAM_IN_USE, {})

Expand All @@ -218,8 +229,4 @@ def plot_ram_usage(self, ax: Axes) -> None:
for sid, vals in ram.items():
ax.plot(times, vals, label=f"{sid} {RAM_PLOT.legend_label}")

ax.set_title(RAM_PLOT.title)
ax.set_xlabel(RAM_PLOT.x_label)
ax.set_ylabel(RAM_PLOT.y_label)
ax.legend()
ax.grid(visible=True)
self._apply_plot_cfg(ax, RAM_PLOT, legend_handles=ax.lines)
4 changes: 3 additions & 1 deletion src/app/metrics/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ def _build_time_series(self) -> Generator[simpy.Event, None, None]:
server.enabled_metrics[self._io_key].append(server.io_queue_len)
server.enabled_metrics[self._ready_key].append(server.ready_queue_len)


def start(self) -> simpy.Process:
"""Definition of the process to collect sampled metrics"""
return self.env.process(self._build_time_series())



Expand Down
1 change: 1 addition & 0 deletions src/app/resources/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ResourcesRuntime:

def __init__(
self,
*,
env: simpy.Environment,
data: TopologyGraph,

Expand Down
4 changes: 3 additions & 1 deletion src/app/runtime/actors/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ class ClientRuntime:

def __init__(
self,
*,
env: simpy.Environment,
out_edge: EdgeRuntime,
out_edge: EdgeRuntime | None,
client_box: simpy.Store,
completed_box: simpy.Store,
client_config: Client,
Expand All @@ -41,6 +42,7 @@ def __init__(

def _forwarder(self) -> Generator[simpy.Event, None, None]:
"""Updtate the state before passing it to another node"""
assert self.out_edge is not None
while True:
state: RequestState = yield self.client_box.get() # type: ignore[assignment]

Expand Down
30 changes: 30 additions & 0 deletions src/app/runtime/actors/helpers/lb_algorithms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""algorithms to simulate the load balancer during the simulation"""



from app.runtime.actors.edge import EdgeRuntime


def least_connections(list_edges: list[EdgeRuntime]) -> EdgeRuntime:
"""We send the state to the edge with less concurrent connections"""
concurrent_connections = [edge.concurrent_connections for edge in list_edges]

idx_min = concurrent_connections.index(min(concurrent_connections))

return list_edges[idx_min]

def round_robin(edges: list[EdgeRuntime], idx: int) -> tuple[EdgeRuntime, int]:
"""
We send states to different server in uniform way by
rotating the list of edges that should transport the state
to the correct server, we rotate the index and not the list
to avoid aliasing since the list is shared by many components
"""
idx %= len(edges)
chosen = edges[idx]
idx = (idx + 1) % len(edges)
return chosen, idx




73 changes: 73 additions & 0 deletions src/app/runtime/actors/load_balancer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Definition of the node represented by the LB in the simulation"""

from collections.abc import Generator
from typing import TYPE_CHECKING

import simpy

from app.config.constants import LbAlgorithmsName, SystemNodes
from app.runtime.actors.edge import EdgeRuntime
from app.runtime.actors.helpers.lb_algorithms import (
least_connections,
round_robin,
)
from app.schemas.system_topology.full_system_topology import LoadBalancer

if TYPE_CHECKING:
from app.runtime.rqs_state import RequestState



class LoadBalancerRuntime:
"""class to define the behaviour of the LB in the simulation"""

def __init__(
self,
*,
env: simpy.Environment,
lb_config: LoadBalancer,
out_edges: list[EdgeRuntime] | None,
lb_box: simpy.Store,
) -> None:
"""
Descriprion of the instance attributes for the class
Args:
env (simpy.Environment): env of the simulation
lb_config (LoadBalancer): input to define the lb in the runtime
rqs_state (RequestState): state of the simulation
out_edges (list[EdgeRuntime]): list of edges that connects lb with servers
lb_box (simpy.Store): store to add the state

"""
self.env = env
self.lb_config = lb_config
self.out_edges = out_edges
self.lb_box = lb_box
self._round_robin_index: int = 0


def _forwarder(self) -> Generator[simpy.Event, None, None]:
"""Updtate the state before passing it to another node"""
assert self.out_edges is not None
while True:
state: RequestState = yield self.lb_box.get() # type: ignore[assignment]

state.record_hop(
SystemNodes.LOAD_BALANCER,
self.lb_config.id,
self.env.now,
)

if self.lb_config.algorithms == LbAlgorithmsName.ROUND_ROBIN:
out_edge, self._round_robin_index = round_robin(
self.out_edges,
self._round_robin_index,
)
else:
out_edge = least_connections(self.out_edges)

out_edge.transport(state)

def start(self) -> simpy.Process:
"""Initialization of the simpy process for the LB"""
return self.env.process(self._forwarder())
Loading