diff --git a/src/app/api/simulation.py b/src/app/api/simulation.py index 73984d5..2775dcc 100644 --- a/src/app/api/simulation.py +++ b/src/app/api/simulation.py @@ -1,18 +1,4 @@ """"Api to simulate the process""" -import numpy as np -from fastapi import APIRouter - -from app.core.simulation.simulation_run import run_simulation -from app.schemas.full_simulation_input import SimulationPayload -from app.schemas.simulation_output import SimulationOutput - -router = APIRouter() - -@router.post("/simulation") -async def event_loop_simulation(input_data: SimulationPayload) -> SimulationOutput: - """Run the simulation and return aggregate KPIs.""" - rng = np.random.default_rng() - return run_simulation(input_data, rng=rng) diff --git a/src/app/config/constants.py b/src/app/config/constants.py index 498e8b7..c611697 100644 --- a/src/app/config/constants.py +++ b/src/app/config/constants.py @@ -49,6 +49,7 @@ class Distribution(StrEnum): NORMAL = "normal" LOG_NORMAL = "log_normal" EXPONENTIAL = "exponential" + UNIFORM = "uniform" # ====================================================================== # CONSTANTS FOR ENDPOINT STEP DEFINITION (REQUEST-HANDLER) @@ -123,7 +124,7 @@ class EndpointStepRAM(StrEnum): RAM = "ram" -class Metrics(StrEnum): +class StepOperation(StrEnum): """ Keys used inside the ``metrics`` dictionary of a *step*. @@ -153,6 +154,17 @@ class ServerResourcesDefaults: MINIMUM_RAM_MB = 256 DB_CONNECTION_POOL = None +# ====================================================================== +# CONSTANTS FOR NETWORK PARAMETERS +# ====================================================================== + +class NetworkParameters: + """parameters for the network""" + + MIN_DROPOUT_RATE = 0.0 + DROPOUT_RATE = 0.01 + MAX_DROPOUT_RATE = 1.0 + # ====================================================================== # CONSTANTS FOR THE MACRO-TOPOLOGY GRAPH # ====================================================================== @@ -165,6 +177,7 @@ class SystemNodes(StrEnum): resources (CPU cores, DB pool, etc.). """ + GENERATOR = "generator" SERVER = "server" CLIENT = "client" LOAD_BALANCER = "load_balancer" diff --git a/src/app/config/rqs_state.py b/src/app/config/rqs_state.py new file mode 100644 index 0000000..dec9200 --- /dev/null +++ b/src/app/config/rqs_state.py @@ -0,0 +1,50 @@ +""" +defining a state in a one to one correspondence +with the requests generated that will go through +all the node necessary to accomplish the user request +""" + +from __future__ import annotations + +from dataclasses import dataclass, field + + +@dataclass +class RequestState: + """ + State object carried by each request through the simulation. + + Attributes: + id: Unique identifier of the request. + t0: Timestamp (simulated env.now) when the request was generated. + history: List of hop records, each noting a node/edge visit. + finish_time: Timestamp when the requests is satisfied + + """ + + id: int # Unique request identifier + initial_time: float # Generation timestamp (env.now) + finish_time: float | None = None # a requests might be dropped + history: list[str] = field(default_factory=list) # Trace of hops + + def record_hop(self, node_name: str, now: float) -> None: + """ + Append a record of visiting a node or edge. + + Args: + node_name: Name of the node or edge being recorded. + now: register the time of the operation + + """ + # Record hop as "NodeName@Timestamp" + self.history.append(f"{node_name}@{now:.3f}") + + @property + def latency(self) -> float | None: + """ + Return the total time in the system (finish_time - initial_time), + or None if the request hasn't completed yet. + """ + if self.finish_time is None: + return None + return self.finish_time - self.initial_time diff --git a/src/app/core/event_samplers/common_helpers.py b/src/app/core/event_samplers/common_helpers.py index 0f80870..e565a20 100644 --- a/src/app/core/event_samplers/common_helpers.py +++ b/src/app/core/event_samplers/common_helpers.py @@ -3,31 +3,74 @@ import numpy as np +from app.config.constants import Distribution +from app.schemas.random_variables_config import RVConfig -def uniform_variable_generator(rng: np.random.Generator | None = None) -> float: - """Return U~Uniform(0, 1).""" - rng = rng or np.random.default_rng() - return float(rng.random()) +def uniform_variable_generator(rng: np.random.Generator) -> float: + """Return U~Uniform(0, 1).""" + # rng is guaranteed to be a valid np.random.Generator due to the type signature. + return rng.random() def poisson_variable_generator( mean: float, - rng: np.random.Generator | None = None, -) -> int: + rng: np.random.Generator, +) -> float: """Return a Poisson-distributed integer with expectation *mean*.""" - rng = rng or np.random.default_rng() - return int(rng.poisson(mean)) - + return rng.poisson(mean) def truncated_gaussian_generator( mean: float, variance: float, rng: np.random.Generator, -) -> int: +) -> float: """ Generate a Normal-distributed variable with mean and variance """ - rng = rng or np.random.default_rng() value = rng.normal(mean, variance) - return max(0, int(value)) + return max(0.0, value) + +def lognormal_variable_generator( + mean: float, + variance: float, + rng: np.random.Generator, +) -> float: + """Return a Poisson-distributed floateger with expectation *mean*.""" + return rng.lognormal(mean, variance) + +def exponential_variable_generator( + mean: float, + rng: np.random.Generator, +) -> float: + """Return an exponentially-distributed float with mean *mean*.""" + return float(rng.exponential(mean)) + +def general_sampler(random_variable: RVConfig, rng: np.random.Generator) -> float: + """Sample a number according to the distribution described in `random_variable`.""" + dist = random_variable.distribution + mean = random_variable.mean + + match dist: + case Distribution.UNIFORM: + + assert random_variable.variance is None + return uniform_variable_generator(rng) + + case _: + + variance = random_variable.variance + assert variance is not None + + match dist: + case Distribution.NORMAL: + return truncated_gaussian_generator(mean, variance, rng) + case Distribution.LOG_NORMAL: + return lognormal_variable_generator(mean, variance, rng) + case Distribution.POISSON: + return float(poisson_variable_generator(mean, rng)) + case Distribution.EXPONENTIAL: + return exponential_variable_generator(mean, rng) + case _: + msg = f"Unsupported distribution: {dist}" + raise ValueError(msg) diff --git a/src/app/core/event_samplers/gaussian_poisson.py b/src/app/core/event_samplers/gaussian_poisson.py index 0b7818c..f9cc401 100644 --- a/src/app/core/event_samplers/gaussian_poisson.py +++ b/src/app/core/event_samplers/gaussian_poisson.py @@ -24,7 +24,7 @@ def gaussian_poisson_sampling( input_data: RqsGeneratorInput, sim_settings: SimulationSettings, *, - rng: np.random.Generator | None = None, + rng: np.random.Generator, ) -> Generator[float, None, None]: """ Yield inter-arrival gaps (seconds) for the compound Gaussian-Poisson process. @@ -39,8 +39,6 @@ def gaussian_poisson_sampling( Δt ~ Exponential(Λ) using inverse-CDF. 4. Stop once the virtual clock exceeds *total_simulation_time*. """ - rng = rng or np.random.default_rng() - simulation_time = sim_settings.total_simulation_time user_sampling_window = input_data.user_sampling_window diff --git a/src/app/core/event_samplers/poisson_poisson.py b/src/app/core/event_samplers/poisson_poisson.py index 1566f90..1d4787f 100644 --- a/src/app/core/event_samplers/poisson_poisson.py +++ b/src/app/core/event_samplers/poisson_poisson.py @@ -21,7 +21,7 @@ def poisson_poisson_sampling( input_data: RqsGeneratorInput, sim_settings: SimulationSettings, *, - rng: np.random.Generator | None = None, + rng: np.random.Generator, ) -> Generator[float, None, None]: """ Yield inter-arrival gaps (seconds) for the compound Poisson-Poisson process. @@ -36,8 +36,6 @@ def poisson_poisson_sampling( Δt ~ Exponential(Λ) using inverse-CDF. 4. Stop once the virtual clock exceeds *total_simulation_time*. """ - rng = rng or np.random.default_rng() - simulation_time = sim_settings.total_simulation_time user_sampling_window = input_data.user_sampling_window diff --git a/src/app/core/helpers.py b/src/app/core/helpers/dictionary_metrics.py similarity index 100% rename from src/app/core/helpers.py rename to src/app/core/helpers/dictionary_metrics.py diff --git a/src/app/core/simulation/requests_generator.py b/src/app/core/helpers/requests_generator.py similarity index 79% rename from src/app/core/simulation/requests_generator.py rename to src/app/core/helpers/requests_generator.py index 1b177e7..2d04bef 100644 --- a/src/app/core/simulation/requests_generator.py +++ b/src/app/core/helpers/requests_generator.py @@ -1,6 +1,10 @@ """ -Continuous-time event sampling for the Poisson-Poisson -and Gaussian-Poisson workload model. +SimPy process that generates user requests at stochastic intervals. + +This node samples inter-arrival times according to the configured +distribution (Gaussian-Poisson or Poisson-Poisson), constructs a +RequestState for each new request, records its origin hop, and +immediately pushes it into the next pipeline stage via an EdgeRuntime. """ from __future__ import annotations @@ -24,7 +28,7 @@ def requests_generator( input_data: RqsGeneratorInput, sim_settings: SimulationSettings, *, - rng: np.random.Generator | None = None, + rng: np.random.Generator, ) -> Generator[float, None, None]: """ Return an iterator of inter-arrival gaps (seconds) according to the model diff --git a/src/app/core/runtime/__init__.py b/src/app/core/runtime/__init__.py new file mode 100644 index 0000000..fdc562a --- /dev/null +++ b/src/app/core/runtime/__init__.py @@ -0,0 +1 @@ +"""module for the runtime folder""" diff --git a/src/app/core/runtime/edge.py b/src/app/core/runtime/edge.py new file mode 100644 index 0000000..4f22608 --- /dev/null +++ b/src/app/core/runtime/edge.py @@ -0,0 +1,69 @@ +""" +Unidirectional link that simulates message transmission between nodes. +Encapsulates network behavior—latency sampling (LogNormal, Exponential, etc.), +drop probability, and optional connection-pool contention—by exposing a +`send(msg)` method. Each `send` call schedules a SimPy subprocess that +waits the sampled delay (and any resource wait) before delivering the +message to the target node's inbox. +""" +from collections.abc import Generator +from typing import TYPE_CHECKING + +import numpy as np +import simpy + +from app.config.rqs_state import RequestState +from app.core.event_samplers.common_helpers import general_sampler +from app.schemas.system_topology_schema.full_system_topology_schema import Edge + +if TYPE_CHECKING: + from app.schemas.random_variables_config import RVConfig + + + +class EdgeRuntime: + """definining the logic to handle the edges during the simulation""" + + def __init__( + self, + *, + env: simpy.Environment, + edge_config: Edge, + rng: np.random.Generator | None = None, + target_box: simpy.Store, + ) -> None: + """Definition of the instance attributes""" + self.env = env + self.edge_config = edge_config + self.target_box = target_box + self.rng = rng or np.random.default_rng() + + def _deliver(self, state: RequestState) -> Generator[simpy.Event, None, None]: + """Function to deliver the state to the next node""" + # extract the random variables defining the latency of the edge + random_variable: RVConfig = self.edge_config.latency + + uniform_variable = self.rng.uniform() + if uniform_variable < self.edge_config.dropout_rate: + state.finish_time = self.env.now + state.record_hop(f"{self.edge_config.id}-dropped", state.finish_time) + return + + transit_time = general_sampler(random_variable, self.rng) + yield self.env.timeout(transit_time) + state.record_hop(self.edge_config.id, self.env.now) + yield self.target_box.put(state) + + + def transport(self, state: RequestState) -> simpy.Process: + """ + Called by the upstream node. Immediately spins off a SimPy process + that will handle drop + delay + delivery of `state`. + """ + return self.env.process(self._deliver(state)) + + + + + + diff --git a/src/app/core/runtime/rqs_generator.py b/src/app/core/runtime/rqs_generator.py new file mode 100644 index 0000000..5611c85 --- /dev/null +++ b/src/app/core/runtime/rqs_generator.py @@ -0,0 +1,117 @@ +""" +definition of the class representing the rqs generator +that will be passed as a process in the simpy simulation +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import numpy as np + +from app.config.constants import Distribution, SystemNodes +from app.config.rqs_state import RequestState +from app.core.event_samplers.gaussian_poisson import gaussian_poisson_sampling +from app.core.event_samplers.poisson_poisson import poisson_poisson_sampling + +if TYPE_CHECKING: + + from collections.abc import Generator + + import simpy + + from app.core.runtime.edge import EdgeRuntime + from app.schemas.requests_generator_input import RqsGeneratorInput + from app.schemas.simulation_settings_input import SimulationSettings + + +class RqsGeneratorRuntime: + """ + A “node” that produces request contexts at stochastic inter-arrival times + and immediately pushes them down the pipeline via an EdgeRuntime. + """ + + def __init__( + self, + env: simpy.Environment, + out_edge: EdgeRuntime, + rqs_generator_data: RqsGeneratorInput, + sim_settings: SimulationSettings, + *, + rng: np.random.Generator | None = None, + ) -> None: + """ + Definition of the instance attributes for the RqsGeneratorRuntime + + Args: + env (simpy.Environment): environment for the simulation + out_edge (EdgeRuntime): edge connecting this node with the next one + rqs_generator_data (RqsGeneratorInput): data do define the sampler + sim_settings (SimulationSettings): settings to start the simulation + rng (np.random.Generator | None, optional): random variable generator. + + """ + self.rqs_generator_data = rqs_generator_data + self.sim_settings = sim_settings + self.rng = rng or np.random.default_rng() + self.out_edge = out_edge + self.env = env + self.id_counter = 0 + + + def _next_id(self) -> int: + self.id_counter += 1 + return self.id_counter + + + def _requests_generator(self) -> Generator[float, None, None]: + """ + Return an iterator of inter-arrival gaps (seconds) according to the model + chosen in *input_data*. + + Notes + ----- + * If ``avg_active_users.distribution`` is ``"gaussian"`` or ``"normal"``, + the Gaussian-Poisson sampler is used. + * Otherwise the default Poisson-Poisson sampler is returned. + + """ + dist = self.rqs_generator_data.avg_active_users.distribution + + if dist == Distribution.NORMAL: + #Gaussian-Poisson model + return gaussian_poisson_sampling( + input_data=self.rqs_generator_data, + sim_settings=self.sim_settings, + rng=self.rng, + + ) + + # Poisson + Poisson + return poisson_poisson_sampling( + input_data=self.rqs_generator_data, + sim_settings=self.sim_settings, + rng=self.rng, + ) + + def _event_arrival(self) -> Generator[simpy.Event, None, None]: + """Simulating the process of event generation""" + time_gaps = self._requests_generator() + + for gap in time_gaps: + yield self.env.timeout(gap) + + state = RequestState( + id=self._next_id(), + initial_time=self.env.now, + + ) + state.record_hop(SystemNodes.GENERATOR, self.env.now) + # transport is a method of the edge runtime + # which define the step of how the state is moving + # from one node to another + self.out_edge.transport(state) + + def run(self) -> simpy.Process: + """Passing the structure as a simpy process""" + return self.env.process(self._event_arrival()) diff --git a/src/app/core/simulation/simulation_run.py b/src/app/core/simulation_run.py similarity index 95% rename from src/app/core/simulation/simulation_run.py rename to src/app/core/simulation_run.py index 1aea997..c6159ca 100644 --- a/src/app/core/simulation/simulation_run.py +++ b/src/app/core/simulation_run.py @@ -6,7 +6,7 @@ import simpy -from app.core.simulation.requests_generator import requests_generator +from app.core.helpers.requests_generator import requests_generator from app.schemas.simulation_output import SimulationOutput if TYPE_CHECKING: diff --git a/src/app/schemas/random_variables_config.py b/src/app/schemas/random_variables_config.py index 8d17c5f..b09ed13 100644 --- a/src/app/schemas/random_variables_config.py +++ b/src/app/schemas/random_variables_config.py @@ -15,7 +15,7 @@ class RVConfig(BaseModel): @field_validator("mean", mode="before") def ensure_mean_is_numeric( cls, # noqa: N805 - v: object, + v: float, ) -> float: """Ensure `mean` is numeric, then coerce to float.""" err_msg = "mean must be a number (int or float)" @@ -25,7 +25,12 @@ def ensure_mean_is_numeric( @model_validator(mode="after") # type: ignore[arg-type] def default_variance(cls, model: "RVConfig") -> "RVConfig": # noqa: N805 - """Set variance = mean when distribution == 'normal' and variance is missing.""" - if model.variance is None and model.distribution == Distribution.NORMAL: + """Set variance = mean when distribution require and variance is missing.""" + needs_variance: set[Distribution] = { + Distribution.NORMAL, + Distribution.LOG_NORMAL, + } + + if model.variance is None and model.distribution in needs_variance: model.variance = model.mean return model diff --git a/src/app/schemas/requests_generator_input.py b/src/app/schemas/requests_generator_input.py index f56a4f0..35ff361 100644 --- a/src/app/schemas/requests_generator_input.py +++ b/src/app/schemas/requests_generator_input.py @@ -1,9 +1,9 @@ """Define the schemas for the simulator""" -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, field_validator -from app.config.constants import TimeDefaults +from app.config.constants import Distribution, TimeDefaults from app.schemas.random_variables_config import RVConfig @@ -24,4 +24,34 @@ class RqsGeneratorInput(BaseModel): ), ) + @field_validator("avg_request_per_minute_per_user", mode="after") + def ensure_avg_request_is_poisson( + cls, # noqa: N805 + v: RVConfig, + ) -> RVConfig: + """ + Force the distribution for the rqs generator to be poisson + at the moment we have a joint sampler just for the poisson-poisson + and gaussian-poisson case + """ + if v.distribution != Distribution.POISSON: + msg = "At the moment the variable avg request must be Poisson" + raise ValueError(msg) + return v + + @field_validator("avg_active_users", mode="after") + def ensure_avg_user_is_poisson_or_gaussian( + cls, # noqa: N805 + v: RVConfig, + ) -> RVConfig: + """ + Force the distribution for the rqs generator to be poisson + at the moment we have a joint sampler just for the poisson-poisson + and gaussian-poisson case + """ + if v.distribution not in {Distribution.POISSON, Distribution.NORMAL}: + msg = "At the moment the variable active user must be Poisson or Gaussian" + raise ValueError(msg) + return v + diff --git a/src/app/schemas/system_topology_schema/endpoint_schema.py b/src/app/schemas/system_topology_schema/endpoint_schema.py index abe53da..8933dab 100644 --- a/src/app/schemas/system_topology_schema/endpoint_schema.py +++ b/src/app/schemas/system_topology_schema/endpoint_schema.py @@ -12,7 +12,7 @@ EndpointStepCPU, EndpointStepIO, EndpointStepRAM, - Metrics, + StepOperation, ) @@ -23,58 +23,58 @@ class Step(BaseModel): """ kind: EndpointStepIO | EndpointStepCPU | EndpointStepRAM - step_metrics: dict[Metrics, PositiveFloat | PositiveInt] + step_operation: dict[StepOperation, PositiveFloat | PositiveInt] - @field_validator("step_metrics", mode="before") + @field_validator("step_operation", mode="before") def ensure_non_empty( cls, # noqa: N805 - v: dict[Metrics, PositiveFloat | PositiveInt], - ) -> dict[Metrics, PositiveFloat | PositiveInt]: - """Ensure the dict step metrics exist""" + v: dict[StepOperation, PositiveFloat | PositiveInt], + ) -> dict[StepOperation, PositiveFloat | PositiveInt]: + """Ensure the dict step operation exist""" if not v: - msg = "step_metrics cannot be empty" + msg = "step_operation cannot be empty" raise ValueError(msg) return v @model_validator(mode="after") # type: ignore[arg-type] - def ensure_coherence_kind_metrics( + def ensure_coherence_type_operation( cls, # noqa: N805 model: "Step", ) -> "Step": """ - Validation to couple kind and metrics only when they are + Validation to couple kind and operation only when they are valid for example ram cannot have associated a cpu time """ - metrics_keys = set(model.step_metrics) + operation_keys = set(model.step_operation) # Control of the length of the set to be sure only on key is passed - if len(metrics_keys) != 1: - msg = "step_metrics must contain exactly one entry" + if len(operation_keys) != 1: + msg = "step_operation must contain exactly one entry" raise ValueError(msg) - # Coherence CPU bound operation and metric + # Coherence CPU bound operation and operation if isinstance(model.kind, EndpointStepCPU): - if metrics_keys != {Metrics.CPU_TIME}: + if operation_keys != {StepOperation.CPU_TIME}: msg = ( - "The metric to quantify a CPU BOUND step" - f"must be {Metrics.CPU_TIME}" + "The operation to quantify a CPU BOUND step" + f"must be {StepOperation.CPU_TIME}" ) raise ValueError(msg) - # Coherence RAM operation and metric + # Coherence RAM operation and operation elif isinstance(model.kind, EndpointStepRAM): - if metrics_keys != {Metrics.NECESSARY_RAM}: + if operation_keys != {StepOperation.NECESSARY_RAM}: msg = ( - "The metric to quantify a RAM step" - f"must be {Metrics.NECESSARY_RAM}" + "The operation to quantify a RAM step" + f"must be {StepOperation.NECESSARY_RAM}" ) raise ValueError(msg) - # Coherence I/O operation and metric - elif metrics_keys != {Metrics.IO_WAITING_TIME}: + # Coherence I/O operation and operation + elif operation_keys != {StepOperation.IO_WAITING_TIME}: msg = ( - "The metric to quantify an I/O step" - f"must be {Metrics.IO_WAITING_TIME}" + "The operation to quantify an I/O step" + f"must be {StepOperation.IO_WAITING_TIME}" ) raise ValueError(msg) diff --git a/src/app/schemas/system_topology_schema/full_system_topology_schema.py b/src/app/schemas/system_topology_schema/full_system_topology_schema.py index b53f08e..ddbac36 100644 --- a/src/app/schemas/system_topology_schema/full_system_topology_schema.py +++ b/src/app/schemas/system_topology_schema/full_system_topology_schema.py @@ -6,6 +6,8 @@ one structure to another """ +from collections import Counter + from pydantic import ( BaseModel, ConfigDict, @@ -16,6 +18,7 @@ ) from app.config.constants import ( + NetworkParameters, ServerResourcesDefaults, SystemEdges, SystemNodes, @@ -126,8 +129,10 @@ def unique_ids( ) -> "TopologyNodes": """Check that all id are unique""" ids = [server.id for server in model.servers] + [model.client.id] - if len(ids) != len(set(ids)): - msg = "Node ids must be unique" + counter = Counter(ids) + duplicate = [node_id for node_id, value in counter.items() if value > 1] + if duplicate: + msg = f"The following node ids are duplicate {duplicate}" raise ValueError(msg) return model @@ -159,11 +164,21 @@ class Edge(BaseModel): """ + id: str source: str target: str latency: RVConfig probability: float = Field(1.0, ge=0.0, le=1.0) edge_type: SystemEdges = SystemEdges.NETWORK_CONNECTION + dropout_rate: float = Field( + NetworkParameters.DROPOUT_RATE, + ge = NetworkParameters.MIN_DROPOUT_RATE, + le = NetworkParameters.MAX_DROPOUT_RATE, + description=( + "for each nodes representing a network we define" + "a probability to drop the request" + ), + ) @model_validator(mode="after") # type: ignore[arg-type] def check_src_trgt_different(cls, model: "Edge") -> "Edge": # noqa: N805 @@ -188,6 +203,20 @@ class TopologyGraph(BaseModel): nodes: TopologyNodes edges: list[Edge] + @model_validator(mode="after") # type: ignore[arg-type] + def unique_ids( + cls, # noqa: N805 + model: "TopologyGraph", + ) -> "TopologyGraph": + """Check that all id are unique""" + counter = Counter(edge.id for edge in model.edges) + duplicate = [edge_id for edge_id, value in counter.items() if value > 1] + if duplicate: + msg = f"There are multiple edges with the following ids {duplicate}" + raise ValueError(msg) + return model + + @model_validator(mode="after") # type: ignore[arg-type] def edge_refs_valid(cls, model: "TopologyGraph") -> "TopologyGraph": # noqa: N805 """ diff --git a/tests/unit/input_sructure/test_endpoint_input.py b/tests/unit/input_sructure/test_endpoint_input.py index 7e166dd..fc08b5e 100644 --- a/tests/unit/input_sructure/test_endpoint_input.py +++ b/tests/unit/input_sructure/test_endpoint_input.py @@ -9,7 +9,7 @@ EndpointStepCPU, EndpointStepIO, EndpointStepRAM, - Metrics, + StepOperation, ) from app.schemas.system_topology_schema.endpoint_schema import Endpoint, Step @@ -21,7 +21,7 @@ def cpu_step(value: float = 0.1) -> Step: """Return a minimal valid CPU-bound Step.""" return Step( kind=EndpointStepCPU.CPU_BOUND_OPERATION, - step_metrics={Metrics.CPU_TIME: value}, + step_operation={StepOperation.CPU_TIME: value}, ) @@ -29,7 +29,7 @@ def ram_step(value: int = 128) -> Step: """Return a minimal valid RAM Step.""" return Step( kind=EndpointStepRAM.RAM, - step_metrics={Metrics.NECESSARY_RAM: value}, + step_operation={StepOperation.NECESSARY_RAM: value}, ) @@ -37,7 +37,7 @@ def io_step(value: float = 0.05) -> Step: """Return a minimal valid I/O Step.""" return Step( kind=EndpointStepIO.WAIT, - step_metrics={Metrics.IO_WAITING_TIME: value}, + step_operation={StepOperation.IO_WAITING_TIME: value}, ) @@ -45,22 +45,25 @@ def io_step(value: float = 0.05) -> Step: # Positive test cases # --------------------------------------------------------------------------- # def test_valid_cpu_step() -> None: - """Test that a CPU step with correct 'cpu_time' metric passes validation.""" + """Test that a CPU step with correct 'cpu_time' operation passes validation.""" step = cpu_step() - # The metric value must match the input - assert step.step_metrics[Metrics.CPU_TIME] == 0.1 + # The operation value must match the input + assert step.step_operation[StepOperation.CPU_TIME] == 0.1 def test_valid_ram_step() -> None: - """Test that a RAM step with correct 'necessary_ram' metric passes validation.""" + """Test that a RAM step with correct 'necessary_ram' operation passes validation.""" step = ram_step() - assert step.step_metrics[Metrics.NECESSARY_RAM] == 128 + assert step.step_operation[StepOperation.NECESSARY_RAM] == 128 def test_valid_io_step() -> None: - """Test that an I/O step with correct 'io_waiting_time' metric passes validation.""" + """ + Test that an I/O step with correct 'io_waiting_time' + operation passes validation. + """ step = io_step() - assert step.step_metrics[Metrics.IO_WAITING_TIME] == 0.05 + assert step.step_operation[StepOperation.IO_WAITING_TIME] == 0.05 def test_endpoint_with_mixed_steps() -> None: @@ -79,47 +82,50 @@ def test_endpoint_with_mixed_steps() -> None: # Negative test cases # --------------------------------------------------------------------------- # @pytest.mark.parametrize( - ("kind", "bad_metrics"), + ("kind", "bad_operation"), [ - # CPU step with RAM metric - (EndpointStepCPU.CPU_BOUND_OPERATION, {Metrics.NECESSARY_RAM: 64}), - # RAM step with CPU metric - (EndpointStepRAM.RAM, {Metrics.CPU_TIME: 0.2}), - # I/O step with CPU metric - (EndpointStepIO.DB, {Metrics.CPU_TIME: 0.05}), + # CPU step with RAM operation + (EndpointStepCPU.CPU_BOUND_OPERATION, {StepOperation.NECESSARY_RAM: 64}), + # RAM step with CPU operation + (EndpointStepRAM.RAM, {StepOperation.CPU_TIME: 0.2}), + # I/O step with CPU operation + (EndpointStepIO.DB, {StepOperation.CPU_TIME: 0.05}), ], ) -def test_incoherent_kind_metric_pair( +def test_incoherent_kind_operation_pair( kind: EndpointStepCPU | EndpointStepRAM | EndpointStepIO, - bad_metrics: dict[Metrics, float | int], + bad_operation: dict[StepOperation, float | int], ) -> None: - """Test that mismatched kind and metric combinations raise ValidationError.""" + """Test that mismatched kind and operation combinations raise ValidationError.""" with pytest.raises(ValidationError): - Step(kind=kind, step_metrics=bad_metrics) + Step(kind=kind, step_operation=bad_operation) -def test_multiple_metrics_not_allowed() -> None: - """Test that providing multiple metrics in a single Step raises ValidationError.""" +def test_multiple_operation_not_allowed() -> None: + """ + Test that providing multiple operation in a single Step + raises ValidationError. + """ with pytest.raises(ValidationError): Step( kind=EndpointStepCPU.CPU_BOUND_OPERATION, - step_metrics={ - Metrics.CPU_TIME: 0.1, - Metrics.NECESSARY_RAM: 64, + step_operation={ + StepOperation.CPU_TIME: 0.1, + StepOperation.NECESSARY_RAM: 64, }, ) -def test_empty_metrics_rejected() -> None: - """Test that an empty metrics dict is rejected by the validator.""" +def test_empty_operation_rejected() -> None: + """Test that an empty operation dict is rejected by the validator.""" with pytest.raises(ValidationError): - Step(kind=EndpointStepCPU.CPU_BOUND_OPERATION, step_metrics={}) + Step(kind=EndpointStepCPU.CPU_BOUND_OPERATION, step_operation={}) -def test_wrong_metric_name_for_io() -> None: - """Test that an I/O step with a non-I/O metric key is rejected.""" +def test_wrong_operation_name_for_io() -> None: + """Test that an I/O step with a non-I/O operation key is rejected.""" with pytest.raises(ValidationError): Step( kind=EndpointStepIO.CACHE, - step_metrics={Metrics.NECESSARY_RAM: 64}, + step_operation={StepOperation.NECESSARY_RAM: 64}, ) diff --git a/tests/unit/input_sructure/test_full_topology_input.py b/tests/unit/input_sructure/test_full_topology_input.py index 9e17573..7f0d864 100644 --- a/tests/unit/input_sructure/test_full_topology_input.py +++ b/tests/unit/input_sructure/test_full_topology_input.py @@ -1,13 +1,4 @@ -"""Unit-tests for the **topology schemas** (Client, ServerResources, …). - -Every section below is grouped by the object under test, separated by -clear comment banners so that long files remain navigable. - -The tests aim for: -* 100 % branch-coverage on custom validators. -* mypy strict-compatibility (full type hints, no Any). -* ruff compliance (imports ordered, no unused vars, ≤ 88-char lines). -""" +"""Unit-tests for topology schemas (Client, ServerResources, Edge, …)""" from __future__ import annotations @@ -16,8 +7,9 @@ from app.config.constants import ( EndpointStepCPU, - Metrics, + NetworkParameters, ServerResourcesDefaults, + StepOperation, SystemEdges, SystemNodes, ) @@ -32,53 +24,58 @@ TopologyNodes, ) - # --------------------------------------------------------------------------- # -# Client +# Client # # --------------------------------------------------------------------------- # + + def test_valid_client() -> None: - """A client with correct `type` should validate.""" + """A client with correct ``type`` validates.""" cli = Client(id="frontend", type=SystemNodes.CLIENT) assert cli.type is SystemNodes.CLIENT def test_invalid_client_type() -> None: - """Wrong `type` enum on Client must raise ValidationError.""" + """Wrong ``type`` enumeration on Client raises ValidationError.""" with pytest.raises(ValidationError): - Client(id="wrong", type=SystemNodes.SERVER) + Client(id="oops", type=SystemNodes.SERVER) # --------------------------------------------------------------------------- # -# ServerResources +# ServerResources # # --------------------------------------------------------------------------- # + + def test_server_resources_defaults() -> None: - """Default values must match the constant table.""" - res = ServerResources() # all defaults + """All defaults match constant table.""" + res = ServerResources() assert res.cpu_cores == ServerResourcesDefaults.CPU_CORES assert res.ram_mb == ServerResourcesDefaults.RAM_MB assert res.db_connection_pool is ServerResourcesDefaults.DB_CONNECTION_POOL def test_server_resources_min_constraints() -> None: - """cpu_cores and ram_mb < minimum should fail validation.""" + """Values below minimum trigger validation failure.""" with pytest.raises(ValidationError): ServerResources(cpu_cores=0, ram_mb=128) # too small # --------------------------------------------------------------------------- # -# Server +# Server # # --------------------------------------------------------------------------- # + + def _dummy_endpoint() -> Endpoint: - """Return a minimal valid Endpoint needed to build a Server.""" + """Return a minimal valid Endpoint for Server construction.""" step = Step( kind=EndpointStepCPU.CPU_BOUND_OPERATION, - step_metrics={Metrics.CPU_TIME: 0.1}, + step_operation={StepOperation.CPU_TIME: 0.1}, ) return Endpoint(endpoint_name="/ping", steps=[step]) def test_valid_server() -> None: - """Server with correct type, resources and endpoint list.""" + """Server with correct ``type`` and resources passes validation.""" srv = Server( id="api-1", type=SystemNodes.SERVER, @@ -89,10 +86,10 @@ def test_valid_server() -> None: def test_invalid_server_type() -> None: - """Server with wrong `type` enum must be rejected.""" + """Server with wrong ``type`` raises ValidationError.""" with pytest.raises(ValidationError): Server( - id="oops", + id="bad-srv", type=SystemNodes.CLIENT, server_resources=ServerResources(), endpoints=[_dummy_endpoint()], @@ -100,10 +97,12 @@ def test_invalid_server_type() -> None: # --------------------------------------------------------------------------- # -# TopologyNodes +# TopologyNodes # # --------------------------------------------------------------------------- # + + def _single_node_topology() -> TopologyNodes: - """Helper that returns a valid TopologyNodes with one server and one client.""" + """Helper returning one server + one client topology.""" srv = Server( id="svc-A", server_resources=ServerResources(), @@ -114,22 +113,24 @@ def _single_node_topology() -> TopologyNodes: def test_unique_ids_validator() -> None: - """Duplicate node IDs should trigger the unique_ids validator.""" + """Duplicate node IDs trigger the ``unique_ids`` validator.""" nodes = _single_node_topology() - # duplicate client ID dup_srv = nodes.servers[0].model_copy(update={"id": "browser"}) with pytest.raises(ValidationError): TopologyNodes(servers=[dup_srv], client=nodes.client) # --------------------------------------------------------------------------- # -# Edge +# Edge # # --------------------------------------------------------------------------- # + + def test_edge_source_equals_target_fails() -> None: - """Edge with identical source/target must raise.""" + """Edge with identical source/target raises ValidationError.""" latency_cfg = RVConfig(mean=0.05) with pytest.raises(ValidationError): Edge( + id="edge-dup", source="same", target="same", latency=latency_cfg, @@ -137,18 +138,48 @@ def test_edge_source_equals_target_fails() -> None: ) +def test_edge_missing_id_raises() -> None: + """Omitting mandatory ``id`` field raises ValidationError.""" + latency_cfg = RVConfig(mean=0.01) + with pytest.raises(ValidationError): + Edge( # type: ignore[call-arg] + source="a", + target="b", + latency=latency_cfg, + ) + + +@pytest.mark.parametrize( + "bad_rate", + [-0.1, NetworkParameters.MAX_DROPOUT_RATE + 0.1], +) +def test_edge_dropout_rate_bounds(bad_rate: float) -> None: + """Drop-out rate outside valid range triggers ValidationError.""" + with pytest.raises(ValidationError): + Edge( + id="edge-bad-drop", + source="n1", + target="n2", + latency=RVConfig(mean=0.01), + dropout_rate=bad_rate, + ) + + # --------------------------------------------------------------------------- # -# TopologyGraph +# TopologyGraph # # --------------------------------------------------------------------------- # + + def _latency() -> RVConfig: - """A tiny helper for RVConfig latency objects.""" + """Tiny helper for latency objects.""" return RVConfig(mean=0.02) def test_valid_topology_graph() -> None: - """End-to-end happy-path graph passes validation.""" + """Happy-path graph passes validation.""" nodes = _single_node_topology() edge = Edge( + id="edge-1", source="browser", target="svc-A", latency=_latency(), @@ -159,9 +190,10 @@ def test_valid_topology_graph() -> None: def test_edge_refers_unknown_node() -> None: - """Edge pointing to a non-existent node ID must fail.""" + """Edge pointing to a non-existent node fails validation.""" nodes = _single_node_topology() bad_edge = Edge( + id="edge-ghost", source="browser", target="ghost-srv", latency=_latency(), diff --git a/tests/unit/input_sructure/test_requests_generator_input.py b/tests/unit/input_sructure/test_requests_generator_input.py index 39a0b2d..1ca9562 100644 --- a/tests/unit/input_sructure/test_requests_generator_input.py +++ b/tests/unit/input_sructure/test_requests_generator_input.py @@ -1,5 +1,4 @@ """Validation tests for RVConfig, RqsGeneratorInput and SimulationSettings.""" - from __future__ import annotations import pytest @@ -10,23 +9,41 @@ from app.schemas.requests_generator_input import RqsGeneratorInput from app.schemas.simulation_settings_input import SimulationSettings -# --------------------------------------------------------------------------- -# RVCONFIG -# --------------------------------------------------------------------------- +# --------------------------------------------------------------------------- # +# RVCONFIG # +# --------------------------------------------------------------------------- # def test_normal_sets_variance_to_mean() -> None: - """If variance is omitted with 'normal', it defaults to mean.""" + """If variance is omitted for 'normal', it defaults to mean.""" cfg = RVConfig(mean=10, distribution=Distribution.NORMAL) assert cfg.variance == 10.0 +def test_log_normal_sets_variance_to_mean() -> None: + """If variance is omitted for 'log_normal', it defaults to mean.""" + cfg = RVConfig(mean=5, distribution=Distribution.LOG_NORMAL) + assert cfg.variance == 5.0 + + def test_poisson_keeps_variance_none() -> None: - """If variance is omitted with 'poisson', it remains None.""" + """If variance is omitted for 'poisson', it remains None.""" cfg = RVConfig(mean=5, distribution=Distribution.POISSON) assert cfg.variance is None +def test_uniform_keeps_variance_none() -> None: + """If variance is omitted for 'uniform', it remains None.""" + cfg = RVConfig(mean=1, distribution=Distribution.UNIFORM) + assert cfg.variance is None + + +def test_exponential_keeps_variance_none() -> None: + """If variance is omitted for 'exponential', it remains None.""" + cfg = RVConfig(mean=2.5, distribution=Distribution.EXPONENTIAL) + assert cfg.variance is None + + def test_explicit_variance_is_preserved() -> None: """An explicit variance value is not modified.""" cfg = RVConfig(mean=8, distribution=Distribution.NORMAL, variance=4) @@ -34,7 +51,7 @@ def test_explicit_variance_is_preserved() -> None: def test_mean_must_be_numeric() -> None: - """A non numeric mean triggers a ValidationError.""" + """A non-numeric mean triggers a ValidationError.""" with pytest.raises(ValidationError) as exc: RVConfig(mean="not a number", distribution=Distribution.POISSON) @@ -65,24 +82,32 @@ def test_explicit_variance_kept_for_poisson() -> None: assert cfg.variance == pytest.approx(2.2) -def test_invalid_distribution_raises() -> None: +def test_invalid_distribution_literal_raises() -> None: """An unsupported distribution literal raises ValidationError.""" with pytest.raises(ValidationError): RVConfig(mean=5.0, distribution="not_a_dist") -# --------------------------------------------------------------------------- -# RQSGENERATORINPUT - USER_SAMPLING_WINDOW -# --------------------------------------------------------------------------- + +# --------------------------------------------------------------------------- # +# RQSGENERATORINPUT - USER_SAMPLING_WINDOW & DISTRIBUTION CONSTRAINTS # +# --------------------------------------------------------------------------- # + + +def _valid_poisson_cfg(mean: float = 1.0) -> dict[str, float | str]: + """Helper: minimal Poisson config for JSON-style input.""" + return {"mean": mean, "distribution": Distribution.POISSON} + + +def _valid_normal_cfg(mean: float = 1.0) -> dict[str, float | str]: + """Helper: minimal Normal config for JSON-style input.""" + return {"mean": mean, "distribution": Distribution.NORMAL} def test_default_user_sampling_window() -> None: """If user_sampling_window is missing it defaults to the constant.""" inp = RqsGeneratorInput( - avg_active_users={"mean": 1.0, "distribution": Distribution.POISSON}, - avg_request_per_minute_per_user={ - "mean": 1.0, - "distribution": Distribution.POISSON, - }, + avg_active_users=_valid_poisson_cfg(), + avg_request_per_minute_per_user=_valid_poisson_cfg(), ) assert inp.user_sampling_window == TimeDefaults.USER_SAMPLING_WINDOW @@ -90,25 +115,19 @@ def test_default_user_sampling_window() -> None: def test_explicit_user_sampling_window_kept() -> None: """An explicit user_sampling_window is preserved.""" inp = RqsGeneratorInput( - avg_active_users={"mean": 1.0, "distribution": Distribution.POISSON}, - avg_request_per_minute_per_user={ - "mean": 1.0, - "distribution": Distribution.POISSON, - }, + avg_active_users=_valid_poisson_cfg(), + avg_request_per_minute_per_user=_valid_poisson_cfg(), user_sampling_window=30, ) assert inp.user_sampling_window == 30 def test_user_sampling_window_not_int_raises() -> None: - """A non integer user_sampling_window raises ValidationError.""" + """A non-integer user_sampling_window raises ValidationError.""" with pytest.raises(ValidationError): RqsGeneratorInput( - avg_active_users={"mean": 1.0, "distribution": Distribution.POISSON}, - avg_request_per_minute_per_user={ - "mean": 1.0, - "distribution": Distribution.POISSON, - }, + avg_active_users=_valid_poisson_cfg(), + avg_request_per_minute_per_user=_valid_poisson_cfg(), user_sampling_window="not-int", ) @@ -118,19 +137,60 @@ def test_user_sampling_window_above_max_raises() -> None: too_large = TimeDefaults.MAX_USER_SAMPLING_WINDOW + 1 with pytest.raises(ValidationError): RqsGeneratorInput( - avg_active_users={"mean": 1.0, "distribution": Distribution.POISSON}, - avg_request_per_minute_per_user={ - "mean": 1.0, - "distribution": Distribution.POISSON, - }, + avg_active_users=_valid_poisson_cfg(), + avg_request_per_minute_per_user=_valid_poisson_cfg(), user_sampling_window=too_large, ) +def test_avg_request_must_be_poisson() -> None: + """avg_request_per_minute_per_user must be Poisson; Normal raises.""" + with pytest.raises(ValidationError): + RqsGeneratorInput( + avg_active_users=_valid_poisson_cfg(), + avg_request_per_minute_per_user=_valid_normal_cfg(), + ) + + +def test_avg_active_users_invalid_distribution_raises() -> None: + """avg_active_users cannot be Exponential; only Poisson or Normal allowed.""" + bad_cfg = {"mean": 1.0, "distribution": Distribution.EXPONENTIAL} + with pytest.raises(ValidationError): + RqsGeneratorInput( + avg_active_users=bad_cfg, + avg_request_per_minute_per_user=_valid_poisson_cfg(), + ) + + +def test_valid_poisson_poisson_configuration() -> None: + """Poisson-Poisson combo is accepted.""" + cfg = RqsGeneratorInput( + avg_active_users=_valid_poisson_cfg(), + avg_request_per_minute_per_user=_valid_poisson_cfg(), + ) + assert cfg.avg_active_users.distribution is Distribution.POISSON + assert ( + cfg.avg_request_per_minute_per_user.distribution + is Distribution.POISSON + ) + + +def test_valid_normal_poisson_configuration() -> None: + """Normal-Poisson combo is accepted.""" + cfg = RqsGeneratorInput( + avg_active_users=_valid_normal_cfg(), + avg_request_per_minute_per_user=_valid_poisson_cfg(), + ) + assert cfg.avg_active_users.distribution is Distribution.NORMAL + assert ( + cfg.avg_request_per_minute_per_user.distribution + is Distribution.POISSON + ) + -# --------------------------------------------------------------------------- -# SIMULATIONSETTINGS - TOTAL_SIMULATION_TIME -# --------------------------------------------------------------------------- +# --------------------------------------------------------------------------- # +# SIMULATIONSETTINGS - TOTAL_SIMULATION_TIME # +# --------------------------------------------------------------------------- # def test_default_total_simulation_time() -> None: @@ -146,7 +206,7 @@ def test_explicit_total_simulation_time_kept() -> None: def test_total_simulation_time_not_int_raises() -> None: - """A non integer total_simulation_time raises ValidationError.""" + """A non-integer total_simulation_time raises ValidationError.""" with pytest.raises(ValidationError): SimulationSettings(total_simulation_time="three thousand") diff --git a/tests/unit/runtime/test_edge b/tests/unit/runtime/test_edge new file mode 100644 index 0000000..8daaf01 --- /dev/null +++ b/tests/unit/runtime/test_edge @@ -0,0 +1,136 @@ +"""Unit-tests for :class:`EdgeRuntime`. + +The tests cover: + +* correct delivery when the random 'uniform' draw exceeds the drop rate; +* correct drop behaviour when the draw is below the drop rate; +* that the latency sampler is invoked exactly once per message. +""" +from __future__ import annotations + +from collections.abc import Iterator +from typing import TYPE_CHECKING, cast + +import numpy as np +import pytest +import simpy + +from app.config.constants import NetworkParameters, SystemNodes +from app.config.rqs_state import RequestState +from app.core.event_samplers.common_helpers import general_sampler +from app.core.runtime.edge import EdgeRuntime +from app.schemas.random_variables_config import RVConfig +from app.schemas.system_topology_schema.full_system_topology_schema import Edge + +if TYPE_CHECKING: + from collections.abc import Generator + + +pytestmark = pytest.mark.unit # module-level marker + + +# --------------------------------------------------------------------------- # +# Dummy RNG # +# --------------------------------------------------------------------------- # + + +class DummyRNG: + """RNG stub returning preset values for `uniform()` and `normal()`.""" + + def __init__( + self, + *, + uniform_value: float, + sampler_value: float = 0.0, + ) -> None: + self.uniform_value = uniform_value + self.sampler_value = sampler_value + self.uniform_called = False + self.sampler_called = False + + def uniform(self) -> float: # noqa: D401 + self.uniform_called = True + return self.uniform_value + + # EdgeRuntime passes `self` to `general_sampler`; wrap the call + def normal(self, mean: float, sigma: float) -> float: # noqa: D401 + self.sampler_called = True + return self.sampler_value + + +# --------------------------------------------------------------------------- # +# Helper to create a minimal EdgeRuntime # +# --------------------------------------------------------------------------- # + + +def _make_edge( + env: simpy.Environment, + *, + uniform_value: float, + sampler_value: float = 0.0, +) -> tuple[EdgeRuntime, DummyRNG, simpy.Store]: + rng = DummyRNG(uniform_value=uniform_value, sampler_value=sampler_value) + store: simpy.Store = simpy.Store(env) + + edge_cfg = Edge( + id="edge-1", + source="src", + target="dst", + latency=RVConfig(mean=0.0, distribution="uniform"), # value ignored in test + ) + + edge_rt = EdgeRuntime( + env=env, + edge_config=edge_cfg, + rng=cast("np.random.Generator", rng), + target_box=store, + ) + return edge_rt, rng, store + + +# --------------------------------------------------------------------------- # +# Tests # +# --------------------------------------------------------------------------- # + + +def test_edge_delivers_message_when_not_dropped() -> None: + """Message is delivered and latency sampler is called once.""" + env = simpy.Environment() + edge, rng, store = _make_edge(env, uniform_value=0.9, sampler_value=0.5) + + # prepare request state + state = RequestState(id=1, initial_time=0.0) + state.record_hop(SystemNodes.GENERATOR, env.now) + + edge.transport(state) + env.run() + + # exactly one message delivered + assert len(store.items) == 1 + delivered: RequestState = store.items[0] + assert delivered.hops[-1].node == "edge-1" # last hop is the edge id + assert rng.uniform_called is True + assert rng.sampler_called is True + + +def test_edge_drops_message_when_uniform_below_threshold( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Message is dropped and never placed in the target store.""" + # override global drop rate to deterministic 0.5 for the test + monkeypatch.setattr(NetworkParameters, "DROPOUT_RATE", 0.5, raising=False) + + env = simpy.Environment() + edge, rng, store = _make_edge(env, uniform_value=0.1) # below 0.5 ⇒ drop + + state = RequestState(id=2, initial_time=0.0) + state.record_hop(SystemNodes.GENERATOR, env.now) + + edge.transport(state) + env.run() + + assert len(store.items) == 0 # nothing delivered + assert state.hops[-1].node.endswith("dropped") + assert rng.uniform_called is True + # sampler must not be invoked when dropped + assert rng.sampler_called is False diff --git a/tests/unit/runtime/test_requests_generator.py b/tests/unit/runtime/test_requests_generator.py new file mode 100644 index 0000000..be03583 --- /dev/null +++ b/tests/unit/runtime/test_requests_generator.py @@ -0,0 +1,151 @@ +"""Unit-tests for the :class:`RqsGeneratorRuntime` dispatcher and event flow.""" +from __future__ import annotations + +from collections.abc import Iterator +from typing import TYPE_CHECKING, cast + +import numpy as np +import simpy + +from app.config.constants import Distribution +from app.core.runtime.rqs_generator import RqsGeneratorRuntime + +if TYPE_CHECKING: + + import pytest + from numpy.random import Generator + + from app.config.rqs_state import RequestState + from app.core.runtime.edge import EdgeRuntime + from app.schemas.requests_generator_input import RqsGeneratorInput + from app.schemas.simulation_settings_input import SimulationSettings + +import importlib + +# --------------------------------------------------------------------------- # +# Helpers # +# --------------------------------------------------------------------------- # + + +class DummyEdgeRuntime: + """Minimal stub capturing transported :class:`RequestState`.""" + + def __init__(self) -> None: + """Definition of the attributes""" + self.received: list[RequestState] = [] + + def transport(self, state: RequestState) -> None: + """Collect every state passed through the edge.""" + self.received.append(state) + + +def _make_runtime( + env: simpy.Environment, + edge: DummyEdgeRuntime, + rqs_input: RqsGeneratorInput, + sim_settings: SimulationSettings, + *, + seed: int = 0, +) -> RqsGeneratorRuntime: + """Factory returning a fully wired :class:`RqsGeneratorRuntime`.""" + rng: Generator = np.random.default_rng(seed) + return RqsGeneratorRuntime( + env=env, + out_edge=cast("EdgeRuntime", edge), + rqs_generator_data=rqs_input, + sim_settings=sim_settings, + rng=rng, + ) + + +# --------------------------------------------------------------------------- # +# Dispatcher behaviour # +# --------------------------------------------------------------------------- # + + +RGR_MODULE = importlib.import_module("app.core.runtime.rqs_generator") + +def test_dispatcher_selects_poisson_poisson( + monkeypatch: pytest.MonkeyPatch, + rqs_input: RqsGeneratorInput, + sim_settings: SimulationSettings, +) -> None: + """Default (Poisson) distribution must invoke *poisson_poisson_sampling*.""" + called = {"pp": False} + + def _fake_pp(*args: object, **kwargs: object) -> Iterator[float]: + called["pp"] = True + return iter(()) # iterator already exhausted + + monkeypatch.setattr(RGR_MODULE, "poisson_poisson_sampling", _fake_pp) + + env = simpy.Environment() + edge = DummyEdgeRuntime() + runtime = _make_runtime(env, edge, rqs_input, sim_settings) + + gen = runtime._requests_generator() # noqa: SLF001 + for _ in gen: + pass + + assert called["pp"] is True + assert isinstance(gen, Iterator) + + +def test_dispatcher_selects_gaussian_poisson( + monkeypatch: pytest.MonkeyPatch, + rqs_input: RqsGeneratorInput, + sim_settings: SimulationSettings, +) -> None: + """Normal distribution must invoke *gaussian_poisson_sampling*.""" + rqs_input.avg_active_users.distribution = Distribution.NORMAL + called = {"gp": False} + + def _fake_gp(*args: object, **kwargs: object) -> Iterator[float]: + called["gp"] = True + return iter(()) + + monkeypatch.setattr(RGR_MODULE, "gaussian_poisson_sampling", _fake_gp) + + env = simpy.Environment() + edge = DummyEdgeRuntime() + runtime = _make_runtime(env, edge, rqs_input, sim_settings) + + gen = runtime._requests_generator() # noqa: SLF001 + for _ in gen: + pass + + assert called["gp"] is True + assert isinstance(gen, Iterator) + +# --------------------------------------------------------------------------- # +# Event-arrival flow # +# --------------------------------------------------------------------------- # + + +def test_event_arrival_generates_expected_number_of_requests( + monkeypatch: pytest.MonkeyPatch, + rqs_input: RqsGeneratorInput, + sim_settings: SimulationSettings, +) -> None: + """Given a deterministic gap list, exactly that many requests are sent.""" + gaps = [1.0, 2.0, 3.0] + + def _fake_gen(self: object) -> Iterator[float]: + yield from gaps + + monkeypatch.setattr( + RqsGeneratorRuntime, + "_requests_generator", + _fake_gen, + ) + + env = simpy.Environment() + edge = DummyEdgeRuntime() + runtime = _make_runtime(env, edge, rqs_input, sim_settings) + + env.process(runtime._event_arrival()) # noqa: SLF001 + env.run(until=sum(gaps) + 0.1) # run slightly past the last gap + + assert len(edge.received) == len(gaps) + ids = [s.id for s in edge.received] + assert ids == [1, 2, 3] diff --git a/tests/unit/sampler/test_sampler_helper.py b/tests/unit/sampler/test_sampler_helper.py index 0222196..7d34990 100644 --- a/tests/unit/sampler/test_sampler_helper.py +++ b/tests/unit/sampler/test_sampler_helper.py @@ -1,130 +1,196 @@ +"""Unit-tests for helper-functions in +`app.core.event_samplers.common_helpers`. +""" +from __future__ import annotations + from typing import cast import numpy as np +import pytest +from app.config.constants import Distribution from app.core.event_samplers.common_helpers import ( + exponential_variable_generator, + general_sampler, + lognormal_variable_generator, poisson_variable_generator, truncated_gaussian_generator, uniform_variable_generator, ) +from app.schemas.random_variables_config import RVConfig + +# --------------------------------------------------------------------------- # +# Dummy RNG # +# --------------------------------------------------------------------------- # class DummyRNG: - """Dummy RNG for testing: returns fixed values for random(), poisson(), normal().""" + """Minimal stub mimicking the subset of the NumPy RNG API used in tests.""" - def __init__( + def __init__( # noqa: D107 self, + *, uniform_value: float | None = None, poisson_value: int | None = None, normal_value: float | None = None, + lognormal_value: float | None = None, + exponential_value: float | None = None, ) -> None: - """ - Initialize the dummy RNG with optional preset outputs. - - Args: - uniform_value: value to return from random(), if not None. - poisson_value: value to return from poisson(), if not None. - normal_value: value to return from normal(), if not None. - - """ self.uniform_value = uniform_value self.poisson_value = poisson_value self.normal_value = normal_value + self.lognormal_value = lognormal_value + self.exponential_value = exponential_value - def random(self) -> float: - """ - Return the preset uniform_value or fall back to a real RNG. + # --- uniform ----------------------------------------------------------- # - Returns: - A float in [0.0, 1.0). - - """ + def random(self) -> float: + """Return the preset ``uniform_value`` or fall back to a real RNG.""" if self.uniform_value is not None: return self.uniform_value - return np.random.default_rng().random() - - def poisson(self, mean: float) -> int: - """ - Return the preset poisson_value or fall back to a real RNG. - - Args: - mean: the λ parameter for a Poisson draw (ignored if poisson_value is set). + return float(np.random.default_rng().random()) - Returns: - An integer sample from a Poisson distribution. + # --- Poisson ----------------------------------------------------------- # - """ + def poisson(self, lam: float) -> int: + """Return the preset ``poisson_value`` or draw from a real Poisson.""" if self.poisson_value is not None: return self.poisson_value - return int(np.random.default_rng().poisson(mean)) + return int(np.random.default_rng().poisson(lam)) - def normal(self, mean: float, sigma: float) -> float: - """ - Return the preset normal_value or fall back to a real RNG. - - Args: - mean: the mean of the Normal distribution. - sigma: the standard deviation of the Normal distribution. - - Returns: - A float sample from a Normal distribution. + # --- Normal ------------------------------------------------------------ # - """ + def normal(self, mean: float, sigma: float) -> float: + """Return the preset ``normal_value`` or draw from a real Normal.""" if self.normal_value is not None: return self.normal_value return float(np.random.default_rng().normal(mean, sigma)) + # --- Log-normal -------------------------------------------------------- # + + def lognormal(self, mean: float, sigma: float) -> float: + """Return the preset ``lognormal_value`` or draw from a real LogNormal.""" + if self.lognormal_value is not None: + return self.lognormal_value + return float(np.random.default_rng().lognormal(mean, sigma)) + + # --- Exponential ------------------------------------------------------- # + + def exponential(self, scale: float) -> float: + """Return the preset ``exponential_value`` or draw from a real Exponential.""" + if self.exponential_value is not None: + return self.exponential_value + return float(np.random.default_rng().exponential(scale)) + + +# --------------------------------------------------------------------------- # +# Tests for low-level generators # +# --------------------------------------------------------------------------- # + def test_uniform_variable_generator_with_dummy_rng() -> None: - """Ensure uniform_variable_generator returns the dummy RNGs uniform_value.""" + """`uniform_variable_generator` returns the dummy's ``uniform_value``.""" dummy = cast("np.random.Generator", DummyRNG(uniform_value=0.75)) assert uniform_variable_generator(dummy) == 0.75 -def test_uniform_variable_generator_default_rng_range() -> None: - """Ensure the default RNG produces a float in [0.0, 1.0).""" - for _ in range(100): - val = uniform_variable_generator() - assert isinstance(val, float) - assert 0.0 <= val < 1.0 +def test_uniform_variable_generator_bounds() -> None: + """Calling with a real RNG yields a value in the half-open interval [0, 1).""" + rng = np.random.default_rng(1_234) + val = uniform_variable_generator(rng) + assert 0.0 <= val < 1.0 def test_poisson_variable_generator_with_dummy_rng() -> None: - """Ensure poisson_variable_generator returns the dummy RNGs poisson_value.""" + """`poisson_variable_generator` returns the dummy's ``poisson_value``.""" dummy = cast("np.random.Generator", DummyRNG(poisson_value=3)) assert poisson_variable_generator(mean=5.0, rng=dummy) == 3 def test_poisson_variable_generator_reproducible() -> None: - """Ensure two generators with the same seed produce the same Poisson sample.""" - rng1 = np.random.default_rng(12345) - rng2 = np.random.default_rng(12345) - v1 = poisson_variable_generator(mean=10.0, rng=rng1) - v2 = poisson_variable_generator(mean=10.0, rng=rng2) + """Two RNGs with the same seed produce identical Poisson draws.""" + rng1 = np.random.default_rng(42) + rng2 = np.random.default_rng(42) + v1 = poisson_variable_generator(7.0, rng1) + v2 = poisson_variable_generator(7.0, rng2) assert v1 == v2 -def test_truncated_gaussian_generator_truncates_negative() -> None: - """Ensure truncated_gaussian_generator clamps negative draws to zero.""" +def test_truncated_gaussian_generator_negative_clamped() -> None: + """Negative Normal draws are clamped to zero.""" dummy = cast("np.random.Generator", DummyRNG(normal_value=-2.7)) - result = truncated_gaussian_generator(mean=10.0, variance=5.0, rng=dummy) - assert result == 0 + assert truncated_gaussian_generator(10.0, 5.0, dummy) == 0.0 -def test_truncated_gaussian_generator_truncates_toward_zero() -> None: - """Ensure truncated_gaussian_generator rounds toward zero for positive draws.""" +def test_truncated_gaussian_generator_positive_passthrough() -> None: + """Positive Normal draws pass through unchanged.""" dummy = cast("np.random.Generator", DummyRNG(normal_value=3.9)) - result = truncated_gaussian_generator(mean=10.0, variance=5.0, rng=dummy) - assert isinstance(result, int) - assert result == 3 + val = truncated_gaussian_generator(10.0, 5.0, dummy) + assert isinstance(val, float) + assert val == 3.9 -def test_truncated_gaussian_generator_default_rng_non_negative_int() -> None: - """ - Ensure the default RNG produces - a non-negative integer from the truncated Gaussian. - """ +def test_truncated_gaussian_generator_default_rng_non_negative() -> None: + """Real RNG always yields a non-negative float after truncation.""" rng = np.random.default_rng(321) - val = truncated_gaussian_generator(mean=10.0, variance=2.0, rng=rng) - assert isinstance(val, int) - assert val >= 0 + assert truncated_gaussian_generator(10.0, 2.0, rng) >= 0.0 + + +def test_lognormal_variable_generator_reproducible() -> None: + """`lognormal_variable_generator` is reproducible with a fixed seed.""" + rng1 = np.random.default_rng(99) + rng2 = np.random.default_rng(99) + v1 = lognormal_variable_generator(1.0, 0.5, rng1) + v2 = lognormal_variable_generator(1.0, 0.5, rng2) + assert v1 == pytest.approx(v2) + + +def test_exponential_variable_generator_reproducible() -> None: + """`exponential_variable_generator` is reproducible with a fixed seed.""" + rng1 = np.random.default_rng(54_321) + rng2 = np.random.default_rng(54_321) + v1 = exponential_variable_generator(2.0, rng1) + v2 = exponential_variable_generator(2.0, rng2) + assert v1 == pytest.approx(v2) + + +# --------------------------------------------------------------------------- # +# Tests for `general_sampler` # +# --------------------------------------------------------------------------- # + + +def test_general_sampler_uniform_path() -> None: + """Uniform branch returns the dummy's preset value.""" + dummy = cast("np.random.Generator", DummyRNG(uniform_value=0.42)) + cfg = RVConfig(mean=1.0, distribution=Distribution.UNIFORM) + assert general_sampler(cfg, dummy) == 0.42 + + +def test_general_sampler_normal_path() -> None: + """Normal branch applies truncation logic (negative → 0).""" + dummy = cast("np.random.Generator", DummyRNG(normal_value=-1.2)) + cfg = RVConfig(mean=0.0, variance=1.0, distribution=Distribution.NORMAL) + assert general_sampler(cfg, dummy) == 0.0 + + +def test_general_sampler_poisson_path() -> None: + """Poisson branch returns the dummy's preset integer as *float*.""" + dummy = cast("np.random.Generator", DummyRNG(poisson_value=4)) + cfg = RVConfig(mean=5.0, variance=5.0, distribution=Distribution.POISSON) + result = general_sampler(cfg, dummy) + assert isinstance(result, float) + assert result == 4.0 + + +def test_general_sampler_lognormal_path() -> None: + """Log-normal branch produces a strictly positive float.""" + rng = np.random.default_rng(2_025) + cfg = RVConfig(mean=0.0, variance=0.5, distribution=Distribution.LOG_NORMAL) + assert general_sampler(cfg, rng) > 0.0 + + +def test_general_sampler_exponential_path() -> None: + """Exponential branch produces a strictly positive float.""" + rng = np.random.default_rng(7) + cfg = RVConfig(mean=1.5, variance=1.5, distribution=Distribution.EXPONENTIAL) + assert general_sampler(cfg, rng) > 0.0 diff --git a/tests/unit/simulation/test_requests_generator.py b/tests/unit/simulation/test_requests_generator.py deleted file mode 100644 index fe72f2e..0000000 --- a/tests/unit/simulation/test_requests_generator.py +++ /dev/null @@ -1,145 +0,0 @@ -"""Unit-tests for the requests generator and the SimPy runner. - -All common fixtures (`rng`, `rqs_input`, `sim_settings`, `payload_base`, …) -are defined once in *tests/conftest.py*. -This module focuses purely on behavioural checks. -""" - -from __future__ import annotations - -from types import GeneratorType -from typing import TYPE_CHECKING - -import pytest - -from app.core.simulation.requests_generator import requests_generator -from app.core.simulation.simulation_run import run_simulation - -if TYPE_CHECKING: # static-typing only - from collections.abc import Iterator - - from numpy.random import Generator - - from app.schemas.full_simulation_input import SimulationPayload - from app.schemas.requests_generator_input import RqsGeneratorInput - from app.schemas.simulation_output import SimulationOutput - from app.schemas.simulation_settings_input import SimulationSettings - - -# --------------------------------------------------------------------------- -# REQUESTS-GENERATOR - dispatcher tests -# --------------------------------------------------------------------------- - - -def test_default_requests_generator_uses_poisson_poisson_sampling( - rqs_input: RqsGeneratorInput, - sim_settings: SimulationSettings, - rng: Generator, -) -> None: - """Default distribution must map to *poisson_poisson_sampling*.""" - gen = requests_generator(rqs_input, sim_settings, rng=rng) - - assert isinstance(gen, GeneratorType) - assert gen.gi_code.co_name == "poisson_poisson_sampling" - - -@pytest.mark.parametrize( - ("dist", "expected_sampler"), - [ - ("poisson", "poisson_poisson_sampling"), - ("normal", "gaussian_poisson_sampling"), - ], -) -def test_requests_generator_dispatches_to_correct_sampler( - dist: str, - expected_sampler: str, - rqs_input: RqsGeneratorInput, - sim_settings: SimulationSettings, - rng: Generator, -) -> None: - """Dispatcher must select the sampler matching *dist*.""" - rqs_input.avg_active_users.distribution = dist # type: ignore[assignment] - gen = requests_generator(rqs_input, sim_settings, rng=rng) - - assert isinstance(gen, GeneratorType) - assert gen.gi_code.co_name == expected_sampler - - -# --------------------------------------------------------------------------- -# SIMULATION-RUNNER - horizon handling -# --------------------------------------------------------------------------- - - -def _patch_generator( - monkeypatch: pytest.MonkeyPatch, - gaps: list[float], -) -> None: - """Monkey-patch *requests_generator* with a deterministic gap sequence.""" - - def _fake( - data: RqsGeneratorInput, - config: SimulationSettings, # unused, keeps signature - *, - rng: Generator | None = None, - ) -> Iterator[float]: - yield from gaps - - monkeypatch.setattr( - "app.core.simulation.simulation_run.requests_generator", - _fake, - ) - - -def test_run_simulation_counts_events_up_to_horizon( - monkeypatch: pytest.MonkeyPatch, - payload_base: SimulationPayload, - rng: Generator, -) -> None: - """All events with cumulative time ≤ horizon must be counted.""" - _patch_generator(monkeypatch, gaps=[1.0, 2.0, 3.0, 4.0]) - - output: SimulationOutput = run_simulation(payload_base, rng=rng) - - assert output.total_requests["total_requests"] == 4 - assert output.metric_2 == str( - payload_base.rqs_input.avg_request_per_minute_per_user.mean, - ) - assert output.metric_n == str(payload_base.rqs_input.avg_active_users.mean) - - -def test_run_simulation_skips_event_at_exact_horizon( - monkeypatch: pytest.MonkeyPatch, - payload_base: SimulationPayload, - rng: Generator, -) -> None: - """An event scheduled exactly at *t == horizon* is ignored.""" - horizon = payload_base.sim_settings.total_simulation_time - _patch_generator(monkeypatch, gaps=[float(horizon)]) - - output: SimulationOutput = run_simulation(payload_base, rng=rng) - assert output.total_requests["total_requests"] == 0 - - -def test_run_simulation_excludes_event_beyond_horizon( - monkeypatch: pytest.MonkeyPatch, - payload_base: SimulationPayload, - rng: Generator, -) -> None: - """Events strictly after the horizon must not be counted.""" - horizon = payload_base.sim_settings.total_simulation_time - _patch_generator(monkeypatch, gaps=[float(horizon) + 0.1]) - - output: SimulationOutput = run_simulation(payload_base, rng=rng) - assert output.total_requests["total_requests"] == 0 - - -def test_run_simulation_zero_events_when_generator_empty( - monkeypatch: pytest.MonkeyPatch, - payload_base: SimulationPayload, - rng: Generator, -) -> None: - """No gaps => no requests counted.""" - _patch_generator(monkeypatch, gaps=[]) - - output: SimulationOutput = run_simulation(payload_base, rng=rng) - assert output.total_requests["total_requests"] == 0 diff --git a/tests/unit/test_state.py b/tests/unit/test_state.py new file mode 100644 index 0000000..4612f1b --- /dev/null +++ b/tests/unit/test_state.py @@ -0,0 +1,47 @@ +"""Unit-tests for :class:`RequestState`.""" +from __future__ import annotations + +from app.config.rqs_state import RequestState + +# --------------------------------------------------------------------------- # +# Helpers # +# --------------------------------------------------------------------------- # + + +def _state() -> RequestState: + """Return a fresh RequestState with id=42 and t0=0.0.""" + return RequestState(id=42, initial_time=0.0) + + +# --------------------------------------------------------------------------- # +# Tests # +# --------------------------------------------------------------------------- # + + +def test_record_hop_appends_formatted_entry() -> None: + """Calling *record_hop* stores 'node@timestamp' with 3-dec precision.""" + st = _state() + st.record_hop("generator", now=1.23456) + assert st.history == ["generator@1.235"] # rounded to 3 decimals + + +def test_multiple_hops_preserve_order() -> None: + """History keeps insertion order for consecutive hops.""" + st = _state() + st.record_hop("A", 0.1) + st.record_hop("B", 0.2) + st.record_hop("C", 0.3) + assert st.history == ["A@0.100", "B@0.200", "C@0.300"] + + +def test_latency_none_until_finish_time_set() -> None: + """Latency is None if *finish_time* not assigned.""" + st = _state() + assert st.latency is None + + +def test_latency_returns_difference() -> None: + """Latency equals finish_time - initial_time once completed.""" + st = _state() + st.finish_time = 5.5 + assert st.latency == 5.5 # 5.5 - 0.0