diff --git a/asyncflow_queue_limit/asyncflow_mm1.ipynb b/asyncflow_queue_limit/asyncflow_mm1.ipynb index 1cbd4a5..6d95aff 100644 --- a/asyncflow_queue_limit/asyncflow_mm1.ipynb +++ b/asyncflow_queue_limit/asyncflow_mm1.ipynb @@ -19,7 +19,7 @@ }, { "cell_type": "code", - "execution_count": 34, + "execution_count": 1, "id": "c3a69413", "metadata": {}, "outputs": [], @@ -34,7 +34,7 @@ "from asyncflow import AsyncFlow, SimulationRunner\n", "from asyncflow.analysis import MMc, ResultsAnalyzer, SweepAnalyzer\n", "from asyncflow.components import (\n", - " Client, Server, Edge, Endpoint, LoadBalancer, ArrivalsGenerator\n", + " Client, Server, LinkEdge, Endpoint, LoadBalancer, ArrivalsGenerator\n", ")\n", "from asyncflow.settings import SimulationSettings\n", "\n", @@ -43,7 +43,7 @@ }, { "cell_type": "code", - "execution_count": 35, + "execution_count": null, "metadata": { "tags": [ "imports" @@ -51,10 +51,14 @@ }, "outputs": [ { - "name": "stdout", - "output_type": "stream", - "text": [ - "Imports OK.\n" + "ename": "ImportError", + "evalue": "cannot import name 'Edge' from 'asyncflow.components' (/home/gioele/projects/AsyncFlow/src/asyncflow/components/__init__.py)", + "output_type": "error", + "traceback": [ + "\u001b[31m---------------------------------------------------------------------------\u001b[39m", + "\u001b[31mImportError\u001b[39m Traceback (most recent call last)", + "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[2]\u001b[39m\u001b[32m, line 6\u001b[39m\n\u001b[32m 4\u001b[39m \u001b[38;5;66;03m# Public AsyncFlow API\u001b[39;00m\n\u001b[32m 5\u001b[39m \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01masyncflow\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mimport\u001b[39;00m AsyncFlow, SimulationRunner, Sweep\n\u001b[32m----> \u001b[39m\u001b[32m6\u001b[39m \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01masyncflow\u001b[39;00m\u001b[34;01m.\u001b[39;00m\u001b[34;01mcomponents\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mimport\u001b[39;00m Client, Server, Edge, Endpoint, ArrivalsGenerator\n\u001b[32m 7\u001b[39m \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01masyncflow\u001b[39;00m\u001b[34;01m.\u001b[39;00m\u001b[34;01msettings\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mimport\u001b[39;00m SimulationSettings\n\u001b[32m 8\u001b[39m \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01masyncflow\u001b[39;00m\u001b[34;01m.\u001b[39;00m\u001b[34;01manalysis\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mimport\u001b[39;00m MMc, ResultsAnalyzer, SweepAnalyzer\n", + "\u001b[31mImportError\u001b[39m: cannot import name 'Edge' from 'asyncflow.components' (/home/gioele/projects/AsyncFlow/src/asyncflow/components/__init__.py)" ] } ], @@ -64,7 +68,7 @@ "\n", "# Public AsyncFlow API\n", "from asyncflow import AsyncFlow, SimulationRunner, Sweep\n", - "from asyncflow.components import Client, Server, Edge, Endpoint, ArrivalsGenerator\n", + "from asyncflow.components import Client, Server, LinkEdge, Endpoint, ArrivalsGenerator\n", "from asyncflow.settings import SimulationSettings\n", "from asyncflow.analysis import MMc, ResultsAnalyzer, SweepAnalyzer\n", "from asyncflow.enums import Distribution\n", @@ -103,7 +107,7 @@ }, { "cell_type": "code", - "execution_count": 36, + "execution_count": null, "metadata": { "tags": [ "build" @@ -139,9 +143,9 @@ " endpoints=[endpoint],\n", " )\n", "\n", - " e_gen_client = Edge(id=\"gen-client\", source=\"rqs-1\", target=\"client-1\", latency=0.0001, dropout_rate=0.0)\n", - " e_client_app = Edge(id=\"client-app\", source=\"client-1\", target=\"app-1\", latency=0.0001, dropout_rate=0.0)\n", - " e_app_client = Edge(id=\"app-client\", source=\"app-1\", target=\"client-1\", latency=0.0001, dropout_rate=0.0)\n", + " e_gen_client = LinkEdge(id=\"gen-client\", source=\"rqs-1\", target=\"client-1\")\n", + " e_client_app = LinkEdge(id=\"client-app\", source=\"client-1\", target=\"app-1\")\n", + " e_app_client = LinkEdge(id=\"app-client\", source=\"app-1\", target=\"client-1\")\n", "\n", " settings = SimulationSettings(\n", " total_simulation_time=2400,\n", @@ -168,7 +172,7 @@ }, { "cell_type": "code", - "execution_count": 37, + "execution_count": null, "metadata": { "tags": [ "run" @@ -296,12 +300,12 @@ "\\end{cases}\n", "$$\n", "\n", - "> **Why small deltas appear:** warm-up effects, the user-sampling window (piecewise-constant rate), finite simulation horizon, and a (small) deterministic network latency naturally introduce small Theory vs Observed gaps. Increasing the simulation time and reducing network latency typically shrinks these deltas.\n" + "> **Why small deltas appear:** warm-up effects, the user-sampling window (piecewise-constant rate), finite simulation horizon. Increasing the simulation time typically shrinks these deltas.\n" ] }, { "cell_type": "code", - "execution_count": 38, + "execution_count": null, "metadata": { "tags": [ "mm1" @@ -378,7 +382,7 @@ }, { "cell_type": "code", - "execution_count": 39, + "execution_count": null, "metadata": { "tags": [ "plots" @@ -467,7 +471,7 @@ }, { "cell_type": "code", - "execution_count": 40, + "execution_count": null, "id": "c9063bbe", "metadata": {}, "outputs": [ @@ -514,7 +518,7 @@ }, { "cell_type": "code", - "execution_count": 41, + "execution_count": null, "id": "48716bc8", "metadata": {}, "outputs": [ @@ -564,7 +568,7 @@ }, { "cell_type": "code", - "execution_count": 42, + "execution_count": null, "id": "9b9f0236", "metadata": {}, "outputs": [ diff --git a/asyncflow_queue_limit/asyncflow_mmc_split.ipynb b/asyncflow_queue_limit/asyncflow_mmc_split.ipynb index 94633ef..9d71c7a 100644 --- a/asyncflow_queue_limit/asyncflow_mmc_split.ipynb +++ b/asyncflow_queue_limit/asyncflow_mmc_split.ipynb @@ -20,7 +20,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "id": "b8a94d93", "metadata": {}, "outputs": [], @@ -36,7 +36,7 @@ "from asyncflow import AsyncFlow, SimulationRunner\n", "from asyncflow.analysis import MMc, ResultsAnalyzer\n", "from asyncflow.components import (\n", - " Client, Server, Edge, Endpoint, LoadBalancer, ArrivalsGenerator\n", + " Client, Server, LinkEdge, Endpoint, LoadBalancer, ArrivalsGenerator\n", ")\n", "from asyncflow.settings import SimulationSettings\n", "\n", @@ -46,7 +46,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "id": "d1b7ad7d", "metadata": {}, "outputs": [ @@ -64,7 +64,7 @@ "\n", "# Public AsyncFlow API\n", "from asyncflow import AsyncFlow, SimulationRunner, Sweep\n", - "from asyncflow.components import Client, Server, Edge, Endpoint, LoadBalancer, ArrivalsGenerator\n", + "from asyncflow.components import Client, Server, LinkEdge, Endpoint, LoadBalancer, ArrivalsGenerator\n", "from asyncflow.settings import SimulationSettings\n", "from asyncflow.analysis import ResultsAnalyzer, SweepAnalyzer, MMc\n", "from asyncflow.enums import Distribution\n", @@ -94,7 +94,7 @@ " Arrivals are produced by the same **two-stage, windowed Poisson sampler**: in each user-sampling window \\$\\Delta\\$, we draw the active users \\$U\\$ (Poisson or Normal, per config).\n", " Within the window, arrivals are a **homogeneous Poisson process** with rate \\$\\Lambda = U \\cdot \\lambda\\_r/60\\$.\n", "\n", - " With **small \\$\\Delta\\$**, **Poisson users**, **long runs**, and **tiny edge latency**, the aggregate arrivals seen by the load balancer approximate a global Poisson input, yielding a good empirical match to the M/M/c model.\n", + " \n", "\n", "---\n", "\n", @@ -128,7 +128,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "id": "ba93587a", "metadata": {}, "outputs": [], @@ -173,12 +173,12 @@ " )\n", "\n", " edges = [\n", - " Edge(id=\"gen-client\", source=\"rqs-1\", target=\"client-1\", latency=0.00001, dropout_rate=0),\n", - " Edge(id=\"client-lb\", source=\"client-1\", target=\"lb-1\", latency=0.00001, dropout_rate=0),\n", - " Edge(id=\"lb-srv1\", source=\"lb-1\", target=\"srv-1\", latency=0.00001, dropout_rate=0),\n", - " Edge(id=\"lb-srv2\", source=\"lb-1\", target=\"srv-2\", latency=0.00001, dropout_rate=0),\n", - " Edge(id=\"srv1-client\", source=\"srv-1\", target=\"client-1\", latency=0.00001, dropout_rate=0),\n", - " Edge(id=\"srv2-client\", source=\"srv-2\", target=\"client-1\", latency=0.00001, dropout_rate=0),\n", + " LinkEdge(id=\"gen-client\", source=\"rqs-1\", target=\"client-1\",),\n", + " LinkEdge(id=\"client-lb\", source=\"client-1\", target=\"lb-1\", ),\n", + " LinkEdge(id=\"lb-srv1\", source=\"lb-1\", target=\"srv-1\", ),\n", + " LinkEdge(id=\"lb-srv2\", source=\"lb-1\", target=\"srv-2\", ),\n", + " LinkEdge(id=\"srv1-client\", source=\"srv-1\", target=\"client-1\",),\n", + " LinkEdge(id=\"srv2-client\", source=\"srv-2\", target=\"client-1\",),\n", " ]\n", "\n", " settings = SimulationSettings(\n", diff --git a/src/asyncflow/builder/asyncflow_builder.py b/src/asyncflow/builder/asyncflow_builder.py index dae2429..7d79bd6 100644 --- a/src/asyncflow/builder/asyncflow_builder.py +++ b/src/asyncflow/builder/asyncflow_builder.py @@ -4,12 +4,12 @@ from typing import Self -from asyncflow.config.enums import EventDescription +from asyncflow.config.enums import EventDescription, SystemEdges from asyncflow.schemas.arrivals.generator import ArrivalsGenerator from asyncflow.schemas.events.injection import End, EventInjection, Start from asyncflow.schemas.payload import SimulationPayload from asyncflow.schemas.settings.simulation import SimulationSettings -from asyncflow.schemas.topology.edges import Edge +from asyncflow.schemas.topology.edges import LinkEdge, NetworkEdge from asyncflow.schemas.topology.graph import TopologyGraph from asyncflow.schemas.topology.nodes import ( Client, @@ -27,10 +27,12 @@ def __init__(self) -> None: self._arrivals: ArrivalsGenerator | None = None self._client: Client | None = None self._servers: list[Server] | None = None - self._edges: list[Edge] | None = None + self._net_edges: list[NetworkEdge] | None = None + self._link_edges: list[LinkEdge] | None = None self._sim_settings: SimulationSettings | None = None self._load_balancer: LoadBalancer | None = None self._events: list[EventInjection] = [] + self._edges_kind: SystemEdges | None = None def add_arrivals_generator( self, @@ -64,18 +66,48 @@ def add_servers(self, *servers: Server) -> Self: self._servers.append(server) return self - def add_edges(self, *edges: Edge) -> Self: - """Method to instantiate the list of edges""" - if self._edges is None: - self._edges = [] + def add_edges(self, *edges: NetworkEdge | LinkEdge) -> Self: + """Add edges; enforces homogeneous type (all NetworkEdge or all LinkEdge).""" + if not edges: + return self + + if self._edges_kind is None: + first = edges[0] + if isinstance(first, NetworkEdge): + self._edges_kind = SystemEdges.NETWORK_CONNECTION + self._net_edges = [] + elif isinstance(first, LinkEdge): + self._edges_kind = SystemEdges.LINK_CONNECTION + self._link_edges = [] + else: + msg = "Edges must be NetworkEdge or LinkEdge." + raise TypeError(msg) + + assert self._edges_kind is not None - for edge in edges: - if not isinstance(edge, Edge): - msg = "All the instances must be of the type Edge" + if self._edges_kind == SystemEdges.NETWORK_CONNECTION: + assert self._net_edges is not None + if any(not isinstance(e, NetworkEdge) for e in edges): + msg = "Cannot mix LinkEdge with NetworkEdge." raise TypeError(msg) - self._edges.append(edge) + # ⬇️ Build a typed batch so mypy is happy + net_batch: list[NetworkEdge] = [ + e for e in edges if isinstance(e, NetworkEdge) + ] + self._net_edges.extend(net_batch) + else: + assert self._link_edges is not None + if any(not isinstance(e, LinkEdge) for e in edges): + msg = "Cannot mix NetworkEdge with LinkEdge." + raise TypeError(msg) + # ⬇️ Typed batch for LinkEdge + link_batch: list[LinkEdge] = [e for e in edges if isinstance(e, LinkEdge)] + self._link_edges.extend(link_batch) + return self + + def add_simulation_settings(self, sim_settings: SimulationSettings) -> Self: """Method to instantiate the settings for the simulation""" if not isinstance(sim_settings, SimulationSettings): @@ -146,7 +178,7 @@ def add_server_outage( def build_payload(self) -> SimulationPayload: """Method to build the payload for the simulation""" if self._arrivals is None: - msg = "The generator input must be instantiated before the simulation" + msg = "The arrivals generator must be instantiated before the simulation" raise ValueError(msg) if self._client is None: msg = "The client input must be instantiated before the simulation" @@ -154,9 +186,23 @@ def build_payload(self) -> SimulationPayload: if not self._servers: msg = "You must instantiate at least one server before the simulation" raise ValueError(msg) - if not self._edges: - msg = "You must instantiate edges before the simulation" + if self._edges_kind is None: + msg = "You must instantiate edges before the simulation." raise ValueError(msg) + + # mypy facilitator + edges_u: list[NetworkEdge] | list[LinkEdge] + if self._edges_kind == SystemEdges.NETWORK_CONNECTION: + if not self._net_edges: + msg = "You must instantiate edges before the simulation." + raise ValueError(msg) + edges_u = self._net_edges + else: + if not self._link_edges: + msg = "You must instantiate edges before the simulation." + raise ValueError(msg) + edges_u = self._link_edges + if self._sim_settings is None: msg = "The simulation settings must be instantiated before the simulation" raise ValueError(msg) @@ -169,7 +215,7 @@ def build_payload(self) -> SimulationPayload: graph = TopologyGraph( nodes = nodes, - edges=self._edges, + edges=edges_u, ) return SimulationPayload.model_validate({ diff --git a/src/asyncflow/components/__init__.py b/src/asyncflow/components/__init__.py index 0c3f5ca..b6c452b 100644 --- a/src/asyncflow/components/__init__.py +++ b/src/asyncflow/components/__init__.py @@ -3,7 +3,7 @@ from asyncflow.schemas.arrivals.generator import ArrivalsGenerator from asyncflow.schemas.events.injection import EventInjection -from asyncflow.schemas.topology.edges import Edge +from asyncflow.schemas.topology.edges import LinkEdge, NetworkEdge from asyncflow.schemas.topology.endpoint import Endpoint from asyncflow.schemas.topology.nodes import ( Client, @@ -15,10 +15,11 @@ __all__ = [ "ArrivalsGenerator", "Client", - "Edge", "Endpoint", "EventInjection", + "LinkEdge", "LoadBalancer", + "NetworkEdge", "NodesResources", "Server", ] diff --git a/src/asyncflow/config/enums.py b/src/asyncflow/config/enums.py index 494780c..53e6a90 100644 --- a/src/asyncflow/config/enums.py +++ b/src/asyncflow/config/enums.py @@ -162,6 +162,7 @@ class SystemEdges(StrEnum): """ NETWORK_CONNECTION = "network_connection" + LINK_CONNECTION = "link_connection" # ====================================================================== # CONSTANTS FOR THE EVENT TO INJECT IN THE SIMULATION diff --git a/src/asyncflow/queue_theory_analysis/mmc.py b/src/asyncflow/queue_theory_analysis/mmc.py index 9bc525a..e2d6181 100644 --- a/src/asyncflow/queue_theory_analysis/mmc.py +++ b/src/asyncflow/queue_theory_analysis/mmc.py @@ -17,6 +17,7 @@ ) from asyncflow.queue_theory_analysis.base import QueueTheoryBase from asyncflow.schemas.common.random_variables import RVConfig +from asyncflow.schemas.topology.edges import LinkEdge if TYPE_CHECKING: @@ -104,9 +105,6 @@ class MMcCompatServerRow(TypedDict): class MMc(QueueTheoryBase): """Analyzer for the M/M/c split model (Round-Robin), c>=1, with strict checks.""" - # Upper bound for "negligible" deterministic network latency - MAX_EDGE_LATENCY_S: float = 1e-3 # 1 ms - def __init__(self) -> None: """Track analyzers we've already processed; weak refs avoid leaks.""" self._processed_ras: WeakSet[ResultsAnalyzer] = WeakSet() @@ -139,24 +137,21 @@ def _check_generator(self, payload: SimulationPayload) -> list[str]: arrivals = payload.arrivals if arrivals.model not in {Distribution.POISSON, Distribution.EXPONENTIAL}: errs.append("arrivals.model must be 'poisson' or 'exponential'.") - - errs.append("avg_active_users must be Poisson or exponential.") return errs def _check_edges(self, payload: SimulationPayload) -> list[str]: errs: list[str] = [] - for edge in payload.topology_graph.edges: - latency = edge.latency - if isinstance(latency, RVConfig): - errs.append( - f"edge '{edge.id}' latency must be deterministic (<=1ms), " - "not a random variable.", - ) - continue - if float(latency) > self.MAX_EDGE_LATENCY_S: - errs.append( - f"edge '{edge.id}' deterministic latency must be <= 1 ms.", - ) + if not payload.topology_graph.edges: + errs.append("topology must include at least one edge.") + return errs + # In pydantic we define edges to be only of type Network or Link + # The types are mutually exclusive hence we have to be sure only + # one edge is of the link type + edge = payload.topology_graph.edges[0] + if not isinstance(edge, LinkEdge): + errs.append( + "In MMc models edges are just connector, use link_edge as a type", + ) return errs def _check_server_model(self, payload: SimulationPayload) -> list[str]: @@ -428,7 +423,7 @@ def _observed_kpis( mu_hat = self._observed_mu_rate(results_analyzer) server_count = self._server_count(payload) - # Ŵ from latency stats (client-side); edges are constrained to ≤1ms. + # Ŵ from latency stats (client-side); lat_stats = results_analyzer.get_latency_stats() w_hat = float(lat_stats.get(LatencyKey.MEAN, 0.0)) diff --git a/src/asyncflow/runner/simulation.py b/src/asyncflow/runner/simulation.py index 3fb449a..834cfce 100644 --- a/src/asyncflow/runner/simulation.py +++ b/src/asyncflow/runner/simulation.py @@ -28,7 +28,7 @@ from asyncflow.schemas.arrivals.generator import ArrivalsGenerator from asyncflow.schemas.events.injection import EventInjection - from asyncflow.schemas.topology.edges import Edge + from asyncflow.schemas.topology.edges import LinkEdge, NetworkEdge from asyncflow.schemas.topology.nodes import ( Client, LoadBalancer, @@ -73,7 +73,10 @@ def __init__( self.arrivals: ArrivalsGenerator = simulation_input.arrivals self.lb: LoadBalancer | None = None self.simulation_settings = simulation_input.sim_settings - self.edges: list[Edge] = simulation_input.topology_graph.edges + # Edges can be NetworkEdge or LinkEdge; TopologyGraph ensures homogeneity. + self.edges: list[NetworkEdge] | list[LinkEdge] = ( + simulation_input.topology_graph.edges + ) self.rng = np.random.default_rng() # Object needed to start the simulation diff --git a/src/asyncflow/runtime/actors/edge.py b/src/asyncflow/runtime/actors/edge.py index dc5adf5..259bca0 100644 --- a/src/asyncflow/runtime/actors/edge.py +++ b/src/asyncflow/runtime/actors/edge.py @@ -20,7 +20,7 @@ from asyncflow.samplers.common_helpers import general_sampler from asyncflow.schemas.common.random_variables import RVConfig from asyncflow.schemas.settings.simulation import SimulationSettings -from asyncflow.schemas.topology.edges import Edge +from asyncflow.schemas.topology.edges import LinkEdge, NetworkEdge if TYPE_CHECKING: from pydantic import PositiveFloat @@ -35,7 +35,7 @@ def __init__( # Noqa: PLR0913 self, *, env: simpy.Environment, - edge_config: Edge, + edge_config: NetworkEdge | LinkEdge, # ------------------------------------------------------------ # ATTRIBUTES FROM THE OBJECT EVENTINJECTIONRUNTIME @@ -75,10 +75,15 @@ def __init__( # Noqa: PLR0913 # verify that each optional metric is active. For deafult metric settings # is not needed but as we will scale as explained above we will need it - def _deliver(self, state: RequestState) -> Generator[simpy.Event, None, None]: + def _deliver_network( + self, + state: RequestState, + ) -> Generator[simpy.Event, None, None]: """Function to deliver the state to the next node""" # extract the random variables defining the latency of the edge + assert isinstance(self.edge_config, NetworkEdge) + uniform_variable = self.rng.uniform() if uniform_variable < self.edge_config.dropout_rate: state.finish_time = self.env.now @@ -125,13 +130,30 @@ def _deliver(self, state: RequestState) -> Generator[simpy.Event, None, None]: self._concurrent_connections -=1 yield self.target_box.put(state) + def _deliver_link(self, state: RequestState) -> Generator[simpy.Event, None, None]: + """Function to deliver the state to the next node""" + state.record_hop( + SystemEdges.LINK_CONNECTION, + self.edge_config.id, + self.env.now, + ) + + # Advance to the next simulation event tick (zero-time delay) so that link + # deliveries are processed after the current event, preserving causal order + # and avoiding same-tick side effects. + yield self.env.timeout(0) + yield self.target_box.put(state) + def transport(self, state: RequestState) -> simpy.Process: """ Called by the upstream node. Immediately spins off a SimPy process that will handle drop + delay + delivery of `state`. """ - return self.env.process(self._deliver(state)) + if isinstance(self.edge_config, NetworkEdge): + return self.env.process(self._deliver_network(state)) + + return self.env.process(self._deliver_link(state)) @property def enabled_metrics(self) -> dict[SampledMetricName, list[float | int]]: diff --git a/src/asyncflow/runtime/events/injection.py b/src/asyncflow/runtime/events/injection.py index 753c5d3..769ccfe 100644 --- a/src/asyncflow/runtime/events/injection.py +++ b/src/asyncflow/runtime/events/injection.py @@ -11,7 +11,7 @@ from asyncflow.runtime.actors.edge import EdgeRuntime from asyncflow.schemas.events.injection import EventInjection -from asyncflow.schemas.topology.edges import Edge +from asyncflow.schemas.topology.edges import LinkEdge, NetworkEdge from asyncflow.schemas.topology.nodes import Server # Helpers to distinguish when the event start and when the event finish @@ -36,7 +36,7 @@ def __init__( self, *, events: list[EventInjection] | None, - edges: list[Edge], + edges: list[NetworkEdge] | list[LinkEdge], env: simpy.Environment, servers: list[Server], # This is initiated in the simulation runner to understand @@ -116,6 +116,20 @@ def __init__( self._servers_ids = {server.id for server in self.servers} self._edges_ids = {edge.id for edge in self.edges} + # If any event targets an edge, we only need to inspect the first edge: + # the topology type is homogeneous by construction + # (list[NetworkEdge] | list[LinkEdge]), + # so checking one element determines the type of the entire list. + if self.events and self.edges and any( + ev.target_id in self._edges_ids for ev in self.events + ): + first_edge = self.edges[0] + if not isinstance(first_edge, NetworkEdge): + msg=("Edge events are present, but the topology uses LinkEdge. " + "Edge-targeted events require NetworkEdge " + "(network_connection) edges.") + raise ValueError(msg) + for event in self.events: start_event = ( event.start.t_start, event.event_id, event.target_id, START_MARK, diff --git a/src/asyncflow/schemas/topology/edges.py b/src/asyncflow/schemas/topology/edges.py index 8dea31e..a2b57da 100644 --- a/src/asyncflow/schemas/topology/edges.py +++ b/src/asyncflow/schemas/topology/edges.py @@ -12,10 +12,10 @@ #------------------------------------------------------------- # Definition of the edges structure for the graph representing -# the topoogy of the system defined for the simulation +# the topology of the system defined for the simulation #------------------------------------------------------------- -class Edge(BaseModel): +class NetworkEdge(BaseModel): """ A directed connection in the topology graph. @@ -28,9 +28,6 @@ class Edge(BaseModel): latency : RVConfig | PositiveFloat Random-variable configuration for network latency on this link or positive float value. - probability : float - Probability of taking this edge when there are multiple outgoing links. - Must be in [0.0, 1.0]. Defaults to 1.0 (always taken). edge_type : SystemEdges Category of the link (e.g. network, queue, stream). @@ -87,11 +84,51 @@ def ensure_latency_is_non_negative( @model_validator(mode="after") # type: ignore[arg-type] - def check_src_trgt_different(cls, model: "Edge") -> "Edge": # noqa: N805 + def check_src_trgt_different(cls, model: "NetworkEdge") -> "NetworkEdge": # noqa: N805 """Ensure source is different from target""" if model.source == model.target: msg = "source and target must be different nodes" raise ValueError(msg) return model + @field_validator("edge_type", mode="after") + def ensure_edge_type_is_correct(cls, v: SystemEdges) -> SystemEdges: # noqa: N805 + """ + Ensure the type of an edge not representing the network is network_connection + useful for to test model where the network is not negligible + """ + if v != SystemEdges.NETWORK_CONNECTION: + msg=f"The type of the edge must be {SystemEdges.NETWORK_CONNECTION}" + raise ValueError(msg) + return v + + +class LinkEdge(BaseModel): + """ + Edges without latency, they may be useful in situations where + it is not necessary to model the network + """ + + id: str + source: str + target: str + edge_type: SystemEdges = SystemEdges.LINK_CONNECTION + + @field_validator("edge_type", mode="after") + def ensure_edge_type_is_correct(cls, v: SystemEdges) -> SystemEdges: # noqa: N805 + """ + Ensure the type of an edge not representing the network is link_connection + useful for to test model where the network is negligible + """ + if v != SystemEdges.LINK_CONNECTION: + msg=f"The type of the edge must be {SystemEdges.LINK_CONNECTION}" + raise ValueError(msg) + return v + @model_validator(mode="after") # type: ignore[arg-type] + def check_src_trgt_different(cls, model: "LinkEdge") -> "LinkEdge": # noqa: N805 + """Ensure source is different from target""" + if model.source == model.target: + msg = "source and target must be different nodes" + raise ValueError(msg) + return model diff --git a/src/asyncflow/schemas/topology/graph.py b/src/asyncflow/schemas/topology/graph.py index 91cf857..7cb5af5 100644 --- a/src/asyncflow/schemas/topology/graph.py +++ b/src/asyncflow/schemas/topology/graph.py @@ -13,7 +13,7 @@ model_validator, ) -from asyncflow.schemas.topology.edges import Edge +from asyncflow.schemas.topology.edges import LinkEdge, NetworkEdge from asyncflow.schemas.topology.nodes import TopologyNodes #------------------------------------------------------------- @@ -28,7 +28,7 @@ class TopologyGraph(BaseModel): """ nodes: TopologyNodes - edges: list[Edge] + edges: list[NetworkEdge] | list[LinkEdge] @model_validator(mode="after") # type: ignore[arg-type] def unique_ids( diff --git a/tests/conftest.py b/tests/conftest.py index 4f1eb98..80a1768 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,7 +16,7 @@ from asyncflow.schemas.common.random_variables import RVConfig from asyncflow.schemas.payload import SimulationPayload from asyncflow.schemas.settings.simulation import SimulationSettings -from asyncflow.schemas.topology.edges import Edge +from asyncflow.schemas.topology.edges import NetworkEdge from asyncflow.schemas.topology.graph import TopologyGraph from asyncflow.schemas.topology.nodes import ( Client, @@ -118,7 +118,7 @@ def topology_minimal() -> TopologyGraph: client = Client(id="client-1") # Stub edge: generator id comes from rqs_input fixture (“rqs-1”) - edge = Edge( + edge = NetworkEdge( id="gen-to-client", source="rqs-1", target="client-1", diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 238a184..273ea06 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -26,7 +26,7 @@ from asyncflow.schemas.events.injection import EventInjection from asyncflow.schemas.payload import SimulationPayload from asyncflow.schemas.settings.simulation import SimulationSettings -from asyncflow.schemas.topology.edges import Edge +from asyncflow.schemas.topology.edges import NetworkEdge from asyncflow.schemas.topology.endpoint import Endpoint, Step from asyncflow.schemas.topology.graph import TopologyGraph from asyncflow.schemas.topology.nodes import ( @@ -144,12 +144,12 @@ def __call__( tgt: str, mean: float, dist: Distribution = ..., - ) -> Edge: + ) -> NetworkEdge: """Return an `Edge` from ids and latency parameters.""" @pytest.fixture -def edge_factory() -> Callable[..., Edge]: +def edge_factory() -> Callable[..., NetworkEdge]: """ Build an edge with a latency RV. Defaults to Poisson(mean=1ms) to keep tests fast; pass another distribution/mean when needed. @@ -161,8 +161,8 @@ def _make( tgt: str, mean: float = 0.001, dist: Distribution = Distribution.POISSON, - ) -> Edge: - return Edge( + ) -> NetworkEdge: + return NetworkEdge( id=eid, source=src, target=tgt, @@ -197,7 +197,7 @@ def __call__( @pytest.fixture def topology_two_servers( server_factory: Callable[[str, float | None], Server], - edge_factory: Callable[..., Edge], + edge_factory: Callable[..., NetworkEdge], ) -> Callable[..., TopologyGraph]: """Factory for a two-server topology with a load balancer""" def _make(*, service_time_s: float | None = 0.001, @@ -225,7 +225,7 @@ def _make(*, service_time_s: float | None = 0.001, @pytest.fixture def topology_single_server( server_factory: Callable[[str, float | None], Server], - edge_factory: Callable[..., Edge], + edge_factory: Callable[..., NetworkEdge], ) -> Callable[..., TopologyGraph]: """Factory for a single-server topology with a load balancer in front""" def _make(*, service_time_s: float | None = 0.001, diff --git a/tests/system/test_sys_ev_inj_lb_two_servers.py b/tests/system/test_sys_ev_inj_lb_two_servers.py index 746e268..4be2cf6 100644 --- a/tests/system/test_sys_ev_inj_lb_two_servers.py +++ b/tests/system/test_sys_ev_inj_lb_two_servers.py @@ -31,7 +31,7 @@ import simpy from asyncflow import AsyncFlow -from asyncflow.components import Client, Edge, Endpoint, LoadBalancer, Server +from asyncflow.components import Client, Endpoint, LoadBalancer, NetworkEdge, Server from asyncflow.config.enums import Distribution, LatencyKey from asyncflow.runner.simulation import SimulationRunner from asyncflow.schemas.arrivals.generator import ArrivalsGenerator @@ -98,37 +98,37 @@ def _build_payload(*, with_events: bool) -> SimulationPayload: # Edges: generator→client, client→lb, lb→srv-{1,2}, srv-{1,2}→client. edges = [ - Edge( + NetworkEdge( id="gen-client", source="rqs-1", target="client-1", latency={"mean": 0.003, "distribution": "exponential"}, ), - Edge( + NetworkEdge( id="client-lb", source="client-1", target="lb-1", latency={"mean": 0.002, "distribution": "exponential"}, ), - Edge( + NetworkEdge( id="lb-srv-1", source="lb-1", target="srv-1", latency={"mean": 0.003, "distribution": "exponential"}, ), - Edge( + NetworkEdge( id="lb-srv-2", source="lb-1", target="srv-2", latency={"mean": 0.003, "distribution": "exponential"}, ), - Edge( + NetworkEdge( id="srv1-client", source="srv-1", target="client-1", latency={"mean": 0.003, "distribution": "exponential"}, ), - Edge( + NetworkEdge( id="srv2-client", source="srv-2", target="client-1", diff --git a/tests/system/test_sys_ev_inj_single_server.py b/tests/system/test_sys_ev_inj_single_server.py index 489e30b..655ebe6 100644 --- a/tests/system/test_sys_ev_inj_single_server.py +++ b/tests/system/test_sys_ev_inj_single_server.py @@ -34,7 +34,7 @@ import simpy from asyncflow import AsyncFlow -from asyncflow.components import Client, Edge, Endpoint, Server +from asyncflow.components import Client, Endpoint, NetworkEdge, Server from asyncflow.config.enums import Distribution, LatencyKey from asyncflow.runner.simulation import SimulationRunner from asyncflow.schemas.arrivals.generator import ArrivalsGenerator @@ -92,19 +92,19 @@ def _build_payload(*, with_spike: bool) -> SimulationPayload: # Edges: baseline exponential latencies around a few milliseconds. edges = [ - Edge( + NetworkEdge( id="gen-client", source="rqs-1", target="client-1", latency={"mean": 0.003, "distribution": "exponential"}, ), - Edge( + NetworkEdge( id="client-srv", source="client-1", target="srv-1", latency={"mean": 0.002, "distribution": "exponential"}, ), - Edge( + NetworkEdge( id="srv-client", source="srv-1", target="client-1", diff --git a/tests/system/test_sys_lb_two_servers.py b/tests/system/test_sys_lb_two_servers.py index 3f24fb7..8eb705d 100644 --- a/tests/system/test_sys_lb_two_servers.py +++ b/tests/system/test_sys_lb_two_servers.py @@ -25,7 +25,7 @@ import simpy from asyncflow import AsyncFlow -from asyncflow.components import Client, Edge, Endpoint, LoadBalancer, Server +from asyncflow.components import Client, Endpoint, LoadBalancer, NetworkEdge, Server from asyncflow.config.enums import Distribution, LatencyKey from asyncflow.runner.simulation import SimulationRunner from asyncflow.schemas.arrivals.generator import ArrivalsGenerator @@ -89,37 +89,37 @@ def _build_payload() -> SimulationPayload: ) edges = [ - Edge( + NetworkEdge( id="gen-client", source="rqs-1", target="client-1", latency={"mean": 0.003, "distribution": "exponential"}, ), - Edge( + NetworkEdge( id="client-lb", source="client-1", target="lb-1", latency={"mean": 0.002, "distribution": "exponential"}, ), - Edge( + NetworkEdge( id="lb-srv1", source="lb-1", target="srv-1", latency={"mean": 0.002, "distribution": "exponential"}, ), - Edge( + NetworkEdge( id="lb-srv2", source="lb-1", target="srv-2", latency={"mean": 0.002, "distribution": "exponential"}, ), - Edge( + NetworkEdge( id="srv1-client", source="srv-1", target="client-1", latency={"mean": 0.003, "distribution": "exponential"}, ), - Edge( + NetworkEdge( id="srv2-client", source="srv-2", target="client-1", diff --git a/tests/system/test_sys_single_server.py b/tests/system/test_sys_single_server.py index f3fc703..9b4c081 100644 --- a/tests/system/test_sys_single_server.py +++ b/tests/system/test_sys_single_server.py @@ -24,7 +24,7 @@ import simpy from asyncflow import AsyncFlow -from asyncflow.components import Client, Edge, Endpoint, Server +from asyncflow.components import Client, Endpoint, NetworkEdge, Server from asyncflow.config.enums import Distribution, LatencyKey from asyncflow.runner.simulation import SimulationRunner from asyncflow.schemas.arrivals.generator import ArrivalsGenerator @@ -77,19 +77,19 @@ def _build_payload() -> SimulationPayload: ) edges = [ - Edge( + NetworkEdge( id="gen-client", source="rqs-1", target="client-1", latency={"mean": 0.003, "distribution": "exponential"}, ), - Edge( + NetworkEdge( id="client-srv", source="client-1", target="srv-1", latency={"mean": 0.002, "distribution": "exponential"}, ), - Edge( + NetworkEdge( id="srv-client", source="srv-1", target="client-1", diff --git a/tests/unit/public_api/test_import.py b/tests/unit/public_api/test_import.py index 8ddaec2..5eba664 100644 --- a/tests/unit/public_api/test_import.py +++ b/tests/unit/public_api/test_import.py @@ -20,10 +20,11 @@ from asyncflow.components import ( ArrivalsGenerator, Client, - Edge, Endpoint, EventInjection, + LinkEdge, LoadBalancer, + NetworkEdge, NodesResources, Server, ) @@ -144,10 +145,11 @@ def test_components_public_symbols() -> None: expected = [ "ArrivalsGenerator", "Client", - "Edge", "Endpoint", "EventInjection", + "LinkEdge", "LoadBalancer", + "NetworkEdge", "NodesResources", "Server", ] @@ -159,10 +161,11 @@ def test_components_symbols_are_importable_classes() -> None: for cls, name in [ (ArrivalsGenerator, "ArrivalsGenerator"), (Client, "Client"), - (Edge, "Edge"), (Endpoint, "Endpoint"), (EventInjection, "EventInjection"), + (LinkEdge, "LinkEdge"), (LoadBalancer, "LoadBalancer"), + (NetworkEdge, "NetworkEdge"), (NodesResources, "NodesResources"), (Server, "Server"), ]: diff --git a/tests/unit/pybuilder/test_input_builder.py b/tests/unit/pybuilder/test_input_builder.py index 58e0d44..68ad909 100644 --- a/tests/unit/pybuilder/test_input_builder.py +++ b/tests/unit/pybuilder/test_input_builder.py @@ -1,12 +1,13 @@ """ Unit tests for the AsyncFlow builder. -The goal is to verify that: -- The builder enforces types on each `add_*` method. -- Missing components produce clear ValueError exceptions on `build_payload()`. -- A valid, minimal scenario builds a `SimulationPayload` successfully. +Goals: +- Enforce types on each `add_*` method. +- Missing parts raise clear ValueErrors on `build_payload()`. +- Minimal valid scenario builds a SimulationPayload. - Methods return `self` to support fluent chaining. - Servers and edges can be added in multiples and preserve order. +- New features: LinkEdge-only topologies and homogeneous edge enforcement. """ from __future__ import annotations @@ -14,33 +15,32 @@ import pytest from asyncflow.builder.asyncflow_builder import AsyncFlow +from asyncflow.config.enums import EventDescription, SystemEdges from asyncflow.schemas.arrivals.generator import ArrivalsGenerator +from asyncflow.schemas.events.injection import EventInjection from asyncflow.schemas.payload import SimulationPayload from asyncflow.schemas.settings.simulation import SimulationSettings -from asyncflow.schemas.topology.edges import Edge +from asyncflow.schemas.topology.edges import LinkEdge, NetworkEdge from asyncflow.schemas.topology.endpoint import Endpoint from asyncflow.schemas.topology.nodes import Client, Server - # --------------------------------------------------------------------------- # -# Helpers: build minimal, valid components # +# Helpers: minimal, valid components # # --------------------------------------------------------------------------- # + + def make_generator() -> ArrivalsGenerator: - """Return a minimal valid request generator.""" - return ArrivalsGenerator( - id="rqs-1", - lambda_rps=10, - model="poisson", - ) + """Minimal valid request generator.""" + return ArrivalsGenerator(id="rqs-1", lambda_rps=10, model="poisson") def make_client() -> Client: - """Return a minimal valid client.""" + """Minimal valid client.""" return Client(id="client-1") def make_endpoint() -> Endpoint: - """Return a minimal endpoint with CPU and IO steps.""" + """Endpoint with CPU and IO steps.""" return Endpoint( endpoint_name="ep-1", probability=1.0, @@ -52,7 +52,7 @@ def make_endpoint() -> Endpoint: def make_server(server_id: str = "srv-1") -> Server: - """Return a minimal valid server with 1 core, 2GB RAM, and one endpoint.""" + """Server with 1 core, 2GB RAM, and one endpoint.""" return Server( id=server_id, server_resources={"cpu_cores": 1, "ram_mb": 2048}, @@ -60,21 +60,21 @@ def make_server(server_id: str = "srv-1") -> Server: ) -def make_edges() -> list[Edge]: - """Return a valid edge triplet for the minimal single-server scenario.""" - e1 = Edge( +def make_net_edges() -> list[NetworkEdge]: + """Network edges for a single-server scenario.""" + e1 = NetworkEdge( id="gen-to-client", source="rqs-1", target="client-1", latency={"mean": 0.003, "distribution": "exponential"}, ) - e2 = Edge( + e2 = NetworkEdge( id="client-to-server", source="client-1", target="srv-1", latency={"mean": 0.003, "distribution": "exponential"}, ) - e3 = Edge( + e3 = NetworkEdge( id="server-to-client", source="srv-1", target="client-1", @@ -83,10 +83,18 @@ def make_edges() -> list[Edge]: return [e1, e2, e3] +def make_link_edges() -> list[LinkEdge]: + """Link-only edges for a single-server scenario.""" + e1 = LinkEdge(id="gen-to-client", source="rqs-1", target="client-1") + e2 = LinkEdge(id="client-to-server", source="client-1", target="srv-1") + e3 = LinkEdge(id="server-to-client", source="srv-1", target="client-1") + return [e1, e2, e3] + + def make_settings() -> SimulationSettings: - """Return minimal simulation settings within validation bounds.""" + """Minimal simulation settings within validation bounds.""" return SimulationSettings( - total_simulation_time=5.0, # lower bound is 5 seconds + total_simulation_time=5.0, sample_period_s=0.1, enabled_sample_metrics=[ "ready_queue_len", @@ -99,15 +107,17 @@ def make_settings() -> SimulationSettings: # --------------------------------------------------------------------------- # -# Positive / “happy path” # +# Positive / happy path # # --------------------------------------------------------------------------- # + + def test_builder_happy_path_returns_payload() -> None: - """Building a minimal scenario returns a validated SimulationPayload.""" + """Minimal scenario builds a validated SimulationPayload.""" flow = AsyncFlow() generator = make_generator() client = make_client() server = make_server() - e1, e2, e3 = make_edges() + e1, e2, e3 = make_net_edges() settings = make_settings() payload = ( @@ -130,13 +140,13 @@ def test_builder_happy_path_returns_payload() -> None: def test_add_methods_return_self_for_chaining() -> None: - """Every add_* method returns `self` to support fluent chaining.""" + """Every add_* method returns `self` for fluent chaining.""" flow = AsyncFlow() ret = ( flow.add_arrivals_generator(make_generator()) .add_client(make_client()) .add_servers(make_server()) - .add_edges(*make_edges()) + .add_edges(*make_net_edges()) .add_simulation_settings(make_settings()) ) assert ret is flow @@ -145,20 +155,16 @@ def test_add_methods_return_self_for_chaining() -> None: def test_add_servers_accepts_multiple_and_keeps_order() -> None: """Adding multiple servers keeps insertion order.""" flow = AsyncFlow() - (flow.add_arrivals_generator(make_generator()) - .add_client(make_client()) - ) + flow.add_arrivals_generator(make_generator()).add_client(make_client()) s1 = make_server("srv-1") s2 = make_server("srv-2") s3 = make_server("srv-3") flow.add_servers(s1, s2).add_servers(s3) - e1, e2, e3 = make_edges() + e1, e2, e3 = make_net_edges() settings = make_settings() payload = ( - flow.add_edges(e1, e2, e3) - .add_simulation_settings(settings) - .build_payload() + flow.add_edges(e1, e2, e3).add_simulation_settings(settings).build_payload() ) ids = [srv.id for srv in payload.topology_graph.nodes.servers] @@ -166,19 +172,21 @@ def test_add_servers_accepts_multiple_and_keeps_order() -> None: # --------------------------------------------------------------------------- # -# Negative cases: missing components # +# Negative: missing components # # --------------------------------------------------------------------------- # + + def test_build_without_generator_raises() -> None: """Building without a generator fails with a clear error.""" flow = AsyncFlow() flow.add_client(make_client()) flow.add_servers(make_server()) - flow.add_edges(*make_edges()) + flow.add_edges(*make_net_edges()) flow.add_simulation_settings(make_settings()) with pytest.raises( ValueError, - match="The generator input must be instantiated before the simulation", + match="The arrivals generator must be instantiated before the simulation", ): flow.build_payload() @@ -188,7 +196,7 @@ def test_build_without_client_raises() -> None: flow = AsyncFlow() flow.add_arrivals_generator(make_generator()) flow.add_servers(make_server()) - flow.add_edges(*make_edges()) + flow.add_edges(*make_net_edges()) flow.add_simulation_settings(make_settings()) with pytest.raises( @@ -203,7 +211,7 @@ def test_build_without_servers_raises() -> None: flow = AsyncFlow() flow.add_arrivals_generator(make_generator()) flow.add_client(make_client()) - flow.add_edges(*make_edges()) + flow.add_edges(*make_net_edges()) flow.add_simulation_settings(make_settings()) with pytest.raises( @@ -234,7 +242,7 @@ def test_build_without_settings_raises() -> None: flow.add_arrivals_generator(make_generator()) flow.add_client(make_client()) flow.add_servers(make_server()) - flow.add_edges(*make_edges()) + flow.add_edges(*make_net_edges()) with pytest.raises( ValueError, @@ -244,20 +252,22 @@ def test_build_without_settings_raises() -> None: # --------------------------------------------------------------------------- # -# Negative cases: type enforcement in add_* methods # +# Negative: type enforcement in add_* methods # # --------------------------------------------------------------------------- # + + def test_add_generator_rejects_wrong_type() -> None: - """`add_generator` rejects non-ArrivalsGenerator instances.""" + """`add_arrivals_generator` rejects non-ArrivalsGenerator instances.""" flow = AsyncFlow() with pytest.raises(TypeError): - flow.add_arrivals_generator("not-a-generator") # type: ignore[arg-type] + flow.add_arrivals_generator("not-a-generator") # type: ignore[arg-type] def test_add_client_rejects_wrong_type() -> None: """`add_client` rejects non-Client instances.""" flow = AsyncFlow() with pytest.raises(TypeError): - flow.add_client(1234) # type: ignore[arg-type] + flow.add_client(1234) # type: ignore[arg-type] def test_add_servers_rejects_wrong_type() -> None: @@ -265,19 +275,135 @@ def test_add_servers_rejects_wrong_type() -> None: flow = AsyncFlow() good = make_server() with pytest.raises(TypeError): - flow.add_servers(good, "not-a-server") # type: ignore[arg-type] + flow.add_servers(good, "not-a-server") # type: ignore[arg-type] def test_add_edges_rejects_wrong_type() -> None: """`add_edges` rejects any non-Edge in the varargs.""" flow = AsyncFlow() - good = make_edges()[0] + good = make_net_edges()[0] with pytest.raises(TypeError): - flow.add_edges(good, 3.14) # type: ignore[arg-type] + flow.add_edges(good, 3.14) # type: ignore[arg-type] def test_add_settings_rejects_wrong_type() -> None: """`add_simulation_settings` rejects non-SimulationSettings instances.""" flow = AsyncFlow() with pytest.raises(TypeError): - flow.add_simulation_settings({"total_simulation_time": 1.0}) # type: ignore[arg-type] + flow.add_simulation_settings({"total_simulation_time": 1.0}) # type: ignore[arg-type] + + +# --------------------------------------------------------------------------- # +# New features: LinkEdge support and homogeneous enforcement # +# --------------------------------------------------------------------------- # + + +def test_linkedge_topology_builds_payload() -> None: + """A LinkEdge-only topology is accepted and preserved.""" + flow = AsyncFlow() + payload = ( + flow.add_arrivals_generator(make_generator()) + .add_client(make_client()) + .add_servers(make_server()) + .add_edges(*make_link_edges()) + .add_simulation_settings(make_settings()) + .build_payload() + ) + assert all( + e.edge_type is SystemEdges.LINK_CONNECTION + for e in payload.topology_graph.edges + ) + assert {e.id for e in payload.topology_graph.edges} == { + "gen-to-client", + "client-to-server", + "server-to-client", + } + + +def test_add_edges_rejects_mixed_types_in_single_call() -> None: + """Mixing NetworkEdge and LinkEdge in the same call is rejected.""" + flow = ( + AsyncFlow() + .add_arrivals_generator(make_generator()) + .add_client(make_client()) + .add_servers(make_server()) + ) + n1 = make_net_edges()[0] + l1 = make_link_edges()[0] + with pytest.raises(TypeError, match="Cannot mix (LinkEdge|NetworkEdge)"): + flow.add_edges(n1, l1) + + +def test_add_edges_rejects_mixed_types_across_calls() -> None: + """Once kind is fixed, subsequent calls with other kind fail.""" + flow = ( + AsyncFlow() + .add_arrivals_generator(make_generator()) + .add_client(make_client()) + .add_servers(make_server()) + ) + n1, _, _ = make_net_edges() + flow.add_edges(n1) + with pytest.raises(TypeError, match="Cannot mix LinkEdge with NetworkEdge."): + flow.add_edges(make_link_edges()[0]) + + flow2 = ( + AsyncFlow() + .add_arrivals_generator(make_generator()) + .add_client(make_client()) + .add_servers(make_server()) + ) + l1, _, _ = make_link_edges() + flow2.add_edges(l1) + with pytest.raises(TypeError, match="Cannot mix NetworkEdge with LinkEdge."): + flow2.add_edges(make_net_edges()[0]) + + +def test_add_edges_noop_on_empty_call() -> None: + """Calling add_edges() with no args is a no-op and does not fix kind.""" + flow = AsyncFlow() + flow.add_arrivals_generator(make_generator()) + flow.add_client(make_client()) + flow.add_servers(make_server()) + flow.add_simulation_settings(make_settings()) + + ret = flow.add_edges() # no edges + assert ret is flow + + with pytest.raises(ValueError, match="You must instantiate edges"): + flow.build_payload() + + +# --------------------------------------------------------------------------- # +# Events helpers # +# --------------------------------------------------------------------------- # + + +def test_add_network_spike_is_in_payload() -> None: + """add_network_spike wires a NETWORK_SPIKE event into the payload.""" + n1, n2, n3 = make_net_edges() + flow = ( + AsyncFlow() + .add_arrivals_generator(make_generator()) + .add_client(make_client()) + .add_servers(make_server()) + .add_edges(n1, n2, n3) + .add_network_spike( + event_id="ev1", + edge_id=n2.id, + t_start=1.0, + t_end=2.0, + spike_s=0.25, + ) + .add_simulation_settings(make_settings()) + ) + payload = flow.build_payload() + assert payload.events is not None + assert len(payload.events) == 1 + ev = payload.events[0] + assert isinstance(ev, EventInjection) + assert ev.event_id == "ev1" + assert ev.target_id == n2.id + assert ev.start.kind is EventDescription.NETWORK_SPIKE_START + assert ev.end.kind is EventDescription.NETWORK_SPIKE_END + assert ev.start.spike_s == 0.25 diff --git a/tests/unit/queue_theory_analysis/test_mmc.py b/tests/unit/queue_theory_analysis/test_mmc.py index 1b3dc10..de725ff 100644 --- a/tests/unit/queue_theory_analysis/test_mmc.py +++ b/tests/unit/queue_theory_analysis/test_mmc.py @@ -19,8 +19,8 @@ from asyncflow.components import ( ArrivalsGenerator, Client, - Edge, Endpoint, + LinkEdge, LoadBalancer, Server, ) @@ -144,39 +144,33 @@ def _build_payload_mmc_split( ) edges = [ - Edge( + LinkEdge( id="gen-client", source="rqs-1", target="client-1", - latency=0.00001, - dropout_rate=0, ), - Edge( + LinkEdge( id="client-lb", source="client-1", target="lb-1", - latency=0.00001, - dropout_rate=0, + ), ] for s in servers: edges.append( - Edge( + LinkEdge( id=f"lb-{s.id}", source="lb-1", target=s.id, - latency=0.00001, - dropout_rate=0, + ), ) edges.append( - Edge( + LinkEdge( id=f"{s.id}-client", source=s.id, target="client-1", - latency=0.00001, - dropout_rate=0, ), ) @@ -241,8 +235,8 @@ def test_mmc_compare_matches_theory() -> None: def test_mmc_instability_returns_infinities() -> None: """If rho >= 1, closed-form KPIs must be +inf (W, Wq, L, Lq).""" - # c=2, mu=100 rps (service=0.01 s) -> capacity = 200 rps. - # For instability set lambda >= 200. + # c = 1, mu = 100 rps (service = 0.01 s) -> capacity = 100 rps. + # For instability set lambda >= 100. client = Client(id="client-1") endpoint = Endpoint( @@ -258,17 +252,30 @@ def test_mmc_instability_returns_infinities() -> None: ], ) # 0.01 s -> mu = 100 rps srv = Server( - id="srv-1", server_resources=NodesResources(cpu_cores=2), endpoints=[endpoint], + id="srv-1", + server_resources=NodesResources(cpu_cores=2), + endpoints=[endpoint], ) + + # Minimal LinkEdge topology (MMc expects LinkEdge, not NetworkEdge). + edges = [ + LinkEdge(id="gen-client", source="gen", target="client-1"), + LinkEdge(id="client-srv", source="client-1", target="srv-1"), + LinkEdge(id="srv-client", source="srv-1", target="client-1"), + ] + nodes = TopologyNodes(servers=[srv], client=client, load_balancer=None) - graph = TopologyGraph(nodes=nodes, edges=[]) + graph = TopologyGraph(nodes=nodes, edges=edges) - # λ = 200 rps (== capacity) -> rho = 1 - arrivals = ArrivalsGenerator(id="gen", lambda_rps=200.0, model=Distribution.POISSON) + # λ = 200 rps (>= capacity 100) -> rho >= 1 + arrivals = ArrivalsGenerator( + id="gen", lambda_rps=200.0, model=Distribution.POISSON, + ) settings = SimulationSettings(total_simulation_time=5) payload = SimulationPayload( - arrivals=arrivals, topology_graph=graph, sim_settings=settings) + arrivals=arrivals, topology_graph=graph, sim_settings=settings, + ) mmc = MMc() res = mmc.evaluate(payload) @@ -278,106 +285,6 @@ def test_mmc_instability_returns_infinities() -> None: assert res[key] == float("inf") - -def test_mmc_incompatible_edge_latency_too_large() -> None: - """Latency must be deterministic and <= 1 ms.""" - gen = ArrivalsGenerator( - id="rqs-1", - lambda_rps=20, - model="poisson", - ) - client = Client(id="client-1") - endpoint = Endpoint( - endpoint_name="/api", - probability=1.0, - steps=[ - { - "kind": "initial_parsing", - "step_operation": { - "cpu_time": {"mean": 0.01, "distribution": "exponential"}, - }, - }, - ], - ) - srv1 = Server( - id="srv-1", - server_resources={"cpu_cores": 1, "ram_mb": 2048}, - endpoints=[endpoint], - ) - srv2 = Server( - id="srv-2", - server_resources={"cpu_cores": 1, "ram_mb": 2048}, - endpoints=[endpoint], - ) - lb = LoadBalancer( - id="lb-1", - algorithms="random", - server_covered={"srv-1", "srv-2"}, - ) - edges = [ - Edge( - id="gen-client", - source="rqs-1", - target="client-1", - latency=0.005, # 5 ms -> too large - dropout_rate=0, - ), - Edge( - id="client-lb", - source="client-1", - target="lb-1", - latency=0.00001, - dropout_rate=0, - ), - Edge( - id="lb-srv1", - source="lb-1", - target="srv-1", - latency=0.00001, - dropout_rate=0, - ), - Edge( - id="lb-srv2", - source="lb-1", - target="srv-2", - latency=0.00001, - dropout_rate=0, - ), - Edge( - id="srv1-client", - source="srv-1", - target="client-1", - latency=0.00001, - dropout_rate=0, - ), - Edge( - id="srv2-client", - source="srv-2", - target="client-1", - latency=0.00001, - dropout_rate=0, - ), - ] - settings = SimulationSettings( - total_simulation_time=60, - sample_period_s=0.05, - ) - payload = ( - AsyncFlow() - .add_arrivals_generator(gen) - .add_client(client) - .add_servers(srv1, srv2) - .add_load_balancer(lb) - .add_edges(*edges) - .add_simulation_settings(settings) - ).build_payload() - - mmc = MMc() - assert not mmc.is_compatible(payload) - reasons = mmc.explain_incompatibilities(payload) - assert any("<= 1 ms" in r for r in reasons) - - def test_mmc_incompatible_server_model_requires_single_cpu_step() -> None: """Each server endpoint must have exactly one CPU step.""" gen = ArrivalsGenerator( @@ -436,47 +343,35 @@ def test_mmc_incompatible_server_model_requires_single_cpu_step() -> None: server_covered={"srv-1", "srv-2"}, ) edges = [ - Edge( + LinkEdge( id="gen-client", source="rqs-1", target="client-1", - latency=0.00001, - dropout_rate=0, ), - Edge( + LinkEdge( id="client-lb", source="client-1", target="lb-1", - latency=0.00001, - dropout_rate=0, ), - Edge( + LinkEdge( id="lb-srv1", source="lb-1", target="srv-1", - latency=0.00001, - dropout_rate=0, ), - Edge( + LinkEdge( id="lb-srv2", source="lb-1", target="srv-2", - latency=0.00001, - dropout_rate=0, ), - Edge( + LinkEdge( id="srv1-client", source="srv-1", target="client-1", - latency=0.00001, - dropout_rate=0, ), - Edge( + LinkEdge( id="srv2-client", source="srv-2", target="client-1", - latency=0.00001, - dropout_rate=0, ), ] settings = SimulationSettings( diff --git a/tests/unit/runner/test_simulation.py b/tests/unit/runner/test_simulation.py index de6dbf9..30be498 100644 --- a/tests/unit/runner/test_simulation.py +++ b/tests/unit/runner/test_simulation.py @@ -8,7 +8,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, cast import pytest import simpy @@ -22,7 +22,7 @@ from asyncflow.schemas.events.injection import EventInjection from asyncflow.schemas.payload import SimulationPayload from asyncflow.schemas.settings.simulation import SimulationSettings -from asyncflow.schemas.topology.edges import Edge +from asyncflow.schemas.topology.edges import LinkEdge, NetworkEdge from asyncflow.schemas.topology.graph import TopologyGraph from asyncflow.schemas.topology.nodes import ( Client, @@ -121,27 +121,42 @@ def test_build_load_balancer_noop_when_absent( # --------------------------------------------------------------------------- # # Edges builder # # --------------------------------------------------------------------------- # -def test_build_edges_with_stub_edge(runner: SimulationRunner) -> None: - """ - `_build_edges()` must register exactly one `EdgeRuntime`, corresponding - to a stub edge (generator → client). We inject that edge here. - """ - # Inject one stub edge into the payload graph. +def test_build_edges_with_stub_network_edge(runner: SimulationRunner) -> None: + """Register exactly one EdgeRuntime for a NetworkEdge (gen → cli).""" arrivals_id = runner.arrivals.id client_id = runner.client.id - stub_edge = Edge( + stub_edge = NetworkEdge( id="gen-cli", source=arrivals_id, target=client_id, latency=RVConfig(mean=0.001, distribution=Distribution.POISSON), ) - runner.edges.append(stub_edge) + + # Tipizza esplicitamente la lista come list[NetworkEdge] + net_edges: list[NetworkEdge] = [stub_edge] + runner.edges = cast("list[NetworkEdge] | list[LinkEdge]", net_edges) runner._build_rqs_generator() # noqa: SLF001 - runner._build_client() # noqa: SLF001 - runner._build_edges() # noqa: SLF001 + runner._build_client() # noqa: SLF001 + runner._build_edges() # noqa: SLF001 + assert len(runner._edges_runtime) == 1 # noqa: SLF001 +def test_build_edges_with_stub_link_edge(runner: SimulationRunner) -> None: + """Register exactly one EdgeRuntime for a LinkEdge (gen → cli).""" + arrivals_id = runner.arrivals.id + client_id = runner.client.id + stub_edge = LinkEdge(id="gen-cli", source=arrivals_id, target=client_id) + + # Tipizza esplicitamente la lista come list[LinkEdge] + link_edges: list[LinkEdge] = [stub_edge] + runner.edges = cast("list[NetworkEdge] | list[LinkEdge]", link_edges) + + runner._build_rqs_generator() # noqa: SLF001 + runner._build_client() # noqa: SLF001 + runner._build_edges() # noqa: SLF001 + + assert len(runner._edges_runtime) == 1 # noqa: SLF001 # --------------------------------------------------------------------------- # # from_yaml utility # @@ -185,19 +200,19 @@ def _payload_with_lb_one_server_and_edges( lb = LoadBalancer(id="lb-1") nodes = TopologyNodes(servers=[server], client=client, load_balancer=lb) - e_gen_lb = Edge( + e_gen_lb = NetworkEdge( id="gen-lb", source=arrivals.id, target=lb.id, latency=RVConfig(mean=0.001, distribution=Distribution.POISSON), ) - e_lb_srv = Edge( + e_lb_srv = NetworkEdge( id="lb-srv", source=lb.id, target=server.id, latency=RVConfig(mean=0.002, distribution=Distribution.POISSON), ) - e_net = Edge( + e_net = NetworkEdge( id="net-edge", source=arrivals.id, target=client.id, @@ -322,3 +337,4 @@ def test_build_events_attaches_shared_views(env: simpy.Environment) -> None: for er in sr._edges_runtime.values(): # noqa: SLF001 assert er.edges_spike is not None assert er.edges_affected is events_rt.edges_affected + diff --git a/tests/unit/runtime/actors/test_edge_rt.py b/tests/unit/runtime/actors/test_edge_rt.py index 5da6165..4131303 100644 --- a/tests/unit/runtime/actors/test_edge_rt.py +++ b/tests/unit/runtime/actors/test_edge_rt.py @@ -19,7 +19,7 @@ from asyncflow.runtime.actors.edge import EdgeRuntime from asyncflow.runtime.rqs_state import RequestState from asyncflow.schemas.common.random_variables import RVConfig -from asyncflow.schemas.topology.edges import Edge +from asyncflow.schemas.topology.edges import NetworkEdge if TYPE_CHECKING: # pragma: no cover import numpy as np @@ -78,7 +78,7 @@ def _make_edge( rng = DummyRNG(uniform_value=uniform_value, exp_value=exp_value) store: simpy.Store = simpy.Store(env) - edge_cfg = Edge( + edge_cfg = NetworkEdge( id="edge-1", source="src", target="dst", diff --git a/tests/unit/runtime/events/test_injection_edges.py b/tests/unit/runtime/events/test_injection_edges.py index a1618fe..2fd08b9 100644 --- a/tests/unit/runtime/events/test_injection_edges.py +++ b/tests/unit/runtime/events/test_injection_edges.py @@ -16,7 +16,7 @@ ) from asyncflow.schemas.common.random_variables import RVConfig from asyncflow.schemas.events.injection import EventInjection -from asyncflow.schemas.topology.edges import Edge +from asyncflow.schemas.topology.edges import LinkEdge, NetworkEdge if TYPE_CHECKING: import simpy @@ -24,9 +24,10 @@ # ----------------------------- Helpers ------------------------------------- # -def _edge(edge_id: str, source: str, target: str) -> Edge: +def _edge(edge_id: str, source: str, target: str) -> NetworkEdge: """Minimal edge with negligible latency.""" - return Edge(id=edge_id, source=source, target=target, latency=RVConfig(mean=0.001)) + return NetworkEdge( + id=edge_id, source=source, target=target, latency=RVConfig(mean=0.001)) def _spike_event( @@ -294,3 +295,44 @@ def test_zero_time_batch_draining_makes_first_event_visible( env.step() assert env.now == pytest.approx(1.0) assert inj.edges_spike[e.id] == pytest.approx(0.1) + +def _link_edge(edge_id: str, source: str, target: str) -> LinkEdge: + """Minimal LinkEdge without latency.""" + return LinkEdge(id=edge_id, source=source, target=target) + + +def _spike_event_on_edge(edge_id: str) -> EventInjection: + """Simple spike event targeting the given edge.""" + return EventInjection( + event_id="ev-link", + target_id=edge_id, + start={ + "kind": EventDescription.NETWORK_SPIKE_START, + "t_start": 1.0, + "spike_s": 0.2, + }, + end={ + "kind": EventDescription.NETWORK_SPIKE_END, + "t_end": 2.0, + }, + ) + + +def test_edge_events_rejected_for_linkedge_topology( + env: simpy.Environment, + ) -> None: + """ + If any event targets an edge, but edges are LinkEdge, + a ValueError must be raised. + """ + edges = [_link_edge("link-1", "A", "B")] + ev = _spike_event_on_edge("link-1") + + with pytest.raises(ValueError, match="Edge events are present.*NetworkEdge"): + EventInjectionRuntime( + events=[ev], + edges=edges, + env=env, + servers=[], + lb_out_edges=OrderedDict[str, EdgeRuntime](), + ) diff --git a/tests/unit/runtime/events/test_injection_servers.py b/tests/unit/runtime/events/test_injection_servers.py index 1fd2255..9c5c6ec 100644 --- a/tests/unit/runtime/events/test_injection_servers.py +++ b/tests/unit/runtime/events/test_injection_servers.py @@ -14,7 +14,7 @@ from asyncflow.runtime.events.injection import EventInjectionRuntime from asyncflow.schemas.common.random_variables import RVConfig from asyncflow.schemas.events.injection import EventInjection -from asyncflow.schemas.topology.edges import Edge +from asyncflow.schemas.topology.edges import NetworkEdge from asyncflow.schemas.topology.nodes import NodesResources, Server if TYPE_CHECKING: @@ -27,9 +27,9 @@ -def _edge(edge_id: str, source: str, target: str) -> Edge: +def _edge(edge_id: str, source: str, target: str) -> NetworkEdge: """Create a minimal LB→server edge with negligible latency.""" - return Edge( + return NetworkEdge( id=edge_id, source=source, target=target, diff --git a/tests/unit/runtime/events/test_injection_servers_edges.py b/tests/unit/runtime/events/test_injection_servers_edges.py index b15a53c..9451f00 100644 --- a/tests/unit/runtime/events/test_injection_servers_edges.py +++ b/tests/unit/runtime/events/test_injection_servers_edges.py @@ -14,7 +14,7 @@ from asyncflow.runtime.events.injection import EventInjectionRuntime from asyncflow.schemas.common.random_variables import RVConfig from asyncflow.schemas.events.injection import EventInjection -from asyncflow.schemas.topology.edges import Edge +from asyncflow.schemas.topology.edges import NetworkEdge from asyncflow.schemas.topology.nodes import NodesResources, Server if TYPE_CHECKING: @@ -25,9 +25,10 @@ # Helpers # # --------------------------------------------------------------------------- # -def _edge(edge_id: str, source: str, target: str) -> Edge: +def _edge(edge_id: str, source: str, target: str) -> NetworkEdge: """Create a minimal edge with negligible latency.""" - return Edge(id=edge_id, source=source, target=target, latency=RVConfig(mean=0.001)) + return NetworkEdge( + id=edge_id, source=source, target=target, latency=RVConfig(mean=0.001)) def _srv(server_id: str) -> Server: diff --git a/tests/unit/schemas/test_edge.py b/tests/unit/schemas/test_edge.py index 2c80abc..0d8a1c6 100644 --- a/tests/unit/schemas/test_edge.py +++ b/tests/unit/schemas/test_edge.py @@ -16,7 +16,7 @@ from asyncflow.config.constants import NetworkParameters from asyncflow.config.enums import SystemEdges from asyncflow.schemas.common.random_variables import RVConfig -from asyncflow.schemas.topology.edges import Edge +from asyncflow.schemas.topology.edges import LinkEdge, NetworkEdge # --------------------------------------------------------------------------- # # Helpers # @@ -35,7 +35,7 @@ def _rv(mean: float, variance: float | None = None) -> RVConfig: def test_edge_minimal_construction_uses_default_edge_type() -> None: """Minimal valid Edge uses NETWORK_CONNECTION as default edge_type.""" - e = Edge( + e = NetworkEdge( id="e1", source="a", target="b", @@ -47,19 +47,19 @@ def test_edge_minimal_construction_uses_default_edge_type() -> None: def test_edge_requires_id_source_target() -> None: """Omitting required fields raises ValidationError.""" with pytest.raises(ValidationError): - Edge( # type: ignore[call-arg] + NetworkEdge( # type: ignore[call-arg] source="a", target="b", latency=_rv(mean=0.01), ) with pytest.raises(ValidationError): - Edge( # type: ignore[call-arg] + NetworkEdge( # type: ignore[call-arg] id="e1", target="b", latency=_rv(mean=0.01), ) with pytest.raises(ValidationError): - Edge( # type: ignore[call-arg] + NetworkEdge( # type: ignore[call-arg] id="e1", source="a", latency=_rv(mean=0.01), @@ -74,7 +74,7 @@ def test_edge_requires_id_source_target() -> None: def test_edge_source_equals_target_fails() -> None: """Validator forbids identical source and target.""" with pytest.raises(ValidationError): - Edge( + NetworkEdge( id="loop", source="x", target="x", @@ -94,7 +94,7 @@ def test_edge_source_equals_target_fails() -> None: def test_edge_dropout_rate_out_of_bounds(bad_rate: float) -> None: """Dropout rate outside configured bounds is rejected.""" with pytest.raises(ValidationError): - Edge( + NetworkEdge( id="ed", source="a", target="b", @@ -113,7 +113,7 @@ def test_edge_dropout_rate_out_of_bounds(bad_rate: float) -> None: ) def test_edge_dropout_rate_in_bounds(ok_rate: float) -> None: """Boundary and mid-range dropout rates are accepted.""" - e = Edge( + e = NetworkEdge( id="ed", source="a", target="b", @@ -131,7 +131,7 @@ def test_edge_dropout_rate_in_bounds(ok_rate: float) -> None: @pytest.mark.parametrize("good_latency", [0.001, 0.1, 5.0]) def test_edge_deterministic_latency_positivefloat_ok(good_latency: float) -> None: """Deterministic latency validates as PositiveFloat when > 0.""" - e = Edge( + e = NetworkEdge( id="dt", source="a", target="b", @@ -146,7 +146,7 @@ def test_edge_deterministic_latency_positivefloat_ok(good_latency: float) -> Non def test_edge_deterministic_latency_non_positive_fails(bad_latency: float) -> None: """Non-positive deterministic latency is rejected by PositiveFloat.""" with pytest.raises(ValidationError): - Edge( + NetworkEdge( id="dt-bad", source="a", target="b", @@ -161,7 +161,7 @@ def test_edge_deterministic_latency_non_positive_fails(bad_latency: float) -> No def test_edge_rvconfig_latency_ok_with_zero_variance() -> None: """RVConfig with mean>0 and variance==0 is accepted.""" - e = Edge( + e = NetworkEdge( id="rv0", source="a", target="b", @@ -174,7 +174,7 @@ def test_edge_rvconfig_latency_ok_with_zero_variance() -> None: def test_edge_rvconfig_latency_ok_with_none_variance() -> None: """RVConfig with mean>0 and variance=None is accepted.""" - e = Edge( + e = NetworkEdge( id="rvn", source="a", target="b", @@ -189,7 +189,7 @@ def test_edge_rvconfig_latency_ok_with_none_variance() -> None: def test_edge_rvconfig_latency_non_positive_mean_fails(bad_mean: float) -> None: """RVConfig with non-positive mean is rejected by the field validator.""" with pytest.raises(ValidationError): - Edge( + NetworkEdge( id="rv-bad-mean", source="a", target="b", @@ -200,9 +200,75 @@ def test_edge_rvconfig_latency_non_positive_mean_fails(bad_mean: float) -> None: def test_edge_rvconfig_latency_negative_variance_fails() -> None: """RVConfig with negative variance is rejected by the field validator.""" with pytest.raises(ValidationError): - Edge( + NetworkEdge( id="rv-bad-var", source="a", target="b", latency=_rv(mean=0.02, variance=-0.0001), ) + +# --------------------------------------------------------------------------- # +# LinkEdge: required fields and defaults # +# --------------------------------------------------------------------------- # + +def test_link_edge_minimal_construction_uses_default_edge_type() -> None: + """Minimal LinkEdge uses LINK_CONNECTION as the default edge_type.""" + e = LinkEdge(id="l1", source="a", target="b") + assert e.edge_type is SystemEdges.LINK_CONNECTION + + +def test_link_edge_requires_id_source_target() -> None: + """Omitting required fields raises ValidationError.""" + with pytest.raises(ValidationError): + LinkEdge( # type: ignore[call-arg] + source="a", + target="b", + ) + with pytest.raises(ValidationError): + LinkEdge( # type: ignore[call-arg] + id="l1", + target="b", + ) + with pytest.raises(ValidationError): + LinkEdge( # type: ignore[call-arg] + id="l1", + source="a", + ) + +# --------------------------------------------------------------------------- # +# LinkEdge: source != target # +# --------------------------------------------------------------------------- # + +def test_link_edge_source_equals_target_fails() -> None: + """Validator forbids identical source and target in LinkEdge.""" + with pytest.raises(ValidationError): + LinkEdge(id="loop", source="x", target="x") + +# --------------------------------------------------------------------------- # +# LinkEdge: edge_type must be LINK_CONNECTION # +# --------------------------------------------------------------------------- # + +def test_link_edge_wrong_edge_type_fails() -> None: + """Setting edge_type to anything other than LINK_CONNECTION is rejected.""" + with pytest.raises(ValidationError): + LinkEdge( + id="lbad", + source="a", + target="b", + edge_type=SystemEdges.NETWORK_CONNECTION, # invalid + ) + +# --------------------------------------------------------------------------- # +# NetworkEdge: edge_type must be NETWORK_CONNECTION # +# --------------------------------------------------------------------------- # + +def test_network_edge_wrong_edge_type_fails() -> None: + """NetworkEdge rejects edge_type values other than NETWORK_CONNECTION.""" + with pytest.raises(ValidationError): + NetworkEdge( + id="nbad", + source="a", + target="b", + latency=0.01, + edge_type=SystemEdges.LINK_CONNECTION, # invalid + ) diff --git a/tests/unit/schemas/test_payload.py b/tests/unit/schemas/test_payload.py index 93aff13..9a7d8cb 100644 --- a/tests/unit/schemas/test_payload.py +++ b/tests/unit/schemas/test_payload.py @@ -19,7 +19,7 @@ from asyncflow.schemas.common.random_variables import RVConfig from asyncflow.schemas.events.injection import End, EventInjection, Start from asyncflow.schemas.payload import SimulationPayload -from asyncflow.schemas.topology.edges import Edge +from asyncflow.schemas.topology.edges import NetworkEdge from asyncflow.schemas.topology.graph import TopologyGraph from asyncflow.schemas.topology.nodes import Client, Server, TopologyNodes @@ -75,7 +75,7 @@ def _mk_server_window( def _topology_with_min_edge() -> TopologyGraph: """Create a tiny topology with one client and one minimal edge.""" client = Client(id="client-1") - edge = Edge( + edge = NetworkEdge( id="gen-to-client", source="rqs-1", target="client-1", @@ -100,7 +100,7 @@ def _topology_with_two_servers_and_edge() -> TopologyGraph: endpoints=[make_min_ep()], ), ] - edge = Edge( + edge = NetworkEdge( id="gen-to-client", source="rqs-1", target="client-1", diff --git a/tests/unit/schemas/test_topology.py b/tests/unit/schemas/test_topology.py index 246d37d..edb3306 100644 --- a/tests/unit/schemas/test_topology.py +++ b/tests/unit/schemas/test_topology.py @@ -13,7 +13,7 @@ SystemNodes, ) from asyncflow.schemas.common.random_variables import RVConfig -from asyncflow.schemas.topology.edges import Edge +from asyncflow.schemas.topology.edges import NetworkEdge from asyncflow.schemas.topology.endpoint import Endpoint, Step from asyncflow.schemas.topology.graph import TopologyGraph from asyncflow.schemas.topology.nodes import ( @@ -141,7 +141,7 @@ def test_edge_source_equals_target_fails() -> None: """Edge with identical source/target raises ValidationError.""" latency_cfg = RVConfig(mean=0.05) with pytest.raises(ValidationError): - Edge( + NetworkEdge( id="edge-dup", source="same", target="same", @@ -154,7 +154,7 @@ def test_edge_missing_id_raises() -> None: """Omitting mandatory ``id`` field raises ValidationError.""" latency_cfg = RVConfig(mean=0.01) with pytest.raises(ValidationError): - Edge( # type: ignore[call-arg] + NetworkEdge( # type: ignore[call-arg] source="a", target="b", latency=latency_cfg, @@ -168,7 +168,7 @@ def test_edge_missing_id_raises() -> None: def test_edge_dropout_rate_bounds(bad_rate: float) -> None: """Drop-out rate outside valid range triggers ValidationError.""" with pytest.raises(ValidationError): - Edge( + NetworkEdge( id="edge-bad-drop", source="n1", target="n2", @@ -188,7 +188,7 @@ def _latency() -> RVConfig: def _topology_with_lb( cover: set[str], - extra_edges: list[Edge] | None = None, + extra_edges: list[NetworkEdge] | None = None, ) -> TopologyGraph: """Build a minimal graph with 1 client, 1 server and a load balancer.""" nodes = _single_node_topology() @@ -199,14 +199,14 @@ def _topology_with_lb( load_balancer=lb, ) - edges: list[Edge] = [ - Edge( # client -> LB + edges: list[NetworkEdge] = [ + NetworkEdge( # client -> LB id="cli-lb", source="browser", target="lb-1", latency=_latency(), ), - Edge( # LB -> server (may be removed in invalid tests) + NetworkEdge( # LB -> server (may be removed in invalid tests) id="lb-srv", source="lb-1", target="svc-A", @@ -221,7 +221,7 @@ def _topology_with_lb( def test_valid_topology_graph() -> None: """Happy-path graph passes validation.""" nodes = _single_node_topology() - edge = Edge( + edge = NetworkEdge( id="edge-1", source="browser", target="svc-A", @@ -234,7 +234,7 @@ def test_valid_topology_graph() -> None: def test_topology_graph_without_lb_still_valid() -> None: """Graph without load balancer validates just like before.""" nodes = _single_node_topology() - edge = Edge( + edge = NetworkEdge( id="edge-1", source="browser", target="svc-A", @@ -248,7 +248,7 @@ def test_topology_graph_without_lb_still_valid() -> None: def test_edge_refers_unknown_node() -> None: """Edge pointing to a non-existent node fails validation.""" nodes = _single_node_topology() - bad_edge = Edge( + bad_edge = NetworkEdge( id="edge-ghost", source="browser", target="ghost-srv", @@ -291,7 +291,7 @@ def test_lb_missing_edge_to_covered_server() -> None: load_balancer=lb, ) edges = [ - Edge( + NetworkEdge( id="cli-lb", source="browser", target="lb-1",