Skip to content

Commit 2a306ac

Browse files
authored
Added pybuilder and unit tests (#12)
1 parent 43eb8c5 commit 2a306ac

File tree

6 files changed

+734
-4
lines changed

6 files changed

+734
-4
lines changed

docs/dev_workflow_guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ fastsim-backend/
3838
│ │ ├── rqs_state.py # RequestState & Hop
3939
│ │ ├── simulation_runner.py # logic to initialize the whole simulation
4040
| └── actors/ # SimPy “actors”: Edge, Server, Client, RqsGenerator
41+
├── pybuilder/ # Pythonic way to build the simulation payload
4142
│ ├── samplers/ # stochastic samplers (Gaussian-Poisson, etc.)
4243
│ ├── schemas/ # Pydantic input/output models
4344
├── poetry.lock
161 KB
Loading
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Didactic example: build and run a FastSim scenario **without** YAML,
4+
using the 'pybuilder' (AsyncFlow) to assemble the SimulationPayload.
5+
6+
Scenario reproduced (same as the previous YAML):
7+
generator ──edge──> client ──edge──> server ──edge──> client
8+
9+
Load:
10+
~100 active users, 20 req/min each.
11+
12+
Server:
13+
1 CPU core, 2GB RAM, endpoint with steps:
14+
CPU(1ms) → RAM(100MB) → IO(100ms)
15+
16+
Network:
17+
3ms mean (exponential) latency on each edge.
18+
19+
What this script does:
20+
1) Build Pydantic models (generator, client, server, edges, settings).
21+
2) Compose the final SimulationPayload via AsyncFlow (builder pattern).
22+
3) Run the simulation with SimulationRunner.
23+
4) Print latency stats, throughput timeline, and a sampled-metrics preview.
24+
5) (Optional) Visualize the topology with Matplotlib.
25+
26+
Run:
27+
python run_with_pybuilder.py
28+
"""
29+
30+
from __future__ import annotations
31+
32+
from pathlib import Path
33+
from typing import Dict, Iterable, List, Mapping, Tuple
34+
35+
import numpy as np
36+
import simpy
37+
38+
# ── FastSim domain imports ───────────────────────────────────────────────────
39+
from app.pybuilder.input_builder import AsyncFlow
40+
from app.runtime.simulation_runner import SimulationRunner
41+
from app.metrics.analyzer import ResultsAnalyzer
42+
from app.schemas.full_simulation_input import SimulationPayload
43+
from app.schemas.rqs_generator_input import RqsGeneratorInput
44+
from app.schemas.simulation_settings_input import SimulationSettings
45+
from app.schemas.system_topology.endpoint import Endpoint
46+
from app.schemas.system_topology.full_system_topology import (
47+
Client,
48+
Edge,
49+
Server,
50+
)
51+
52+
from app.config.constants import LatencyKey, SampledMetricName
53+
54+
55+
# ─────────────────────────────────────────────────────────────
56+
# Pretty printers (compact, readable output)
57+
# ─────────────────────────────────────────────────────────────
58+
def print_latency_stats(res: ResultsAnalyzer) -> None:
59+
"""Print latency statistics calculated by the analyzer."""
60+
stats: Mapping[LatencyKey, float] = res.get_latency_stats()
61+
print("\n════════ LATENCY STATS ════════")
62+
if not stats:
63+
print("(empty)")
64+
return
65+
66+
order: List[LatencyKey] = [
67+
LatencyKey.TOTAL_REQUESTS,
68+
LatencyKey.MEAN,
69+
LatencyKey.MEDIAN,
70+
LatencyKey.STD_DEV,
71+
LatencyKey.P95,
72+
LatencyKey.P99,
73+
LatencyKey.MIN,
74+
LatencyKey.MAX,
75+
]
76+
for key in order:
77+
if key in stats:
78+
print(f"{key.name:<20} = {stats[key]:.6f}")
79+
80+
81+
def print_throughput(res: ResultsAnalyzer) -> None:
82+
"""Print the 1-second throughput buckets."""
83+
timestamps, rps = res.get_throughput_series()
84+
print("\n════════ THROUGHPUT (req/sec) ════════")
85+
if not timestamps:
86+
print("(empty)")
87+
return
88+
89+
for t, rate in zip(timestamps, rps):
90+
print(f"t={t:4.1f}s → {rate:6.2f} rps")
91+
92+
93+
def print_sampled_preview(res: ResultsAnalyzer) -> None:
94+
"""
95+
Print a small preview for each sampled metric series (first 5 values).
96+
This helps verify that sampler pipelines are running.
97+
"""
98+
sampled = res.get_sampled_metrics()
99+
print("\n════════ SAMPLED METRICS (preview) ════════")
100+
if not sampled:
101+
print("(empty)")
102+
return
103+
104+
for metric, series in sampled.items():
105+
metric_name = (
106+
metric.name if isinstance(metric, SampledMetricName) else str(metric)
107+
)
108+
print(f"\n📈 {metric_name}:")
109+
for entity, vals in series.items():
110+
head = list(vals[:5]) if vals else []
111+
print(f" - {entity}: len={len(vals)}, first={head}")
112+
113+
114+
# ─────────────────────────────────────────────────────────────
115+
# Tiny helpers for sanity checks (optional)
116+
# ─────────────────────────────────────────────────────────────
117+
def _mean(series: Iterable[float]) -> float:
118+
"""Numerically stable mean for a generic float iterable."""
119+
arr = np.asarray(list(series), dtype=float)
120+
return float(np.mean(arr)) if arr.size else 0.0
121+
122+
123+
def run_sanity_checks(
124+
runner: SimulationRunner,
125+
res: ResultsAnalyzer,
126+
) -> None:
127+
"""
128+
Back-of-the-envelope checks to compare rough expectations vs observations.
129+
These are intentionally simplistic approximations.
130+
"""
131+
print("\n════════ SANITY CHECKS (rough) ════════")
132+
w = runner.simulation_input.rqs_input
133+
lam_rps = (
134+
float(w.avg_active_users.mean)
135+
* float(w.avg_request_per_minute_per_user.mean)
136+
/ 60.0
137+
)
138+
139+
# Observed throughput
140+
_, rps_series = res.get_throughput_series()
141+
rps_observed = _mean(rps_series)
142+
print(f"• Mean throughput (rps) expected≈{lam_rps:.3f} "
143+
f"observed={rps_observed:.3f}")
144+
145+
# A few sampled signals (RAM, queues) just to show they are populated.
146+
sampled = res.get_sampled_metrics()
147+
ram_series = sampled.get(SampledMetricName.RAM_IN_USE, {})
148+
ioq_series = sampled.get(SampledMetricName.EVENT_LOOP_IO_SLEEP, {})
149+
ready_series = sampled.get(SampledMetricName.READY_QUEUE_LEN, {})
150+
151+
ram_mean = _mean([_mean(v) for v in ram_series.values()]) if ram_series else 0.0
152+
ioq_mean = _mean([_mean(v) for v in ioq_series.values()]) if ioq_series else 0.0
153+
ready_mean = (
154+
_mean([_mean(v) for v in ready_series.values()]) if ready_series else 0.0
155+
)
156+
157+
print(f"• Mean RAM in use (MB) observed={ram_mean:.3f}")
158+
print(f"• Mean I/O queue length observed={ioq_mean:.3f}")
159+
print(f"• Mean ready queue length observed={ready_mean:.3f}")
160+
161+
162+
# ─────────────────────────────────────────────────────────────
163+
# Build the same scenario via AsyncFlow (pybuilder)
164+
# ─────────────────────────────────────────────────────────────
165+
def build_payload_with_pybuilder() -> SimulationPayload:
166+
"""
167+
Construct the SimulationPayload programmatically using the builder.
168+
169+
This mirrors the YAML:
170+
- Generator (100 users, 20 rpm each)
171+
- Client
172+
- One server with a single endpoint (CPU → RAM → IO)
173+
- Three edges with exponential latency (3ms mean)
174+
- Simulation settings: 500s total, sample period 50ms
175+
"""
176+
# 1) Request generator
177+
generator = RqsGeneratorInput(
178+
id="rqs-1",
179+
avg_active_users={"mean": 100},
180+
avg_request_per_minute_per_user={"mean": 20},
181+
user_sampling_window=60,
182+
)
183+
184+
# 2) Client
185+
client = Client(id="client-1")
186+
187+
# 3) Server (1 CPU core, 2GB RAM) with one endpoint and three steps
188+
# We let Pydantic coerce nested dicts for the endpoint steps.
189+
endpoint = Endpoint(
190+
endpoint_name="ep-1",
191+
probability=1.0,
192+
steps=[
193+
{"kind": "initial_parsing", "step_operation": {"cpu_time": 0.001}},
194+
{"kind": "ram", "step_operation": {"necessary_ram": 100}},
195+
{"kind": "io_wait", "step_operation": {"io_waiting_time": 0.1}},
196+
],
197+
)
198+
199+
server = Server(
200+
id="srv-1",
201+
server_resources={"cpu_cores": 1, "ram_mb": 2048},
202+
endpoints=[endpoint],
203+
)
204+
205+
# 4) Edges: exponential latency with 3ms mean (same as YAML)
206+
e_gen_client = Edge(
207+
id="gen-to-client",
208+
source="rqs-1",
209+
target="client-1",
210+
latency={"mean": 0.003, "distribution": "exponential"},
211+
)
212+
e_client_server = Edge(
213+
id="client-to-server",
214+
source="client-1",
215+
target="srv-1",
216+
latency={"mean": 0.003, "distribution": "exponential"},
217+
)
218+
e_server_client = Edge(
219+
id="server-to-client",
220+
source="srv-1",
221+
target="client-1",
222+
latency={"mean": 0.003, "distribution": "exponential"},
223+
)
224+
225+
# 5) Simulation settings
226+
settings = SimulationSettings(
227+
total_simulation_time=500,
228+
sample_period_s=0.05,
229+
enabled_sample_metrics=[
230+
"ready_queue_len",
231+
"event_loop_io_sleep",
232+
"ram_in_use",
233+
"edge_concurrent_connection",
234+
],
235+
enabled_event_metrics=["rqs_clock"],
236+
)
237+
238+
# 6) Assemble the payload via the builder (AsyncFlow).
239+
# The builder will validate the final structure on build.
240+
flow = (
241+
AsyncFlow()
242+
.add_generator(generator)
243+
.add_client(client)
244+
.add_servers(server)
245+
.add_edges(e_gen_client, e_client_server, e_server_client)
246+
.add_simulation_settings(settings)
247+
)
248+
249+
return flow.build_payload()
250+
251+
252+
# ─────────────────────────────────────────────────────────────
253+
# Main entry-point
254+
# ─────────────────────────────────────────────────────────────
255+
def main() -> None:
256+
"""
257+
Build → wire → run the simulation, then print diagnostics.
258+
Mirrors run_from_yaml.py but uses the pybuilder to construct the input.
259+
Also saves a 2x2 plot figure (latency, throughput, server queues, RAM).
260+
"""
261+
env = simpy.Environment()
262+
payload = build_payload_with_pybuilder()
263+
264+
runner = SimulationRunner(env=env, simulation_input=payload)
265+
results: ResultsAnalyzer = runner.run()
266+
267+
# Human-friendly diagnostics
268+
print_latency_stats(results)
269+
print_throughput(results)
270+
print_sampled_preview(results)
271+
272+
# Optional sanity checks (very rough)
273+
run_sanity_checks(runner, results)
274+
275+
# Save plots (2x2 figure), same layout as in the YAML-based example
276+
try:
277+
from matplotlib import pyplot as plt # noqa: PLC0415
278+
279+
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
280+
results.plot_latency_distribution(axes[0, 0])
281+
results.plot_throughput(axes[0, 1])
282+
results.plot_server_queues(axes[1, 0])
283+
results.plot_ram_usage(axes[1, 1])
284+
fig.tight_layout()
285+
286+
out_path = Path(__file__).parent / "single_server_pybuilder.png"
287+
fig.savefig(out_path)
288+
print(f"\n🖼️ Plots saved to: {out_path}")
289+
except Exception as exc: # Matplotlib not installed or plotting failed
290+
print(f"\n[plotting skipped] {exc!r}")
291+
292+
if __name__ == "__main__":
293+
main()

0 commit comments

Comments
 (0)