Skip to content

Commit 35d507e

Browse files
GioeleB00Copilot
andauthored
Features/rqs generator runtime (#5)
* definition of state and RqsGeneratorRuntime * defined edge runtime and more central logic for sampler * minor changes * minor changes * minor bug fixed * pytest adapted to the new structure, added pytest for rqs_state * Update src/app/core/runtime/rqs_generator.py Co-authored-by: Copilot <[email protected]> * Update src/app/core/runtime/rqs_generator.py Co-authored-by: Copilot <[email protected]> * Update src/app/core/runtime/edge.py Co-authored-by: Copilot <[email protected]> * Update src/app/core/runtime/edge.py Co-authored-by: Copilot <[email protected]> * Update tests/unit/runtime/test_requests_generator.py Co-authored-by: Copilot <[email protected]> * Update src/app/core/event_samplers/common_helpers.py Co-authored-by: Copilot <[email protected]> * Update src/app/core/event_samplers/common_helpers.py Co-authored-by: Copilot <[email protected]> * Update src/app/core/event_samplers/common_helpers.py Co-authored-by: Copilot <[email protected]> * minor changes --------- Co-authored-by: Copilot <[email protected]>
1 parent 953e318 commit 35d507e

24 files changed

+1085
-389
lines changed

src/app/api/simulation.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,4 @@
11
""""Api to simulate the process"""
22

3-
import numpy as np
4-
from fastapi import APIRouter
5-
6-
from app.core.simulation.simulation_run import run_simulation
7-
from app.schemas.full_simulation_input import SimulationPayload
8-
from app.schemas.simulation_output import SimulationOutput
9-
10-
router = APIRouter()
11-
12-
@router.post("/simulation")
13-
async def event_loop_simulation(input_data: SimulationPayload) -> SimulationOutput:
14-
"""Run the simulation and return aggregate KPIs."""
15-
rng = np.random.default_rng()
16-
return run_simulation(input_data, rng=rng)
173

184

src/app/config/constants.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class Distribution(StrEnum):
4949
NORMAL = "normal"
5050
LOG_NORMAL = "log_normal"
5151
EXPONENTIAL = "exponential"
52+
UNIFORM = "uniform"
5253

5354
# ======================================================================
5455
# CONSTANTS FOR ENDPOINT STEP DEFINITION (REQUEST-HANDLER)
@@ -123,7 +124,7 @@ class EndpointStepRAM(StrEnum):
123124
RAM = "ram"
124125

125126

126-
class Metrics(StrEnum):
127+
class StepOperation(StrEnum):
127128
"""
128129
Keys used inside the ``metrics`` dictionary of a *step*.
129130
@@ -153,6 +154,17 @@ class ServerResourcesDefaults:
153154
MINIMUM_RAM_MB = 256
154155
DB_CONNECTION_POOL = None
155156

157+
# ======================================================================
158+
# CONSTANTS FOR NETWORK PARAMETERS
159+
# ======================================================================
160+
161+
class NetworkParameters:
162+
"""parameters for the network"""
163+
164+
MIN_DROPOUT_RATE = 0.0
165+
DROPOUT_RATE = 0.01
166+
MAX_DROPOUT_RATE = 1.0
167+
156168
# ======================================================================
157169
# CONSTANTS FOR THE MACRO-TOPOLOGY GRAPH
158170
# ======================================================================
@@ -165,6 +177,7 @@ class SystemNodes(StrEnum):
165177
resources (CPU cores, DB pool, etc.).
166178
"""
167179

180+
GENERATOR = "generator"
168181
SERVER = "server"
169182
CLIENT = "client"
170183
LOAD_BALANCER = "load_balancer"

src/app/config/rqs_state.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
"""
2+
defining a state in a one to one correspondence
3+
with the requests generated that will go through
4+
all the node necessary to accomplish the user request
5+
"""
6+
7+
from __future__ import annotations
8+
9+
from dataclasses import dataclass, field
10+
11+
12+
@dataclass
13+
class RequestState:
14+
"""
15+
State object carried by each request through the simulation.
16+
17+
Attributes:
18+
id: Unique identifier of the request.
19+
t0: Timestamp (simulated env.now) when the request was generated.
20+
history: List of hop records, each noting a node/edge visit.
21+
finish_time: Timestamp when the requests is satisfied
22+
23+
"""
24+
25+
id: int # Unique request identifier
26+
initial_time: float # Generation timestamp (env.now)
27+
finish_time: float | None = None # a requests might be dropped
28+
history: list[str] = field(default_factory=list) # Trace of hops
29+
30+
def record_hop(self, node_name: str, now: float) -> None:
31+
"""
32+
Append a record of visiting a node or edge.
33+
34+
Args:
35+
node_name: Name of the node or edge being recorded.
36+
now: register the time of the operation
37+
38+
"""
39+
# Record hop as "NodeName@Timestamp"
40+
self.history.append(f"{node_name}@{now:.3f}")
41+
42+
@property
43+
def latency(self) -> float | None:
44+
"""
45+
Return the total time in the system (finish_time - initial_time),
46+
or None if the request hasn't completed yet.
47+
"""
48+
if self.finish_time is None:
49+
return None
50+
return self.finish_time - self.initial_time

src/app/core/event_samplers/common_helpers.py

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,31 +3,74 @@
33

44
import numpy as np
55

6+
from app.config.constants import Distribution
7+
from app.schemas.random_variables_config import RVConfig
68

7-
def uniform_variable_generator(rng: np.random.Generator | None = None) -> float:
8-
"""Return U~Uniform(0, 1)."""
9-
rng = rng or np.random.default_rng()
10-
return float(rng.random())
119

10+
def uniform_variable_generator(rng: np.random.Generator) -> float:
11+
"""Return U~Uniform(0, 1)."""
12+
# rng is guaranteed to be a valid np.random.Generator due to the type signature.
13+
return rng.random()
1214

1315
def poisson_variable_generator(
1416
mean: float,
15-
rng: np.random.Generator | None = None,
16-
) -> int:
17+
rng: np.random.Generator,
18+
) -> float:
1719
"""Return a Poisson-distributed integer with expectation *mean*."""
18-
rng = rng or np.random.default_rng()
19-
return int(rng.poisson(mean))
20-
20+
return rng.poisson(mean)
2121

