diff --git a/src/asyncflow/runtime/actors/edge.py b/src/asyncflow/runtime/actors/edge.py index 63c8f45..d741e53 100644 --- a/src/asyncflow/runtime/actors/edge.py +++ b/src/asyncflow/runtime/actors/edge.py @@ -6,6 +6,8 @@ waits the sampled delay (and any resource wait) before delivering the message to the target node's inbox. """ + + from collections.abc import Container, Generator, Mapping from typing import TYPE_CHECKING @@ -16,11 +18,14 @@ from asyncflow.metrics.edge import build_edge_metrics from asyncflow.runtime.rqs_state import RequestState from asyncflow.samplers.common_helpers import general_sampler +from asyncflow.schemas.common.random_variables import RVConfig from asyncflow.schemas.settings.simulation import SimulationSettings from asyncflow.schemas.topology.edges import Edge if TYPE_CHECKING: - from asyncflow.schemas.common.random_variables import RVConfig + from pydantic import PositiveFloat + + class EdgeRuntime: @@ -73,7 +78,6 @@ def __init__( # Noqa: PLR0913 def _deliver(self, state: RequestState) -> Generator[simpy.Event, None, None]: """Function to deliver the state to the next node""" # extract the random variables defining the latency of the edge - random_variable: RVConfig = self.edge_config.latency uniform_variable = self.rng.uniform() if uniform_variable < self.edge_config.dropout_rate: @@ -85,9 +89,15 @@ def _deliver(self, state: RequestState) -> Generator[simpy.Event, None, None]: ) return + # latency + latency: RVConfig | PositiveFloat = self.edge_config.latency + self._concurrent_connections +=1 - transit_time = general_sampler(random_variable, self.rng) + if isinstance(latency, RVConfig): + transit_time = general_sampler(latency, self.rng) + else: + transit_time = latency # Logic to add if exists the event injection for the given edge diff --git a/src/asyncflow/runtime/actors/server.py b/src/asyncflow/runtime/actors/server.py index d610a9a..2fae18e 100644 --- a/src/asyncflow/runtime/actors/server.py +++ b/src/asyncflow/runtime/actors/server.py @@ -3,11 +3,13 @@ during the simulation """ + from collections.abc import Generator from typing import cast import numpy as np import simpy +from pydantic import PositiveFloat, PositiveInt from asyncflow.config.constants import ( EndpointStepCPU, @@ -22,6 +24,8 @@ from asyncflow.resources.server_containers import ServerContainers from asyncflow.runtime.actors.edge import EdgeRuntime from asyncflow.runtime.rqs_state import RequestState +from asyncflow.samplers.common_helpers import general_sampler +from asyncflow.schemas.common.random_variables import RVConfig from asyncflow.schemas.settings.simulation import SimulationSettings from asyncflow.schemas.topology.nodes import Server @@ -75,6 +79,45 @@ def __init__( # noqa: PLR0913 settings.enabled_sample_metrics, ) + # ------------------------------------------------------------------ + # HELPERS + # ------------------------------------------------------------------ + + def _sample_duration( + self, time: RVConfig | PositiveFloat | PositiveInt, + ) -> float: + """ + Return a non-negative duration in seconds. + + - RVConfig -> sample via general_sampler(self.rng) + - float/int -> cast to float + - Negative draws are clamped to 0.0 (e.g., Normal tails). + """ + if isinstance(time, RVConfig): + time = float(general_sampler(time, self.rng)) + else: + time = float(time) + + return time + + def _compute_latency_cpu( + self, + cpu_time:PositiveFloat | PositiveInt | RVConfig, + ) -> float: + """Helper to compute the latency of a cpu bound given step""" + return self._sample_duration(cpu_time) + + def _compute_latency_io( + self, + io_time:PositiveFloat | PositiveInt | RVConfig, + ) -> float: + """Helper to compute the latency of a IO bound given step""" + return self._sample_duration(io_time) + + # ------------------------------------------------------------------- + # Main function to elaborate a request + # ------------------------------------------------------------------- + # right now we disable the warnings but a refactor will be done soon def _handle_request( # noqa: PLR0915, PLR0912, C901 self, @@ -103,11 +146,12 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901 # Extract the total ram to execute the endpoint - total_ram = sum( - step.step_operation[StepOperation.NECESSARY_RAM] - for step in selected_endpoint.steps - if isinstance(step.kind, EndpointStepRAM) - ) + total_ram = 0 + for step in selected_endpoint.steps: + if isinstance(step.kind, EndpointStepRAM): + ram = step.step_operation[StepOperation.NECESSARY_RAM] + assert isinstance(ram, int) + total_ram += ram # ------------------------------------------------------------------ # CPU & RAM SCHEDULING @@ -226,7 +270,9 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901 core_locked = True - cpu_time = step.step_operation[StepOperation.CPU_TIME] + cpu_time = self._compute_latency_cpu( + step.step_operation[StepOperation.CPU_TIME], + ) # Execute the step giving back the control to the simpy env yield self.env.timeout(cpu_time) @@ -234,7 +280,9 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901 # is one member of enum elif isinstance(step.kind, EndpointStepIO): # define the io time - io_time = step.step_operation[StepOperation.IO_WAITING_TIME] + io_time = self._compute_latency_io( + step.step_operation[StepOperation.IO_WAITING_TIME], + ) if core_locked: # release the core coming from a cpu step diff --git a/src/asyncflow/schemas/common/random_variables.py b/src/asyncflow/schemas/common/random_variables.py index d827b92..464f7a6 100644 --- a/src/asyncflow/schemas/common/random_variables.py +++ b/src/asyncflow/schemas/common/random_variables.py @@ -1,6 +1,6 @@ """Definition of the schema for a Random variable""" -from pydantic import BaseModel, field_validator, model_validator +from pydantic import BaseModel, NonNegativeFloat, model_validator from asyncflow.config.constants import Distribution @@ -8,21 +8,9 @@ class RVConfig(BaseModel): """class to configure random variables""" - mean: float + mean: NonNegativeFloat distribution: Distribution = Distribution.POISSON - variance: float | None = None - - @field_validator("mean", mode="before") - def ensure_mean_is_numeric_and_positive( - cls, # noqa: N805 - v: float, - ) -> float: - """Ensure `mean` is numeric, then coerce to float.""" - err_msg = "mean must be a number (int or float)" - if not isinstance(v, (float, int)): - raise ValueError(err_msg) # noqa: TRY004 - - return float(v) + variance: NonNegativeFloat | None = None @model_validator(mode="after") # type: ignore[arg-type] def default_variance(cls, model: "RVConfig") -> "RVConfig": # noqa: N805 diff --git a/src/asyncflow/schemas/topology/edges.py b/src/asyncflow/schemas/topology/edges.py index 6e3d03b..3e5ccce 100644 --- a/src/asyncflow/schemas/topology/edges.py +++ b/src/asyncflow/schemas/topology/edges.py @@ -3,12 +3,7 @@ links between different nodes """ -from pydantic import ( - BaseModel, - Field, - field_validator, - model_validator, -) +from pydantic import BaseModel, Field, PositiveFloat, field_validator, model_validator from pydantic_core.core_schema import ValidationInfo from asyncflow.config.constants import ( @@ -32,8 +27,9 @@ class Edge(BaseModel): Identifier of the source node (where the request comes from). target : str Identifier of the destination node (where the request goes to). - latency : RVConfig - Random-variable configuration for network latency on this link. + latency : RVConfig | PositiveFloat + Random-variable configuration for network latency on this link or + positive float value. probability : float Probability of taking this edge when there are multiple outgoing links. Must be in [0.0, 1.0]. Defaults to 1.0 (always taken). @@ -45,7 +41,7 @@ class Edge(BaseModel): id: str source: str target: str - latency: RVConfig + latency: RVConfig | PositiveFloat edge_type: SystemEdges = SystemEdges.NETWORK_CONNECTION dropout_rate: float = Field( NetworkParameters.DROPOUT_RATE, @@ -66,10 +62,13 @@ class Edge(BaseModel): @field_validator("latency", mode="after") def ensure_latency_is_non_negative( cls, # noqa: N805 - v: RVConfig, + v: RVConfig | PositiveFloat, info: ValidationInfo, - ) -> RVConfig: + ) -> RVConfig | PositiveFloat: """Ensures that the latency's mean and variance are positive.""" + if not isinstance(v, RVConfig): + return v + mean = v.mean variance = v.variance @@ -79,9 +78,10 @@ def ensure_latency_is_non_negative( if mean <= 0: msg = f"The mean latency of the edge '{edge_id}' must be positive" raise ValueError(msg) + if variance is not None and variance < 0: # Variance can be zero msg = ( - f"The variance of the latency of the edge {edge_id}" + f"The variance of the latency of the edge {edge_id} " "must be non negative" ) raise ValueError(msg) diff --git a/src/asyncflow/schemas/topology/endpoint.py b/src/asyncflow/schemas/topology/endpoint.py index aa91c7b..54ee0c7 100644 --- a/src/asyncflow/schemas/topology/endpoint.py +++ b/src/asyncflow/schemas/topology/endpoint.py @@ -14,6 +14,7 @@ EndpointStepRAM, StepOperation, ) +from asyncflow.schemas.common.random_variables import RVConfig class Step(BaseModel): @@ -23,7 +24,7 @@ class Step(BaseModel): """ kind: EndpointStepIO | EndpointStepCPU | EndpointStepRAM - step_operation: dict[StepOperation, PositiveFloat | PositiveInt] + step_operation: dict[StepOperation, PositiveFloat | PositiveInt | RVConfig] @field_validator("step_operation", mode="before") def ensure_non_empty( @@ -85,6 +86,55 @@ def ensure_coherence_type_operation( return model + @model_validator(mode="after") # type: ignore[arg-type] + def ensure_cpu_io_positive_rv(cls, model: "Step") -> "Step": # noqa: N805 + """ + For CPU/IO steps: if the operation is an RVConfig, require mean > 0 + and variance ≥ 0. Deterministic PositiveFloat è già validato. + """ + # safe anche se per qualche motivo ci fossero 0/2+ chiavi + op_val = next(iter(model.step_operation.values()), None) + if op_val is None: + return model + + if isinstance(model.kind, EndpointStepCPU) and isinstance(op_val, RVConfig): + if op_val.mean <= 0: + msg = "CPU_TIME RVConfig.mean must be > 0" + raise ValueError(msg) + if op_val.variance is not None and op_val.variance < 0: + msg = "CPU_TIME RVConfig.variance must be >= 0" + raise ValueError(msg) + + if isinstance(model.kind, EndpointStepIO) and isinstance(op_val, RVConfig): + if op_val.mean <= 0: + msg = "IO_WAITING_TIME RVConfig.mean must be > 0" + raise ValueError(msg) + if op_val.variance is not None and op_val.variance < 0: + msg = "IO_WAITING_TIME RVConfig.variance must be >= 0" + raise ValueError(msg) + + return model + + @model_validator(mode="after") # type: ignore[arg-type] + def ensure_ram_positive_int(cls, model: "Step") -> "Step": # noqa: N805 + """For RAM steps: operation must be a positive integer (no RVs/floats)""" + if not isinstance(model.kind, EndpointStepRAM): + return model + + op_val = next(iter(model.step_operation.values()), None) + if op_val is None: + return model + + if isinstance(op_val, RVConfig) or not isinstance(op_val, int): + msg = "NECESSARY_RAM must be a positive integer" + raise TypeError(msg) + + if op_val <= 0: + msg = "NECESSARY_RAM must be > 0" + raise ValueError(msg) + + return model + diff --git a/tests/unit/runtime/actors/test_client.py b/tests/unit/runtime/actors/test_client_rt.py similarity index 100% rename from tests/unit/runtime/actors/test_client.py rename to tests/unit/runtime/actors/test_client_rt.py diff --git a/tests/unit/runtime/actors/test_edge.py b/tests/unit/runtime/actors/test_edge_rt.py similarity index 100% rename from tests/unit/runtime/actors/test_edge.py rename to tests/unit/runtime/actors/test_edge_rt.py diff --git a/tests/unit/runtime/actors/test_load_balancer.py b/tests/unit/runtime/actors/test_load_balancer_rt.py similarity index 100% rename from tests/unit/runtime/actors/test_load_balancer.py rename to tests/unit/runtime/actors/test_load_balancer_rt.py diff --git a/tests/unit/runtime/actors/test_rqs_generator.py b/tests/unit/runtime/actors/test_rqs_generator_rt.py similarity index 100% rename from tests/unit/runtime/actors/test_rqs_generator.py rename to tests/unit/runtime/actors/test_rqs_generator_rt.py diff --git a/tests/unit/runtime/actors/test_server.py b/tests/unit/runtime/actors/test_server_rt.py similarity index 72% rename from tests/unit/runtime/actors/test_server.py rename to tests/unit/runtime/actors/test_server_rt.py index 5251ffe..53da3ef 100644 --- a/tests/unit/runtime/actors/test_server.py +++ b/tests/unit/runtime/actors/test_server_rt.py @@ -19,7 +19,9 @@ from typing import TYPE_CHECKING +import pytest import simpy +from numpy.random import Generator as NpGenerator from numpy.random import default_rng from asyncflow.config.constants import ( @@ -30,8 +32,10 @@ StepOperation, ) from asyncflow.resources.server_containers import build_containers +from asyncflow.runtime.actors import server as server_mod from asyncflow.runtime.actors.server import ServerRuntime from asyncflow.runtime.rqs_state import RequestState +from asyncflow.schemas.common.random_variables import RVConfig from asyncflow.schemas.settings.simulation import SimulationSettings from asyncflow.schemas.topology.endpoint import Endpoint, Step from asyncflow.schemas.topology.nodes import NodesResources, Server @@ -366,3 +370,112 @@ def test_enabled_metrics_dict_populated() -> None: SampledMetricName.EVENT_LOOP_IO_SLEEP, } assert mandatory.issubset(server.enabled_metrics.keys()) + + +# --------------------------------------------------------------------------- # +# CPU step: RVConfig is sampled via general_sampler # +# --------------------------------------------------------------------------- # + +def test_cpu_step_uses_rvconfig_sample(monkeypatch: pytest.MonkeyPatch) -> None: + """CPU step duration follows the (patched) sampler result.""" + # Patch sampler: return 7 ms when mean=0.123 (CPU sentinel) + def fake_sampler(cfg: RVConfig, rng: NpGenerator) -> float: + return 0.007 if cfg.mean == 0.123 else 0.0 + + monkeypatch.setattr(server_mod, "general_sampler", fake_sampler) + + env = simpy.Environment() + steps = ( + Step( + kind=EndpointStepRAM.RAM, + step_operation={StepOperation.NECESSARY_RAM: 64}, + ), + Step( + kind=EndpointStepCPU.CPU_BOUND_OPERATION, + step_operation={StepOperation.CPU_TIME: RVConfig(mean=0.123)}, + ), + Step( + kind=EndpointStepIO.WAIT, + step_operation={StepOperation.IO_WAITING_TIME: 0.010}, + ), + ) + server, _ = _make_server_runtime(env, steps=steps, cpu_cores=1) + cpu = server.server_resources["CPU"] + + server.server_box.put(RequestState(id=100, initial_time=0.0)) + server.start() + + # During CPU (7 ms) + env.run(until=0.004) + assert cpu.level == 0 # 1 core, held + # After CPU finished + env.run(until=0.008) + assert cpu.level == 1 # released + + +# --------------------------------------------------------------------------- # +# IO step: RVConfig is sampled via general_sampler # +# --------------------------------------------------------------------------- # + +def test_io_step_uses_rvconfig_sample(monkeypatch: pytest.MonkeyPatch) -> None: + """IO step duration follows the (patched) sampler result.""" + # Patch sampler: return 15 ms when mean=0.456 (IO sentinel) + def fake_sampler(cfg: RVConfig, rng: NpGenerator) -> float: + return 0.015 if cfg.mean == 0.456 else 0.0 + + monkeypatch.setattr(server_mod, "general_sampler", fake_sampler) + + env = simpy.Environment() + steps = ( + Step( + kind=EndpointStepRAM.RAM, + step_operation={StepOperation.NECESSARY_RAM: 64}, + ), + Step( + kind=EndpointStepCPU.CPU_BOUND_OPERATION, + step_operation={StepOperation.CPU_TIME: 0.002}, + ), + Step( + kind=EndpointStepIO.DB, + step_operation={StepOperation.IO_WAITING_TIME: RVConfig(mean=0.456)}, + ), + ) + server, _ = _make_server_runtime(env, steps=steps, cpu_cores=1) + + server.server_box.put(RequestState(id=200, initial_time=0.0)) + server.start() + + # After CPU (2 ms), inside IO (15 ms total) + env.run(until=0.010) + assert server.io_queue_len == 1 + # After IO finished + env.run(until=0.020) + assert server.io_queue_len == 0 + + +# --------------------------------------------------------------------------- # +# Helpers: _compute_latency_cpu/_io dispatch to sampler and accept ints/floats # +# --------------------------------------------------------------------------- # + +def test_helpers_sample_and_deterministic(monkeypatch: pytest.MonkeyPatch) -> None: + """Helpers use sampler for RVConfig and accept int/float deterministics.""" + # Patch sampler to a fixed value + def fake_sampler(cfg: RVConfig, rng: NpGenerator) -> float: + return 0.123 + + monkeypatch.setattr(server_mod, "general_sampler", fake_sampler) + + # Minimal runtime just to call methods + env = simpy.Environment() + + # Call unbound methods with a real ServerRuntime instance to be safe + # (we can build one via the existing factory) + server, _ = _make_server_runtime(env) + + # RVConfig paths + assert server._compute_latency_cpu(RVConfig(mean=1.0)) == pytest.approx(0.123) # noqa: SLF001 + assert server._compute_latency_io(RVConfig(mean=1.0)) == pytest.approx(0.123) # noqa: SLF001 + + # Deterministic int/float paths + assert server._compute_latency_cpu(2) == pytest.approx(2.0) # noqa: SLF001 + assert server._compute_latency_io(0.5) == pytest.approx(0.5) # noqa: SLF001 diff --git a/tests/unit/schemas/test_edge.py b/tests/unit/schemas/test_edge.py new file mode 100644 index 0000000..714ea46 --- /dev/null +++ b/tests/unit/schemas/test_edge.py @@ -0,0 +1,207 @@ +""" +Unit tests for the Edge schema. + +Covers: +- Required fields and defaults +- Source/target validator +- Dropout rate bounds +- Latency validator for both deterministic (PositiveFloat) and RVConfig cases +""" + +from __future__ import annotations + +import pytest +from pydantic import ValidationError + +from asyncflow.config.constants import NetworkParameters, SystemEdges +from asyncflow.schemas.common.random_variables import RVConfig +from asyncflow.schemas.topology.edges import Edge + +# --------------------------------------------------------------------------- # +# Helpers # +# --------------------------------------------------------------------------- # + + +def _rv(mean: float, variance: float | None = None) -> RVConfig: + """Build a minimal RVConfig with mean and optional variance.""" + return RVConfig(mean=mean, variance=variance) + + +# --------------------------------------------------------------------------- # +# Basic construction and defaults # +# --------------------------------------------------------------------------- # + + +def test_edge_minimal_construction_uses_default_edge_type() -> None: + """Minimal valid Edge uses NETWORK_CONNECTION as default edge_type.""" + e = Edge( + id="e1", + source="a", + target="b", + latency=_rv(mean=0.01), + ) + assert e.edge_type is SystemEdges.NETWORK_CONNECTION + + +def test_edge_requires_id_source_target() -> None: + """Omitting required fields raises ValidationError.""" + with pytest.raises(ValidationError): + Edge( # type: ignore[call-arg] + source="a", + target="b", + latency=_rv(mean=0.01), + ) + with pytest.raises(ValidationError): + Edge( # type: ignore[call-arg] + id="e1", + target="b", + latency=_rv(mean=0.01), + ) + with pytest.raises(ValidationError): + Edge( # type: ignore[call-arg] + id="e1", + source="a", + latency=_rv(mean=0.01), + ) + + +# --------------------------------------------------------------------------- # +# Source != Target # +# --------------------------------------------------------------------------- # + + +def test_edge_source_equals_target_fails() -> None: + """Validator forbids identical source and target.""" + with pytest.raises(ValidationError): + Edge( + id="loop", + source="x", + target="x", + latency=_rv(mean=0.01), + ) + + +# --------------------------------------------------------------------------- # +# Dropout rate bounds # +# --------------------------------------------------------------------------- # + + +@pytest.mark.parametrize( + "bad_rate", + [-0.001, NetworkParameters.MAX_DROPOUT_RATE + 1e-6], +) +def test_edge_dropout_rate_out_of_bounds(bad_rate: float) -> None: + """Dropout rate outside configured bounds is rejected.""" + with pytest.raises(ValidationError): + Edge( + id="ed", + source="a", + target="b", + latency=_rv(mean=0.01), + dropout_rate=bad_rate, + ) + + +@pytest.mark.parametrize( + "ok_rate", + [ + NetworkParameters.MIN_DROPOUT_RATE, + NetworkParameters.MAX_DROPOUT_RATE, + (NetworkParameters.MIN_DROPOUT_RATE + NetworkParameters.MAX_DROPOUT_RATE) / 2, + ], +) +def test_edge_dropout_rate_in_bounds(ok_rate: float) -> None: + """Boundary and mid-range dropout rates are accepted.""" + e = Edge( + id="ed", + source="a", + target="b", + latency=_rv(mean=0.01), + dropout_rate=ok_rate, + ) + assert e.dropout_rate == ok_rate + + +# --------------------------------------------------------------------------- # +# Latency validation: deterministic (PositiveFloat) # +# --------------------------------------------------------------------------- # + + +@pytest.mark.parametrize("good_latency", [0.001, 0.1, 5.0]) +def test_edge_deterministic_latency_positivefloat_ok(good_latency: float) -> None: + """Deterministic latency validates as PositiveFloat when > 0.""" + e = Edge( + id="dt", + source="a", + target="b", + latency=good_latency, + ) + # pydantic casts PositiveFloat to float at runtime + assert isinstance(e.latency, float) + assert e.latency == good_latency + + +@pytest.mark.parametrize("bad_latency", [0.0, -0.001, -5.0]) +def test_edge_deterministic_latency_non_positive_fails(bad_latency: float) -> None: + """Non-positive deterministic latency is rejected by PositiveFloat.""" + with pytest.raises(ValidationError): + Edge( + id="dt-bad", + source="a", + target="b", + latency=bad_latency, + ) + + +# --------------------------------------------------------------------------- # +# Latency validation: RVConfig branch # +# --------------------------------------------------------------------------- # + + +def test_edge_rvconfig_latency_ok_with_zero_variance() -> None: + """RVConfig with mean>0 and variance==0 is accepted.""" + e = Edge( + id="rv0", + source="a", + target="b", + latency=_rv(mean=0.02, variance=0.0), + ) + assert isinstance(e.latency, RVConfig) + assert e.latency.mean == 0.02 + assert e.latency.variance == 0.0 + + +def test_edge_rvconfig_latency_ok_with_none_variance() -> None: + """RVConfig with mean>0 and variance=None is accepted.""" + e = Edge( + id="rvn", + source="a", + target="b", + latency=_rv(mean=0.02, variance=None), + ) + assert isinstance(e.latency, RVConfig) + assert e.latency.mean == 0.02 + assert e.latency.variance is None + + +@pytest.mark.parametrize("bad_mean", [0.0, -1e-9, -1.0]) +def test_edge_rvconfig_latency_non_positive_mean_fails(bad_mean: float) -> None: + """RVConfig with non-positive mean is rejected by the field validator.""" + with pytest.raises(ValidationError): + Edge( + id="rv-bad-mean", + source="a", + target="b", + latency=_rv(mean=bad_mean, variance=0.0), + ) + + +def test_edge_rvconfig_latency_negative_variance_fails() -> None: + """RVConfig with negative variance is rejected by the field validator.""" + with pytest.raises(ValidationError): + Edge( + id="rv-bad-var", + source="a", + target="b", + latency=_rv(mean=0.02, variance=-0.0001), + ) diff --git a/tests/unit/schemas/test_endpoint.py b/tests/unit/schemas/test_endpoint.py index 080f55a..c9d8648 100644 --- a/tests/unit/schemas/test_endpoint.py +++ b/tests/unit/schemas/test_endpoint.py @@ -11,6 +11,7 @@ EndpointStepRAM, StepOperation, ) +from asyncflow.schemas.common.random_variables import RVConfig from asyncflow.schemas.topology.endpoint import Endpoint, Step @@ -129,3 +130,97 @@ def test_wrong_operation_name_for_io() -> None: kind=EndpointStepIO.CACHE, step_operation={StepOperation.NECESSARY_RAM: 64}, ) + + +# --------------------------------------------------------------------------- # +# CPU: RVConfig branch # +# --------------------------------------------------------------------------- # + +def test_cpu_step_rvconfig_positive_ok() -> None: + """CPU step with RVConfig(mean>0, variance=0) is accepted.""" + s = Step( + kind=EndpointStepCPU.CPU_BOUND_OPERATION, + step_operation={StepOperation.CPU_TIME: RVConfig(mean=0.05, variance=0.0)}, + ) + assert isinstance(s.step_operation[StepOperation.CPU_TIME], RVConfig) + + +def test_cpu_step_rvconfig_zero_mean_fails() -> None: + """CPU step with RVConfig(mean==0) is rejected by model validator.""" + with pytest.raises(ValidationError): + Step( + kind=EndpointStepCPU.CPU_BOUND_OPERATION, + step_operation={StepOperation.CPU_TIME: RVConfig(mean=0.0)}, + ) + + +def test_cpu_step_deterministic_zero_fails() -> None: + """Deterministic CPU time must be PositiveFloat (>0).""" + with pytest.raises(ValidationError): + Step( + kind=EndpointStepCPU.CPU_BOUND_OPERATION, + step_operation={StepOperation.CPU_TIME: 0.0}, + ) + + +# --------------------------------------------------------------------------- # +# IO: RVConfig branch # +# --------------------------------------------------------------------------- # + +def test_io_step_rvconfig_negative_variance_fails() -> None: + """IO step with negative variance is rejected.""" + with pytest.raises(ValidationError): + Step( + kind=EndpointStepIO.WAIT, + step_operation={ + StepOperation.IO_WAITING_TIME: + RVConfig(mean=0.02, variance=-1.0)}, + ) + + +def test_io_step_rvconfig_positive_ok() -> None: + """IO step with RVConfig(mean>0, variance=None) is accepted.""" + s = Step( + kind=EndpointStepIO.WAIT, + step_operation={StepOperation.IO_WAITING_TIME: RVConfig(mean=0.02)}, + ) + assert isinstance(s.step_operation[StepOperation.IO_WAITING_TIME], RVConfig) + + +# --------------------------------------------------------------------------- # +# RAM: type discipline # +# --------------------------------------------------------------------------- # + +def test_ram_step_rejects_float_and_rvconfig() -> None: + """RAM step must use a positive integer; float and RVConfig are rejected.""" + # float rejected + with pytest.raises(TypeError): + Step( + kind=EndpointStepRAM.RAM, + step_operation={StepOperation.NECESSARY_RAM: 64.0}, + ) + # RVConfig rejected + with pytest.raises(TypeError): + Step( + kind=EndpointStepRAM.RAM, + step_operation={StepOperation.NECESSARY_RAM: RVConfig(mean=128.0)}, + ) + + +def test_ram_step_zero_fails() -> None: + """RAM step with 0 is rejected by model validator.""" + with pytest.raises(ValidationError): + Step( + kind=EndpointStepRAM.RAM, + step_operation={StepOperation.NECESSARY_RAM: 0}, + ) + + + + + + + + + + diff --git a/tests/unit/schemas/test_nodes.py b/tests/unit/schemas/test_nodes.py new file mode 100644 index 0000000..8b1f7a7 --- /dev/null +++ b/tests/unit/schemas/test_nodes.py @@ -0,0 +1,222 @@ +""" +Unit tests for node schemas: +- NodesResources +- Client +- Server +- LoadBalancer +- TopologyNodes +""" + +from __future__ import annotations + +import pytest +from pydantic import ValidationError + +from asyncflow.config.constants import ( + EndpointStepCPU, + LbAlgorithmsName, + NodesResourcesDefaults, + StepOperation, + SystemNodes, +) +from asyncflow.schemas.topology.endpoint import Endpoint, Step +from asyncflow.schemas.topology.nodes import ( + Client, + LoadBalancer, + NodesResources, + Server, + TopologyNodes, +) + +# --------------------------------------------------------------------------- # +# Helpers # +# --------------------------------------------------------------------------- # + + +def _dummy_endpoint() -> Endpoint: + """Return a minimal valid endpoint with one CPU step.""" + step = Step( + kind=EndpointStepCPU.CPU_BOUND_OPERATION, + step_operation={StepOperation.CPU_TIME: 0.01}, + ) + return Endpoint(endpoint_name="/ping", steps=[step]) + + +def _one_server_topology() -> TopologyNodes: + """Build a minimal topology with one client and one server.""" + cli = Client(id="cli-1", type=SystemNodes.CLIENT) + srv = Server( + id="srv-1", + type=SystemNodes.SERVER, + server_resources=NodesResources(), + endpoints=[_dummy_endpoint()], + ) + return TopologyNodes(servers=[srv], client=cli) + + +# --------------------------------------------------------------------------- # +# NodesResources # +# --------------------------------------------------------------------------- # + + +def test_nodes_resources_defaults_match_constants() -> None: + """Defaults match NodesResourcesDefaults constants.""" + res = NodesResources() + assert res.cpu_cores == NodesResourcesDefaults.CPU_CORES + assert res.ram_mb == NodesResourcesDefaults.RAM_MB + assert res.db_connection_pool is NodesResourcesDefaults.DB_CONNECTION_POOL + + +def test_nodes_resources_minimum_constraints() -> None: + """Values below minimum bounds raise ValidationError.""" + with pytest.raises(ValidationError): + NodesResources(cpu_cores=0, ram_mb=NodesResourcesDefaults.RAM_MB) + with pytest.raises(ValidationError): + NodesResources( + cpu_cores=NodesResourcesDefaults.CPU_CORES, + ram_mb=NodesResourcesDefaults.MINIMUM_RAM_MB - 1, + ) + + +# --------------------------------------------------------------------------- # +# Client # +# --------------------------------------------------------------------------- # + + +def test_client_type_must_be_client() -> None: + """Client.type must equal SystemNodes.CLIENT.""" + cli = Client(id="c1", type=SystemNodes.CLIENT) + assert cli.type is SystemNodes.CLIENT + + with pytest.raises(ValidationError): + Client(id="bad", type=SystemNodes.SERVER) + + +def test_client_ram_per_process_requires_resources() -> None: + """If ram_per_process is set, client_resources must be provided.""" + with pytest.raises(ValidationError): + Client(id="c1", ram_per_process=64) + + ok = Client( + id="c2", + client_resources=NodesResources(ram_mb=2048), + ram_per_process=64, + ) + assert ok.client_resources is not None + + +# --------------------------------------------------------------------------- # +# Server # +# --------------------------------------------------------------------------- # + + +def test_server_type_must_be_server() -> None: + """Server.type must equal SystemNodes.SERVER.""" + srv = Server( + id="s1", + type=SystemNodes.SERVER, + server_resources=NodesResources(), + endpoints=[_dummy_endpoint()], + ) + assert srv.type is SystemNodes.SERVER + + with pytest.raises(ValidationError): + Server( + id="bad", + type=SystemNodes.CLIENT, + server_resources=NodesResources(), + endpoints=[_dummy_endpoint()], + ) + + +def test_server_requires_at_least_one_endpoint() -> None: + """Endpoints list must be non-empty.""" + with pytest.raises(ValidationError): + Server( + id="s1", + server_resources=NodesResources(), + endpoints=[], + ) + + +# --------------------------------------------------------------------------- # +# LoadBalancer # +# --------------------------------------------------------------------------- # + + +def test_load_balancer_type_and_defaults() -> None: + """LB.type and default algorithm validate.""" + lb = LoadBalancer(id="lb1", type=SystemNodes.LOAD_BALANCER) + assert lb.type is SystemNodes.LOAD_BALANCER + assert lb.algorithms is LbAlgorithmsName.ROUND_ROBIN + + +def test_lb_ram_per_process_requires_resources() -> None: + """If ram_per_process is set, lb_resources must be provided.""" + with pytest.raises(ValidationError): + LoadBalancer( + id="lb1", + ram_per_process=64, + ) + + ok = LoadBalancer( + id="lb2", + lb_resources=NodesResources(ram_mb=2048), + ram_per_process=64, + ) + assert ok.lb_resources is not None + + +# --------------------------------------------------------------------------- # +# TopologyNodes # +# --------------------------------------------------------------------------- # + + +def test_topology_nodes_unique_ids_validator() -> None: + """Duplicate node IDs are rejected.""" + topo = _one_server_topology() + dup_srv = topo.servers[0].model_copy(update={"id": "cli-1"}) + with pytest.raises(ValidationError): + TopologyNodes(servers=[dup_srv], client=topo.client) + + +def test_topology_lb_references_unknown_server_fails() -> None: + """LB must only cover servers present in the topology.""" + topo = _one_server_topology() + lb = LoadBalancer(id="lb-1", server_covered={"missing"}) + with pytest.raises(ValidationError): + TopologyNodes( + servers=topo.servers, + client=topo.client, + load_balancer=lb, + ) + + +def test_topology_lb_with_valid_coverage_passes() -> None: + """LB covering an existing server validates.""" + topo = _one_server_topology() + lb = LoadBalancer(id="lb-1", server_covered={"srv-1"}) + ok = TopologyNodes( + servers=topo.servers, + client=topo.client, + load_balancer=lb, + ) + assert ok.load_balancer is not None + assert ok.load_balancer.server_covered == {"srv-1"} + + +def test_topology_server_ram_per_process_total_lt_ram() -> None: + """Total per-process RAM must be strictly less than node RAM.""" + srv = Server( + id="s1", + server_resources=NodesResources(cpu_cores=2, ram_mb=1024), + endpoints=[_dummy_endpoint()], + ram_per_process=200, + ) + topo = TopologyNodes(servers=[srv], client=Client(id="c1")) + assert topo.servers[0].ram_per_process == 200 + + # Now violate the constraint: 2 cores * 600 >= 1024 → invalid + bad_srv = srv.model_copy(update={"ram_per_process": 600}) + with pytest.raises(ValidationError): + TopologyNodes(servers=[bad_srv], client=Client(id="c2"))