diff --git a/docs/dev_workflow_guide.md b/docs/dev_workflow_guide.md index d1379db..c0a9705 100644 --- a/docs/dev_workflow_guide.md +++ b/docs/dev_workflow_guide.md @@ -38,6 +38,7 @@ fastsim-backend/ │ │ ├── rqs_state.py # RequestState & Hop │ │ ├── simulation_runner.py # logic to initialize the whole simulation | └── actors/ # SimPy “actors”: Edge, Server, Client, RqsGenerator + ├── pybuilder/ # Pythonic way to build the simulation payload │ ├── samplers/ # stochastic samplers (Gaussian-Poisson, etc.) │ ├── schemas/ # Pydantic input/output models ├── poetry.lock diff --git a/examples/single_server_pybuilder.png b/examples/single_server_pybuilder.png new file mode 100644 index 0000000..e851f7b Binary files /dev/null and b/examples/single_server_pybuilder.png differ diff --git a/examples/single_server_pybuilder.py b/examples/single_server_pybuilder.py new file mode 100644 index 0000000..4d8b757 --- /dev/null +++ b/examples/single_server_pybuilder.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python3 +""" +Didactic example: build and run a FastSim scenario **without** YAML, +using the 'pybuilder' (AsyncFlow) to assemble the SimulationPayload. + +Scenario reproduced (same as the previous YAML): + generator ──edge──> client ──edge──> server ──edge──> client + +Load: + ~100 active users, 20 req/min each. + +Server: + 1 CPU core, 2GB RAM, endpoint with steps: + CPU(1ms) → RAM(100MB) → IO(100ms) + +Network: + 3ms mean (exponential) latency on each edge. + +What this script does: + 1) Build Pydantic models (generator, client, server, edges, settings). + 2) Compose the final SimulationPayload via AsyncFlow (builder pattern). + 3) Run the simulation with SimulationRunner. + 4) Print latency stats, throughput timeline, and a sampled-metrics preview. + 5) (Optional) Visualize the topology with Matplotlib. + +Run: + python run_with_pybuilder.py +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Dict, Iterable, List, Mapping, Tuple + +import numpy as np +import simpy + +# ── FastSim domain imports ─────────────────────────────────────────────────── +from app.pybuilder.input_builder import AsyncFlow +from app.runtime.simulation_runner import SimulationRunner +from app.metrics.analyzer import ResultsAnalyzer +from app.schemas.full_simulation_input import SimulationPayload +from app.schemas.rqs_generator_input import RqsGeneratorInput +from app.schemas.simulation_settings_input import SimulationSettings +from app.schemas.system_topology.endpoint import Endpoint +from app.schemas.system_topology.full_system_topology import ( + Client, + Edge, + Server, +) + +from app.config.constants import LatencyKey, SampledMetricName + + +# ───────────────────────────────────────────────────────────── +# Pretty printers (compact, readable output) +# ───────────────────────────────────────────────────────────── +def print_latency_stats(res: ResultsAnalyzer) -> None: + """Print latency statistics calculated by the analyzer.""" + stats: Mapping[LatencyKey, float] = res.get_latency_stats() + print("\n════════ LATENCY STATS ════════") + if not stats: + print("(empty)") + return + + order: List[LatencyKey] = [ + LatencyKey.TOTAL_REQUESTS, + LatencyKey.MEAN, + LatencyKey.MEDIAN, + LatencyKey.STD_DEV, + LatencyKey.P95, + LatencyKey.P99, + LatencyKey.MIN, + LatencyKey.MAX, + ] + for key in order: + if key in stats: + print(f"{key.name:<20} = {stats[key]:.6f}") + + +def print_throughput(res: ResultsAnalyzer) -> None: + """Print the 1-second throughput buckets.""" + timestamps, rps = res.get_throughput_series() + print("\n════════ THROUGHPUT (req/sec) ════════") + if not timestamps: + print("(empty)") + return + + for t, rate in zip(timestamps, rps): + print(f"t={t:4.1f}s → {rate:6.2f} rps") + + +def print_sampled_preview(res: ResultsAnalyzer) -> None: + """ + Print a small preview for each sampled metric series (first 5 values). + This helps verify that sampler pipelines are running. + """ + sampled = res.get_sampled_metrics() + print("\n════════ SAMPLED METRICS (preview) ════════") + if not sampled: + print("(empty)") + return + + for metric, series in sampled.items(): + metric_name = ( + metric.name if isinstance(metric, SampledMetricName) else str(metric) + ) + print(f"\n📈 {metric_name}:") + for entity, vals in series.items(): + head = list(vals[:5]) if vals else [] + print(f" - {entity}: len={len(vals)}, first={head}") + + +# ───────────────────────────────────────────────────────────── +# Tiny helpers for sanity checks (optional) +# ───────────────────────────────────────────────────────────── +def _mean(series: Iterable[float]) -> float: + """Numerically stable mean for a generic float iterable.""" + arr = np.asarray(list(series), dtype=float) + return float(np.mean(arr)) if arr.size else 0.0 + + +def run_sanity_checks( + runner: SimulationRunner, + res: ResultsAnalyzer, +) -> None: + """ + Back-of-the-envelope checks to compare rough expectations vs observations. + These are intentionally simplistic approximations. + """ + print("\n════════ SANITY CHECKS (rough) ════════") + w = runner.simulation_input.rqs_input + lam_rps = ( + float(w.avg_active_users.mean) + * float(w.avg_request_per_minute_per_user.mean) + / 60.0 + ) + + # Observed throughput + _, rps_series = res.get_throughput_series() + rps_observed = _mean(rps_series) + print(f"• Mean throughput (rps) expected≈{lam_rps:.3f} " + f"observed={rps_observed:.3f}") + + # A few sampled signals (RAM, queues) just to show they are populated. + sampled = res.get_sampled_metrics() + ram_series = sampled.get(SampledMetricName.RAM_IN_USE, {}) + ioq_series = sampled.get(SampledMetricName.EVENT_LOOP_IO_SLEEP, {}) + ready_series = sampled.get(SampledMetricName.READY_QUEUE_LEN, {}) + + ram_mean = _mean([_mean(v) for v in ram_series.values()]) if ram_series else 0.0 + ioq_mean = _mean([_mean(v) for v in ioq_series.values()]) if ioq_series else 0.0 + ready_mean = ( + _mean([_mean(v) for v in ready_series.values()]) if ready_series else 0.0 + ) + + print(f"• Mean RAM in use (MB) observed={ram_mean:.3f}") + print(f"• Mean I/O queue length observed={ioq_mean:.3f}") + print(f"• Mean ready queue length observed={ready_mean:.3f}") + + +# ───────────────────────────────────────────────────────────── +# Build the same scenario via AsyncFlow (pybuilder) +# ───────────────────────────────────────────────────────────── +def build_payload_with_pybuilder() -> SimulationPayload: + """ + Construct the SimulationPayload programmatically using the builder. + + This mirrors the YAML: + - Generator (100 users, 20 rpm each) + - Client + - One server with a single endpoint (CPU → RAM → IO) + - Three edges with exponential latency (3ms mean) + - Simulation settings: 500s total, sample period 50ms + """ + # 1) Request generator + generator = RqsGeneratorInput( + id="rqs-1", + avg_active_users={"mean": 100}, + avg_request_per_minute_per_user={"mean": 20}, + user_sampling_window=60, + ) + + # 2) Client + client = Client(id="client-1") + + # 3) Server (1 CPU core, 2GB RAM) with one endpoint and three steps + # We let Pydantic coerce nested dicts for the endpoint steps. + endpoint = Endpoint( + endpoint_name="ep-1", + probability=1.0, + steps=[ + {"kind": "initial_parsing", "step_operation": {"cpu_time": 0.001}}, + {"kind": "ram", "step_operation": {"necessary_ram": 100}}, + {"kind": "io_wait", "step_operation": {"io_waiting_time": 0.1}}, + ], + ) + + server = Server( + id="srv-1", + server_resources={"cpu_cores": 1, "ram_mb": 2048}, + endpoints=[endpoint], + ) + + # 4) Edges: exponential latency with 3ms mean (same as YAML) + e_gen_client = Edge( + id="gen-to-client", + source="rqs-1", + target="client-1", + latency={"mean": 0.003, "distribution": "exponential"}, + ) + e_client_server = Edge( + id="client-to-server", + source="client-1", + target="srv-1", + latency={"mean": 0.003, "distribution": "exponential"}, + ) + e_server_client = Edge( + id="server-to-client", + source="srv-1", + target="client-1", + latency={"mean": 0.003, "distribution": "exponential"}, + ) + + # 5) Simulation settings + settings = SimulationSettings( + total_simulation_time=500, + sample_period_s=0.05, + enabled_sample_metrics=[ + "ready_queue_len", + "event_loop_io_sleep", + "ram_in_use", + "edge_concurrent_connection", + ], + enabled_event_metrics=["rqs_clock"], + ) + + # 6) Assemble the payload via the builder (AsyncFlow). + # The builder will validate the final structure on build. + flow = ( + AsyncFlow() + .add_generator(generator) + .add_client(client) + .add_servers(server) + .add_edges(e_gen_client, e_client_server, e_server_client) + .add_simulation_settings(settings) + ) + + return flow.build_payload() + + +# ───────────────────────────────────────────────────────────── +# Main entry-point +# ───────────────────────────────────────────────────────────── +def main() -> None: + """ + Build → wire → run the simulation, then print diagnostics. + Mirrors run_from_yaml.py but uses the pybuilder to construct the input. + Also saves a 2x2 plot figure (latency, throughput, server queues, RAM). + """ + env = simpy.Environment() + payload = build_payload_with_pybuilder() + + runner = SimulationRunner(env=env, simulation_input=payload) + results: ResultsAnalyzer = runner.run() + + # Human-friendly diagnostics + print_latency_stats(results) + print_throughput(results) + print_sampled_preview(results) + + # Optional sanity checks (very rough) + run_sanity_checks(runner, results) + + # Save plots (2x2 figure), same layout as in the YAML-based example + try: + from matplotlib import pyplot as plt # noqa: PLC0415 + + fig, axes = plt.subplots(2, 2, figsize=(12, 8)) + results.plot_latency_distribution(axes[0, 0]) + results.plot_throughput(axes[0, 1]) + results.plot_server_queues(axes[1, 0]) + results.plot_ram_usage(axes[1, 1]) + fig.tight_layout() + + out_path = Path(__file__).parent / "single_server_pybuilder.png" + fig.savefig(out_path) + print(f"\n🖼️ Plots saved to: {out_path}") + except Exception as exc: # Matplotlib not installed or plotting failed + print(f"\n[plotting skipped] {exc!r}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/app/pybuilder/input_builder.py b/src/app/pybuilder/input_builder.py new file mode 100644 index 0000000..223cd32 --- /dev/null +++ b/src/app/pybuilder/input_builder.py @@ -0,0 +1,127 @@ +"""Definition of the input of the simulation through python object""" + +from __future__ import annotations + +from typing import Self + +from app.schemas.full_simulation_input import SimulationPayload +from app.schemas.rqs_generator_input import RqsGeneratorInput +from app.schemas.simulation_settings_input import SimulationSettings +from app.schemas.system_topology.full_system_topology import ( + Client, + Edge, + LoadBalancer, + Server, + TopologyGraph, + TopologyNodes, +) + + +class AsyncFlow: + """class with method to create the input for the simulation""" + + def __init__(self) -> None: + """Instance attributes necessary to define the simulation payload""" + self._generator: RqsGeneratorInput | None = None + self._client: Client | None = None + self._servers: list[Server] | None = None + self._edges: list[Edge] | None = None + self._sim_settings: SimulationSettings | None = None + self._load_balancer: LoadBalancer | None = None + + def add_generator(self, rqs_generator: RqsGeneratorInput) -> Self: + """Method to instantiate the generator""" + if not isinstance(rqs_generator, RqsGeneratorInput): + msg = "You must add a RqsGeneratorInput instance" + raise TypeError(msg) + self._generator = rqs_generator + return self + + def add_client(self, client: Client) -> Self: + """Method to instantiate the client""" + if not isinstance(client, Client): + msg = "You must add a Client instance" + raise TypeError(msg) + + self._client = client + return self + + def add_servers(self, *servers: Server) -> Self: + """Method to instantiate the server list""" + if self._servers is None: + self._servers = [] + + for server in servers: + if not isinstance(server, Server): + msg = "All the instances must be of the type Server" + raise TypeError(msg) + self._servers.append(server) + return self + + def add_edges(self, *edges: Edge) -> Self: + """Method to instantiate the list of edges""" + if self._edges is None: + self._edges = [] + + for edge in edges: + if not isinstance(edge, Edge): + msg = "All the instances must be of the type Edge" + raise TypeError(msg) + self._edges.append(edge) + return self + + def add_simulation_settings(self, sim_settings: SimulationSettings) -> Self: + """Method to instantiate the settings for the simulation""" + if not isinstance(sim_settings, SimulationSettings): + msg = "The instance must be of the type SimulationSettings" + raise TypeError(msg) + + self._sim_settings = sim_settings + return self + + def add_load_balancer(self, load_balancer: LoadBalancer) -> Self: + """Method to instantiate a load balancer""" + if not isinstance(load_balancer, LoadBalancer): + msg = "The instance must be of the type LoadBalancer" + raise TypeError(msg) + + self._load_balancer = load_balancer + return self + + def build_payload(self) -> SimulationPayload: + """Method to build the payload for the simulation""" + if self._generator is None: + msg = "The generator input must be instantiated before the simulation" + raise ValueError(msg) + if self._client is None: + msg = "The client input must be instantiated before the simulation" + raise ValueError(msg) + if not self._servers: + msg = "You must instantiate at least one server before the simulation" + raise ValueError(msg) + if not self._edges: + msg = "You must instantiate edges before the simulation" + raise ValueError(msg) + if self._sim_settings is None: + msg = "The simulation settings must be instantiated before the simulation" + raise ValueError(msg) + + nodes = TopologyNodes( + servers=self._servers, + client=self._client, + load_balancer=self._load_balancer, + ) + + graph = TopologyGraph( + nodes = nodes, + edges=self._edges, + ) + + return SimulationPayload.model_validate({ + "rqs_input": self._generator, + "topology_graph": graph, + "sim_settings": self._sim_settings, + }) + + + diff --git a/src/app/schemas/system_topology/full_system_topology.py b/src/app/schemas/system_topology/full_system_topology.py index ddd4c5f..f70b16c 100644 --- a/src/app/schemas/system_topology/full_system_topology.py +++ b/src/app/schemas/system_topology/full_system_topology.py @@ -155,6 +155,10 @@ def unique_ids( ) -> "TopologyNodes": """Check that all id are unique""" ids = [server.id for server in model.servers] + [model.client.id] + + if model.load_balancer is not None: + ids.append(model.load_balancer.id) + counter = Counter(ids) duplicate = [node_id for node_id, value in counter.items() if value > 1] if duplicate: @@ -193,7 +197,6 @@ class Edge(BaseModel): source: str target: str latency: RVConfig - probability: float = Field(1.0, ge=0.0, le=1.0) edge_type: SystemEdges = SystemEdges.NETWORK_CONNECTION dropout_rate: float = Field( NetworkParameters.DROPOUT_RATE, @@ -230,7 +233,7 @@ def ensure_latency_is_non_negative( if variance is not None and variance < 0: # Variance can be zero msg = ( f"The variance of the latency of the edge {edge_id}" - "must be positive" + "must be non negative" ) raise ValueError(msg) return v @@ -327,7 +330,7 @@ def edge_refs_valid( @model_validator(mode="after") # type: ignore[arg-type] def valid_load_balancer(cls, model: "TopologyGraph") -> "TopologyGraph": # noqa: N805 """ - Check de validity of the load balancer: first we check + Check the validity of the load balancer: first we check if is present in the simulation, second we check if the LB list is a proper subset of the server sets of ids, then we check if edge from LB to the servers are well defined @@ -343,7 +346,7 @@ def valid_load_balancer(cls, model: "TopologyGraph") -> "TopologyGraph": # noqa: if missing: msg = (f"Load balancer '{lb.id}'" - "references unknown servers: {sorted(missing)}") + f"references unknown servers: {sorted(missing)}") raise ValueError(msg) # edge are well defined @@ -360,3 +363,29 @@ def valid_load_balancer(cls, model: "TopologyGraph") -> "TopologyGraph": # noqa: return model + @model_validator(mode="after") # type: ignore[arg-type] + def no_fanout_except_lb(cls, model: "TopologyGraph") -> "TopologyGraph": # noqa: N805 + """Ensure only the LB (declared node) can have multiple outgoing edges.""" + lb_id = model.nodes.load_balancer.id if model.nodes.load_balancer else None + + # let us consider only nodes declared in the topology + node_ids: set[str] = {server.id for server in model.nodes.servers} + node_ids.add(model.nodes.client.id) + if lb_id: + node_ids.add(lb_id) + + counts: dict[str, int] = {} + for edge in model.edges: + if edge.source not in node_ids: + continue + counts[edge.source] = counts.get(edge.source, 0) + 1 + + offenders = [src for src, c in counts.items() if c > 1 and src != lb_id] + if offenders: + msg = ( + "Only the load balancer can have multiple outgoing edges. " + f"Offending sources: {offenders}" + ) + raise ValueError(msg) + + return model diff --git a/tests/unit/pybuilder/test_input_builder.py b/tests/unit/pybuilder/test_input_builder.py new file mode 100644 index 0000000..1bdddf8 --- /dev/null +++ b/tests/unit/pybuilder/test_input_builder.py @@ -0,0 +1,280 @@ +""" +Unit tests for the AsyncFlow builder. + +The goal is to verify that: +- The builder enforces types on each `add_*` method. +- Missing components produce clear ValueError exceptions on `build_payload()`. +- A valid, minimal scenario builds a `SimulationPayload` successfully. +- Methods return `self` to support fluent chaining. +- Servers and edges can be added in multiples and preserve order. +""" + +from __future__ import annotations + +import pytest + +from app.pybuilder.input_builder import AsyncFlow +from app.schemas.full_simulation_input import SimulationPayload +from app.schemas.rqs_generator_input import RqsGeneratorInput +from app.schemas.simulation_settings_input import SimulationSettings +from app.schemas.system_topology.endpoint import Endpoint +from app.schemas.system_topology.full_system_topology import Client, Edge, Server + + +# --------------------------------------------------------------------------- # +# Helpers: build minimal, valid components # +# --------------------------------------------------------------------------- # +def make_generator() -> RqsGeneratorInput: + """Return a minimal valid request generator.""" + return RqsGeneratorInput( + id="rqs-1", + avg_active_users={"mean": 10}, + avg_request_per_minute_per_user={"mean": 30}, + user_sampling_window=60, + ) + + +def make_client() -> Client: + """Return a minimal valid client.""" + return Client(id="client-1") + + +def make_endpoint() -> Endpoint: + """Return a minimal endpoint with CPU and IO steps.""" + return Endpoint( + endpoint_name="ep-1", + probability=1.0, + steps=[ + {"kind": "initial_parsing", "step_operation": {"cpu_time": 0.001}}, + {"kind": "io_wait", "step_operation": {"io_waiting_time": 0.001}}, + ], + ) + + +def make_server(server_id: str = "srv-1") -> Server: + """Return a minimal valid server with 1 core, 2GB RAM, and one endpoint.""" + return Server( + id=server_id, + server_resources={"cpu_cores": 1, "ram_mb": 2048}, + endpoints=[make_endpoint()], + ) + + +def make_edges() -> list[Edge]: + """Return a valid edge triplet for the minimal single-server scenario.""" + e1 = Edge( + id="gen-to-client", + source="rqs-1", + target="client-1", + latency={"mean": 0.003, "distribution": "exponential"}, + ) + e2 = Edge( + id="client-to-server", + source="client-1", + target="srv-1", + latency={"mean": 0.003, "distribution": "exponential"}, + ) + e3 = Edge( + id="server-to-client", + source="srv-1", + target="client-1", + latency={"mean": 0.003, "distribution": "exponential"}, + ) + return [e1, e2, e3] + + +def make_settings() -> SimulationSettings: + """Return minimal simulation settings within validation bounds.""" + return SimulationSettings( + total_simulation_time=5.0, # lower bound is 5 seconds + sample_period_s=0.1, + enabled_sample_metrics=[ + "ready_queue_len", + "event_loop_io_sleep", + "ram_in_use", + "edge_concurrent_connection", + ], + enabled_event_metrics=["rqs_clock"], + ) + + +# --------------------------------------------------------------------------- # +# Positive / “happy path” # +# --------------------------------------------------------------------------- # +def test_builder_happy_path_returns_payload() -> None: + """Building a minimal scenario returns a validated SimulationPayload.""" + flow = AsyncFlow() + generator = make_generator() + client = make_client() + server = make_server() + e1, e2, e3 = make_edges() + settings = make_settings() + + payload = ( + flow.add_generator(generator) + .add_client(client) + .add_servers(server) + .add_edges(e1, e2, e3) + .add_simulation_settings(settings) + .build_payload() + ) + + assert isinstance(payload, SimulationPayload) + assert payload.topology_graph.nodes.client.id == client.id + assert len(payload.topology_graph.nodes.servers) == 1 + assert {e.id for e in payload.topology_graph.edges} == { + "gen-to-client", + "client-to-server", + "server-to-client", + } + + +def test_add_methods_return_self_for_chaining() -> None: + """Every add_* method returns `self` to support fluent chaining.""" + flow = AsyncFlow() + ret = ( + flow.add_generator(make_generator()) + .add_client(make_client()) + .add_servers(make_server()) + .add_edges(*make_edges()) + .add_simulation_settings(make_settings()) + ) + assert ret is flow + + +def test_add_servers_accepts_multiple_and_keeps_order() -> None: + """Adding multiple servers keeps insertion order.""" + flow = AsyncFlow().add_generator(make_generator()).add_client(make_client()) + s1 = make_server("srv-1") + s2 = make_server("srv-2") + s3 = make_server("srv-3") + + flow.add_servers(s1, s2).add_servers(s3) + e1, e2, e3 = make_edges() + settings = make_settings() + payload = ( + flow.add_edges(e1, e2, e3) + .add_simulation_settings(settings) + .build_payload() + ) + + ids = [srv.id for srv in payload.topology_graph.nodes.servers] + assert ids == ["srv-1", "srv-2", "srv-3"] + + +# --------------------------------------------------------------------------- # +# Negative cases: missing components # +# --------------------------------------------------------------------------- # +def test_build_without_generator_raises() -> None: + """Building without a generator fails with a clear error.""" + flow = AsyncFlow() + flow.add_client(make_client()) + flow.add_servers(make_server()) + flow.add_edges(*make_edges()) + flow.add_simulation_settings(make_settings()) + + with pytest.raises( + ValueError, + match="The generator input must be instantiated before the simulation", + ): + flow.build_payload() + + +def test_build_without_client_raises() -> None: + """Building without a client fails with a clear error.""" + flow = AsyncFlow() + flow.add_generator(make_generator()) + flow.add_servers(make_server()) + flow.add_edges(*make_edges()) + flow.add_simulation_settings(make_settings()) + + with pytest.raises( + ValueError, + match="The client input must be instantiated before the simulation", + ): + flow.build_payload() + + +def test_build_without_servers_raises() -> None: + """Building without servers fails with a clear error.""" + flow = AsyncFlow() + flow.add_generator(make_generator()) + flow.add_client(make_client()) + flow.add_edges(*make_edges()) + flow.add_simulation_settings(make_settings()) + + with pytest.raises( + ValueError, + match="You must instantiate at least one server before the simulation", + ): + flow.build_payload() + + +def test_build_without_edges_raises() -> None: + """Building without edges fails with a clear error.""" + flow = AsyncFlow() + flow.add_generator(make_generator()) + flow.add_client(make_client()) + flow.add_servers(make_server()) + flow.add_simulation_settings(make_settings()) + + with pytest.raises( + ValueError, + match="You must instantiate edges before the simulation", + ): + flow.build_payload() + + +def test_build_without_settings_raises() -> None: + """Building without settings fails with a clear error.""" + flow = AsyncFlow() + flow.add_generator(make_generator()) + flow.add_client(make_client()) + flow.add_servers(make_server()) + flow.add_edges(*make_edges()) + + with pytest.raises( + ValueError, + match="The simulation settings must be instantiated before the simulation", + ): + flow.build_payload() + + +# --------------------------------------------------------------------------- # +# Negative cases: type enforcement in add_* methods # +# --------------------------------------------------------------------------- # +def test_add_generator_rejects_wrong_type() -> None: + """`add_generator` rejects non-RqsGeneratorInput instances.""" + flow = AsyncFlow() + with pytest.raises(TypeError): + flow.add_generator("not-a-generator") # type: ignore[arg-type] + + +def test_add_client_rejects_wrong_type() -> None: + """`add_client` rejects non-Client instances.""" + flow = AsyncFlow() + with pytest.raises(TypeError): + flow.add_client(1234) # type: ignore[arg-type] + + +def test_add_servers_rejects_wrong_type() -> None: + """`add_servers` rejects any non-Server in the varargs.""" + flow = AsyncFlow() + good = make_server() + with pytest.raises(TypeError): + flow.add_servers(good, "not-a-server") # type: ignore[arg-type] + + +def test_add_edges_rejects_wrong_type() -> None: + """`add_edges` rejects any non-Edge in the varargs.""" + flow = AsyncFlow() + good = make_edges()[0] + with pytest.raises(TypeError): + flow.add_edges(good, 3.14) # type: ignore[arg-type] + + +def test_add_settings_rejects_wrong_type() -> None: + """`add_simulation_settings` rejects non-SimulationSettings instances.""" + flow = AsyncFlow() + with pytest.raises(TypeError): + flow.add_simulation_settings({"total_simulation_time": 1.0}) # type: ignore[arg-type]