Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions src/asyncflow/runtime/actors/edge.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
waits the sampled delay (and any resource wait) before delivering the
message to the target node's inbox.
"""


from collections.abc import Container, Generator, Mapping
from typing import TYPE_CHECKING

Expand All @@ -16,11 +18,14 @@
from asyncflow.metrics.edge import build_edge_metrics
from asyncflow.runtime.rqs_state import RequestState
from asyncflow.samplers.common_helpers import general_sampler
from asyncflow.schemas.common.random_variables import RVConfig
from asyncflow.schemas.settings.simulation import SimulationSettings
from asyncflow.schemas.topology.edges import Edge

if TYPE_CHECKING:
from asyncflow.schemas.common.random_variables import RVConfig
from pydantic import PositiveFloat




class EdgeRuntime:
Expand Down Expand Up @@ -73,7 +78,6 @@ def __init__( # Noqa: PLR0913
def _deliver(self, state: RequestState) -> Generator[simpy.Event, None, None]:
"""Function to deliver the state to the next node"""
# extract the random variables defining the latency of the edge
random_variable: RVConfig = self.edge_config.latency

uniform_variable = self.rng.uniform()
if uniform_variable < self.edge_config.dropout_rate:
Expand All @@ -85,9 +89,15 @@ def _deliver(self, state: RequestState) -> Generator[simpy.Event, None, None]:
)
return

# latency
latency: RVConfig | PositiveFloat = self.edge_config.latency

self._concurrent_connections +=1

transit_time = general_sampler(random_variable, self.rng)
if isinstance(latency, RVConfig):
transit_time = general_sampler(latency, self.rng)
else:
transit_time = latency


# Logic to add if exists the event injection for the given edge
Expand Down
62 changes: 55 additions & 7 deletions src/asyncflow/runtime/actors/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
during the simulation
"""


from collections.abc import Generator
from typing import cast

import numpy as np
import simpy
from pydantic import PositiveFloat, PositiveInt