2222
def truncated_gaussian_generator(
2323
mean: float,
2424
variance: float,
2525
rng: np.random.Generator,
26-
) -> int:
26+
) -> float:
2727
"""
2828
Generate a Normal-distributed variable
2929
with mean and variance
3030
"""
31-
rng = rng or np.random.default_rng()
3231
value = rng.normal(mean, variance)
33-
return max(0, int(value))
32+
return max(0.0, value)
33+
34+
def lognormal_variable_generator(
35+
mean: float,
36+
variance: float,
37+
rng: np.random.Generator,
38+
) -> float:
39+
"""Return a Poisson-distributed floateger with expectation *mean*."""
40+
return rng.lognormal(mean, variance)
41+
42+
def exponential_variable_generator(
43+
mean: float,
44+
rng: np.random.Generator,
45+
) -> float:
46+
"""Return an exponentially-distributed float with mean *mean*."""
47+
return float(rng.exponential(mean))
48+
49+
def general_sampler(random_variable: RVConfig, rng: np.random.Generator) -> float:
50+
"""Sample a number according to the distribution described in `random_variable`."""
51+
dist = random_variable.distribution
52+
mean = random_variable.mean
53+
54+
match dist:
55+
case Distribution.UNIFORM:
56+
57+
assert random_variable.variance is None
58+
return uniform_variable_generator(rng)
59+
60+
case _:
61+
62+
variance = random_variable.variance
63+
assert variance is not None
64+
65+
match dist:
66+
case Distribution.NORMAL:
67+
return truncated_gaussian_generator(mean, variance, rng)
68+
case Distribution.LOG_NORMAL:
69+
return lognormal_variable_generator(mean, variance, rng)
70+
case Distribution.POISSON:
71+
return float(poisson_variable_generator(mean, rng))
72+
case Distribution.EXPONENTIAL:
73+
return exponential_variable_generator(mean, rng)
74+
case _:
75+
msg = f"Unsupported distribution: {dist}"
76+
raise ValueError(msg)

src/app/core/event_samplers/gaussian_poisson.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def gaussian_poisson_sampling(
2424
input_data: RqsGeneratorInput,
2525
sim_settings: SimulationSettings,
2626
*,
27-
rng: np.random.Generator | None = None,
27+
rng: np.random.Generator,
2828
) -> Generator[float, None, None]:
2929
"""
3030
Yield inter-arrival gaps (seconds) for the compound Gaussian-Poisson process.
@@ -39,8 +39,6 @@ def gaussian_poisson_sampling(
3939
Δt ~ Exponential(Λ) using inverse-CDF.
4040
4. Stop once the virtual clock exceeds *total_simulation_time*.
4141
"""
42-
rng = rng or np.random.default_rng()
43-
4442
simulation_time = sim_settings.total_simulation_time
4543
user_sampling_window = input_data.user_sampling_window
4644

