Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions src/app/api/simulation.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,4 @@
""""Api to simulate the process"""

import numpy as np
from fastapi import APIRouter

from app.core.simulation.simulation_run import run_simulation
from app.schemas.full_simulation_input import SimulationPayload
from app.schemas.simulation_output import SimulationOutput

router = APIRouter()

@router.post("/simulation")
async def event_loop_simulation(input_data: SimulationPayload) -> SimulationOutput:
"""Run the simulation and return aggregate KPIs."""
rng = np.random.default_rng()
return run_simulation(input_data, rng=rng)


15 changes: 14 additions & 1 deletion src/app/config/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class Distribution(StrEnum):
NORMAL = "normal"
LOG_NORMAL = "log_normal"
EXPONENTIAL = "exponential"
UNIFORM = "uniform"

# ======================================================================
# CONSTANTS FOR ENDPOINT STEP DEFINITION (REQUEST-HANDLER)
Expand Down Expand Up @@ -123,7 +124,7 @@ class EndpointStepRAM(StrEnum):
RAM = "ram"


class Metrics(StrEnum):
class StepOperation(StrEnum):
"""
Keys used inside the ``metrics`` dictionary of a *step*.

Expand Down Expand Up @@ -153,6 +154,17 @@ class ServerResourcesDefaults:
MINIMUM_RAM_MB = 256
DB_CONNECTION_POOL = None

# ======================================================================
# CONSTANTS FOR NETWORK PARAMETERS
# ======================================================================

class NetworkParameters:
"""parameters for the network"""

MIN_DROPOUT_RATE = 0.0
DROPOUT_RATE = 0.01
MAX_DROPOUT_RATE = 1.0

# ======================================================================
# CONSTANTS FOR THE MACRO-TOPOLOGY GRAPH
# ======================================================================
Expand All @@ -165,6 +177,7 @@ class SystemNodes(StrEnum):
resources (CPU cores, DB pool, etc.).
"""

GENERATOR = "generator"
SERVER = "server"
CLIENT = "client"
LOAD_BALANCER = "load_balancer"
Expand Down
50 changes: 50 additions & 0 deletions src/app/config/rqs_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
defining a state in a one to one correspondence
with the requests generated that will go through
all the node necessary to accomplish the user request
"""

from __future__ import annotations

from dataclasses import dataclass, field


@dataclass
class RequestState:
"""
State object carried by each request through the simulation.
Attributes:
id: Unique identifier of the request.
t0: Timestamp (simulated env.now) when the request was generated.
history: List of hop records, each noting a node/edge visit.
Copy link

Copilot AI Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring uses incorrect attribute name 'history' instead of the actual attribute name 'hops'. However, looking at the implementation, the attribute is actually called 'history', so this comment is about the inconsistency in the test file that references 'hops'.