from asyncflow.config.constants import (
EndpointStepCPU,
Expand All @@ -22,6 +24,8 @@
from asyncflow.resources.server_containers import ServerContainers
from asyncflow.runtime.actors.edge import EdgeRuntime
from asyncflow.runtime.rqs_state import RequestState
from asyncflow.samplers.common_helpers import general_sampler
from asyncflow.schemas.common.random_variables import RVConfig
from asyncflow.schemas.settings.simulation import SimulationSettings
from asyncflow.schemas.topology.nodes import Server

Expand Down Expand Up @@ -75,6 +79,45 @@ def __init__( # noqa: PLR0913
settings.enabled_sample_metrics,
)

# ------------------------------------------------------------------
# HELPERS
# ------------------------------------------------------------------

def _sample_duration(
self, time: RVConfig | PositiveFloat | PositiveInt,
) -> float:
"""
Return a non-negative duration in seconds.

- RVConfig -> sample via general_sampler(self.rng)
- float/int -> cast to float
- Negative draws are clamped to 0.0 (e.g., Normal tails).
"""
if isinstance(time, RVConfig):
time = float(general_sampler(time, self.rng))
else:
time = float(time)

return time

def _compute_latency_cpu(
self,
cpu_time:PositiveFloat | PositiveInt | RVConfig,
) -> float:
"""Helper to compute the latency of a cpu bound given step"""
return self._sample_duration(cpu_time)

def _compute_latency_io(
self,
io_time:PositiveFloat | PositiveInt | RVConfig,
) -> float:
"""Helper to compute the latency of a IO bound given step"""
return self._sample_duration(io_time)

# -------------------------------------------------------------------
# Main function to elaborate a request
# -------------------------------------------------------------------

# right now we disable the warnings but a refactor will be done soon
def _handle_request( # noqa: PLR0915, PLR0912, C901
self,
Expand Down Expand Up @@ -103,11 +146,12 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901


# Extract the total ram to execute the endpoint
total_ram = sum(
step.step_operation[StepOperation.NECESSARY_RAM]
for step in selected_endpoint.steps
if isinstance(step.kind, EndpointStepRAM)
)
total_ram = 0
for step in selected_endpoint.steps:
if isinstance(step.kind, EndpointStepRAM):
ram = step.step_operation[StepOperation.NECESSARY_RAM]
assert isinstance(ram, int)
total_ram += ram

# ------------------------------------------------------------------
# CPU & RAM SCHEDULING
Expand Down Expand Up @@ -226,15 +270,19 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901

core_locked = True

cpu_time = step.step_operation[StepOperation.CPU_TIME]
cpu_time = self._compute_latency_cpu(
step.step_operation[StepOperation.CPU_TIME],
)
# Execute the step giving back the control to the simpy env
yield self.env.timeout(cpu_time)

# since the object is of an Enum class we check if the step.kind
# is one member of enum
elif isinstance(step.kind, EndpointStepIO):
# define the io time
io_time = step.step_operation[StepOperation.IO_WAITING_TIME]
io_time = self._compute_latency_io(
step.step_operation[StepOperation.IO_WAITING_TIME],
)

if core_locked:
# release the core coming from a cpu step
Expand Down
18 changes: 3 additions & 15 deletions src/asyncflow/schemas/common/random_variables.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,16 @@
"""Definition of the schema for a Random variable"""

from pydantic import BaseModel, field_validator, model_validator
from pydantic import BaseModel, NonNegativeFloat, model_validator

from asyncflow.config.constants import Distribution


class RVConfig(BaseModel):
"""class to configure random variables"""

mean: float
mean: NonNegativeFloat
distribution: Distribution = Distribution.POISSON
variance: float | None = None

@field_validator("mean", mode="before")
def ensure_mean_is_numeric_and_positive(
cls, # noqa: N805
v: float,
) -> float:
"""Ensure `mean` is numeric, then coerce to float."""
err_msg = "mean must be a number (int or float)"
if not isinstance(v, (float, int)):
raise ValueError(err_msg) # noqa: TRY004

return float(v)
variance: NonNegativeFloat | None = None

@model_validator(mode="after") # type: ignore[arg-type]
def default_variance(cls, model: "RVConfig") -> "RVConfig": # noqa: N805
Expand Down
24 changes: 12 additions & 12 deletions src/asyncflow/schemas/topology/edges.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,7 @@
links between different nodes
"""

from pydantic import (
BaseModel,
Field,
field_validator,
model_validator,
)
from pydantic import BaseModel, Field, PositiveFloat, field_validator, model_validator
from pydantic_core.core_schema import ValidationInfo

from asyncflow.config.constants import (
Expand All @@ -32,8 +27,9 @@ class Edge(BaseModel):
Identifier of the source node (where the request comes from).
target : str
Identifier of the destination node (where the request goes to).
latency : RVConfig
Random-variable configuration for network latency on this link.
latency : RVConfig | PositiveFloat
Random-variable configuration for network latency on this link or
positive float value.
probability : float
Probability of taking this edge when there are multiple outgoing links.
Must be in [0.0, 1.0]. Defaults to 1.0 (always taken).
Expand All @@ -45,7 +41,7 @@ class Edge(BaseModel):
id: str
source: str
target: str
latency: RVConfig
latency: RVConfig | PositiveFloat
edge_type: SystemEdges = SystemEdges.NETWORK_CONNECTION
dropout_rate: float = Field(
NetworkParameters.DROPOUT_RATE,
Expand All @@ -66,10 +62,13 @@ class Edge(BaseModel):
@field_validator("latency", mode="after")
def ensure_latency_is_non_negative(
cls, # noqa: N805
v: RVConfig,
v: RVConfig | PositiveFloat,
info: ValidationInfo,
) -> RVConfig:
) -> RVConfig | PositiveFloat:
"""Ensures that the latency's mean and variance are positive."""
if not isinstance(v, RVConfig):
return v

mean = v.mean
variance = v.variance

Expand All @@ -79,9 +78,10 @@ def ensure_latency_is_non_negative(
if mean <= 0:
msg = f"The mean latency of the edge '{edge_id}' must be positive"
raise ValueError(msg)

if variance is not None and variance < 0: # Variance can be zero
msg = (
f"The variance of the latency of the edge {edge_id}"
f"The variance of the latency of the edge {edge_id} "
"must be non negative"
)
raise ValueError(msg)
Expand Down
52 changes: 51 additions & 1 deletion src/asyncflow/schemas/topology/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
EndpointStepRAM,
StepOperation,
)
from asyncflow.schemas.common.random_variables import RVConfig


class Step(BaseModel):
Expand All @@ -23,7 +24,7 @@ class Step(BaseModel):
"""

kind: EndpointStepIO | EndpointStepCPU | EndpointStepRAM
step_operation: dict[StepOperation, PositiveFloat | PositiveInt]
step_operation: dict[StepOperation, PositiveFloat | PositiveInt | RVConfig]

@field_validator("step_operation", mode="before")
def ensure_non_empty(
Expand Down Expand Up @@ -85,6 +86,55 @@ def ensure_coherence_type_operation(

return model

@model_validator(mode="after") # type: ignore[arg-type]
def ensure_cpu_io_positive_rv(cls, model: "Step") -> "Step": # noqa: N805
"""
For CPU/IO steps: if the operation is an RVConfig, require mean > 0
and variance ≥ 0. Deterministic PositiveFloat è già validato.
"""
# safe anche se per qualche motivo ci fossero 0/2+ chiavi
op_val = next(iter(model.step_operation.values()), None)
if op_val is None:
return model

if isinstance(model.kind, EndpointStepCPU) and isinstance(op_val, RVConfig):
if op_val.mean <= 0:
msg = "CPU_TIME RVConfig.mean must be > 0"
raise ValueError(msg)
if op_val.variance is not None and op_val.variance < 0:
msg = "CPU_TIME RVConfig.variance must be >= 0"
raise ValueError(msg)

if isinstance(model.kind, EndpointStepIO) and isinstance(op_val, RVConfig):
if op_val.mean <= 0:
msg = "IO_WAITING_TIME RVConfig.mean must be > 0"
raise ValueError(msg)
if op_val.variance is not None and op_val.variance < 0:
msg = "IO_WAITING_TIME RVConfig.variance must be >= 0"
raise ValueError(msg)

return model

@model_validator(mode="after") # type: ignore[arg-type]
def ensure_ram_positive_int(cls, model: "Step") -> "Step": # noqa: N805
"""For RAM steps: operation must be a positive integer (no RVs/floats)"""
if not isinstance(model.kind, EndpointStepRAM):
return model

op_val = next(iter(model.step_operation.values()), None)
if op_val is None:
return model

if isinstance(op_val, RVConfig) or not isinstance(op_val, int):
msg = "NECESSARY_RAM must be a positive integer"
raise TypeError(msg)

if op_val <= 0:
msg = "NECESSARY_RAM must be > 0"
raise ValueError(msg)

return model




Expand Down
Loading