diff --git a/docs/fastsim-docs/runtime_and_resources.md b/docs/fastsim-docs/runtime_and_resources.md index 763e0a9..a5dca9c 100644 --- a/docs/fastsim-docs/runtime_and_resources.md +++ b/docs/fastsim-docs/runtime_and_resources.md @@ -358,4 +358,94 @@ The client pulls requests from its `client_box`. It then makes a critical decisi **Design Note & Real-World Analogy:** The current logic for this decisionβ€”`if state.history[-2].component_type != SystemNodes.GENERATOR`β€”is **fragile**. While it works, it's not robust. A future improvement would be to add a more explicit routing mechanism. -In the real world, the `ClientRuntime` could be a user's **web browser**, a **mobile application**, or even a **Backend-For-Frontend (BFF)** service that both initiates requests and receives the final aggregated responses. \ No newline at end of file +In the real world, the `ClientRuntime` could be a user's **web browser**, a **mobile application**, or even a **Backend-For-Frontend (BFF)** service that both initiates requests and receives the final aggregated responses. + +## **5.5 `LoadBalancerRuntime` β€” The Traffic Cop 🚦** + +The **Load Balancer** actor lives in `app/runtime/actors/load_balancer_runtime.py`. +It receives a `RequestState` from the client side, decides **which outbound +edge** should carry it to a server, and immediately forwards the request. + +```text +lb_box.get() choose edge transport(state) +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” ───────────────────────────► β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ LoadBalancer β”‚ β”‚ EdgeRuntime n β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β—„β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +### **Constructor Parameters** + +| Parameter | Meaning | +| ------------- | ----------------------------------------------------------------------- | +| `env` | Shared `simpy.Environment` | +| `lb_config` | Validated `LoadBalancer` schema (ID, chosen algorithm enum) | +| `outer_edges` | Immutable list `list[EdgeRuntime]`, one per target server | +| `lb_box` | `simpy.Store` acting as the inbox for requests arriving from the client | + +```python +self._round_robin_index: int = 0 # round-robin pointer (private state) +``` + +### **Supported Algorithms** + +| Enum value (`LbAlgorithmsName`) | Function used | Signature | +| ------------------------------- | --------------------------------------------- | --------- | +| `ROUND_ROBIN` | `round_robin(edges, idx)` β†’ `(edge, new_idx)` | O(1) | +| `LEAST_CONNECTIONS` | `least_connections(edges)` β†’ `edge` | O(N) scan | + +*Both helpers live in* `app/runtime/actors/helpers/lb_algorithms.py`. + +#### **Why an index and not list rotation?** + +`outer_edges` is **shared** with other components (e.g. metrics collector, +tests). Rotating it in-place would mutate a shared object and create +hard-to-trace bugs (aliasing). +Keeping `_round_robin_index` inside the LB runtime preserves immutability while +still advancing the pointer on every request. + +### **Process Workflow** + +```python +def _forwarder(self) -> Generator[simpy.Event, None, None]: + while True: + state: RequestState = yield self.lb_box.get() # β‘  wait for a request + + state.record_hop(SystemNodes.LOAD_BALANCER, + self.lb_config.id, + self.env.now) # β‘‘ trace + + if self.lb_config.algorithms is LbAlgorithmsName.ROUND_ROBIN: + edge, self._round_robin_index = round_robin( + self.outer_edges, + self._round_robin_index, + ) # β‘’a choose RR edge + else: # LEAST_CONNECTIONS + edge = least_connections(self.outer_edges) # β‘’b choose LC edge + + edge.transport(state) # β‘£ forward +``` + +| Step | What happens | Real-world analogue | +| ---- | ------------------------------------------------------------------------ | ---------------------------------------- | +| β‘  | Pull next `RequestState` out of `lb_box`. | Socket `accept()` on the LB front-end. | +| β‘‘ | Add a `Hop` stamped `LOAD_BALANCER`. | Trace span in NGINX/Envoy access log. | +| β‘’a | **Round-Robin** – pick `outer_edges[_round_robin_index]`, advance index. | Classic DNS-RR or NGINX default. | +| β‘’b | **Least-Connections** – `min(edges, key=concurrent_connections)`. | HAProxy `leastconn`, NGINX `least_conn`. | +| β‘£ | Spawn network transit via `EdgeRuntime.transport()`. | LB writes request to backend socket. | + +### **Edge-Case Safety** + +* **Empty `outer_edges`** β†’ impossible by schema validation (LB must cover >1 server). +* **Single server** β†’ RR degenerates to index 0; LC always returns that edge. +* **Concurrency metric** (`edge.concurrent_connections`) is updated inside + `EdgeRuntime` in real time, so `least_connections` adapts instantly to load spikes. + +### **Key Takeaways** + +1. **Stateful but side-effect-free** – `_round_robin_index` keeps per-LB state without mutating the shared edge list. +2. **Uniform API** – both algorithms integrate through a simple `if/else`; additional strategies can be added with negligible changes. +3. **Deterministic & reproducible** – no randomness inside the LB, ensuring repeatable simulations. + +With these mechanics the `LoadBalancerRuntime` faithfully emulates behaviour of +production LBs (NGINX, HAProxy, AWS ALB) while remaining lightweight and +fully deterministic inside the FastSim event loop. diff --git a/src/app/config/constants.py b/src/app/config/constants.py index 74cf0e2..07e383c 100644 --- a/src/app/config/constants.py +++ b/src/app/config/constants.py @@ -161,6 +161,16 @@ class NetworkParameters: DROPOUT_RATE = 0.01 MAX_DROPOUT_RATE = 1.0 +# ====================================================================== +# NAME FOR LOAD BALANCER ALGORITHMS +# ====================================================================== + +class LbAlgorithmsName(StrEnum): + """definition of the available algortithms for the Load Balancer""" + + ROUND_ROBIN = "round_robin" + LEAST_CONNECTIONS = "least_connection" + # ====================================================================== # CONSTANTS FOR THE MACRO-TOPOLOGY GRAPH @@ -178,10 +188,6 @@ class SystemNodes(StrEnum): SERVER = "server" CLIENT = "client" LOAD_BALANCER = "load_balancer" - API_GATEWAY = "api_gateway" - DATABASE = "database" - CACHE = "cache" - class SystemEdges(StrEnum): """ diff --git a/src/app/metrics/analyzer.py b/src/app/metrics/analyzer.py index 00bbd35..07a021e 100644 --- a/src/app/metrics/analyzer.py +++ b/src/app/metrics/analyzer.py @@ -13,11 +13,15 @@ RAM_PLOT, SERVER_QUEUES_PLOT, THROUGHPUT_PLOT, + PlotCfg, ) if TYPE_CHECKING: + from collections.abc import Iterable + from matplotlib.axes import Axes + from matplotlib.lines import Line2D from app.runtime.actors.client import ClientRuntime from app.runtime.actors.edge import EdgeRuntime @@ -35,7 +39,7 @@ class ResultsAnalyzer: """ # Class attribute to define the period to calculate the throughput in s - _WINDOW_SIZE_S: float = 1 + _WINDOW_SIZE_S: float = 1.0 def __init__( self, @@ -64,6 +68,22 @@ def __init__( self.throughput_series: tuple[list[float], list[float]] | None = None self.sampled_metrics: dict[str, dict[str, list[float]]] | None = None + @staticmethod + def _apply_plot_cfg( + ax: Axes, + cfg: PlotCfg, + *, + legend_handles: Iterable[Line2D] | None = None, + ) -> None: + """Apply title / axis labels / grid and (optionally) legend to ax.""" + ax.set_title(cfg.title) + ax.set_xlabel(cfg.x_label) + ax.set_ylabel(cfg.y_label) + ax.grid(visible=True) + + if legend_handles: + ax.legend(handles=legend_handles) + def process_all_metrics(self) -> None: """Compute all aggregated and sampled metrics if not already done.""" if self.latency_stats is None and self._client.rqs_clock: @@ -150,32 +170,26 @@ def get_sampled_metrics(self) -> dict[str, dict[str, list[float]]]: return self.sampled_metrics def plot_latency_distribution(self, ax: Axes) -> None: - """Draw a histogram of request latencies onto the given Axes.""" + """Plot the distribution of the latency""" if not self.latencies: ax.text(0.5, 0.5, LATENCY_PLOT.no_data, ha="center", va="center") return ax.hist(self.latencies, bins=50) - ax.set_title(LATENCY_PLOT.title) - ax.set_xlabel(LATENCY_PLOT.x_label) - ax.set_ylabel(LATENCY_PLOT.y_label) - ax.grid(visible=True) + self._apply_plot_cfg(ax, LATENCY_PLOT) def plot_throughput(self, ax: Axes) -> None: - """Draw throughput (RPS) over time onto the given Axes.""" + """Plot the distribution of the throughput""" timestamps, values = self.get_throughput_series() if not timestamps: ax.text(0.5, 0.5, THROUGHPUT_PLOT.no_data, ha="center", va="center") return ax.plot(timestamps, values, marker="o", linestyle="-") - ax.set_title(THROUGHPUT_PLOT.title) - ax.set_xlabel(THROUGHPUT_PLOT.x_label) - ax.set_ylabel(THROUGHPUT_PLOT.y_label) - ax.grid(visible=True) + self._apply_plot_cfg(ax, THROUGHPUT_PLOT) def plot_server_queues(self, ax: Axes) -> None: - """Draw server queue lengths over time onto the given Axes.""" + """Plot the server queues""" metrics = self.get_sampled_metrics() ready = metrics.get(SampledMetricName.READY_QUEUE_LEN, {}) io_q = metrics.get(SampledMetricName.EVENT_LOOP_IO_SLEEP, {}) @@ -197,14 +211,11 @@ def plot_server_queues(self, ax: Axes) -> None: linestyle="--", ) - ax.set_title(SERVER_QUEUES_PLOT.title) - ax.set_xlabel(SERVER_QUEUES_PLOT.x_label) - ax.set_ylabel(SERVER_QUEUES_PLOT.y_label) - ax.legend() - ax.grid(visible=True) + self._apply_plot_cfg(ax, SERVER_QUEUES_PLOT, legend_handles=ax.lines) + def plot_ram_usage(self, ax: Axes) -> None: - """Draw RAM usage over time onto the given Axes.""" + """Plot the ram usage""" metrics = self.get_sampled_metrics() ram = metrics.get(SampledMetricName.RAM_IN_USE, {}) @@ -218,8 +229,4 @@ def plot_ram_usage(self, ax: Axes) -> None: for sid, vals in ram.items(): ax.plot(times, vals, label=f"{sid} {RAM_PLOT.legend_label}") - ax.set_title(RAM_PLOT.title) - ax.set_xlabel(RAM_PLOT.x_label) - ax.set_ylabel(RAM_PLOT.y_label) - ax.legend() - ax.grid(visible=True) + self._apply_plot_cfg(ax, RAM_PLOT, legend_handles=ax.lines) diff --git a/src/app/metrics/collector.py b/src/app/metrics/collector.py index ed85126..17339fa 100644 --- a/src/app/metrics/collector.py +++ b/src/app/metrics/collector.py @@ -61,7 +61,9 @@ def _build_time_series(self) -> Generator[simpy.Event, None, None]: server.enabled_metrics[self._io_key].append(server.io_queue_len) server.enabled_metrics[self._ready_key].append(server.ready_queue_len) - + def start(self) -> simpy.Process: + """Definition of the process to collect sampled metrics""" + return self.env.process(self._build_time_series()) diff --git a/src/app/resources/registry.py b/src/app/resources/registry.py index c768cef..02b7585 100644 --- a/src/app/resources/registry.py +++ b/src/app/resources/registry.py @@ -18,6 +18,7 @@ class ResourcesRuntime: def __init__( self, + *, env: simpy.Environment, data: TopologyGraph, diff --git a/src/app/runtime/actors/client.py b/src/app/runtime/actors/client.py index f5d63ac..42b0c3f 100644 --- a/src/app/runtime/actors/client.py +++ b/src/app/runtime/actors/client.py @@ -20,8 +20,9 @@ class ClientRuntime: def __init__( self, + *, env: simpy.Environment, - out_edge: EdgeRuntime, + out_edge: EdgeRuntime | None, client_box: simpy.Store, completed_box: simpy.Store, client_config: Client, @@ -41,6 +42,7 @@ def __init__( def _forwarder(self) -> Generator[simpy.Event, None, None]: """Updtate the state before passing it to another node""" + assert self.out_edge is not None while True: state: RequestState = yield self.client_box.get() # type: ignore[assignment] diff --git a/src/app/runtime/actors/helpers/lb_algorithms.py b/src/app/runtime/actors/helpers/lb_algorithms.py new file mode 100644 index 0000000..90c325e --- /dev/null +++ b/src/app/runtime/actors/helpers/lb_algorithms.py @@ -0,0 +1,30 @@ +"""algorithms to simulate the load balancer during the simulation""" + + + +from app.runtime.actors.edge import EdgeRuntime + + +def least_connections(list_edges: list[EdgeRuntime]) -> EdgeRuntime: + """We send the state to the edge with less concurrent connections""" + concurrent_connections = [edge.concurrent_connections for edge in list_edges] + + idx_min = concurrent_connections.index(min(concurrent_connections)) + + return list_edges[idx_min] + +def round_robin(edges: list[EdgeRuntime], idx: int) -> tuple[EdgeRuntime, int]: + """ + We send states to different server in uniform way by + rotating the list of edges that should transport the state + to the correct server, we rotate the index and not the list + to avoid aliasing since the list is shared by many components + """ + idx %= len(edges) + chosen = edges[idx] + idx = (idx + 1) % len(edges) + return chosen, idx + + + + diff --git a/src/app/runtime/actors/load_balancer.py b/src/app/runtime/actors/load_balancer.py new file mode 100644 index 0000000..b370ff3 --- /dev/null +++ b/src/app/runtime/actors/load_balancer.py @@ -0,0 +1,73 @@ +"""Definition of the node represented by the LB in the simulation""" + +from collections.abc import Generator +from typing import TYPE_CHECKING + +import simpy + +from app.config.constants import LbAlgorithmsName, SystemNodes +from app.runtime.actors.edge import EdgeRuntime +from app.runtime.actors.helpers.lb_algorithms import ( + least_connections, + round_robin, +) +from app.schemas.system_topology.full_system_topology import LoadBalancer + +if TYPE_CHECKING: + from app.runtime.rqs_state import RequestState + + + +class LoadBalancerRuntime: + """class to define the behaviour of the LB in the simulation""" + + def __init__( + self, + *, + env: simpy.Environment, + lb_config: LoadBalancer, + out_edges: list[EdgeRuntime] | None, + lb_box: simpy.Store, + ) -> None: + """ + Descriprion of the instance attributes for the class + Args: + env (simpy.Environment): env of the simulation + lb_config (LoadBalancer): input to define the lb in the runtime + rqs_state (RequestState): state of the simulation + out_edges (list[EdgeRuntime]): list of edges that connects lb with servers + lb_box (simpy.Store): store to add the state + + """ + self.env = env + self.lb_config = lb_config + self.out_edges = out_edges + self.lb_box = lb_box + self._round_robin_index: int = 0 + + + def _forwarder(self) -> Generator[simpy.Event, None, None]: + """Updtate the state before passing it to another node""" + assert self.out_edges is not None + while True: + state: RequestState = yield self.lb_box.get() # type: ignore[assignment] + + state.record_hop( + SystemNodes.LOAD_BALANCER, + self.lb_config.id, + self.env.now, + ) + + if self.lb_config.algorithms == LbAlgorithmsName.ROUND_ROBIN: + out_edge, self._round_robin_index = round_robin( + self.out_edges, + self._round_robin_index, + ) + else: + out_edge = least_connections(self.out_edges) + + out_edge.transport(state) + + def start(self) -> simpy.Process: + """Initialization of the simpy process for the LB""" + return self.env.process(self._forwarder()) diff --git a/src/app/runtime/actors/rqs_generator.py b/src/app/runtime/actors/rqs_generator.py index bc114ba..b666d5d 100644 --- a/src/app/runtime/actors/rqs_generator.py +++ b/src/app/runtime/actors/rqs_generator.py @@ -33,11 +33,11 @@ class RqsGeneratorRuntime: def __init__( self, + *, env: simpy.Environment, out_edge: EdgeRuntime, rqs_generator_data: RqsGeneratorInput, sim_settings: SimulationSettings, - *, rng: np.random.Generator | None = None, ) -> None: """ @@ -96,6 +96,8 @@ def _requests_generator(self) -> Generator[float, None, None]: def _event_arrival(self) -> Generator[simpy.Event, None, None]: """Simulating the process of event generation""" + assert self.out_edge is not None + time_gaps = self._requests_generator() for gap in time_gaps: diff --git a/src/app/runtime/actors/server.py b/src/app/runtime/actors/server.py index 94d6b24..bf6b0b6 100644 --- a/src/app/runtime/actors/server.py +++ b/src/app/runtime/actors/server.py @@ -31,6 +31,7 @@ class ServerRuntime: def __init__( # noqa: PLR0913 self, + *, env: simpy.Environment, server_resources: ServerContainers, server_config: Server, @@ -262,6 +263,7 @@ def _dispatcher(self) -> Generator[simpy.Event, None, None]: The main dispatcher loop. It pulls requests from the inbox and spawns a new '_handle_request' process for each one. """ + assert self.out_edge is not None while True: # Wait for a request to arrive in the server's inbox raw_state = yield self.server_box.get() diff --git a/src/app/schemas/simulation_settings_input.py b/src/app/schemas/simulation_settings_input.py index 80828b5..a9a90a6 100644 --- a/src/app/schemas/simulation_settings_input.py +++ b/src/app/schemas/simulation_settings_input.py @@ -43,3 +43,4 @@ class SimulationSettings(BaseModel): description="constant interval of time to build time series for metrics", ) + diff --git a/src/app/schemas/system_topology/full_system_topology.py b/src/app/schemas/system_topology/full_system_topology.py index 2f29f2a..6b49b21 100644 --- a/src/app/schemas/system_topology/full_system_topology.py +++ b/src/app/schemas/system_topology/full_system_topology.py @@ -18,6 +18,7 @@ ) from app.config.constants import ( + LbAlgorithmsName, NetworkParameters, ServerResourcesDefaults, SystemEdges, @@ -50,7 +51,7 @@ def ensure_type_is_standard(cls, v: SystemNodes) -> SystemNodes: # noqa: N805 return v # ------------------------------------------------------------- -# SERVER RESOURCES EXAMPLE +# SERVER RESOURCES # ------------------------------------------------------------- class ServerResources(BaseModel): @@ -107,6 +108,29 @@ def ensure_type_is_standard(cls, v: SystemNodes) -> SystemNodes: # noqa: N805 raise ValueError(msg) return v +class LoadBalancer(BaseModel): + """ + basemodel for the load balancer + - id: unique name associated to the lb + - type: type of the node in the structure + - server_covered: list of server id connected to the lb + """ + + id: str + type: SystemNodes = SystemNodes.LOAD_BALANCER + algorithms: LbAlgorithmsName = LbAlgorithmsName.ROUND_ROBIN + server_covered: set[str] = Field(default_factory=set) + + + + @field_validator("type", mode="after") + def ensure_type_is_standard(cls, v: SystemNodes) -> SystemNodes: # noqa: N805 + """Ensure the type of the server is standard""" + if v != SystemNodes.LOAD_BALANCER: + msg = f"The type should have a standard value: {SystemNodes.LOAD_BALANCER}" + raise ValueError(msg) + return v + # ------------------------------------------------------------- # NODES CLASS WITH ALL POSSIBLE OBJECTS REPRESENTED BY A NODE # ------------------------------------------------------------- @@ -121,6 +145,7 @@ class TopologyNodes(BaseModel): servers: list[Server] client: Client + load_balancer: LoadBalancer | None = None @model_validator(mode="after") # type: ignore[arg-type] def unique_ids( @@ -236,10 +261,48 @@ def edge_refs_valid(cls, model: "TopologyGraph") -> "TopologyGraph": # noqa: N80 Returning the (unchanged) model signals that the integrity check passed. """ valid_ids = {s.id for s in model.nodes.servers} | {model.nodes.client.id} + if model.nodes.load_balancer is not None: + valid_ids.add(model.nodes.load_balancer.id) + for e in model.edges: if e.source not in valid_ids or e.target not in valid_ids: msg = f"Edge {e.source}->{e.target} references unknown node" raise ValueError(msg) return model + @model_validator(mode="after") # type: ignore[arg-type] + def valid_load_balancer(cls, model: "TopologyGraph") -> "TopologyGraph": # noqa: N805 + """ + Check de validity of the load balancer: first we check + if is present in the simulation, second we check if the LB list + is a proper subset of the server sets of ids, then we check if + edge from LB to the servers are well defined + """ + lb = model.nodes.load_balancer + if lb is None: + return model + + server_ids = {s.id for s in model.nodes.servers} + + # 1) LB list βŠ† server_ids + missing = lb.server_covered - server_ids + if missing: + + msg = (f"Load balancer '{lb.id}'" + "references unknown servers: {sorted(missing)}") + raise ValueError(msg) + + # edge are well defined + targets_from_lb = {e.target for e in model.edges if e.source == lb.id} + not_linked = lb.server_covered - targets_from_lb + if not_linked: + msg = ( + f"Servers {sorted(not_linked)} are covered by LB '{lb.id}' " + "but have no outgoing edge from it." + ) + + raise ValueError(msg) + + return model + diff --git a/tests/unit/resources/test_registry.py b/tests/unit/resources/test_registry.py index fe5a693..6aef76a 100644 --- a/tests/unit/resources/test_registry.py +++ b/tests/unit/resources/test_registry.py @@ -39,7 +39,7 @@ def test_registry_initialises_filled_containers() -> None: """CPU and RAM containers must start full for every server.""" env = simpy.Environment() topo = _build_topology() - registry = ResourcesRuntime(env, topo) + registry = ResourcesRuntime(env=env, data=topo) for srv in topo.nodes.servers: containers = registry[srv.id] @@ -54,7 +54,7 @@ def test_registry_initialises_filled_containers() -> None: def test_getitem_unknown_server_raises_keyerror() -> None: """Accessing an undefined server ID should raise KeyError.""" env = simpy.Environment() - registry = ResourcesRuntime(env, _build_topology()) + registry = ResourcesRuntime(env=env, data=_build_topology()) with pytest.raises(KeyError): _ = registry["non-existent-server"] diff --git a/tests/unit/runtime/engine/test_client.py b/tests/unit/runtime/actors/test_client.py similarity index 100% rename from tests/unit/runtime/engine/test_client.py rename to tests/unit/runtime/actors/test_client.py diff --git a/tests/unit/runtime/engine/test_edge.py b/tests/unit/runtime/actors/test_edge.py similarity index 100% rename from tests/unit/runtime/engine/test_edge.py rename to tests/unit/runtime/actors/test_edge.py diff --git a/tests/unit/runtime/actors/test_load_balancer.py b/tests/unit/runtime/actors/test_load_balancer.py new file mode 100644 index 0000000..35e622a --- /dev/null +++ b/tests/unit/runtime/actors/test_load_balancer.py @@ -0,0 +1,131 @@ +"""Unit tests for ``LoadBalancerRuntime`` (round-robin & least-connections).""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, cast + +import pytest +import simpy + +from app.config.constants import LbAlgorithmsName, SystemNodes +from app.runtime.actors.load_balancer import LoadBalancerRuntime +from app.schemas.system_topology.full_system_topology import LoadBalancer + +if TYPE_CHECKING: + from app.runtime.actors.edge import EdgeRuntime + + + +# --------------------------------------------------------------------------- # +# Dummy objects (lightweight test doubles) # +# --------------------------------------------------------------------------- # +class DummyState: + """Tiny substitute for ``RequestState`` - only ``history`` is needed.""" + + def __init__(self) -> None: + """Instance of the state history""" + self.history: list[str] = [] + + def record_hop(self, comp_type: SystemNodes, comp_id: str, _: float) -> None: + """Append the hop as ``":"``.""" + self.history.append(f"{comp_type.value}:{comp_id}") + + +class DummyEdge: + """Stub that mimics just the pieces `LoadBalancerRuntime` relies on.""" + + def __init__(self, edge_id: str, concurrent: int = 0) -> None: + """Instance for the dummy edge""" + self.edge_config = type("Cfg", (), {"id": edge_id}) + self.concurrent_connections = concurrent + self.received: list[DummyState] = [] + + # Signature compatible with EdgeRuntime.transport + def transport(self, state: DummyState) -> None: + """Collect the state for later assertions.""" + self.received.append(state) + + +# --------------------------------------------------------------------------- # +# Fixtures # +# --------------------------------------------------------------------------- # +@pytest.fixture +def env() -> simpy.Environment: + """Return a fresh SimPy environment per test.""" + return simpy.Environment() + + +def _make_lb_runtime( + env: simpy.Environment, + algorithm: LbAlgorithmsName, + edges: list[DummyEdge], +) -> LoadBalancerRuntime: + """Wire LB, its inbox store and the supplied dummy edges.""" + lb_cfg = LoadBalancer( + id="lb-1", + algorithms=algorithm, + server_covered={e.edge_config.id for e in edges}, # type: ignore[attr-defined] + ) + inbox: simpy.Store = simpy.Store(env) + lb = LoadBalancerRuntime( + env=env, + lb_config=lb_cfg, + # β‘‘ cast DummyEdge list to the expected interface type + out_edges=cast("list[EdgeRuntime]", edges), + lb_box=inbox, + ) + lb.start() + return lb + +# --------------------------------------------------------------------------- # +# Tests # +# --------------------------------------------------------------------------- # +def test_round_robin_rotation(env: simpy.Environment) -> None: + """Three requests, two edges β‡’ order must be edge-0, edge-1, edge-0.""" + edge0, edge1 = DummyEdge("srv-A"), DummyEdge("srv-B") + lb = _make_lb_runtime(env, LbAlgorithmsName.ROUND_ROBIN, [edge0, edge1]) + + for _ in range(3): + lb.lb_box.put(DummyState()) + + env.run() + + assert len(edge0.received) == 2 + assert len(edge1.received) == 1 + + tag = SystemNodes.LOAD_BALANCER.value + assert edge0.received[0].history[0].startswith(f"{tag}:") + assert edge0.received[1].history[0].startswith(f"{tag}:") + + +def test_least_connections_picks_lowest(env: simpy.Environment) -> None: + """Edge with fewer concurrent connections must be selected.""" + busy = DummyEdge("busy", concurrent=10) + idle = DummyEdge("idle", concurrent=1) + + lb = _make_lb_runtime(env, LbAlgorithmsName.LEAST_CONNECTIONS, [busy, idle]) + lb.lb_box.put(DummyState()) + + env.run() + + assert idle.received + assert not busy.received + + +def test_start_raises_if_no_edges(env: simpy.Environment) -> None: + """`start()` followed by `env.run()` with `out_edges=None` must assert.""" + lb_cfg = LoadBalancer( + id="lb-bad", + algorithms=LbAlgorithmsName.ROUND_ROBIN, + server_covered=set(), + ) + lb = LoadBalancerRuntime( + env=env, + lb_config=lb_cfg, + out_edges=None, + lb_box=simpy.Store(env), + ) + + lb.start() + with pytest.raises(AssertionError): + env.run() diff --git a/tests/unit/runtime/engine/test_rqs_generator.py b/tests/unit/runtime/actors/test_rqs_generator.py similarity index 100% rename from tests/unit/runtime/engine/test_rqs_generator.py rename to tests/unit/runtime/actors/test_rqs_generator.py diff --git a/tests/unit/runtime/engine/test_server.py b/tests/unit/runtime/actors/test_server.py similarity index 100% rename from tests/unit/runtime/engine/test_server.py rename to tests/unit/runtime/actors/test_server.py diff --git a/tests/unit/schemas/test_full_topology_input.py b/tests/unit/schemas/test_full_topology_input.py index 8382ee6..f6bf1f9 100644 --- a/tests/unit/schemas/test_full_topology_input.py +++ b/tests/unit/schemas/test_full_topology_input.py @@ -18,6 +18,7 @@ from app.schemas.system_topology.full_system_topology import ( Client, Edge, + LoadBalancer, Server, ServerResources, TopologyGraph, @@ -95,6 +96,18 @@ def test_invalid_server_type() -> None: endpoints=[_dummy_endpoint()], ) +# --------------------------------------------------------------------------- # +# Load Balancer # +# --------------------------------------------------------------------------- # + +def test_valid_lb() -> None: + """A LB with correct ``type`` validates.""" + cli = LoadBalancer( + id="LB", + type=SystemNodes.LOAD_BALANCER, + server_covered=["s1", "s2"], + ) + assert cli.type is SystemNodes.LOAD_BALANCER # --------------------------------------------------------------------------- # # TopologyNodes # @@ -174,6 +187,37 @@ def _latency() -> RVConfig: """Tiny helper for latency objects.""" return RVConfig(mean=0.02) +def _topology_with_lb( + cover: set[str], + extra_edges: list[Edge] | None = None, +) -> TopologyGraph: + """Build a minimal graph with 1 client, 1 server and a load balancer.""" + nodes = _single_node_topology() + lb = LoadBalancer(id="lb-1", server_covered=cover) + nodes = TopologyNodes( + servers=nodes.servers, + client=nodes.client, + load_balancer=lb, + ) + + edges: list[Edge] = [ + Edge( # client -> LB + id="cli-lb", + source="browser", + target="lb-1", + latency=_latency(), + ), + Edge( # LB -> server (may be removed in invalid tests) + id="lb-srv", + source="lb-1", + target="svc-A", + latency=_latency(), + ), + ] + if extra_edges: + edges.extend(extra_edges) + return TopologyGraph(nodes=nodes, edges=edges) + def test_valid_topology_graph() -> None: """Happy-path graph passes validation.""" @@ -188,6 +232,19 @@ def test_valid_topology_graph() -> None: graph = TopologyGraph(nodes=nodes, edges=[edge]) assert len(graph.edges) == 1 +def test_topology_graph_without_lb_still_valid() -> None: + """Graph without load balancer validates just like before.""" + nodes = _single_node_topology() + edge = Edge( + id="edge-1", + source="browser", + target="svc-A", + latency=_latency(), + ) + graph = TopologyGraph(nodes=nodes, edges=[edge]) + assert graph.nodes.load_balancer is None + + def test_edge_refers_unknown_node() -> None: """Edge pointing to a non-existent node fails validation.""" @@ -200,3 +257,49 @@ def test_edge_refers_unknown_node() -> None: ) with pytest.raises(ValidationError): TopologyGraph(nodes=nodes, edges=[bad_edge]) + + +# --------------------------------------------------------------------------- # +# 2) LB is valid # +# --------------------------------------------------------------------------- # +def test_load_balancer_valid_graph() -> None: + """LB covering a server with proper edges passes validation.""" + graph = _topology_with_lb({"svc-A"}) + assert graph.nodes.load_balancer is not None + assert graph.nodes.load_balancer.server_covered == {"svc-A"} + + +# --------------------------------------------------------------------------- # +# 3) LB con server inesistente # +# --------------------------------------------------------------------------- # +def test_lb_references_unknown_server() -> None: + """LB that lists a non-existent server triggers ValidationError.""" + with pytest.raises(ValidationError): + _topology_with_lb({"ghost-srv"}) + + +# --------------------------------------------------------------------------- # +# 4) LB no edge with a server covered # +# --------------------------------------------------------------------------- # +def test_lb_missing_edge_to_covered_server() -> None: + """LB covers svc-A but edge LBβ†’svc-A is missing β†’ ValidationError.""" + # costruiamo il grafo senza l'edge lb-srv + nodes = _single_node_topology() + lb = LoadBalancer(id="lb-1", server_covered={"svc-A"}) + nodes = TopologyNodes( + servers=nodes.servers, + client=nodes.client, + load_balancer=lb, + ) + edges = [ + Edge( + id="cli-lb", + source="browser", + target="lb-1", + latency=_latency(), + ), + ] + with pytest.raises(ValidationError): + TopologyGraph(nodes=nodes, edges=edges) + +