src/app/core/event_samplers/poisson_poisson.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def poisson_poisson_sampling(
2121
input_data: RqsGeneratorInput,
2222
sim_settings: SimulationSettings,
2323
*,
24-
rng: np.random.Generator | None = None,
24+
rng: np.random.Generator,
2525
) -> Generator[float, None, None]:
2626
"""
2727
Yield inter-arrival gaps (seconds) for the compound Poisson-Poisson process.
@@ -36,8 +36,6 @@ def poisson_poisson_sampling(
3636
Δt ~ Exponential(Λ) using inverse-CDF.
3737
4. Stop once the virtual clock exceeds *total_simulation_time*.
3838
"""
39-
rng = rng or np.random.default_rng()
40-
4139
simulation_time = sim_settings.total_simulation_time
4240
user_sampling_window = input_data.user_sampling_window
4341

File renamed without changes.

src/app/core/simulation/requests_generator.py renamed to src/app/core/helpers/requests_generator.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
"""
2-
Continuous-time event sampling for the Poisson-Poisson
3-
and Gaussian-Poisson workload model.
2+
SimPy process that generates user requests at stochastic intervals.
3+
4+
This node samples inter-arrival times according to the configured
5+
distribution (Gaussian-Poisson or Poisson-Poisson), constructs a
6+
RequestState for each new request, records its origin hop, and
7+
immediately pushes it into the next pipeline stage via an EdgeRuntime.
48
"""
59

610
from __future__ import annotations
@@ -24,7 +28,7 @@ def requests_generator(
2428
input_data: RqsGeneratorInput,
2529
sim_settings: SimulationSettings,
2630
*,
27-
rng: np.random.Generator | None = None,
31+
rng: np.random.Generator,
2832
) -> Generator[float, None, None]:
2933
"""
3034
Return an iterator of inter-arrival gaps (seconds) according to the model

src/app/core/runtime/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""module for the runtime folder"""

src/app/core/runtime/edge.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""
2+
Unidirectional link that simulates message transmission between nodes.
3+
Encapsulates network behavior—latency sampling (LogNormal, Exponential, etc.),
4+
drop probability, and optional connection-pool contention—by exposing a
5+
`send(msg)` method. Each `send` call schedules a SimPy subprocess that
6+
waits the sampled delay (and any resource wait) before delivering the
7+
message to the target node's inbox.
8+
"""
9+
from collections.abc import Generator
10+
from typing import TYPE_CHECKING
11+
12+
import numpy as np
13+
import simpy
14+
15+
from app.config.rqs_state import RequestState
16+
from app.core.event_samplers.common_helpers import general_sampler
17+
from app.schemas.system_topology_schema.full_system_topology_schema import Edge
18+
19+
if TYPE_CHECKING:
20+
from app.schemas.random_variables_config import RVConfig
21+
22+
23+
24+
class EdgeRuntime:
25+
"""definining the logic to handle the edges during the simulation"""
26+
27+
def __init__(
28+
self,
29+
*,
30+
env: simpy.Environment,
31+
edge_config: Edge,
32+
rng: np.random.Generator | None = None,
33+
target_box: simpy.Store,
34+
) -> None:
35+
"""Definition of the instance attributes"""
36+
self.env = env
37+
self.edge_config = edge_config
38+
self.target_box = target_box
39+
self.rng = rng or np.random.default_rng()
40+
41+
def _deliver(self, state: RequestState) -> Generator[simpy.Event, None, None]:
42+
"""Function to deliver the state to the next node"""
43+
# extract the random variables defining the latency of the edge
44+
random_variable: RVConfig = self.edge_config.latency
45+
46+
uniform_variable = self.rng.uniform()
47+
if uniform_variable < self.edge_config.dropout_rate:
48+
state.finish_time = self.env.now
49+
state.record_hop(f"{self.edge_config.id}-dropped", state.finish_time)
50+
return
51+
52+
transit_time = general_sampler(random_variable, self.rng)
53+
yield self.env.timeout(transit_time)
54+
state.record_hop(self.edge_config.id, self.env.now)
55+
yield self.target_box.put(state)
56+
57+
58+
def transport(self, state: RequestState) -> simpy.Process:
59+
"""
60+
Called by the upstream node. Immediately spins off a SimPy process
61+
that will handle drop + delay + delivery of `state`.
62+
"""
63+
return self.env.process(self._deliver(state))
64+
65+
66+
67+
68+
69+

0 commit comments

Comments
 (0)