diff --git a/docs/api/event-injection.md b/docs/api/event-injection.md new file mode 100644 index 0000000..9156364 --- /dev/null +++ b/docs/api/event-injection.md @@ -0,0 +1,246 @@ +# EventInjection — Public API Documentation + +## Overview + +`EventInjection` declares a **time-bounded event** that affects a component in the simulation. Each event targets either a **server** or a **network edge**, and is delimited by a `start` marker and an `end` marker. + +Supported families (per code): + +* **Server availability**: `SERVER_DOWN` → `SERVER_UP` +* **Network latency spike (deterministic offset in seconds)**: `NETWORK_SPIKE_START` → `NETWORK_SPIKE_END` + For network spikes, the `Start` marker carries the amplitude in seconds via `spike_s`. + +Strictness: + +* Models use `ConfigDict(extra="forbid", frozen=True)` + → unknown fields are rejected; instances are immutable at runtime. + +--- + +## Data Model + +### `Start` + +* `kind: Literal[SERVER_DOWN, NETWORK_SPIKE_START]` + Event family selector. +* `t_start: NonNegativeFloat` + Start time in **seconds** from simulation start; **≥ 0.0**. +* `spike_s: PositiveFloat | None` + **Required** and **> 0** **only** when `kind == NETWORK_SPIKE_START`. + **Forbidden** (must be omitted/`None`) for any other kind. + +### `End` + +* `kind: Literal[SERVER_UP, NETWORK_SPIKE_END]` + Must match the start family (see invariants). +* `t_end: PositiveFloat` + End time in **seconds**; **> 0.0**. + +### `EventInjection` + +* `event_id: str` + Unique identifier within the simulation payload. +* `target_id: str` + Identifier of the affected component (server or edge) as defined in the topology. +* `start: Start` + Start marker. +* `end: End` + End marker. + +--- + +## Validation & Invariants (as implemented) + +### Within `EventInjection` + +1. **Family coherence** + + * `SERVER_DOWN` → `SERVER_UP` + * `NETWORK_SPIKE_START` → `NETWORK_SPIKE_END` + Any other pairing raises: + + ``` + The event {event_id} must have as value of kind in end {expected} + ``` +2. **Temporal ordering** + + * `t_start < t_end` (with `t_start ≥ 0.0`, `t_end > 0.0`) + Error: + + ``` + The starting time for the event {event_id} must be smaller than the ending time + ``` +3. **Network spike parameter** + + * If `start.kind == NETWORK_SPIKE_START` ⇒ `start.spike_s` **must** be provided and be a positive float (seconds). + Error: + + ``` + The field spike_s for the event {event_id} must be defined as a positive float (seconds) + ``` + * Otherwise (`SERVER_DOWN`) ⇒ `start.spike_s` **must be omitted** / `None`. + Error: + + ``` + Event {event_id}: spike_s must be omitted for non-network events + ``` + +### Enforced at `SimulationPayload` level + +4. **Unique event IDs** + Error: + + ``` + The id's representing different events must be unique + ``` +5. **Target existence & compatibility** + + * For server events (`SERVER_DOWN`), `target_id` must refer to a **server**. + * For network spikes (`NETWORK_SPIKE_START`), `target_id` must refer to an **edge**. + Errors: + + ``` + The target id {target_id} related to the event {event_id} does not exist + ``` + + ``` + The event {event_id} regarding a server does not have a compatible target id + ``` + + ``` + The event {event_id} regarding an edge does not have a compatible target id + ``` +6. **Times within simulation horizon** (with `T = sim_settings.total_simulation_time`) + + * `t_start >= 0.0` and `t_start <= T` + * `t_end <= T` + Errors: + + ``` + Event '{event_id}': start time t_start={t:.6f} must be >= 0.0 + Event '{event_id}': start time t_start={t:.6f} exceeds simulation horizon T={T:.6f} + Event '{event_id}': end time t_end={t:.6f} exceeds simulation horizon T={T:.6f} + ``` +7. **Global liveness rule (servers)** + The payload is rejected if **all servers are down at the same moment**. + Implementation detail: the timeline is ordered so that, at identical timestamps, **`END` is processed before `START`** to avoid transient all-down states. + Error: + + ``` + At time {time:.6f} all servers are down; keep at least one up + ``` + +--- + +## Runtime Semantics (summary) + +* **Server events**: the targeted server is unavailable between the start and end markers; the system enforces that at least one server remains up at all times. +* **Network spike events**: the targeted edge’s latency sampler is deterministically **shifted by `spike_s` seconds** during the event window (additive congestion model). The underlying distribution is not reshaped—samples are translated by a constant offset. + +*(This reflects the agreed model: deterministic additive offset on edges.)* + +--- + +## Units & Precision + +* All times and offsets are in **seconds** (floating-point). +* Provide values with the precision your simulator supports; microsecond-level precision is acceptable if needed. + +--- + +## Authoring Guidelines + +* **Do not include `spike_s`** for non-network events. +* Use **stable, meaningful `event_id`** values for auditability. +* Keep events within the **simulation horizon**. +* When multiple markers share the same timestamp, rely on the engine’s **END-before-START** ordering for determinism. + +--- + +## Examples + +### 1) Valid — Server maintenance window + +```yaml +event_id: ev-maint-001 +target_id: srv-1 +start: { kind: SERVER_DOWN, t_start: 120.0 } +end: { kind: SERVER_UP, t_end: 240.0 } +``` + +### 2) Valid — Network spike on an edge (+8 ms) + +```yaml +event_id: ev-spike-008ms +target_id: edge-12 +start: { kind: NETWORK_SPIKE_START, t_start: 10.0, spike_s: 0.008 } +end: { kind: NETWORK_SPIKE_END, t_end: 25.0 } +``` + +### 3) Invalid — Missing `spike_s` for a network spike + +```yaml +event_id: ev-missing-spike +target_id: edge-5 +start: { kind: NETWORK_SPIKE_START, t_start: 5.0 } +end: { kind: NETWORK_SPIKE_END, t_end: 15.0 } +``` + +Error: + +``` +The field spike_s for the event ev-missing-spike must be defined as a positive float (seconds) +``` + +### 4) Invalid — `spike_s` present for a server event + +```yaml +event_id: ev-bad-spike +target_id: srv-2 +start: { kind: SERVER_DOWN, t_start: 50.0, spike_s: 0.005 } +end: { kind: SERVER_UP, t_end: 60.0 } +``` + +Error: + +``` +Event ev-bad-spike: spike_s must be omitted for non-network events +``` + +### 5) Invalid — Mismatched families + +```yaml +event_id: ev-bad-kinds +target_id: edge-1 +start: { kind: NETWORK_SPIKE_START, t_start: 5.0, spike_s: 0.010 } +end: { kind: SERVER_UP, t_end: 15.0 } +``` + +Error: + +``` +The event ev-bad-kinds must have as value of kind in end NETWORK_SPIKE_END +``` + +### 6) Invalid — Start not before End + +```yaml +event_id: ev-bad-time +target_id: srv-2 +start: { kind: SERVER_DOWN, t_start: 300.0 } +end: { kind: SERVER_UP, t_end: 300.0 } +``` + +Error: + +``` +The starting time for the event ev-bad-time must be smaller than the ending time +``` + +--- + +## Notes for Consumers + +* The schema is **strict**: misspelled fields (e.g., `t_strat`) are rejected. +* The engine may combine multiple active network spikes on the same edge by **summing** their `spike_s` values while they overlap (handled by runtime bookkeeping). +* This document describes exactly what is present in the provided code and validators; no additional fields or OpenAPI metadata are assumed. diff --git a/src/asyncflow/components/__init__.py b/src/asyncflow/components/__init__.py index 52d66c7..2f40f69 100644 --- a/src/asyncflow/components/__init__.py +++ b/src/asyncflow/components/__init__.py @@ -1,6 +1,7 @@ """Public components: re-exports Pydantic schemas (topology).""" from __future__ import annotations +from asyncflow.schemas.event.injection import EventInjection from asyncflow.schemas.topology.edges import Edge from asyncflow.schemas.topology.endpoint import Endpoint from asyncflow.schemas.topology.nodes import ( @@ -10,6 +11,14 @@ ServerResources, ) -__all__ = ["Client", "Edge", "Endpoint", "LoadBalancer", "Server", "ServerResources"] +__all__ = [ + "Client", + "Edge", + "Endpoint", + "EventInjection", + "LoadBalancer", + "Server", + "ServerResources", + ] diff --git a/src/asyncflow/config/constants.py b/src/asyncflow/config/constants.py index de79a33..29b2229 100644 --- a/src/asyncflow/config/constants.py +++ b/src/asyncflow/config/constants.py @@ -175,6 +175,19 @@ class SystemEdges(StrEnum): NETWORK_CONNECTION = "network_connection" +# ====================================================================== +# CONSTANTS FOR THE EVENT TO INJECT IN THE SIMULATION +# ====================================================================== + +class EventDescription(StrEnum): + """Description for the events you may inject during the simulation""" + + SERVER_UP = "server_up" + SERVER_DOWN = "server_down" + NETWORK_SPIKE_START = "network_spike_start" + NETWORK_SPIKE_END = "network_spike_end" + + # ====================================================================== # CONSTANTS FOR SAMPLED METRICS # ====================================================================== diff --git a/src/asyncflow/schemas/event/injection.py b/src/asyncflow/schemas/event/injection.py new file mode 100644 index 0000000..266f920 --- /dev/null +++ b/src/asyncflow/schemas/event/injection.py @@ -0,0 +1,119 @@ +"""Pydantic model to inject event during the simulation""" + +from typing import Literal + +from pydantic import ( + BaseModel, + ConfigDict, + NonNegativeFloat, + PositiveFloat, + model_validator, +) + +from asyncflow.config.constants import EventDescription + +# Event input schema: +# - Each event has its own identifier (event_id) and references the affected +# component via target_id. +# - The event window is represented by two markers, Start and End. +# - We constrain kind with Literal[...] over EventDescription (a StrEnum), +# so Pydantic enforces allowed values automatically for both Start and End. +# - Both marker models use ConfigDict(extra="forbid", frozen=True): +# extra="forbid" rejects unknown fields (e.g., catches t_strat vs t_start); +# frozen=True makes instances immutable at runtime for stability. + +class Start(BaseModel): + """Start marker for an event window.""" + + model_config = ConfigDict(extra="forbid", frozen=True) + + # Only "start" kinds allowed here + kind: Literal[ + EventDescription.SERVER_DOWN, + EventDescription.NETWORK_SPIKE_START, + ] + t_start: NonNegativeFloat # seconds from simulation start + spike_s: None | PositiveFloat = None + + + +class End(BaseModel): + """End marker for an event window.""" + + model_config = ConfigDict(extra="forbid", frozen=True) + + # Only "end" kinds allowed here + kind: Literal[ + EventDescription.SERVER_UP, + EventDescription.NETWORK_SPIKE_END, + ] + t_end: PositiveFloat # strictly > 0 + +class EventInjection(BaseModel): + """Definition of the input structure to define an event in the simulation""" + + event_id: str + target_id: str + start: Start + end: End + + # Yaml example: + # event_id: ev-1 + # target_id: srv-1 + # start: { kind: SERVER_DOWN, t_start: 120.0 } + # end: { kind: SERVER_UP, t_end: 240.0 } + + @model_validator(mode="after") # type: ignore[arg-type] + def ensure_start_end_compatibility( + cls, # noqa: N805 + model: "EventInjection", + ) -> "EventInjection": + """ + Check the compatibility between Start and End both at level + of time interval and kind + """ + # Ensure kind for Start and End are compatible + start_to_end = { + EventDescription.SERVER_DOWN: EventDescription.SERVER_UP, + EventDescription.NETWORK_SPIKE_START: EventDescription.NETWORK_SPIKE_END, + } + + expected = start_to_end[model.start.kind] + if model.end.kind != expected: + msg = (f"The event {model.event_id} must have " + f"as value of kind in end {expected}") + raise ValueError(msg) + + # Ensure the time sequence is well defined + if model.start.t_start >= model.end.t_end: + msg=(f"The starting time for the event {model.event_id} " + "must be smaller than the ending time") + raise ValueError(msg) + + return model + + + @model_validator(mode="after") # type: ignore[arg-type] + def ensure_spike_exist_on_network_event( + cls, # noqa: N805 + model: "EventInjection", + ) -> "EventInjection": + """ + When a network event is selected the spike must be + indicated + """ + if (model.start.kind == EventDescription.NETWORK_SPIKE_START + and model.start.spike_s is None): + msg = ( + f"The field spike_s for the event {model.event_id} " + "must be defined as a positive float" + ) + raise ValueError(msg) + + if (model.start.kind != EventDescription.NETWORK_SPIKE_START + and model.start.spike_s is not None): + msg = f"Event {model.event_id}: spike_s must be omitted" + raise ValueError(msg) + + + return model diff --git a/src/asyncflow/schemas/payload.py b/src/asyncflow/schemas/payload.py index 3c889e4..4b593b0 100644 --- a/src/asyncflow/schemas/payload.py +++ b/src/asyncflow/schemas/payload.py @@ -1,7 +1,9 @@ """Definition of the full input for the simulation""" -from pydantic import BaseModel +from pydantic import BaseModel, field_validator, model_validator +from asyncflow.config.constants import EventDescription +from asyncflow.schemas.event.injection import EventInjection from asyncflow.schemas.settings.simulation import SimulationSettings from asyncflow.schemas.topology.graph import TopologyGraph from asyncflow.schemas.workload.rqs_generator import RqsGenerator @@ -13,3 +15,188 @@ class SimulationPayload(BaseModel): rqs_input: RqsGenerator topology_graph: TopologyGraph sim_settings: SimulationSettings + events: list[EventInjection] | None = None + + @field_validator("events", mode="after") + def ensure_event_id_is_unique( + cls, # noqa: N805 + v: list[EventInjection] | None, + ) -> list[EventInjection] | None: + """Ensure the id uniqueness of the events id""" + if v is None: + return v + + event_id = [event.event_id for event in v] + set_event_id = set(event_id) + + if len(event_id) != len(set_event_id): + msg = "The id's representing different events must be unique" + raise ValueError(msg) + return v + + @model_validator(mode="after") # type: ignore[arg-type] + def ensure_components_ids_is_compatible( + cls, # noqa: N805 + model: "SimulationPayload", + ) -> "SimulationPayload": + """ + Ensure the id related to the target component of the event + exist + """ + if model.events is None: + return model + + servers_list = model.topology_graph.nodes.servers + edges_list = model.topology_graph.edges + valid_ids = ( + {server.id for server in servers_list} + | {edge.id for edge in edges_list} + ) + + for event in model.events: + if event.target_id not in valid_ids: + msg = (f"The target id {event.target_id} related to " + f"the event {event.event_id} does not exist") + raise ValueError(msg) + + return model + + @model_validator(mode="after") # type: ignore[arg-type] + def ensure_event_time_inside_simulation_horizon( + cls, # noqa: N805 + model: "SimulationPayload", + ) -> "SimulationPayload": + """ + The time interval associated to each event must be in + the simulation horizon + """ + if model.events is None: + return model + + horizon = float(model.sim_settings.total_simulation_time) + + for ev in model.events: + t_start = ev.start.t_start + t_end = ev.end.t_end + + if t_start < 0.0: + msg = ( + f"Event '{ev.event_id}': start time t_start={t_start:.6f} " + "must be >= 0.0" + ) + raise ValueError(msg) + + if t_start > horizon: + msg = ( + f"Event '{ev.event_id}': start time t_start={t_start:.6f} " + f"exceeds simulation horizon T={horizon:.6f}" + ) + raise ValueError(msg) + + # t_end is PositiveFloat by schema, but still guard the horizon. + if t_end > horizon: + msg = ( + f"Event '{ev.event_id}': end time t_end={t_end:.6f} " + f"exceeds simulation horizon T={horizon:.6f}" + ) + raise ValueError(msg) + + return model + + @model_validator(mode="after") # type: ignore[arg-type] + def ensure_compatibility_event_kind_target_id( + cls, # noqa: N805 + model: "SimulationPayload", + ) -> "SimulationPayload": + """ + The kind of the event must be compatible with the target id + type, for example we cannot have an event regarding a server + with a target id associated to an edge + """ + if model.events is None: + return model + + servers_list = model.topology_graph.nodes.servers + edges_list = model.topology_graph.edges + + # We need just the Start or End kind because + # we have a validation for the coherence between + # the starting event kind and the finishing event kind + server_kind = {EventDescription.SERVER_DOWN} + edge_kind = {EventDescription.NETWORK_SPIKE_START} + + servers_ids = {server.id for server in servers_list} + edges_ids = {edge.id for edge in edges_list} + + for event in model.events: + if event.start.kind in server_kind and event.target_id not in servers_ids: + msg = (f"The event {event.event_id} regarding a server does not have " + "a compatible target id") + raise ValueError(msg) + if event.start.kind in edge_kind and event.target_id not in edges_ids: + msg = (f"The event {event.event_id} regarding an edge does not have " + "a compatible target id") + raise ValueError(msg) + + + return model + + + @model_validator(mode="after") # type: ignore[arg-type] + def ensure_not_all_servers_are_down_simultaneously( + cls, # noqa: N805 + model: "SimulationPayload", + ) -> "SimulationPayload": + """ + We will not accept the condition to have all server down + at the same moment, always at least one server must be up + and running + """ + if model.events is None: + return model + + # First let us build a list of events related to the servers + servers_list = model.topology_graph.nodes.servers + servers_ids = {server.id for server in servers_list} + server_events = [ + event for event in model.events + if event.target_id in servers_ids + ] + + # Helpers needed in the algorithm to define a specific ordering + # procedure + start = "start" + end = "end" + + # Let us define a list of tuple as a timeline, this approach ensure + # the possibility to have different servers going up or down at the + # same time, a more elegant approach through an hashmap has been + # considered however it would require an extra assumption that all + # the times had to be different, we thought that this would be too + # strict + timeline: list[tuple[float, str, str]] = [] + for event in server_events: + timeline.append((event.start.t_start, start, event.target_id)) + timeline.append((event.end.t_end, end, event.target_id)) + + # Let us order the timeline by time if there are multiple events at the + # same time process first the end type events + timeline.sort(key=lambda x: (x[0], x[1] == start)) + + # Definition of a set to verify the condition that at least one server must + # be up + server_down = set() + for time, kind, server_id in timeline: + if kind == end: + server_down.discard(server_id) + else: # "start" + server_down.add(server_id) + if len(server_down) == len(servers_ids): + msg = ( + f"At time {time:.6f} all servers are down; keep at least one up" + ) + raise ValueError(msg) + + return model + + diff --git a/tests/conftest.py b/tests/conftest.py index 80955f0..b0fd5de 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -150,4 +150,5 @@ def payload_base( rqs_input=rqs_input, topology_graph=topology_minimal, sim_settings=sim_settings, + ) diff --git a/tests/unit/public_api/test_import.py b/tests/unit/public_api/test_import.py index cd708bc..2bea333 100644 --- a/tests/unit/public_api/test_import.py +++ b/tests/unit/public_api/test_import.py @@ -14,6 +14,7 @@ Client, Edge, Endpoint, + EventInjection, LoadBalancer, Server, ServerResources, @@ -43,6 +44,7 @@ def test_components_public_symbols() -> None: "Client", "Edge", "Endpoint", + "EventInjection", "LoadBalancer", "Server", "ServerResources", @@ -57,6 +59,7 @@ def test_components_symbols_are_importable_classes() -> None: (Client, "Client"), (Edge, "Edge"), (Endpoint, "Endpoint"), + (EventInjection, "EventInjection"), (LoadBalancer, "LoadBalancer"), (Server, "Server"), (ServerResources, "ServerResources"), diff --git a/tests/unit/schemas/test_event_injection.py b/tests/unit/schemas/test_event_injection.py new file mode 100644 index 0000000..36edbaa --- /dev/null +++ b/tests/unit/schemas/test_event_injection.py @@ -0,0 +1,196 @@ +"""Unit tests for the EventInjection Pydantic models. + +This suite verifies: +- Family coherence: SERVER_DOWN→SERVER_UP and + NETWORK_SPIKE_START→NETWORK_SPIKE_END. +- Temporal ordering: t_start < t_end, with field constraints. +- Spike semantics: spike_s is required only for NETWORK_SPIKE_START + and forbidden otherwise. +- Strictness: unknown fields are rejected; models are frozen. +""" + +from __future__ import annotations + +import re +from typing import Any + +import pytest +from pydantic import ValidationError + +from asyncflow.config.constants import EventDescription +from asyncflow.schemas.event.injection import End, EventInjection, Start + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _mk_server_down(start_t: float, end_t: float) -> EventInjection: + """Build a minimal server down/up event with the given times.""" + start = Start(kind=EventDescription.SERVER_DOWN, t_start=start_t) + end = End(kind=EventDescription.SERVER_UP, t_end=end_t) + return EventInjection( + event_id="ev-server-1", + target_id="srv-1", + start=start, + end=end, + ) + + +def _mk_network_spike( + start_t: float, + end_t: float, + spike_s: float | None, +) -> EventInjection: + """Build a minimal network spike event with the given times and spike.""" + start = Start( + kind=EventDescription.NETWORK_SPIKE_START, + t_start=start_t, + spike_s=spike_s, + ) + end = End(kind=EventDescription.NETWORK_SPIKE_END, t_end=end_t) + return EventInjection( + event_id="ev-spike-1", + target_id="edge-1", + start=start, + end=end, + ) + + +# --------------------------------------------------------------------------- +# Start/End family coherence +# --------------------------------------------------------------------------- + +def test_family_coherence_server_ok() -> None: + """SERVER_DOWN followed by SERVER_UP should validate.""" + model = _mk_server_down(start_t=10.0, end_t=20.0) + assert model.start.kind is EventDescription.SERVER_DOWN + assert model.end.kind is EventDescription.SERVER_UP + + +def test_family_coherence_network_ok() -> None: + """NETWORK_SPIKE_START followed by NETWORK_SPIKE_END should validate.""" + model = _mk_network_spike(start_t=1.0, end_t=2.0, spike_s=0.005) + assert model.start.kind is EventDescription.NETWORK_SPIKE_START + assert model.end.kind is EventDescription.NETWORK_SPIKE_END + + +def test_family_mismatch_raises() -> None: + """Mismatched start/end families must raise a ValueError.""" + start = Start(kind=EventDescription.SERVER_DOWN, t_start=1.0) + end = End(kind=EventDescription.NETWORK_SPIKE_END, t_end=2.0) + with pytest.raises(ValueError, match=r"must have .* kind in end"): + EventInjection( + event_id="ev-bad", + target_id="srv-1", + start=start, + end=end, + ) + + +# --------------------------------------------------------------------------- +# Temporal ordering & per-field constraints +# --------------------------------------------------------------------------- + +def test_time_ordering_start_before_end() -> None: + """t_start must be strictly less than t_end.""" + with pytest.raises(ValueError, match=r"smaller than the ending time"): + _mk_server_down(start_t=10.0, end_t=10.0) + + +def test_start_non_negative_enforced() -> None: + """Start.t_start is NonNegativeFloat; negatives raise ValidationError.""" + with pytest.raises(ValidationError): + Start(kind=EventDescription.SERVER_DOWN, t_start=-1.0) + + +def test_end_positive_enforced() -> None: + """End.t_end is PositiveFloat; non-positive values raise ValidationError.""" + with pytest.raises(ValidationError): + End(kind=EventDescription.SERVER_UP, t_end=0.0) + + +# --------------------------------------------------------------------------- +# Spike semantics +# --------------------------------------------------------------------------- + +def test_network_spike_requires_spike_s() -> None: + """NETWORK_SPIKE_START requires spike_s (seconds) to be present.""" + # Define the event id for the matching condition. + event_id = "ev-spike-1" + + # Define the full message to be matched + expected_message = ( + f"The field spike_s for the event {event_id} " + "must be defined as a positive float" + ) + + with pytest.raises(ValidationError, match=re.escape(expected_message)): + _mk_network_spike(start_t=0.5, end_t=1.5, spike_s=None) + + +def test_network_spike_positive_spike_s_enforced() -> None: + """spike_s uses PositiveFloat; negative values raise ValidationError.""" + with pytest.raises(ValidationError): + _mk_network_spike(start_t=0.0, end_t=1.0, spike_s=-0.001) + + +def test_spike_s_forbidden_for_server_events() -> None: + """For non-network events, spike_s must be omitted.""" + event_id = "ev-bad-spike" + expected_message = f"Event {event_id}: spike_s must be omitted" + start = Start( + kind=EventDescription.SERVER_DOWN, + t_start=0.0, + spike_s=0.001, + ) + end = End(kind=EventDescription.SERVER_UP, t_end=1.0) + with pytest.raises(ValueError, match=re.escape(expected_message)): + EventInjection( + event_id="ev-bad-spike", + target_id="srv-1", + start=start, + end=end, + ) + + +# --------------------------------------------------------------------------- +# Strictness (extra fields) and immutability (frozen models) +# --------------------------------------------------------------------------- + +def test_extra_fields_forbidden_in_start() -> None: + """Unknown fields in Start must be rejected due to extra='forbid'.""" + payload: dict[str, Any] = { + "kind": EventDescription.SERVER_DOWN, + "t_start": 0.0, + "unknown_field": 123, + } + with pytest.raises(ValidationError): + Start.model_validate(payload) + + +def test_extra_fields_forbidden_in_end() -> None: + """Unknown fields in End must be rejected due to extra='forbid'.""" + payload: dict[str, Any] = { + "kind": EventDescription.SERVER_UP, + "t_end": 1.0, + "unknown_field": True, + } + with pytest.raises(ValidationError): + End.model_validate(payload) + + +def test_start_is_frozen_and_immutable() -> None: + """Start is frozen; attempting to mutate fields must raise an error.""" + start = Start(kind=EventDescription.SERVER_DOWN, t_start=0.0) + # Cast to Any to avoid mypy's read-only property check; runtime must fail. + start_any: Any = start + with pytest.raises(ValidationError, match="Instance is frozen"): + start_any.t_start = 1.0 + + +def test_end_is_frozen_and_immutable() -> None: + """End is frozen; attempting to mutate fields must raise an error.""" + end = End(kind=EventDescription.SERVER_UP, t_end=1.0) + end_any: Any = end + with pytest.raises(ValidationError, match="Instance is frozen"): + end_any.t_end = 2.0 diff --git a/tests/unit/schemas/test_payload.py b/tests/unit/schemas/test_payload.py new file mode 100644 index 0000000..5ae8d55 --- /dev/null +++ b/tests/unit/schemas/test_payload.py @@ -0,0 +1,333 @@ +"""Unit tests for the SimulationPayload Pydantic model. + +This suite verifies: +- Unique event IDs constraint. +- Target existence against the topology graph. +- Event times inside the simulation horizon. +- Kind/target compatibility (server vs. edge). +- Global liveness: not all servers down simultaneously. + +All tests are ruff- and mypy-friendly (short lines, precise raises, and +single statements inside raises blocks). They reuse fixtures from +conftest.py where convenient and build custom topologies when needed. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from asyncflow.config.constants import Distribution, EventDescription +from asyncflow.schemas.common.random_variables import RVConfig +from asyncflow.schemas.event.injection import End, EventInjection, Start +from asyncflow.schemas.payload import SimulationPayload +from asyncflow.schemas.topology.edges import Edge +from asyncflow.schemas.topology.graph import TopologyGraph +from asyncflow.schemas.topology.nodes import ( + Client, + Server, + TopologyNodes, +) + +if TYPE_CHECKING: + from asyncflow.schemas.settings.simulation import SimulationSettings + from asyncflow.schemas.workload.rqs_generator import RqsGenerator + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _mk_network_spike( + event_id: str, + target_id: str, + start_t: float, + end_t: float, + spike_s: float, +) -> EventInjection: + """Build a NETWORK_SPIKE event for the given target edge.""" + start = Start( + kind=EventDescription.NETWORK_SPIKE_START, + t_start=start_t, + spike_s=spike_s, + ) + end = End(kind=EventDescription.NETWORK_SPIKE_END, t_end=end_t) + return EventInjection( + event_id=event_id, + target_id=target_id, + start=start, + end=end, + ) + + +def _mk_server_window( + event_id: str, + target_id: str, + start_t: float, + end_t: float, +) -> EventInjection: + """Build a SERVER_DOWN → SERVER_UP event for the given server.""" + start = Start(kind=EventDescription.SERVER_DOWN, t_start=start_t) + end = End(kind=EventDescription.SERVER_UP, t_end=end_t) + return EventInjection( + event_id=event_id, + target_id=target_id, + start=start, + end=end, + ) + + +def _topology_with_min_edge() -> TopologyGraph: + """Create a tiny topology with one client and one minimal edge.""" + client = Client(id="client-1") + edge = Edge( + id="gen-to-client", + source="rqs-1", + target="client-1", + latency=RVConfig(mean=0.001, distribution=Distribution.POISSON), + ) + nodes = TopologyNodes(servers=[], client=client) + return TopologyGraph(nodes=nodes, edges=[edge]) + + +def _topology_with_two_servers_and_edge() -> TopologyGraph: + """Create a topology with two servers and a minimal edge.""" + client = Client(id="client-1") + servers = [ + Server(id="srv-1", server_resources={"cpu_cores": 1}, endpoints=[]), + Server(id="srv-2", server_resources={"cpu_cores": 1}, endpoints=[]), +] + edge = Edge( + id="gen-to-client", + source="rqs-1", + target="client-1", + latency=RVConfig(mean=0.001, distribution=Distribution.POISSON), + ) + nodes = TopologyNodes(servers=servers, client=client) + return TopologyGraph(nodes=nodes, edges=[edge]) + + +# --------------------------------------------------------------------------- +# Unique event IDs +# --------------------------------------------------------------------------- + + +def test_unique_event_ids_ok( + rqs_input: RqsGenerator, sim_settings: SimulationSettings, +) -> None: + """Different event_id values should validate.""" + topo = _topology_with_min_edge() + ev1 = _mk_network_spike( + "ev-a", "gen-to-client", start_t=0.0, end_t=1.0, spike_s=0.001, + ) + ev2 = _mk_network_spike( + "ev-b", "gen-to-client", start_t=2.0, end_t=3.0, spike_s=0.002, + ) + payload = SimulationPayload( + rqs_input=rqs_input, + topology_graph=topo, + sim_settings=sim_settings, + events=[ev1, ev2], + ) + assert payload.events is not None + assert len(payload.events) == 2 + + +def test_duplicate_event_ids_rejected( + rqs_input: RqsGenerator, sim_settings: SimulationSettings, +) -> None: + """Duplicate event_id values must be rejected.""" + topo = _topology_with_min_edge() + ev1 = _mk_network_spike( + "ev-dup", "gen-to-client", start_t=0.0, end_t=1.0, spike_s=0.001, + ) + ev2 = _mk_network_spike( + "ev-dup", "gen-to-client", start_t=2.0, end_t=3.0, spike_s=0.002, + ) + with pytest.raises(ValueError, match=r"must be unique"): + SimulationPayload( + rqs_input=rqs_input, + topology_graph=topo, + sim_settings=sim_settings, + events=[ev1, ev2], + ) + + +# --------------------------------------------------------------------------- +# Target existence +# --------------------------------------------------------------------------- + + +def test_target_id_must_exist( + rqs_input: RqsGenerator, sim_settings: SimulationSettings, +) -> None: + """Target IDs not present in the topology must be rejected.""" + topo = _topology_with_min_edge() + ev = _mk_network_spike( + "ev-x", "missing-edge", start_t=0.0, end_t=1.0, spike_s=0.001, + ) + with pytest.raises(ValueError, match=r"does not exist"): + SimulationPayload( + rqs_input=rqs_input, + topology_graph=topo, + sim_settings=sim_settings, + events=[ev], + ) + + +# --------------------------------------------------------------------------- +# Event times within horizon +# --------------------------------------------------------------------------- + + +def test_start_time_exceeds_horizon_rejected( + rqs_input: RqsGenerator, sim_settings: SimulationSettings, +) -> None: + """Start time greater than the horizon must be rejected.""" + topo = _topology_with_min_edge() + horizon = float(sim_settings.total_simulation_time) + ev = _mk_network_spike( + "ev-hz-start", + "gen-to-client", + start_t=horizon + 0.1, + end_t=horizon + 0.2, + spike_s=0.001, + ) + with pytest.raises(ValueError, match=r"exceeds simulation horizon"): + SimulationPayload( + rqs_input=rqs_input, + topology_graph=topo, + sim_settings=sim_settings, + events=[ev], + ) + + +def test_end_time_exceeds_horizon_rejected( + rqs_input: RqsGenerator, sim_settings: SimulationSettings, +) -> None: + """End time greater than the horizon must be rejected.""" + topo = _topology_with_min_edge() + horizon = float(sim_settings.total_simulation_time) + ev = _mk_network_spike( + "ev-hz-end", + "gen-to-client", + start_t=horizon - 0.1, + end_t=horizon + 0.1, + spike_s=0.001, + ) + with pytest.raises(ValueError, match=r"exceeds simulation horizon"): + SimulationPayload( + rqs_input=rqs_input, + topology_graph=topo, + sim_settings=sim_settings, + events=[ev], + ) + + +# --------------------------------------------------------------------------- +# Kind/target compatibility +# --------------------------------------------------------------------------- + + +def test_server_event_cannot_target_edge( + rqs_input: RqsGenerator, sim_settings: SimulationSettings, +) -> None: + """SERVER_DOWN should not target an edge ID.""" + topo = _topology_with_min_edge() + ev = _mk_server_window( + "ev-srv-bad", + target_id="gen-to-client", + start_t=0.0, + end_t=1.0, + ) + with pytest.raises(ValueError, match=r"regarding a server .* compatible"): + SimulationPayload( + rqs_input=rqs_input, + topology_graph=topo, + sim_settings=sim_settings, + events=[ev], + ) + + +def test_edge_event_ok_on_edge( + rqs_input: RqsGenerator, sim_settings: SimulationSettings, +) -> None: + """NETWORK_SPIKE event is valid when it targets an edge ID.""" + topo = _topology_with_min_edge() + ev = _mk_network_spike( + "ev-edge-ok", "gen-to-client", start_t=0.0, end_t=1.0, spike_s=0.001, + ) + payload = SimulationPayload( + rqs_input=rqs_input, + topology_graph=topo, + sim_settings=sim_settings, + events=[ev], + ) + assert payload.events is not None + assert payload.events[0].target_id == "gen-to-client" + + +# --------------------------------------------------------------------------- +# Global liveness: not all servers down simultaneously +# --------------------------------------------------------------------------- + + +def test_reject_when_all_servers_down_at_same_time( + rqs_input: RqsGenerator, sim_settings: SimulationSettings, +) -> None: + """ + It should raise a ValidationError if there is any time interval during which + all servers are scheduled to be down simultaneously. + """ + topo = _topology_with_two_servers_and_edge() + + # --- SETUP: Use a longer simulation horizon for this specific test --- + # The default `sim_settings` fixture has a short horizon (e.g., 5s) to + # keep most tests fast. For this test, we need a longer horizon to + # ensure the event times themselves are valid. + sim_settings.total_simulation_time = 30 # e.g., 30 seconds + + # The event times are now valid within the new horizon. + # srv-1 is down [10, 20), srv-2 is down [15, 25). + # This creates an overlap in [15, 20) where both are down. + ev_a = _mk_server_window("ev-a", "srv-1", start_t=10.0, end_t=20.0) + ev_b = _mk_server_window("ev-b", "srv-2", start_t=15.0, end_t=25.0) + + # Now the test will bypass the time horizon validation and trigger + # the correct validator that checks for server downtime overlap. + with pytest.raises(ValueError, match=r"all servers are down"): + SimulationPayload( + rqs_input=rqs_input, + topology_graph=topo, + sim_settings=sim_settings, + events=[ev_a, ev_b], + ) + + +def test_accept_when_never_all_down( + rqs_input: RqsGenerator, sim_settings: SimulationSettings, +) -> None: + """Payload is valid when at least one server stays up at all times.""" + topo = _topology_with_two_servers_and_edge() + + # --- SETUP: Use a longer simulation horizon for this specific test --- + # As before, we need to ensure the event times are valid within the + # simulation's total duration. + sim_settings.total_simulation_time = 30 # e.g., 30 seconds + + # Staggered windows: srv-1 down [10, 15), srv-2 down [15, 20). + # There is no point in time where both are down. + ev_a = _mk_server_window("ev-a", "srv-1", start_t=10.0, end_t=15.0) + ev_b = _mk_server_window("ev-b", "srv-2", start_t=15.0, end_t=20.0) + + # This should now pass validation without raising an error. + payload = SimulationPayload( + rqs_input=rqs_input, + topology_graph=topo, + sim_settings=sim_settings, + events=[ev_a, ev_b], + ) + assert payload.events is not None + assert len(payload.events) == 2