Skip to content

Commit a5936ab

Browse files
authored
better test coverage, edges deterministic, step server random (#24)
1 parent 87d30ea commit a5936ab

File tree

13 files changed

+771
-38
lines changed

13 files changed

+771
-38
lines changed

src/asyncflow/runtime/actors/edge.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
waits the sampled delay (and any resource wait) before delivering the
77
message to the target node's inbox.
88
"""
9+
10+
911
from collections.abc import Container, Generator, Mapping
1012
from typing import TYPE_CHECKING
1113

@@ -16,11 +18,14 @@
1618
from asyncflow.metrics.edge import build_edge_metrics
1719
from asyncflow.runtime.rqs_state import RequestState
1820
from asyncflow.samplers.common_helpers import general_sampler
21+
from asyncflow.schemas.common.random_variables import RVConfig
1922
from asyncflow.schemas.settings.simulation import SimulationSettings
2023
from asyncflow.schemas.topology.edges import Edge
2124

2225
if TYPE_CHECKING:
23-
from asyncflow.schemas.common.random_variables import RVConfig
26+
from pydantic import PositiveFloat
27+
28+
2429

2530

2631
class EdgeRuntime:
@@ -73,7 +78,6 @@ def __init__( # Noqa: PLR0913
7378
def _deliver(self, state: RequestState) -> Generator[simpy.Event, None, None]:
7479
"""Function to deliver the state to the next node"""
7580
# extract the random variables defining the latency of the edge
76-
random_variable: RVConfig = self.edge_config.latency
7781

7882
uniform_variable = self.rng.uniform()
7983
if uniform_variable < self.edge_config.dropout_rate:
@@ -85,9 +89,15 @@ def _deliver(self, state: RequestState) -> Generator[simpy.Event, None, None]:
8589
)
8690
return
8791

92+
# latency
93+
latency: RVConfig | PositiveFloat = self.edge_config.latency
94+
8895
self._concurrent_connections +=1
8996

90-
transit_time = general_sampler(random_variable, self.rng)
97+
if isinstance(latency, RVConfig):
98+
transit_time = general_sampler(latency, self.rng)
99+
else:
100+
transit_time = latency
91101

92102

93103
# Logic to add if exists the event injection for the given edge

src/asyncflow/runtime/actors/server.py

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
during the simulation
44
"""
55

6+
67
from collections.abc import Generator
78
from typing import cast
89

910
import numpy as np
1011
import simpy
12+
from pydantic import PositiveFloat, PositiveInt
1113

1214
from asyncflow.config.constants import (
1315
EndpointStepCPU,
@@ -22,6 +24,8 @@
2224
from asyncflow.resources.server_containers import ServerContainers
2325
from asyncflow.runtime.actors.edge import EdgeRuntime
2426
from asyncflow.runtime.rqs_state import RequestState
27+
from asyncflow.samplers.common_helpers import general_sampler
28+
from asyncflow.schemas.common.random_variables import RVConfig
2529
from asyncflow.schemas.settings.simulation import SimulationSettings
2630
from asyncflow.schemas.topology.nodes import Server
2731

@@ -75,6 +79,45 @@ def __init__( # noqa: PLR0913
7579
settings.enabled_sample_metrics,
7680
)
7781

82+
# ------------------------------------------------------------------
83+
# HELPERS
84+
# ------------------------------------------------------------------
85+
86+
def _sample_duration(
87+
self, time: RVConfig | PositiveFloat | PositiveInt,
88+
) -> float:
89+
"""
90+
Return a non-negative duration in seconds.
91+
92+
- RVConfig -> sample via general_sampler(self.rng)
93+
- float/int -> cast to float
94+
- Negative draws are clamped to 0.0 (e.g., Normal tails).
95+
"""
96+
if isinstance(time, RVConfig):
97+
time = float(general_sampler(time, self.rng))
98+
else:
99+
time = float(time)
100+
101+
return time
102+
103+
def _compute_latency_cpu(
104+
self,
105+
cpu_time:PositiveFloat | PositiveInt | RVConfig,
106+
) -> float:
107+
"""Helper to compute the latency of a cpu bound given step"""
108+
return self._sample_duration(cpu_time)
109+
110+
def _compute_latency_io(
111+
self,
112+
io_time:PositiveFloat | PositiveInt | RVConfig,
113+
) -> float:
114+
"""Helper to compute the latency of a IO bound given step"""
115+
return self._sample_duration(io_time)
116+
117+
# -------------------------------------------------------------------
118+
# Main function to elaborate a request
119+
# -------------------------------------------------------------------
120+
78121
# right now we disable the warnings but a refactor will be done soon
79122
def _handle_request( # noqa: PLR0915, PLR0912, C901
80123
self,
@@ -103,11 +146,12 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901
103146

104147

105148
# Extract the total ram to execute the endpoint
106-
total_ram = sum(
107-
step.step_operation[StepOperation.NECESSARY_RAM]
108-
for step in selected_endpoint.steps
109-
if isinstance(step.kind, EndpointStepRAM)
110-
)
149+
total_ram = 0
150+
for step in selected_endpoint.steps:
151+
if isinstance(step.kind, EndpointStepRAM):
152+
ram = step.step_operation[StepOperation.NECESSARY_RAM]
153+
assert isinstance(ram, int)
154+
total_ram += ram
111155

112156
# ------------------------------------------------------------------
113157
# CPU & RAM SCHEDULING
@@ -226,15 +270,19 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901
226270

227271
core_locked = True
228272

229-
cpu_time = step.step_operation[StepOperation.CPU_TIME]
273+
cpu_time = self._compute_latency_cpu(
274+
step.step_operation[StepOperation.CPU_TIME],
275+
)
230276
# Execute the step giving back the control to the simpy env
231277
yield self.env.timeout(cpu_time)
232278

233279
# since the object is of an Enum class we check if the step.kind
234280
# is one member of enum
235281
elif isinstance(step.kind, EndpointStepIO):
236282
# define the io time
237-
io_time = step.step_operation[StepOperation.IO_WAITING_TIME]
283+
io_time = self._compute_latency_io(
284+
step.step_operation[StepOperation.IO_WAITING_TIME],
285+
)
238286

239287
if core_locked:
240288
# release the core coming from a cpu step

src/asyncflow/schemas/common/random_variables.py

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,16 @@
11
"""Definition of the schema for a Random variable"""
22

3-
from pydantic import BaseModel, field_validator, model_validator
3+
from pydantic import BaseModel, NonNegativeFloat, model_validator
44

55
from asyncflow.config.constants import Distribution
66

77

88
class RVConfig(BaseModel):
99
"""class to configure random variables"""
1010

11-
mean: float
11+
mean: NonNegativeFloat
1212
distribution: Distribution = Distribution.POISSON
13-
variance: float | None = None
14-
15-
@field_validator("mean", mode="before")
16-
def ensure_mean_is_numeric_and_positive(
17-
cls, # noqa: N805
18-
v: float,
19-
) -> float:
20-
"""Ensure `mean` is numeric, then coerce to float."""
21-
err_msg = "mean must be a number (int or float)"
22-
if not isinstance(v, (float, int)):
23-
raise ValueError(err_msg) # noqa: TRY004
24-
25-
return float(v)
13+
variance: NonNegativeFloat | None = None
2614

2715
@model_validator(mode="after") # type: ignore[arg-type]
2816
def default_variance(cls, model: "RVConfig") -> "RVConfig": # noqa: N805

src/asyncflow/schemas/topology/edges.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,7 @@
33
links between different nodes
44
"""
55

6-
from pydantic import (
7-
BaseModel,
8-
Field,
9-
field_validator,
10-
model_validator,
11-
)
6+
from pydantic import BaseModel, Field, PositiveFloat, field_validator, model_validator
127
from pydantic_core.core_schema import ValidationInfo
138

149
from asyncflow.config.constants import (
@@ -32,8 +27,9 @@ class Edge(BaseModel):
3227
Identifier of the source node (where the request comes from).
3328
target : str
3429
Identifier of the destination node (where the request goes to).
35-
latency : RVConfig
36-
Random-variable configuration for network latency on this link.
30+
latency : RVConfig | PositiveFloat
31+
Random-variable configuration for network latency on this link or
32+
positive float value.
3733
probability : float
3834
Probability of taking this edge when there are multiple outgoing links.
3935
Must be in [0.0, 1.0]. Defaults to 1.0 (always taken).
@@ -45,7 +41,7 @@ class Edge(BaseModel):
4541
id: str
4642
source: str
4743
target: str
48-
latency: RVConfig
44+
latency: RVConfig | PositiveFloat
4945
edge_type: SystemEdges = SystemEdges.NETWORK_CONNECTION
5046
dropout_rate: float = Field(
5147
NetworkParameters.DROPOUT_RATE,
@@ -66,10 +62,13 @@ class Edge(BaseModel):
6662
@field_validator("latency", mode="after")
6763
def ensure_latency_is_non_negative(
6864
cls, # noqa: N805
69-
v: RVConfig,
65+
v: RVConfig | PositiveFloat,
7066
info: ValidationInfo,
71-
) -> RVConfig:
67+
) -> RVConfig | PositiveFloat:
7268
"""Ensures that the latency's mean and variance are positive."""
69+
if not isinstance(v, RVConfig):
70+
return v
71+
7372
mean = v.mean
7473
variance = v.variance
7574

@@ -79,9 +78,10 @@ def ensure_latency_is_non_negative(
7978
if mean <= 0:
8079
msg = f"The mean latency of the edge '{edge_id}' must be positive"
8180
raise ValueError(msg)
81+
8282
if variance is not None and variance < 0: # Variance can be zero
8383
msg = (
84-
f"The variance of the latency of the edge {edge_id}"
84+
f"The variance of the latency of the edge {edge_id} "
8585
"must be non negative"
8686
)
8787
raise ValueError(msg)

src/asyncflow/schemas/topology/endpoint.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
EndpointStepRAM,
1515
StepOperation,
1616
)
17+
from asyncflow.schemas.common.random_variables import RVConfig
1718

1819

1920
class Step(BaseModel):
@@ -23,7 +24,7 @@ class Step(BaseModel):
2324
"""
2425

2526
kind: EndpointStepIO | EndpointStepCPU | EndpointStepRAM
26-
step_operation: dict[StepOperation, PositiveFloat | PositiveInt]
27+
step_operation: dict[StepOperation, PositiveFloat | PositiveInt | RVConfig]
2728

2829
@field_validator("step_operation", mode="before")
2930
def ensure_non_empty(
@@ -85,6 +86,55 @@ def ensure_coherence_type_operation(
8586

8687
return model
8788

89+
@model_validator(mode="after") # type: ignore[arg-type]
90+
def ensure_cpu_io_positive_rv(cls, model: "Step") -> "Step": # noqa: N805
91+
"""
92+
For CPU/IO steps: if the operation is an RVConfig, require mean > 0
93+
and variance ≥ 0. Deterministic PositiveFloat è già validato.
94+
"""
95+
# safe anche se per qualche motivo ci fossero 0/2+ chiavi
96+
op_val = next(iter(model.step_operation.values()), None)
97+
if op_val is None:
98+
return model
99+
100+
if isinstance(model.kind, EndpointStepCPU) and isinstance(op_val, RVConfig):
101+
if op_val.mean <= 0:
102+
msg = "CPU_TIME RVConfig.mean must be > 0"
103+
raise ValueError(msg)
104+
if op_val.variance is not None and op_val.variance < 0:
105+
msg = "CPU_TIME RVConfig.variance must be >= 0"
106+
raise ValueError(msg)
107+
108+
if isinstance(model.kind, EndpointStepIO) and isinstance(op_val, RVConfig):
109+
if op_val.mean <= 0:
110+
msg = "IO_WAITING_TIME RVConfig.mean must be > 0"
111+
raise ValueError(msg)
112+
if op_val.variance is not None and op_val.variance < 0:
113+
msg = "IO_WAITING_TIME RVConfig.variance must be >= 0"
114+
raise ValueError(msg)
115+
116+
return model
117+
118+
@model_validator(mode="after") # type: ignore[arg-type]
119+
def ensure_ram_positive_int(cls, model: "Step") -> "Step": # noqa: N805
120+
"""For RAM steps: operation must be a positive integer (no RVs/floats)"""
121+
if not isinstance(model.kind, EndpointStepRAM):
122+
return model
123+
124+
op_val = next(iter(model.step_operation.values()), None)
125+
if op_val is None:
126+
return model
127+
128+
if isinstance(op_val, RVConfig) or not isinstance(op_val, int):
129+
msg = "NECESSARY_RAM must be a positive integer"
130+
raise TypeError(msg)
131+
132+
if op_val <= 0:
133+
msg = "NECESSARY_RAM must be > 0"
134+
raise ValueError(msg)
135+
136+
return model
137+
88138

89139

90140

File renamed without changes.
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)