diff --git a/docs/api/high-level.md b/docs/api/high-level.md deleted file mode 100644 index 65c3d24..0000000 --- a/docs/api/high-level.md +++ /dev/null @@ -1,299 +0,0 @@ -# AsyncFlow — High-Level API (`AsyncFlow`, `SimulationRunner`) - -This page explains how to programmatically **assemble a validated simulation payload** and **run** it, returning metrics and plots through the analyzer. - -* **Builder**: `AsyncFlow` – compose workload, topology, and settings into a single `SimulationPayload`. -* **Runner**: `SimulationRunner` – wire actors, start processes, collect metrics, and return a `ResultsAnalyzer`. - ---- - -## Imports - -```python -# High-level API -from asyncflow import AsyncFlow, SimulationRunner - -# Public leaf schemas (components & workload) -from asyncflow.components import Client, Server, Endpoint, Edge -from asyncflow.workload import RqsGenerator, RVConfig -from asyncflow.settings import SimulationSettings -``` - -> These are the **only** imports end users need. Internals (actors, registries, etc.) remain private. - ---- - -## Quick start - -A minimal end-to-end example: - -```python -from __future__ import annotations -import simpy - -from asyncflow import AsyncFlow, SimulationRunner -from asyncflow.components import Client, Server, Endpoint, Edge -from asyncflow.workload import RqsGenerator, RVConfig -from asyncflow.settings import SimulationSettings - -# 1) Workload -rqs = RqsGenerator( - id="rqs-1", - avg_active_users=RVConfig(mean=50, # Poisson by default - # or Distribution.NORMAL with variance auto=mean - ), - avg_request_per_minute_per_user=RVConfig(mean=30), # MUST be Poisson - user_sampling_window=60, # seconds -) - -# 2) Topology components -client = Client(id="client-1") - -endpoint = Endpoint( - endpoint_name="/hello", - steps=[ - {"kind": "ram", "step_operation": {"necessary_ram": 32}}, - {"kind": "initial_parsing", "step_operation": {"cpu_time": 0.002}}, - {"kind": "io_wait", "step_operation": {"io_waiting_time": 0.010}}, - ], -) - -server = Server( - id="srv-1", - server_resources={"cpu_cores": 1, "ram_mb": 1024}, - endpoints=[endpoint], -) - -edges = [ - Edge(id="gen-client", source="rqs-1", target="client-1", - latency={"mean": 0.003, "distribution": "exponential"}), - Edge(id="client-srv1", source="client-1", target="srv-1", - latency={"mean": 0.003, "distribution": "exponential"}), - Edge(id="srv1-client", source="srv-1", target="client-1", - latency={"mean": 0.003, "distribution": "exponential"}), -] - -# 3) Settings (baseline sampled metrics are mandatory) -settings = SimulationSettings( - total_simulation_time=300, # seconds (≥ 5) - sample_period_s=0.01, # 0.001 ≤ value ≤ 0.1 - # enabled_sample_metrics and enabled_event_metrics: safe defaults already set -) - -# 4) Build (validates everything with Pydantic) -payload = ( - AsyncFlow() - .add_generator(rqs) - .add_client(client) - .add_servers(server) - .add_edges(*edges) - .add_simulation_settings(settings) - .build_payload() -) - -# 5) Run -env = simpy.Environment() -results = SimulationRunner(env=env, simulation_input=payload).run() - -# 6) Use the analyzer (examples) -print(results.get_latency_stats()) -ts, rps = results.get_throughput_series() -sampled = results.get_sampled_metrics() -``` - ---- - -## `AsyncFlow` — builder (public) - -`AsyncFlow` helps you construct a **self-consistent** `SimulationPayload` with fluent, chainable calls. Every piece you add is type-checked; the final `build_payload()` validates the full graph and settings. - -### API - -```python -class AsyncFlow: - def add_generator(self, rqs_generator: RqsGenerator) -> Self: ... - def add_client(self, client: Client) -> Self: ... - def add_servers(self, *servers: Server) -> Self: ... - def add_edges(self, *edges: Edge) -> Self: ... - def add_simulation_settings(self, sim_settings: SimulationSettings) -> Self: ... - def add_load_balancer(self, load_balancer: LoadBalancer) -> Self: ... - def build_payload(self) -> SimulationPayload: ... -``` - -### Validation performed by `build_payload()` - -On build, the composed payload is validated by the Pydantic schemas: - -1. **Presence** - - * Generator, client, **≥ 1 server**, **≥ 1 edge**, settings are required. - -2. **Unique IDs** - - * Duplicate server IDs or edge IDs are rejected. - -3. **Node types** - - * Fixed enums: `client`, `server`, `load_balancer`; validated on each node. - -4. **Edge integrity** - - * Every edge **target** must be a declared node ID. - * **External IDs** (e.g., the generator id) are allowed **only as sources**. - * **No self-loops** (`source != target`). - -5. **Load balancer sanity** (if present) - - * `server_covered ⊆ declared servers`. - * There must be an **edge from the LB to every covered server**. - -6. **(Engine rule)** “No fan-out except LB” - - * Only the LB may have multiple outgoing edges among declared nodes. - -7. **Latency RV constraints (edges)** - - * `latency.mean > 0`, and if `variance` exists, `variance ≥ 0`. - -If a rule fails, a **descriptive `ValueError`** points at the offending entity/field. - -### Typical errors you might see - -* Missing parts: - `ValueError: The generator input must be instantiated before the simulation` -* Type mis-match: - `TypeError: All the instances must be of the type Server` -* Graph violations: - `ValueError: Edge client-1->srv-X references unknown target node 'srv-X'` -* LB coverage: - `ValueError: Servers ['srv-2'] are covered by LB 'lb-1' but have no outgoing edge from it.` - ---- - -## `SimulationRunner` — orchestrator (public) - -`SimulationRunner` takes a validated `SimulationPayload`, **instantiates all runtimes**, **wires** edges to their target mailboxes, **starts** every actor, **collects** sampled metrics, and advances the SimPy clock. - -### API - -```python -class SimulationRunner: - def __init__(self, *, env: simpy.Environment, simulation_input: SimulationPayload) -> None: ... - def run(self) -> ResultsAnalyzer: ... - @classmethod - def from_yaml(cls, *, env: simpy.Environment, yaml_path: str | Path) -> "SimulationRunner": ... -``` - -* **`env`**: your SimPy environment (you control its lifetime). - -* **`simulation_input`**: the payload returned by `AsyncFlow.build_payload()` (or parsed from YAML). - -* **`run()`**: - - * Builds and wires all runtime actors (`RqsGeneratorRuntime`, `ClientRuntime`, `ServerRuntime`, `LoadBalancerRuntime`, `EdgeRuntime`). - * Starts the **SampledMetricCollector** (baseline sampled metrics are mandatory and collected automatically). - * Runs until `SimulationSettings.total_simulation_time`. - * Returns a **`ResultsAnalyzer`** with helpers like: - - * `get_latency_stats()` - * `get_throughput_series()` - * `get_sampled_metrics()` - * plotting helpers (`plot_latency_distribution`, `plot_throughput`, …). - -* **`from_yaml`**: convenience constructor for loading a full payload from a YAML file and running it immediately. - -### Determinism & RNG - -* The runner uses `numpy.random.default_rng()` internally. - Seeding is not yet exposed as a public parameter; exact reproducibility across runs is **not guaranteed** in this version. - ---- - -## Extended example: with Load Balancer - -```python -from asyncflow.components import Client, Server, Endpoint, Edge -from asyncflow.components import LoadBalancer -from asyncflow import AsyncFlow, SimulationRunner -from asyncflow.workload import RqsGenerator, RVConfig -from asyncflow.settings import SimulationSettings -import simpy - -client = Client(id="client-1") - -srv1 = Server( - id="srv-1", - server_resources={"cpu_cores": 1, "ram_mb": 1024}, - endpoints=[Endpoint(endpoint_name="/api", steps=[{"kind":"ram","step_operation":{"necessary_ram":64}}])] -) -srv2 = Server( - id="srv-2", - server_resources={"cpu_cores": 2, "ram_mb": 2048}, - endpoints=[Endpoint(endpoint_name="/api", steps=[{"kind":"io_db","step_operation":{"io_waiting_time":0.012}}])] -) - -lb = LoadBalancer(id="lb-1", algorithms="round_robin", server_covered={"srv-1","srv-2"}) - -edges = [ - Edge(id="gen-client", source="rqs-1", target="client-1", latency={"mean":0.002,"distribution":"exponential"}), - Edge(id="client-lb", source="client-1", target="lb-1", latency={"mean":0.002,"distribution":"exponential"}), - Edge(id="lb-srv1", source="lb-1", target="srv-1", latency={"mean":0.002,"distribution":"exponential"}), - Edge(id="lb-srv2", source="lb-1", target="srv-2", latency={"mean":0.002,"distribution":"exponential"}), - Edge(id="srv1-client", source="srv-1", target="client-1", latency={"mean":0.003,"distribution":"exponential"}), - Edge(id="srv2-client", source="srv-2", target="client-1", latency={"mean":0.003,"distribution":"exponential"}), -] - -payload = ( - AsyncFlow() - .add_generator(RqsGenerator( - id="rqs-1", - avg_active_users=RVConfig(mean=120), - avg_request_per_minute_per_user=RVConfig(mean=20), - user_sampling_window=60, - )) - .add_client(client) - .add_servers(srv1, srv2) - .add_load_balancer(lb) - .add_edges(*edges) - .add_simulation_settings(SimulationSettings(total_simulation_time=600, sample_period_s=0.02)) - .build_payload() -) - -env = simpy.Environment() -results = SimulationRunner(env=env, simulation_input=payload).run() -``` - ---- - -## Performance tips - -* **Sampling cost** grows with `total_simulation_time / sample_period_s × (#sampled metrics × entities)`. - For long runs, consider a larger `sample_period_s` (e.g., `0.02–0.05`) to reduce memory while keeping the baseline metrics intact. - -* **Validation first**: prefer failing early by letting `build_payload()` validate everything before the runner starts. - ---- - -## Error handling (what throws) - -* **Type errors** on builder inputs (`TypeError`) when passing the wrong class to `add_*`. -* **Validation errors** (`ValueError`) on `build_payload()` if the graph is inconsistent (unknown targets, duplicates, LB edges missing, self-loops, illegal fan-out, latency rules, etc.). -* **Runtime wiring errors** (`TypeError`) if an unknown runtime target/source type appears while wiring edges (should not occur with a validated payload). - ---- - -## YAML path (alternative) - -You can construct the payload in YAML (see “YAML Input Guide”), then: - -```python -import simpy -from asyncflow import SimulationRunner - -env = simpy.Environment() -runner = SimulationRunner.from_yaml(env=env, yaml_path="scenario.yml") -results = runner.run() -``` - ---- - diff --git a/docs/api/high-level/analyzer.md b/docs/api/high-level/analyzer.md new file mode 100644 index 0000000..8804449 --- /dev/null +++ b/docs/api/high-level/analyzer.md @@ -0,0 +1,211 @@ +# AsyncFlow — Public API Reference: `ResultsAnalyzer` + +`ResultsAnalyzer` is the public object you use **after** a run to compute +latency statistics, derive throughput time-series, and visualize sampled +metrics collected from servers and edges. + +* **Input:** created and returned by `SimulationRunner.run()` +* **Output:** dictionaries and time-series you can print, log, chart, or export + +> **Import (public):** +> +> ```python +> from asyncflow.analysis import ResultsAnalyzer +> ``` + + +--- + +## TL;DR (minimal usage) + +```python +results = SimulationRunner(env=env, simulation_input=payload).run() + +# Aggregates +lat = results.get_latency_stats() # dict of p50, p95, p99, ... +ts, rps = results.get_throughput_series() # per-second timestamps & RPS +series = results.get_sampled_metrics() # nested dict of time-series + +# Plotting (matplotlib) +import matplotlib.pyplot as plt +fig, axes = plt.subplots(2, 2, figsize=(12, 8)) +results.plot_latency_distribution(axes[0, 0]) +results.plot_throughput(axes[0, 1]) +results.plot_server_queues(axes[1, 0]) +results.plot_ram_usage(axes[1, 1]) +fig.tight_layout() +``` + +--- + +## What the analyzer computes + +### Event-level aggregates (from `RQS_CLOCK`) + +* **Latency stats** from per-request `(start_time, finish_time)` tuples: + + * keys: `total_requests, mean, median, std_dev, p95, p99, min, max` +* **Throughput (RPS)** as a time-series: + + * 1-second windows by default (see “Advanced: throughput window”) + +### Sampled time-series (from runtime collectors) + +* Per-entity (server/edge) series for the **baseline mandatory** metrics: + + * `ready_queue_len` (server) + * `event_loop_io_sleep` (server) + * `ram_in_use` (server) + * `edge_concurrent_connection` (edge) + +> These are sampled every `sample_period_s` defined in `SimulationSettings`. + +--- + +## Public API + +### Aggregates + +```python +get_latency_stats() -> dict[LatencyKey, float] +``` + +Returns latency summary statistics. If no requests completed, returns `{}`. + +```python +get_throughput_series() -> tuple[list[float], list[float]] +``` + +Returns `(timestamps_in_seconds, rps_values)`. If no traffic, returns `([], [])`. + +### Sampled metrics + +```python +get_sampled_metrics() -> dict[str, dict[str, list[float]]] +``` + +Returns a nested dictionary: + +```python +{ + "": { "": [v0, v1, ...] } +} +``` + +* Metric names are strings matching the public enums (e.g. `"ready_queue_len"`). +* `entity_id` is a **server id** (for server metrics) or an **edge id** (for edge metrics). + +### Plotting helpers + +All plotting helpers draw on a provided `matplotlib.axes.Axes`: + +```python +plot_latency_distribution(ax: Axes) -> None +plot_throughput(ax: Axes) -> None +plot_server_queues(ax: Axes) -> None +plot_ram_usage(ax: Axes) -> None +``` + +Behavior: + +* If data is missing/empty, the plot shows a “no data” message. +* With a load balancer (multiple servers), per-server lines are labeled by server id automatically. + +--- + +## Return contracts (shapes & keys) + +### `get_latency_stats()` + +Example: + +```python +{ + 'total_requests': 1200.0, + 'mean': 0.0123, + 'median': 0.0108, + 'std_dev': 0.0041, + 'p95': 0.0217, + 'p99': 0.0302, + 'min': 0.0048, + 'max': 0.0625 +} +``` + +### `get_throughput_series()` + +Example: + +```python +timestamps = [1.0, 2.0, 3.0, ...] # seconds from t=0 +rps = [ 36, 41, 38, ...] # requests per second +``` + +### `get_sampled_metrics()` + +Example subset: + +```python +{ + "ready_queue_len": { + "srv-1": [0, 1, 2, 1, ...], + "srv-2": [0, 0, 1, 0, ...], + }, + "event_loop_io_sleep": { + "srv-1": [3, 5, 4, 6, ...], + }, + "ram_in_use": { + "srv-1": [128.0, 160.0, 192.0, ...], + }, + "edge_concurrent_connection": { + "lb-1->srv-1": [0, 1, 1, 2, ...], # your edge ids + } +} +``` + +Time base for these lists is implicit: index `i` corresponds to time `i * sample_period_s`. + +--- + +## Plotting recipes + +### Multi-panel overview + +```python +import matplotlib.pyplot as plt + +fig, axes = plt.subplots(2, 2, figsize=(12, 8)) +results.plot_latency_distribution(axes[0, 0]) +results.plot_throughput(axes[0, 1]) +results.plot_server_queues(axes[1, 0]) +results.plot_ram_usage(axes[1, 1]) + +fig.suptitle("AsyncFlow – Simulation Overview", y=1.02) +fig.tight_layout() +``` + +## Edge cases & guarantees + +* **No traffic:** all getters are safe: + + * `get_latency_stats()` → `{}` + * `get_throughput_series()` → `([], [])` + * Plots show “no data”. +* **Multiple servers / LB:** queue and RAM plots include **one line per server id**. +* **Metric availability:** the analyzer only exposes the **baseline mandatory** sampled metrics; if a metric wasn’t enabled/recorded, it won’t appear in the nested dict. +* **Units:** times are in **seconds**; RAM is in **MB**; RPS is **requests/second**. + +--- + +## Performance characteristics + +* Aggregations (percentiles, std) are **vectorized** via NumPy. +* Memory footprint of sampled series ≈ + `total_simulation_time / sample_period_s × (#metrics × #entities)`. +* Prefer a coarser `sample_period_s` for very long runs. + +--- + + + + diff --git a/docs/api/high-level/builder.md b/docs/api/high-level/builder.md new file mode 100644 index 0000000..4a8ff6c --- /dev/null +++ b/docs/api/high-level/builder.md @@ -0,0 +1,288 @@ +# AsyncFlow — Public API Reference: `AsyncFlow` Builder + +`AsyncFlow` is the **fluent builder** that assembles a complete, validated +`SimulationPayload`. It lets you compose workload, topology, edges, and global +settings with clear types and fail-fast validation. The resulting payload can be +run with `SimulationRunner`. + +* **You write:** small, typed building blocks (workload + components + settings) +* **Builder does:** composition & Pydantic validation (graph integrity, rules) +* **Runner does:** execution & metrics collection + +--- + +## Imports + +```python +# Builder + Runner +from asyncflow import AsyncFlow, SimulationRunner + +# Public leaf schemas +from asyncflow.workload import RqsGenerator, RVConfig +from asyncflow.components import Client, Server, Endpoint, Edge, LoadBalancer +from asyncflow.settings import SimulationSettings +``` + +--- + +## Quick start + +```python +import simpy +from asyncflow import AsyncFlow, SimulationRunner +from asyncflow.workload import RqsGenerator, RVConfig +from asyncflow.components import Client, Server, Endpoint, Edge +from asyncflow.settings import SimulationSettings + +# 1) Workload +rqs = RqsGenerator( + id="rqs-1", + avg_active_users=RVConfig(mean=50), # Poisson by default + avg_request_per_minute_per_user=RVConfig(mean=30), # MUST be Poisson +) + +# 2) Components +client = Client(id="client-1") +server = Server( + id="srv-1", + server_resources={"cpu_cores": 1, "ram_mb": 1024}, + endpoints=[ + Endpoint( + endpoint_name="/hello", + steps=[ + {"kind": "ram", "step_operation": {"necessary_ram": 32}}, + {"kind": "initial_parsing", "step_operation": {"cpu_time": 0.002}}, + {"kind": "io_wait", "step_operation": {"io_waiting_time": 0.010}}, + ], + ) + ], +) + +edges = [ + Edge(id="gen-client", source="rqs-1", target="client-1", + latency={"mean": 0.003, "distribution": "exponential"}), + Edge(id="client-srv1", source="client-1", target="srv-1", + latency={"mean": 0.003, "distribution": "exponential"}), + Edge(id="srv1-client", source="srv-1", target="client-1", + latency={"mean": 0.003, "distribution": "exponential"}), +] + +# 3) Settings (baseline sampled metrics are mandatory by design) +settings = SimulationSettings(total_simulation_time=300, sample_period_s=0.01) + +# 4) Build (validates everything) +payload = ( + AsyncFlow() + .add_generator(rqs) + .add_client(client) + .add_servers(server) + .add_edges(*edges) + .add_simulation_settings(settings) + .build_payload() +) + +# 5) Run +env = simpy.Environment() +results = SimulationRunner(env=env, simulation_input=payload).run() +``` + +--- + +## API + +```python +class AsyncFlow: + def add_generator(self, rqs_generator: RqsGenerator) -> Self: ... + def add_client(self, client: Client) -> Self: ... + def add_servers(self, *servers: Server) -> Self: ... + def add_edges(self, *edges: Edge) -> Self: ... + def add_simulation_settings(self, sim_settings: SimulationSettings) -> Self: ... + def add_load_balancer(self, load_balancer: LoadBalancer) -> Self: ... + def build_payload(self) -> SimulationPayload: ... +``` + +### Method details + +* **`add_generator(rqs_generator)`** + Adds the stochastic workload model. + Errors: `TypeError` if not a `RqsGenerator`. + +* **`add_client(client)`** + Adds the single client node. + Errors: `TypeError` if not a `Client`. + +* **`add_servers(*servers)`** + Adds one or more servers (varargs). + Errors: `TypeError` if any arg is not a `Server`. + +* **`add_edges(*edges)`** + Adds one or more directed edges (varargs). + Errors: `TypeError` if any arg is not an `Edge`. + Notes: *Targets must be declared nodes; sources may be external (e.g. `"rqs-1"`).* + +* **`add_load_balancer(load_balancer)`** *(optional)* + Adds a load balancer node. + Errors: `TypeError` if not a `LoadBalancer`. + +* **`add_simulation_settings(sim_settings)`** + Adds global settings (duration, sampling period, metric selection). + Errors: `TypeError` if not a `SimulationSettings`. + +* **`build_payload()` → `SimulationPayload`** + Finalize composition and run full validation. + Errors: `ValueError` on missing parts or invalid graph. + +--- + +## Validation performed by `build_payload()` + +(Implemented via Pydantic model validation across the payload’s schemas.) + +1. **Presence** + + * Requires: generator, client, **≥ 1 server**, **≥ 1 edge**, settings. + +2. **Unique IDs** + + * Duplicate server IDs or edge IDs are rejected. + +3. **Node types** + + * `client`, `server`, and `load_balancer` are fixed enums; enforced per node. + +4. **Edge integrity** + + * Every **target** must be a declared node ID. + * **External IDs** (e.g. the generator id) are allowed **only** as **sources**. + * **No self-loops** (`source != target`). + +5. **Load balancer sanity** (if present) + + * `server_covered ⊆ declared servers`. + * There is an **outgoing edge from the LB to every covered server**. + +6. **Engine rule: no fan-out except LB** + + * Among declared nodes, only the LB may have multiple outgoing edges. + +7. **Latency RV constraints (edges)** + + * `latency.mean > 0`; if `variance` provided, `variance ≥ 0`. + +If any rule fails, a **descriptive `ValueError`** points to the offending field/entity. + +--- + +## Typical errors & how to fix + +* **Missing parts** + `ValueError: The generator input must be instantiated before the simulation` + → Call the missing `add_*` method before `build_payload()`. + +* **Wrong type passed** + `TypeError: All the instances must be of the type Server` + → Ensure you pass `Server` objects to `add_servers(...)` (not dicts). + +* **Unknown edge target** + `ValueError: Edge client-1->srv-X references unknown target node 'srv-X'` + → Add a `Server(id="srv-X", ...)` or fix the edge target. + +* **LB coverage without edges** + `ValueError: Servers ['srv-2'] are covered by LB 'lb-1' but have no outgoing edge from it.` + → Add `Edge(source="lb-1", target="srv-2", ...)`. + +* **Illegal fan-out** + `ValueError: Only the load balancer can have multiple outgoing edges. Offending sources: ['client-1']` + → Route fan-out through a `LoadBalancer`. + +--- + +## Extended example — with Load Balancer + +```python +from asyncflow import AsyncFlow, SimulationRunner +from asyncflow.workload import RqsGenerator, RVConfig +from asyncflow.components import Client, Server, Endpoint, Edge, LoadBalancer +from asyncflow.settings import SimulationSettings +import simpy + +client = Client(id="client-1") + +srv1 = Server( + id="srv-1", + server_resources={"cpu_cores": 1, "ram_mb": 1024}, + endpoints=[Endpoint(endpoint_name="/api", + steps=[{"kind":"ram","step_operation":{"necessary_ram":64}}])], +) +srv2 = Server( + id="srv-2", + server_resources={"cpu_cores": 2, "ram_mb": 2048}, + endpoints=[Endpoint(endpoint_name="/api", + steps=[{"kind":"io_db","step_operation":{"io_waiting_time":0.012}}])], +) + +lb = LoadBalancer(id="lb-1", algorithms="round_robin", server_covered={"srv-1","srv-2"}) + +edges = [ + Edge(id="gen-client", source="rqs-1", target="client-1", + latency={"mean":0.002,"distribution":"exponential"}), + Edge(id="client-lb", source="client-1", target="lb-1", + latency={"mean":0.002,"distribution":"exponential"}), + Edge(id="lb-srv1", source="lb-1", target="srv-1", + latency={"mean":0.002,"distribution":"exponential"}), + Edge(id="lb-srv2", source="lb-1", target="srv-2", + latency={"mean":0.002,"distribution":"exponential"}), + Edge(id="srv1-client", source="srv-1", target="client-1", + latency={"mean":0.003,"distribution":"exponential"}), + Edge(id="srv2-client", source="srv-2", target="client-1", + latency={"mean":0.003,"distribution":"exponential"}), +] + +payload = ( + AsyncFlow() + .add_generator(RqsGenerator( + id="rqs-1", + avg_active_users=RVConfig(mean=120), + avg_request_per_minute_per_user=RVConfig(mean=20), + user_sampling_window=60, + )) + .add_client(client) + .add_servers(srv1, srv2) + .add_load_balancer(lb) + .add_edges(*edges) + .add_simulation_settings(SimulationSettings(total_simulation_time=600, sample_period_s=0.02)) + .build_payload() +) + +env = simpy.Environment() +results = SimulationRunner(env=env, simulation_input=payload).run() +``` + +--- + +## Tips & pitfalls + +* **IDs are case-sensitive** and must be unique per category (servers, edges, LB). +* **Edge targets must be declared nodes.** External IDs (like the generator) can only appear as **sources**. +* **LB fan-out only.** If you need branching, introduce a `LoadBalancer`. +* **RqsGenerator constraints:** + `avg_request_per_minute_per_user` **must** be Poisson; + `avg_active_users` must be **Poisson** or **Normal** (variance auto-filled if missing). +* **Step coherence:** + CPU step → `cpu_time`; RAM step → `necessary_ram`; I/O step → `io_waiting_time`. Exactly **one** per step. + +--- + +## Interop: YAML ↔ Python + +You can build the same payload from YAML and then use `SimulationRunner.from_yaml(...)`. Field names mirror the Python model names and the enum values (strings) are identical. + +--- + +## Versioning & stability + +* Exceptions: `TypeError` for wrong types passed to builder; `ValueError` for invalid or incomplete payloads. +* Validation rules and enum names are part of the public contract (semantic versioning applies). +* The builder does not mutate your objects; it assembles and validates them into a `SimulationPayload`. + + diff --git a/docs/api/high-level/runner.md b/docs/api/high-level/runner.md new file mode 100644 index 0000000..27a0c87 --- /dev/null +++ b/docs/api/high-level/runner.md @@ -0,0 +1,230 @@ +# AsyncFlow — Public API Reference: `SimulationRunner` + +`SimulationRunner` is the **orchestrator** of a simulation run. It takes a fully +validated `SimulationPayload`, instantiates the runtime actors, wires their +connections, starts the processes inside a `simpy.Environment`, collects sampled +metrics, advances the virtual clock, and returns a `ResultsAnalyzer` for +post-run querying and plotting. + +Use it together with the `AsyncFlow` builder (Python) or a YAML payload. + +--- + +## Imports + +```python +from asyncflow import SimulationRunner, AsyncFlow # high-level API +from asyncflow.settings import SimulationSettings +from asyncflow.components import Client, Server, Endpoint, Edge, LoadBalancer +from asyncflow.workload import RqsGenerator, RVConfig +import simpy +``` + +--- + +## Quick start + +```python +# 1) Build a validated payload (see the builder docs for details) +payload = ( + AsyncFlow() + .add_generator(RqsGenerator( + id="rqs-1", + avg_active_users=RVConfig(mean=50), + avg_request_per_minute_per_user=RVConfig(mean=30), + )) + .add_client(Client(id="client-1")) + .add_servers( + Server( + id="srv-1", + server_resources={"cpu_cores": 1, "ram_mb": 1024}, + endpoints=[Endpoint(endpoint_name="/hello", steps=[ + {"kind": "ram", "step_operation": {"necessary_ram": 32}}, + {"kind": "initial_parsing", "step_operation": {"cpu_time": 0.002}}, + {"kind": "io_wait", "step_operation": {"io_waiting_time": 0.010}}, + ])], + ) + ) + .add_edges( + Edge(id="gen-client", source="rqs-1", target="client-1", + latency={"mean": 0.003, "distribution": "exponential"}), + Edge(id="client-srv1", source="client-1", target="srv-1", + latency={"mean": 0.003, "distribution": "exponential"}), + Edge(id="srv1-client", source="srv-1", target="client-1", + latency={"mean": 0.003, "distribution": "exponential"}), + ) + .add_simulation_settings(SimulationSettings(total_simulation_time=300, sample_period_s=0.01)) + .build_payload() +) + +# 2) Run +env = simpy.Environment() +results = SimulationRunner(env=env, simulation_input=payload).run() + +# 3) Analyze +print(results.get_latency_stats()) +ts, rps = results.get_throughput_series() +sampled = results.get_sampled_metrics() +``` + +--- + +## Class reference + +```python +class SimulationRunner: + def __init__(self, *, env: simpy.Environment, simulation_input: SimulationPayload) -> None: ... + def run(self) -> ResultsAnalyzer: ... + @classmethod + def from_yaml(cls, *, env: simpy.Environment, yaml_path: str | Path) -> "SimulationRunner": ... +``` + +### Parameters + +* **`env: simpy.Environment`** + The SimPy environment that controls virtual time. You own its lifetime. + +* **`simulation_input: SimulationPayload`** + A fully validated payload (typically created with `AsyncFlow.build_payload()` or + parsed from YAML). It contains workload, topology graph, and settings. + +### Returns + +* **`run() -> ResultsAnalyzer`** + A results façade exposing: + + * `get_latency_stats() -> dict` (mean, median, p95, p99, …) + * `get_throughput_series() -> (timestamps, rps)` + * `get_sampled_metrics() -> dict[str, dict[str, list[float]]]` + * plotting helpers: `plot_latency_distribution(ax)`, `plot_throughput(ax)`, + `plot_server_queues(ax)`, `plot_ram_usage(ax)` + +### Convenience: YAML entry point + +```python +env = simpy.Environment() +runner = SimulationRunner.from_yaml(env=env, yaml_path="scenario.yml") +results = runner.run() +``` + +`from_yaml` uses `yaml.safe_load` and validates with the same Pydantic schemas, +so it enforces the exact same contract as the builder. + +--- + +## Lifecycle & internal phases + +`run()` performs the following steps: + +1. **Build runtimes** + + * `RqsGeneratorRuntime` (workload) + * `ClientRuntime` + * `ServerRuntime` for each server (CPU/RAM resources bound) + * `LoadBalancerRuntime` (optional) + +2. **Wire edges** + Creates an `EdgeRuntime` for each edge and assigns the appropriate *inbox* + (`simpy.Store`) of the target actor. Sets the `out_edge` (or `out_edges` for + the load balancer) on the source actor. + +3. **Start processes** + Registers every actor’s `.start()` coroutine in the environment and starts the + **SampledMetricCollector** that snapshots: + + * server **ready queue length**, **I/O queue length**, **RAM in use** + * edge **concurrent connections** + at the configured `sample_period_s`. These sampled metrics are **mandatory** + in this version. + +4. **Advance the clock** + `env.run(until=SimulationSettings.total_simulation_time)` + +5. **Return analyzer** + Wraps the collected state into `ResultsAnalyzer` for stats & plots. + +--- + +## Input contract (what the runner expects) + +The runner assumes `simulation_input` has already passed full validation: + +* All edge targets are declared nodes; external IDs appear only as sources. +* Load balancer coverage and edges are coherent. +* No self-loops; only the LB fans out among declared nodes. +* Edge latency RVs have `mean > 0` (and `variance ≥ 0` if provided). +* Server resources meet minimums (≥ 1 core, ≥ 256 MB RAM), etc. + +> Build with `AsyncFlow` or load from YAML — both paths enforce the same rules. + +--- + +## Error handling + +* **Type errors (builder misuse)** should not reach the runner; they’re raised by the builder (`TypeError`) before `build_payload()`. +* **Validation errors** (`ValueError`) are raised during payload construction/validation, not by the runner. +* **Wiring errors** (`TypeError`) are guarded by validation and indicate an unexpected mismatch between payload and runtime types. With a validated payload, you shouldn’t see them. + +--- + +## Determinism & RNG + +The runner uses `numpy.random.default_rng()` internally. Seeding is not yet a +public parameter; exact reproducibility across runs is **not guaranteed** in +this version. If you need strict reproducibility, pin your environment and keep +payloads identical; a dedicated seeding hook may be added in a future release. + +--- + +## Performance characteristics + +* **Runtime cost** scales with the number of requests and the complexity of + endpoint steps (CPU vs I/O waits). +* **Sampling memory** roughly scales as + `(#entities × #enabled sampled metrics) × (total_simulation_time / sample_period_s)`. + For long runs, consider a larger `sample_period_s` (e.g., `0.02–0.05`) to + reduce the size of time series. +* The collector is a single coroutine that performs `O(entities)` appends on + each tick; the hot path inside actors remains `O(1)` per event. + +--- + +## Usage with Load Balancers + +Topologies **with a LB** are first-class: + +* Only the LB may have multiple outgoing edges (fan-out). +* The analyzer operates on **lists** of servers and edges; plots will naturally + show one line per server/edge where appropriate. +* Validation ensures every `server_covered` by the LB has a corresponding LB→server edge. + +--- + +## One-shot runner (recommended) + +A `SimulationRunner` instance is designed to **run once**. For a new scenario +(or new settings), create a **new** `simpy.Environment` and a **new** +`SimulationRunner`. Reusing a runner after `run()` is not supported. + +--- + +## Best practices + +* **Let the builder fail fast.** Always construct payloads via `AsyncFlow` (or YAML + validation) before running. +* **Keep steps coherent.** CPU step → `cpu_time`, RAM step → `necessary_ram`, I/O step → `io_waiting_time`. Exactly one key per step. +* **Model the network realistically.** Put latency RVs on **every** hop that matters (client↔LB, LB↔server, server↔client). +* **Tune sampling.** High-frequency sampling is useful for short diagnostic runs; increase `sample_period_s` for long capacity sweeps. + +--- + +## See also + +* **Builder:** `AsyncFlow` — compose and validate the payload (workload, topology, settings). +* **Analyzer:** `ResultsAnalyzer` — query KPIs and plot latency/throughput/queues/RAM. +* **Workload:** `RqsGenerator`, `RVConfig` — define traffic models (Poisson or Gaussian–Poisson). +* **Components:** `Client`, `Server`, `Endpoint`, `Edge`, `LoadBalancer`. + +This API keeps **assembly** and **execution** separate: you design and validate +your system with `AsyncFlow`, then hand it to `SimulationRunner` to execute and +measure — a clean workflow that scales from minimal examples to complex, +load-balanced topologies. diff --git a/docs/dev-workflow-guide.md b/docs/guides/dev-workflow.md similarity index 99% rename from docs/dev-workflow-guide.md rename to docs/guides/dev-workflow.md index ff0e3ce..c5d06ef 100644 --- a/docs/dev-workflow-guide.md +++ b/docs/guides/dev-workflow.md @@ -1,6 +1,6 @@ # **Development Workflow & Architecture Guide** -This document describes the development workflow, repository architecture, and branching strategy for **AsyncFlow** +This document describes the development workflow, repository architecture, branching strategy and CI/CD for **AsyncFlow** --- ## 1) Repository Layout diff --git a/docs/index.md b/docs/index.md index 651a0ee..a4affc2 100644 --- a/docs/index.md +++ b/docs/index.md @@ -25,6 +25,7 @@ AsyncFlow is a discrete-event simulator for Python async backends (FastAPI/Uvico * **[Builder Guide](guides/builder.md)** — Programmatically assemble a `SimulationPayload` in Python with validation and examples. * **[YAML Input Guide](guides/yaml-builder.md)** — Author scenarios in YAML: exact schema, units, constraints, runnable samples. +* **[Dev workflow Guide](guides/dev-workflow.md)** — Describes the development workflow, repository architecture, branching strategy and CI/CD for **AsyncFlow** --- diff --git a/examples/builder_input/load_balancer/two_servers.png b/examples/builder_input/load_balancer/two_servers.png new file mode 100644 index 0000000..11ceaf3 Binary files /dev/null and b/examples/builder_input/load_balancer/two_servers.png differ diff --git a/examples/builder_input/load_balancer/two_servers.py b/examples/builder_input/load_balancer/two_servers.py new file mode 100644 index 0000000..d179959 --- /dev/null +++ b/examples/builder_input/load_balancer/two_servers.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python3 +""" +Didactic example: build and run an AsyncFlow scenario **with a Load Balancer** +and two backend servers, using the builder (AsyncFlow) — no YAML. + +Topology: + generator ──> client ──> LB ──> srv-1 + └─> srv-2 + srv-1 ──> client + srv-2 ──> client + +Load: + ~120 active users, 20 req/min each (Poisson by default). + +Servers: + srv-1: 1 CPU core, 1GB RAM, endpoint with CPU→RAM→IO + srv-2: 2 CPU cores, 2GB RAM, endpoint with RAM→IO(DB-like) + +Network: + 2–3ms mean (exponential) latency on each edge. + +What this script does: + 1) Build Pydantic models (generator, client, LB, servers, edges, settings). + 2) Compose the SimulationPayload via AsyncFlow (builder pattern). + 3) Run the simulation with SimulationRunner. + 4) Print latency stats, throughput timeline, and a sampled-metrics preview. + 5) Save a 2×2 plot figure (latency, throughput, server queues, RAM). +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Iterable, List, Mapping, TYPE_CHECKING + +import numpy as np +import simpy + +# ── AsyncFlow domain imports (match your working paths) ──────────────────────── +from asyncflow.builder.asyncflow_builder import AsyncFlow +from asyncflow.runtime.simulation_runner import SimulationRunner +from asyncflow.metrics.analyzer import ResultsAnalyzer +from asyncflow.schemas.payload import SimulationPayload +from asyncflow.schemas.workload.rqs_generator import RqsGenerator +from asyncflow.schemas.settings.simulation import SimulationSettings +from asyncflow.schemas.topology.endpoint import Endpoint +from asyncflow.schemas.topology.nodes import Client, Server, LoadBalancer +from asyncflow.schemas.topology.edges import Edge +from asyncflow.config.constants import LatencyKey, SampledMetricName + + + + +# ───────────────────────────────────────────────────────────── +# Pretty printers (compact, readable output) +# ───────────────────────────────────────────────────────────── +def print_latency_stats(res: ResultsAnalyzer) -> None: + stats: Mapping[LatencyKey, float] = res.get_latency_stats() + print("\n════════ LATENCY STATS ════════") + if not stats: + print("(empty)") + return + + order: List[LatencyKey] = [ + LatencyKey.TOTAL_REQUESTS, + LatencyKey.MEAN, + LatencyKey.MEDIAN, + LatencyKey.STD_DEV, + LatencyKey.P95, + LatencyKey.P99, + LatencyKey.MIN, + LatencyKey.MAX, + ] + for key in order: + if key in stats: + print(f"{key.name:<20} = {stats[key]:.6f}") + + +def print_throughput(res: ResultsAnalyzer) -> None: + timestamps, rps = res.get_throughput_series() + print("\n════════ THROUGHPUT (req/sec) ════════") + if not timestamps: + print("(empty)") + return + for t, rate in zip(timestamps, rps): + print(f"t={t:4.1f}s → {rate:6.2f} rps") + + +def print_sampled_preview(res: ResultsAnalyzer) -> None: + sampled = res.get_sampled_metrics() + print("\n════════ SAMPLED METRICS (preview) ════════") + if not sampled: + print("(empty)") + return + + # Keys may be enums or strings depending on your analyzer; handle both. + def _name(m): # pragma: no cover + return m.name if hasattr(m, "name") else str(m) + + for metric, series in sampled.items(): + print(f"\n📈 {_name(metric)}:") + for entity, vals in series.items(): + head = list(vals[:5]) if vals else [] + print(f" - {entity}: len={len(vals)}, first={head}") + + +# ───────────────────────────────────────────────────────────── +# Tiny helpers for sanity checks (optional) +# ───────────────────────────────────────────────────────────── +def _mean(series: Iterable[float]) -> float: + arr = np.asarray(list(series), dtype=float) + return float(np.mean(arr)) if arr.size else 0.0 + + +def run_sanity_checks( + runner: SimulationRunner, + res: ResultsAnalyzer, +) -> None: + print("\n════════ SANITY CHECKS (rough) ════════") + w = runner.simulation_input.rqs_input + lam_rps = ( + float(w.avg_active_users.mean) + * float(w.avg_request_per_minute_per_user.mean) + / 60.0 + ) + + # Observed throughput + _, rps_series = res.get_throughput_series() + rps_observed = _mean(rps_series) + print( + f"• Mean throughput (rps) expected≈{lam_rps:.3f} " + f"observed={rps_observed:.3f}" + ) + + sampled = res.get_sampled_metrics() + ram_series = sampled.get(SampledMetricName.RAM_IN_USE, {}) + ioq_series = sampled.get(SampledMetricName.EVENT_LOOP_IO_SLEEP, {}) + ready_series = sampled.get(SampledMetricName.READY_QUEUE_LEN, {}) + + ram_mean = _mean([_mean(v) for v in ram_series.values()]) if ram_series else 0.0 + ioq_mean = _mean([_mean(v) for v in ioq_series.values()]) if ioq_series else 0.0 + ready_mean = _mean([_mean(v) for v in ready_series.values()]) if ready_series else 0.0 + + print(f"• Mean RAM in use (MB) observed={ram_mean:.3f}") + print(f"• Mean I/O queue length observed={ioq_mean:.3f}") + print(f"• Mean ready queue length observed={ready_mean:.3f}") + + +# ───────────────────────────────────────────────────────────── +# Build the LB + 2 servers scenario via AsyncFlow (builder) +# ───────────────────────────────────────────────────────────── +def build_payload_with_lb() -> SimulationPayload: + """ + Construct the SimulationPayload programmatically using the builder: + - Generator (120 users, 20 rpm each) + - Client + - Load balancer (round_robin) covering two servers + - Two servers with distinct endpoints + - Edges for all hops (gen→client, client→lb, lb→srv1/2, srv1/2→client) + - Simulation settings: 600s total, sample period 20ms + """ + # 1) Request generator + generator = RqsGenerator( + id="rqs-1", + avg_active_users={"mean": 120}, # Poisson default + avg_request_per_minute_per_user={"mean": 20}, # MUST be Poisson + user_sampling_window=60, + ) + + # 2) Client + client = Client(id="client-1") + + # 3) Servers with distinct endpoints + ep_srv1 = Endpoint( + endpoint_name="/api", + # include 'probability' if your Endpoint schema supports it + probability=1.0, # remove if your Endpoint doesn't have this field + steps=[ + {"kind": "initial_parsing", "step_operation": {"cpu_time": 0.002}}, + {"kind": "ram", "step_operation": {"necessary_ram": 64}}, + {"kind": "io_wait", "step_operation": {"io_waiting_time": 0.012}}, + ], + ) + srv1 = Server( + id="srv-1", + server_resources={"cpu_cores": 1, "ram_mb": 1024}, + endpoints=[ep_srv1], + ) + + ep_srv2 = Endpoint( + endpoint_name="/api", + probability=1.0, # remove if not supported in your schema + steps=[ + {"kind": "ram", "step_operation": {"necessary_ram": 96}}, + {"kind": "io_db", "step_operation": {"io_waiting_time": 0.020}}, + ], + ) + srv2 = Server( + id="srv-2", + server_resources={"cpu_cores": 2, "ram_mb": 2048}, + endpoints=[ep_srv2], + ) + + # 4) Load balancer (round_robin) + lb = LoadBalancer( + id="lb-1", + algorithms="round_robin", + server_covered={"srv-1", "srv-2"}, + ) + + # 5) Edges with exponential latency (2–3 ms) + edges = [ + Edge( + id="gen-client", + source="rqs-1", + target="client-1", + latency={"mean": 0.003, "distribution": "exponential"}, + ), + Edge( + id="client-lb", + source="client-1", + target="lb-1", + latency={"mean": 0.002, "distribution": "exponential"}, + ), + Edge( + id="lb-srv1", + source="lb-1", + target="srv-1", + latency={"mean": 0.002, "distribution": "exponential"}, + ), + Edge( + id="lb-srv2", + source="lb-1", + target="srv-2", + latency={"mean": 0.002, "distribution": "exponential"}, + ), + Edge( + id="srv1-client", + source="srv-1", + target="client-1", + latency={"mean": 0.003, "distribution": "exponential"}, + ), + Edge( + id="srv2-client", + source="srv-2", + target="client-1", + latency={"mean": 0.003, "distribution": "exponential"}, + ), + ] + + # 6) Simulation settings + settings = SimulationSettings( + total_simulation_time=600, + sample_period_s=0.02, + enabled_sample_metrics=[ + "ready_queue_len", + "event_loop_io_sleep", + "ram_in_use", + "edge_concurrent_connection", + ], + enabled_event_metrics=["rqs_clock"], + ) + + # 7) Assemble the payload via the builder + flow = ( + AsyncFlow() + .add_generator(generator) + .add_client(client) + .add_servers(srv1, srv2) + .add_load_balancer(lb) + .add_edges(*edges) + .add_simulation_settings(settings) + ) + + return flow.build_payload() + + +# ───────────────────────────────────────────────────────────── +# Main entry-point +# ───────────────────────────────────────────────────────────── +def main() -> None: + """ + Build → wire → run the simulation, then print diagnostics and save plots. + """ + env = simpy.Environment() + payload = build_payload_with_lb() + + runner = SimulationRunner(env=env, simulation_input=payload) + results: ResultsAnalyzer = runner.run() + + # Human-friendly diagnostics + print_latency_stats(results) + print_throughput(results) + print_sampled_preview(results) + + # Optional sanity checks (very rough) + run_sanity_checks(runner, results) + + # Save plots (2×2 figure) + try: + from matplotlib import pyplot as plt # noqa: PLC0415 + + fig, axes = plt.subplots(2, 2, figsize=(12, 8)) + results.plot_latency_distribution(axes[0, 0]) + results.plot_throughput(axes[0, 1]) + results.plot_server_queues(axes[1, 0]) + results.plot_ram_usage(axes[1, 1]) + fig.tight_layout() + + out_path = Path(__file__).parent / "two_servers.png" + fig.savefig(out_path) + print(f"\n🖼️ Plots saved to: {out_path}") + except Exception as exc: # Matplotlib not installed or plotting failed + print(f"\n[plotting skipped] {exc!r}") + + +if __name__ == "__main__": + main() diff --git a/examples/builder_input/single_server/single_server.png b/examples/builder_input/single_server/single_server.png new file mode 100644 index 0000000..f723f7c Binary files /dev/null and b/examples/builder_input/single_server/single_server.png differ diff --git a/examples/single_server_pybuilder.py b/examples/builder_input/single_server/single_server.py similarity index 99% rename from examples/single_server_pybuilder.py rename to examples/builder_input/single_server/single_server.py index cd92121..6088a12 100644 --- a/examples/single_server_pybuilder.py +++ b/examples/builder_input/single_server/single_server.py @@ -283,7 +283,7 @@ def main() -> None: results.plot_ram_usage(axes[1, 1]) fig.tight_layout() - out_path = Path(__file__).parent / "single_server_builder.png" + out_path = Path(__file__).parent / "single_server.png" fig.savefig(out_path) print(f"\n🖼️ Plots saved to: {out_path}") except Exception as exc: # Matplotlib not installed or plotting failed diff --git a/examples/single_server_builder.png b/examples/single_server_builder.png deleted file mode 100644 index 2a4d21e..0000000 Binary files a/examples/single_server_builder.png and /dev/null differ diff --git a/examples/single_server_pybuilder.png b/examples/single_server_pybuilder.png deleted file mode 100644 index 20166b0..0000000 Binary files a/examples/single_server_pybuilder.png and /dev/null differ diff --git a/examples/single_server_yml.png b/examples/single_server_yml.png deleted file mode 100644 index 76c2115..0000000 Binary files a/examples/single_server_yml.png and /dev/null differ diff --git a/examples/data/single_server.yml b/examples/yaml_input/data/single_server.yml similarity index 100% rename from examples/data/single_server.yml rename to examples/yaml_input/data/single_server.yml diff --git a/examples/yaml_input/data/two_servers_lb.yml b/examples/yaml_input/data/two_servers_lb.yml new file mode 100644 index 0000000..9b71943 --- /dev/null +++ b/examples/yaml_input/data/two_servers_lb.yml @@ -0,0 +1,96 @@ +# AsyncFlow SimulationPayload (LB + 2 servers, realistic steps) + +rqs_input: + id: "rqs-1" + avg_active_users: + mean: 120 + avg_request_per_minute_per_user: + mean: 20 + user_sampling_window: 60 + +topology_graph: + nodes: + client: + id: "client-1" + + servers: + - id: "srv-1" + server_resources: + cpu_cores: 1 + ram_mb: 1024 + endpoints: + - endpoint_name: "/api" + steps: + - kind: "initial_parsing" + step_operation: + cpu_time: 0.002 + - kind: "ram" + step_operation: + necessary_ram: 64 + - kind: "io_wait" + step_operation: + io_waiting_time: 0.012 + + - id: "srv-2" + server_resources: + cpu_cores: 2 + ram_mb: 2048 + endpoints: + - endpoint_name: "/api" + steps: + - kind: "ram" + step_operation: + necessary_ram: 96 + - kind: "io_db" + step_operation: + io_waiting_time: 0.020 + - kind: "cpu_bound_operation" # <-- was 'final_processing' (invalid) + step_operation: + cpu_time: 0.001 + + load_balancer: + id: "lb-1" + algorithms: "round_robin" + server_covered: ["srv-1", "srv-2"] + + edges: + - id: "gen-client" + source: "rqs-1" + target: "client-1" + latency: { mean: 0.003, distribution: "exponential" } + + - id: "client-lb" + source: "client-1" + target: "lb-1" + latency: { mean: 0.002, distribution: "exponential" } + + - id: "lb-srv1" + source: "lb-1" + target: "srv-1" + latency: { mean: 0.002, distribution: "exponential" } + + - id: "lb-srv2" + source: "lb-1" + target: "srv-2" + latency: { mean: 0.002, distribution: "exponential" } + + - id: "srv1-client" + source: "srv-1" + target: "client-1" + latency: { mean: 0.003, distribution: "exponential" } + + - id: "srv2-client" + source: "srv-2" + target: "client-1" + latency: { mean: 0.003, distribution: "exponential" } + +sim_settings: + total_simulation_time: 600 + sample_period_s: 0.02 + enabled_sample_metrics: + - "ready_queue_len" + - "event_loop_io_sleep" + - "ram_in_use" + - "edge_concurrent_connection" + enabled_event_metrics: + - "rqs_clock" diff --git a/examples/yaml_input/load_balancer/two_servers.png b/examples/yaml_input/load_balancer/two_servers.png new file mode 100644 index 0000000..9ee7796 Binary files /dev/null and b/examples/yaml_input/load_balancer/two_servers.png differ diff --git a/examples/yaml_input/load_balancer/two_servers.py b/examples/yaml_input/load_balancer/two_servers.py new file mode 100644 index 0000000..afc4408 --- /dev/null +++ b/examples/yaml_input/load_balancer/two_servers.py @@ -0,0 +1,283 @@ +#!/usr/bin/env python3 +""" +Run an AsyncFlow scenario with a Load Balancer (2 servers) from YAML and print diagnostics. + +What it does: +- Loads the simulation payload from YAML via `SimulationRunner.from_yaml`. +- Runs the simulation. +- Prints latency stats, 1s-bucket throughput, and a preview of sampled metrics. +- Saves four plots (latency histogram, throughput, server queues, RAM). +- Performs sanity checks (expected vs observed) with simple LB-aware heuristics. + +Usage: + python src/app/example/run_lb_from_yaml.py \ + --yaml src/app/example/data/two_servers_lb.yml +""" + +from __future__ import annotations + +from argparse import ArgumentParser +from pathlib import Path +from typing import Dict, Iterable, List, Mapping, Tuple + +import matplotlib.pyplot as plt +import numpy as np +import simpy + +from asyncflow.config.constants import ( # only for basic step-kind/ops inspection + EndpointStepCPU, + EndpointStepIO, + EndpointStepRAM, + LatencyKey, + StepOperation, +) +from asyncflow.metrics.analyzer import ResultsAnalyzer +from asyncflow.runtime.simulation_runner import SimulationRunner + + +# ───────────────────────────────────────────────────────────── +# Pretty printers (same style as your single-server script) +# ───────────────────────────────────────────────────────────── +def print_latency_stats(res: ResultsAnalyzer) -> None: + """Print latency statistics returned by the analyzer.""" + stats: Mapping[LatencyKey, float] = res.get_latency_stats() + print("\n════════ LATENCY STATS ════════") + if not stats: + print("(empty)") + return + + order: List[LatencyKey] = [ + LatencyKey.TOTAL_REQUESTS, + LatencyKey.MEAN, + LatencyKey.MEDIAN, + LatencyKey.STD_DEV, + LatencyKey.P95, + LatencyKey.P99, + LatencyKey.MIN, + LatencyKey.MAX, + ] + for key in order: + if key in stats: + print(f"{key.name:<20} = {stats[key]:.6f}") + + +def print_throughput(res: ResultsAnalyzer) -> None: + """Print 1-second throughput buckets.""" + timestamps, rps = res.get_throughput_series() + print("\n════════ THROUGHPUT (req/sec) ════════") + if not timestamps: + print("(empty)") + return + + for t, rate in zip(timestamps, rps): + print(f"t={t:4.1f}s → {rate:6.2f} rps") + + +def print_sampled_preview(res: ResultsAnalyzer) -> None: + """Print first 5 samples of each sampled metric series.""" + sampled: Dict[str, Dict[str, List[float]]] = res.get_sampled_metrics() + print("\n════════ SAMPLED METRICS ════════") + if not sampled: + print("(empty)") + return + + for metric, series in sampled.items(): + print(f"\n📈 {metric}:") + for entity, vals in series.items(): + head = list(vals[:5]) if vals else [] + print(f" - {entity}: len={len(vals)}, first={head}") + + +# ───────────────────────────────────────────────────────────── +# Plotting +# ───────────────────────────────────────────────────────────── +def save_all_plots(res: ResultsAnalyzer, out_path: Path) -> None: + """Generate the 2x2 plot figure and save it to `out_path`.""" + fig, axes = plt.subplots(2, 2, figsize=(12, 8)) + res.plot_latency_distribution(axes[0, 0]) + res.plot_throughput(axes[0, 1]) + res.plot_server_queues(axes[1, 0]) + res.plot_ram_usage(axes[1, 1]) + fig.tight_layout() + fig.savefig(out_path) + print(f"\n🖼️ Plots saved to: {out_path}") + + +# ───────────────────────────────────────────────────────────── +# Sanity checks (LB-aware, still rough) +# ───────────────────────────────────────────────────────────── + +def run_sanity_checks(runner: SimulationRunner, res: ResultsAnalyzer) -> None: + """ + Sanity checks LB-aware (round-robin): observed vs expected + """ + from asyncflow.config.constants import ( + EndpointStepCPU, EndpointStepIO, EndpointStepRAM, StepOperation, LatencyKey + ) + import numpy as np + + def _mean(arr): + a = np.asarray(list(arr), dtype=float) + return float(a.mean()) if a.size else 0.0 + + # 1) λ + w = runner.simulation_input.rqs_input + lam = float(w.avg_active_users.mean) * float(w.avg_request_per_minute_per_user.mean) / 60.0 + + topo = runner.simulation_input.topology_graph + servers = {s.id: s for s in topo.nodes.servers} + client_id = topo.nodes.client.id + lb = topo.nodes.load_balancer + lb_id = lb.id if lb else None + gen_id = runner.simulation_input.rqs_input.id + + # 2) LB (round_robin -> 1/N) + if lb and lb.server_covered: + covered = [sid for sid in lb.server_covered if sid in servers] + N = max(1, len(covered)) + shares = {sid: 1.0 / N for sid in covered} + else: + only = next(iter(servers.keys())) + shares = {only: 1.0} + + # 3) endpoint totals per server + def endpoint_totals(server): + cpu_s = io_s = ram_mb = 0.0 + for ep in getattr(server, "endpoints", []) or []: + prob = getattr(ep, "probability", 1.0) + for step in ep.steps: + k = step.kind + op = step.step_operation + if isinstance(k, EndpointStepCPU): + cpu_s += prob * float(op[StepOperation.CPU_TIME]) + elif isinstance(k, EndpointStepIO): + io_s += prob * float(op[StepOperation.IO_WAITING_TIME]) + elif isinstance(k, EndpointStepRAM): + ram_mb += prob * float(op[StepOperation.NECESSARY_RAM]) + return cpu_s, io_s, ram_mb + + per_srv = {sid: endpoint_totals(srv) for sid, srv in servers.items()} + + # 4) mappa latencies of edges per role (source,target) + mean_gen_client = 0.0; id_gen_client = None + mean_client_lb = 0.0; id_client_lb = None + mean_lb_srv = {} # sid -> mean + mean_srv_client = {} # sid -> mean + id_lb_srv = {} # sid -> edge_id + id_srv_client = {} # sid -> edge_id + + for e in topo.edges: + s, t, mu = e.source, e.target, float(e.latency.mean) + if s == gen_id and t == client_id: + mean_gen_client = mu; id_gen_client = e.id + elif s == client_id and lb_id and t == lb_id: + mean_client_lb = mu; id_client_lb = e.id + elif lb_id and s == lb_id and t in servers: + mean_lb_srv[t] = mu; id_lb_srv[t] = e.id + elif s in servers and t == client_id: + mean_srv_client[s] = mu; id_srv_client[s] = e.id + + # 5) expected: average latencies + cpu_exp = sum(shares[sid] * per_srv[sid][0] for sid in shares) + io_exp = sum(shares[sid] * per_srv[sid][1] for sid in shares) + net_exp = ( + mean_gen_client + mean_client_lb + + sum(shares[sid] * (mean_lb_srv.get(sid, 0.0) + mean_srv_client.get(sid, 0.0)) for sid in shares) + ) + latency_expected = cpu_exp + io_exp + net_exp + + # 6) observed: throughput & latencies + stats = res.get_latency_stats() + latency_observed = float(stats.get(LatencyKey.MEAN, 0.0)) + _, rps_series = res.get_throughput_series() + rps_observed = _mean(rps_series) + + # 7) expected: RAM e I/O queue as a sum over server + ram_expected = sum((shares[sid] * lam) * (per_srv[sid][0] + per_srv[sid][1]) * per_srv[sid][2] for sid in shares) + ioq_expected = sum((shares[sid] * lam) * per_srv[sid][1] for sid in shares) + + # 8) observed: RAM (sum) and I/O queue sum + sampled = res.get_sampled_metrics() + ram_series = sampled.get("ram_in_use", {}) + ioq_series = sampled.get("event_loop_io_sleep", {}) + ram_observed = sum(_mean(vals) for vals in ram_series.values()) if ram_series else 0.0 + ioq_observed = sum(_mean(vals) for vals in ioq_series.values()) if ioq_series else 0.0 + + # 9) print + REL_TOL = 0.30 + def tick(label, exp, obs): + delta = (abs(obs - exp) / abs(exp)) if exp else 0.0 + icon = "✓" if delta <= REL_TOL else "⚠" + print(f"{icon} {label:<28} expected≈{exp:.3f} observed={obs:.3f} Δ={delta*100:.1f}%") + + print("\n════════ SANITY CHECKS (LB-aware) ════════") + tick("Mean throughput (rps)", lam, rps_observed) + tick("Mean latency (s)", latency_expected, latency_observed) + tick("Mean RAM (MB)", ram_expected, ram_observed) + tick("Mean I/O queue", ioq_expected, ioq_observed) + + # 10) Edge concurrency estimation + edge_conc = sampled.get("edge_concurrent_connection", {}) + if edge_conc: + print("\n— Edge concurrency (LB-aware) —") + means_obs = {eid: _mean(vals) for eid, vals in edge_conc.items()} + + if id_gen_client: + tick(f"edge {id_gen_client}", lam * mean_gen_client, means_obs.get(id_gen_client, 0.0)) + if id_client_lb: + tick(f"edge {id_client_lb}", lam * mean_client_lb, means_obs.get(id_client_lb, 0.0)) + + for sid, p in shares.items(): + lam_i = p * lam + eid = id_lb_srv.get(sid) + if eid: + tick(f"edge {eid}", lam_i * mean_lb_srv.get(sid, 0.0), means_obs.get(eid, 0.0)) + eid = id_srv_client.get(sid) + if eid: + tick(f"edge {eid}", lam_i * mean_srv_client.get(sid, 0.0), means_obs.get(eid, 0.0)) + + # Extra + print("\n— Diagnostics —") + print("λ={:.3f} rps | E[cpu]={:.3f}s E[io]={:.3f}s E[net]≈{:.3f}s | E[RAM/req]={:.1f} MB" + .format(lam, cpu_exp, io_exp, net_exp, sum(shares[sid]*per_srv[sid][2] for sid in shares))) + + + +# ───────────────────────────────────────────────────────────── +# Main +# ───────────────────────────────────────────────────────────── +def main() -> None: + """Parse args, run simulation, print/plot, sanity-check (LB topology).""" + parser = ArgumentParser(description="Run AsyncFlow LB scenario from YAML and print outputs + sanity checks.") + parser.add_argument( + "--yaml", + type=Path, + default=Path(__file__).parent.parent / "data" / "two_servers_lb.yml", + help="Path to the simulation YAML file.", + ) + parser.add_argument( + "--out", + type=Path, + default=Path(__file__).parent / "two_servers.png", + help="Path to the output image (plots).", + ) + args = parser.parse_args() + + yaml_path: Path = args.yaml + if not yaml_path.exists(): + raise FileNotFoundError(f"YAML not found: {yaml_path}") + + env = simpy.Environment() + runner = SimulationRunner.from_yaml(env=env, yaml_path=yaml_path) + results: ResultsAnalyzer = runner.run() + + print_latency_stats(results) + print_throughput(results) + print_sampled_preview(results) + + run_sanity_checks(runner, results) + save_all_plots(results, args.out) + + +if __name__ == "__main__": + main() diff --git a/examples/yaml_input/single_server/single_server.png b/examples/yaml_input/single_server/single_server.png new file mode 100644 index 0000000..c03f9b2 Binary files /dev/null and b/examples/yaml_input/single_server/single_server.png differ diff --git a/examples/single_server_yaml.py b/examples/yaml_input/single_server/single_server.py similarity index 98% rename from examples/single_server_yaml.py rename to examples/yaml_input/single_server/single_server.py index fb20d50..004cd0c 100644 --- a/examples/single_server_yaml.py +++ b/examples/yaml_input/single_server/single_server.py @@ -237,13 +237,13 @@ def main() -> None: parser.add_argument( "--yaml", type=Path, - default=Path(__file__).parent / "data" /"single_server.yml", + default=Path(__file__).parent.parent / "data" /"single_server.yml", help="Path to the simulation YAML file.", ) parser.add_argument( "--out", type=Path, - default=Path(__file__).parent / "single_server_yml.png", + default=Path(__file__).parent / "single_server.png", help="Path to the output image (plots).", ) args = parser.parse_args() diff --git a/examples/your_example.py b/examples/yaml_input/your_example.py similarity index 100% rename from examples/your_example.py rename to examples/yaml_input/your_example.py diff --git a/pyproject.toml b/pyproject.toml index 1348af6..6566c84 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,14 +1,41 @@ [tool.poetry] -name = "AsyncFlow" +name = "asyncflow-simulator" version = "0.1.0" -description = "Simulate distributed system for what if analysis for the capacity planning" +description = "Digital-twin simulator for distributed async systems. Build what-if scenarios and quantify capacity, latency and throughput offline—before you deploy." authors = ["Gioele Botta"] readme = "README.md" - +license = "MIT" packages = [ { include = "asyncflow", from = "src" } ] +include = ["LICENSE", "src/asyncflow/py.typed"] +exclude = ["tests", "docs", "scripts", ".github", "alembic"] + +# Better pypi discovery +keywords = [ + "simulation", "simpy", "asyncio", "capacity-planning", "performance", + "fastapi", "uvicorn", "distributed-systems", "queuing-theory" +] + +homepage = "https://github.com//AsyncFlow-Simulator" # TO COMPLETE +repository = "https://github.com//AsyncFlow-Simulator" # TO COMPLETE +documentation = "https://github.com//AsyncFlow-Simulator/tree/main/docs" # TO COMPLETE + +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Intended Audience :: Science/Research", + "Topic :: Scientific/Engineering :: Simulation", + "Topic :: System :: Distributed Computing", + "Topic :: Software Development :: Testing", + "Topic :: System :: Benchmark", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3.12", + "Typing :: Typed", + "Operating System :: OS Independent", +] + [tool.poetry.dependencies] python = "^3.12" pydantic-settings = "^2.10.1" @@ -96,4 +123,4 @@ ignore_missing_imports = true warn_unused_ignores = true show_error_codes = true pretty = true -exclude = ["^alembic/", "^.venv/"] +exclude = ["^.venv/"] diff --git a/src/asyncflow/__init__.py b/src/asyncflow/__init__.py index 5bdbdea..0f38c83 100644 --- a/src/asyncflow/__init__.py +++ b/src/asyncflow/__init__.py @@ -4,4 +4,4 @@ from asyncflow.builder.asyncflow_builder import AsyncFlow from asyncflow.runtime.simulation_runner import SimulationRunner -__all__ = ["AsyncFlow", "SimulationRunner"] +__all__ = ["AsyncFlow", "SimulationRunner"] diff --git a/src/asyncflow/analysis/__init__.py b/src/asyncflow/analysis/__init__.py new file mode 100644 index 0000000..825de6e --- /dev/null +++ b/src/asyncflow/analysis/__init__.py @@ -0,0 +1,5 @@ +"""Public module exposing the results analyzer""" + +from asyncflow.metrics.analyzer import ResultsAnalyzer + +__all__ = ["ResultsAnalyzer"] diff --git a/src/asyncflow/metrics/analyzer.py b/src/asyncflow/metrics/analyzer.py index 35b43bb..16448bc 100644 --- a/src/asyncflow/metrics/analyzer.py +++ b/src/asyncflow/metrics/analyzer.py @@ -193,8 +193,8 @@ def plot_throughput(self, ax: Axes) -> None: def plot_server_queues(self, ax: Axes) -> None: """Plot the server queues""" metrics = self.get_sampled_metrics() - ready = metrics.get(SampledMetricName.READY_QUEUE_LEN, {}) - io_q = metrics.get(SampledMetricName.EVENT_LOOP_IO_SLEEP, {}) + ready = metrics.get(SampledMetricName.READY_QUEUE_LEN.value, {}) + io_q = metrics.get(SampledMetricName.EVENT_LOOP_IO_SLEEP.value, {}) if not (ready or io_q): ax.text(0.5, 0.5, SERVER_QUEUES_PLOT.no_data, ha="center", va="center") @@ -219,7 +219,7 @@ def plot_server_queues(self, ax: Axes) -> None: def plot_ram_usage(self, ax: Axes) -> None: """Plot the ram usage""" metrics = self.get_sampled_metrics() - ram = metrics.get(SampledMetricName.RAM_IN_USE, {}) + ram = metrics.get(SampledMetricName.RAM_IN_USE.value, {}) if not ram: ax.text(0.5, 0.5, RAM_PLOT.no_data, ha="center", va="center") diff --git a/src/asyncflow/py.typed b/src/asyncflow/py.typed new file mode 100644 index 0000000..e69de29