Copilot uses AI. Check for mistakes.
finish_time: Timestamp when the requests is satisfied
"""

id: int # Unique request identifier
initial_time: float # Generation timestamp (env.now)
finish_time: float | None = None # a requests might be dropped
history: list[str] = field(default_factory=list) # Trace of hops

def record_hop(self, node_name: str, now: float) -> None:
"""
Append a record of visiting a node or edge.
Args:
node_name: Name of the node or edge being recorded.
now: register the time of the operation
"""
# Record hop as "NodeName@Timestamp"
self.history.append(f"{node_name}@{now:.3f}")

@property
def latency(self) -> float | None:
"""
Return the total time in the system (finish_time - initial_time),
or None if the request hasn't completed yet.
"""
if self.finish_time is None:
return None
return self.finish_time - self.initial_time
67 changes: 55 additions & 12 deletions src/app/core/event_samplers/common_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,74 @@

import numpy as np

from app.config.constants import Distribution
from app.schemas.random_variables_config import RVConfig

def uniform_variable_generator(rng: np.random.Generator | None = None) -> float:
"""Return U~Uniform(0, 1)."""
rng = rng or np.random.default_rng()
return float(rng.random())

def uniform_variable_generator(rng: np.random.Generator) -> float:
"""Return U~Uniform(0, 1)."""
# rng is guaranteed to be a valid np.random.Generator due to the type signature.
return rng.random()

def poisson_variable_generator(
mean: float,
rng: np.random.Generator | None = None,
) -> int:
rng: np.random.Generator,
) -> float:
"""Return a Poisson-distributed integer with expectation *mean*."""
rng = rng or np.random.default_rng()
return int(rng.poisson(mean))

return rng.poisson(mean)

def truncated_gaussian_generator(
mean: float,
variance: float,
rng: np.random.Generator,
) -> int:
) -> float:
"""
Generate a Normal-distributed variable
with mean and variance
"""
rng = rng or np.random.default_rng()
value = rng.normal(mean, variance)
return max(0, int(value))
return max(0.0, value)

def lognormal_variable_generator(
mean: float,
variance: float,
rng: np.random.Generator,
) -> float:
"""Return a Poisson-distributed floateger with expectation *mean*."""
return rng.lognormal(mean, variance)

def exponential_variable_generator(
mean: float,
rng: np.random.Generator,
) -> float:
"""Return an exponentially-distributed float with mean *mean*."""
return float(rng.exponential(mean))

def general_sampler(random_variable: RVConfig, rng: np.random.Generator) -> float:
"""Sample a number according to the distribution described in `random_variable`."""
dist = random_variable.distribution
mean = random_variable.mean

match dist:
case Distribution.UNIFORM:

assert random_variable.variance is None
return uniform_variable_generator(rng)

case _:

variance = random_variable.variance
assert variance is not None

match dist:
case Distribution.NORMAL:
return truncated_gaussian_generator(mean, variance, rng)
case Distribution.LOG_NORMAL:
return lognormal_variable_generator(mean, variance, rng)
case Distribution.POISSON:
return float(poisson_variable_generator(mean, rng))
case Distribution.EXPONENTIAL:
return exponential_variable_generator(mean, rng)
case _:
msg = f"Unsupported distribution: {dist}"
raise ValueError(msg)
4 changes: 1 addition & 3 deletions src/app/core/event_samplers/gaussian_poisson.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def gaussian_poisson_sampling(
input_data: RqsGeneratorInput,
sim_settings: SimulationSettings,
*,
rng: np.random.Generator | None = None,
rng: np.random.Generator,
) -> Generator[float, None, None]:
"""
Yield inter-arrival gaps (seconds) for the compound Gaussian-Poisson process.
Expand All @@ -39,8 +39,6 @@ def gaussian_poisson_sampling(
Δt ~ Exponential(Λ) using inverse-CDF.
4. Stop once the virtual clock exceeds *total_simulation_time*.
"""
rng = rng or np.random.default_rng()

simulation_time = sim_settings.total_simulation_time
user_sampling_window = input_data.user_sampling_window

Expand Down
4 changes: 1 addition & 3 deletions src/app/core/event_samplers/poisson_poisson.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def poisson_poisson_sampling(
input_data: RqsGeneratorInput,
sim_settings: SimulationSettings,
*,
rng: np.random.Generator | None = None,
rng: np.random.Generator,
) -> Generator[float, None, None]:
"""
Yield inter-arrival gaps (seconds) for the compound Poisson-Poisson process.
Expand All @@ -36,8 +36,6 @@ def poisson_poisson_sampling(
Δt ~ Exponential(Λ) using inverse-CDF.
4. Stop once the virtual clock exceeds *total_simulation_time*.
"""
rng = rng or np.random.default_rng()

simulation_time = sim_settings.total_simulation_time
user_sampling_window = input_data.user_sampling_window

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""
Continuous-time event sampling for the Poisson-Poisson
and Gaussian-Poisson workload model.
SimPy process that generates user requests at stochastic intervals.

This node samples inter-arrival times according to the configured
distribution (Gaussian-Poisson or Poisson-Poisson), constructs a
RequestState for each new request, records its origin hop, and
immediately pushes it into the next pipeline stage via an EdgeRuntime.
"""

from __future__ import annotations
Expand All @@ -24,7 +28,7 @@ def requests_generator(
input_data: RqsGeneratorInput,
sim_settings: SimulationSettings,
*,
rng: np.random.Generator | None = None,
rng: np.random.Generator,
) -> Generator[float, None, None]:
"""
Return an iterator of inter-arrival gaps (seconds) according to the model
Expand Down
1 change: 1 addition & 0 deletions src/app/core/runtime/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""module for the runtime folder"""
69 changes: 69 additions & 0 deletions src/app/core/runtime/edge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Unidirectional link that simulates message transmission between nodes.
Encapsulates network behavior—latency sampling (LogNormal, Exponential, etc.),
drop probability, and optional connection-pool contention—by exposing a
`send(msg)` method. Each `send` call schedules a SimPy subprocess that
waits the sampled delay (and any resource wait) before delivering the
message to the target node's inbox.
"""
from collections.abc import Generator
from typing import TYPE_CHECKING

import numpy as np
import simpy

from app.config.rqs_state import RequestState
from app.core.event_samplers.common_helpers import general_sampler
from app.schemas.system_topology_schema.full_system_topology_schema import Edge

if TYPE_CHECKING:
from app.schemas.random_variables_config import RVConfig



class EdgeRuntime:
"""definining the logic to handle the edges during the simulation"""

def __init__(
self,
*,
env: simpy.Environment,
edge_config: Edge,
rng: np.random.Generator | None = None,
target_box: simpy.Store,
) -> None:
"""Definition of the instance attributes"""
self.env = env
self.edge_config = edge_config
self.target_box = target_box
self.rng = rng or np.random.default_rng()

def _deliver(self, state: RequestState) -> Generator[simpy.Event, None, None]:
"""Function to deliver the state to the next node"""
# extract the random variables defining the latency of the edge
random_variable: RVConfig = self.edge_config.latency

uniform_variable = self.rng.uniform()
if uniform_variable < self.edge_config.dropout_rate:
state.finish_time = self.env.now
state.record_hop(f"{self.edge_config.id}-dropped", state.finish_time)
return

transit_time = general_sampler(random_variable, self.rng)
yield self.env.timeout(transit_time)
state.record_hop(self.edge_config.id, self.env.now)
yield self.target_box.put(state)


def transport(self, state: RequestState) -> simpy.Process:
"""
Called by the upstream node. Immediately spins off a SimPy process
that will handle drop + delay + delivery of `state`.
"""
return self.env.process(self._deliver(state))






Loading