Skip to content

Commit e869e67

Browse files
authored
Feature/event injection runtime (#20)
* Events injection for edges part 1 * Logic + docs for the event injection to simulate a server down * Added pydantic validation + unit + int tests for eventinjection * added exaple for event inj yaml + builder added int tests * improved server model + system tests for eventinjection * ruff small fix
1 parent e9afe18 commit e869e67

File tree

86 files changed

+4128
-242
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+4128
-242
lines changed

CHANGELOG.MD

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
2+
# Changelog
3+
4+
All notable changes to this project will be documented in this file.
5+
6+
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
7+
8+
## \[Unreleased]
9+
10+
### Added
11+
12+
* **Event Injection input schema** (merged on `develop`):
13+
14+
* Declarative events with `start` / `end` markers (server **down/up**, network **spike start/end**).
15+
* Strong validation (Pydantic):
16+
17+
* `event_id` uniqueness.
18+
* `target_id` must exist (server or edge).
19+
* Start/end times within the simulation horizon and well-ordered.
20+
* Kind ↔ target compatibility (server events must target servers, network events must target edges).
21+
* Safety guard: **never all servers down simultaneously**.
22+
* Documentation comments and examples for authoring event windows in YAML.
23+
24+
### In Progress
25+
26+
* **Runtime Event Scheduler**: SimPy process to apply events at scheduled times.
27+
* **Edge spike handling (phase 1)**: deterministic latency **offsets** during spike windows (no changes to base distributions yet).
28+
29+
---
30+
31+
## \[0.1.0a2] – 2025-08-17
32+
33+
### Fixed
34+
35+
* **Quickstart YAML in README**: corrected field to ensure a smooth first run for new users.
36+
37+
### Notes
38+
39+
* Minor docs polish only; no runtime changes.
40+
41+
---
42+
43+
## \[0.1.0a1] – 2025-08-17
44+
45+
### Changed
46+
47+
* Repository aligned with the **PyPI 0.1.0a1** build.
48+
* Packaging metadata tidy-up in `pyproject.toml`.
49+
50+
### CI
51+
52+
* Main workflow now also triggers on **push** to `main`.
53+
54+
### Notes
55+
56+
* No functional/runtime changes.
57+
58+
---
59+
60+
## \[v0.1.0-alpha] – 2025-08-17
61+
62+
**First public alpha** of AsyncFlow — a SimPy-based, **event-loop-aware** simulator for async distributed systems.
63+
64+
### Highlights
65+
66+
* **Event-loop model** per server: explicit **CPU** (blocking), **I/O waits** (non-blocking), **RAM** residency.
67+
* **Topology graph**: generator → client → (LB, optional) → servers; multi-server via **round-robin**; **stochastic network latency** and optional dropouts.
68+
* **Workload**: stochastic traffic via simple RV configs (Poisson defaults).
69+
70+
### Metrics & Analyzer
71+
72+
* **Event metrics**: `RqsClock` (end-to-end latency).
73+
* **Sampled metrics**: `ready_queue_len`, `event_loop_io_sleep`, `ram_in_use`, `edge_concurrent_connection`.
74+
* **Analyzer API** (`ResultsAnalyzer`):
75+
76+
* `get_latency_stats()`, `get_throughput_series()`
77+
* Plots: `plot_latency_distribution()`, `plot_throughput()`
78+
* Per-server: `plot_single_server_ready_queue()`, `plot_single_server_io_queue()`, `plot_single_server_ram()`
79+
* Compact dashboards.
80+
81+
### Examples
82+
83+
* YAML quickstart (single server).
84+
* Pythonic builder:
85+
86+
* Single server.
87+
* **Load balancer + two servers** example with saved figures.
88+
89+
### Tooling & CI
90+
91+
* One-shot setup scripts (`dev_setup`, `quality_check`, `run_tests`, `run_sys_tests`) for Linux/macOS/Windows.
92+
* GitHub Actions: Ruff + MyPy + Pytest; **system tests gate merges** into `main`.
93+
94+
### Compatibility
95+
96+
* **Python 3.12+** (Linux/macOS/Windows).
97+
* Install from PyPI: `pip install asyncflow-sim`.
98+
99+
### Known Limitations (alpha)
100+
101+
* Network: latency + optional drops (no bandwidth/payload/TCP yet).
102+
* Single event loop per server (no multi-process/node).
103+
* Linear endpoint pipelines (no branching/fan-out inside endpoints).
104+
* Stationary workload; very short spikes may be missed if `sample_period_s` is large.
105+
106+
Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
# Edge Event Injection: Architecture & Operations
2+
3+
This document explains how **edge-level events** (e.g., deterministic latency spikes) are modeled, centralized, and injected into the simulation. It covers:
4+
5+
* Data model (start/end markers & validation)
6+
* The **central event runtime** (timeline, cumulative offsets, live adapters)
7+
* How **SimulationRunner** wires everything
8+
* How **EdgeRuntime** consumes the adapters during delivery
9+
* Ordering, correctness guarantees, and trade-offs
10+
* Extension points and maintenance tips
11+
12+
---
13+
14+
## 1) Conceptual Model
15+
16+
### What’s an “edge event”?
17+
18+
An edge event is a **time-bounded effect** applied to a specific network edge (link). Today we support **latency spikes**: while the event is active, the edge’s transit time is increased by a fixed offset (`spike_s`) in seconds.
19+
20+
### Event markers
21+
22+
Events are defined with two **markers**:
23+
24+
* `Start` (`kind` in `{NETWORK_SPIKE_START, SERVER_DOWN}`)
25+
* `End` (`kind` in `{NETWORK_SPIKE_END, SERVER_UP}`)
26+
27+
Validation guarantees:
28+
29+
* **Kind pairing** is coherent (e.g., `NETWORK_SPIKE_START``NETWORK_SPIKE_END`).
30+
* **Time ordering**: `t_start < t_end`.
31+
* For network spike events, **`spike_s` is required** and positive.
32+
33+
> These guarantees are enforced by the Pydantic models and their `model_validator`s in the schema layer, *before* runtime.
34+
35+
---
36+
37+
## 2) Centralized Event Registry: `EventInjectionRuntime`
38+
39+
`EventInjectionRuntime` centralizes all event logic and exposes **live read-only views** (adapters) to edge actors.
40+
41+
### Responsibilities & Data
42+
43+
* **Input**:
44+
45+
* `events: list[EventInjection] | None`
46+
* `edges: list[Edge]`, `servers: list[Server]`, `env: simpy.Environment`
47+
* **Internal state**:
48+
49+
* `self._edges_events: dict[event_id, dict[edge_id, float]]`
50+
Mapping from event → edge → spike amplitude (`spike_s`).
51+
This allows multiple events per edge and distinguishes overlapping events.
52+
* `self._edges_spike: dict[edge_id, float]`
53+
**Cumulative** spike currently active per edge (updated at runtime).
54+
* `self._edges_affected: set[edge_id]`
55+
All edges that are ever impacted by at least one event.
56+
* `self._edges_timeline: list[tuple[time, event_id, edge_id, mark]]`
57+
Absolute timestamps (`time`) with `mark ∈ {start, end}` for **edges**.
58+
* (We also construct a server timeline, reserved for future server-side effects.)
59+
60+
> If `events` is `None` or empty, the runtime initializes to empty sets/maps and **does nothing** when started.
61+
62+
### Build step (performed in `__init__`)
63+
64+
1. Early return if there are no events (keeps empty adapters).
65+
2. Partition events by **target type** (edge vs server).
66+
3. For each **edge** event:
67+
68+
* Record `spike_s` in `self._edges_events[event_id][edge_id]`.
69+
* Append `(t_start, event_id, edge_id, start)` and `(t_end, event_id, edge_id, end)` to the **edge timeline**.
70+
* Add `edge_id` to `self._edges_affected`.
71+
4. **Sort** timelines by `(time, mark == start, event_id, edge_id)` so that at equal time, **end** is processed **before start**.
72+
(Because `False < True`, `end` precedes `start`.)
73+
74+
### Runtime step (SimPy process)
75+
76+
The coroutine `self._assign_edges_spike()`:
77+
78+
* Iterates the ordered timeline of **absolute** timestamps.
79+
* Converts absolute `t_event` to relative waits via `dt = t_event - last_t`.
80+
* After waiting `dt`, applies the state change:
81+
82+
* On **start**: `edges_spike[edge_id] += delta`
83+
* On **end**: `edges_spike[edge_id] -= delta`
84+
85+
This gives a continuously updated, **cumulative** spike per edge, enabling **overlapping events** to stack linearly.
86+
87+
### Public adapters (read-only views)
88+
89+
* `edges_spike: dict[str, float]` — current cumulative spike per edge.
90+
* `edges_affected: set[str]` — edges that may ever be affected.
91+
92+
These are **shared** with `EdgeRuntime` instances, so updates made by the central process are immediately visible to the edges **without any signaling or copying**.
93+
94+
---
95+
96+
## 3) Wiring & Lifecycle: `SimulationRunner`
97+
98+
`SimulationRunner` orchestrates creation, wiring, and startup order.
99+
100+
### Build phase
101+
102+
1. Build node runtimes (request generator, client, servers, optional load-balancer).
103+
2. Build **edge runtimes** (`EdgeRuntime`) with their target boxes (stores).
104+
3. **Build events**:
105+
106+
* If `simulation_input.events` is empty/None → **skip** (no process, no adapters).
107+
* Else:
108+
109+
* Construct **one** `EventInjectionRuntime`.
110+
* Extract adapters: `edges_affected`, `edges_spike`.
111+
* Attach these **same objects** to **every** `EdgeRuntime`.
112+
(EdgeRuntime performs a membership check; harmless for unaffected edges.)
113+
114+
> We deliberately attach adapters to all edges for simplicity. This is O(1) memory for references, and O(1) runtime per delivery (one membership + dict lookup). If desired, the runner could pass adapters **only** to affected edges—this would save a branch per delivery at the cost of more conditional wiring logic.
115+
116+
### Start phase (order matters)
117+
118+
* `EventInjectionRuntime.start()`**first**
119+
Ensures that the spike timeline is active before edges start delivering; the first edge transport will see the correct offset when due.
120+
* Start all other actors.
121+
* Start the metric collector (RAM / queues / connections snapshots).
122+
* `env.run(until=total_simulation_time)` to advance the clock.
123+
124+
### Why this order?
125+
126+
* Prevents race conditions where the first edge message observes stale (`0.0`) spike at time ≈ `t_start`.
127+
* Keeps the architecture deterministic and easy to reason about.
128+
129+
---
130+
131+
## 4) Edge Consumption: `EdgeRuntime`
132+
133+
Each edge has:
134+
135+
* `edges_affected: Container[str] | None`
136+
* `edges_spike: Mapping[str, float] | None`
137+
138+
During `_deliver(state)`:
139+
140+
1. Sample base latency from the configured RV.
141+
2. If adapters are present **and** `edge_id ∈ edges_affected`:
142+
143+
* Read `spike = edges_spike.get(edge_id, 0.0)`
144+
* `effective = base_latency + spike`
145+
3. `yield env.timeout(effective)`
146+
147+
No further coordination required: the **central** process updates `edges_spike` as time advances, so each delivery observes the **current** spike.
148+
149+
---
150+
151+
## 5) Correctness & Guarantees
152+
153+
* **Temporal correctness**: Absolute → relative time conversion (`dt = t_event - last_t`) ensures the process applies changes at the exact timestamps. Sorting ensures **END** is processed before **START** when times coincide, so zero-length events won’t “leak” positive offset.
154+
* **Coherence**: Pydantic validators enforce event pairing and time ordering.
155+
* **Immutability**: Marker models are frozen; unknown fields are forbidden.
156+
* **Overlap**: Multiple events on the same edge stack linearly (`+=`/`-=`).
157+
158+
---
159+
160+
## 6) Performance & Trade-offs
161+
162+
### Centralized vs Distributed
163+
164+
* **Chosen**: one central `EventInjectionRuntime` with live adapters.
165+
166+
* **Pros**: simple mental model; single source of truth; O(1) read for edges; no per-edge coroutines; minimal memory traffic.
167+
* **Cons**: single process to maintain (but it’s lightweight); edges branch on membership.
168+
169+
* **Alternative A**: deliver the **full** event runtime object to each edge.
170+
171+
* **Cons**: wider API surface; tighter coupling; harder to evolve; edges would get capabilities they don’t need (SRP violation).
172+
173+
* **Alternative B**: per-edge local event processes.
174+
175+
* **Cons**: one coroutine per edge (N processes), more scheduler overhead, duplicated logic & sorting.
176+
177+
### Passing adapters to *all* edges vs only affected edges
178+
179+
* **Chosen**: pass to all edges.
180+
181+
* **Pros**: wiring stays uniform; negligible memory; O(1) branch in `_deliver`.
182+
* **Cons**: trivial per-delivery branch even for unaffected edges.
183+
* **Alternative**: only affected edges receive adapters.
184+
185+
* **Pros**: removes one branch at delivery.
186+
* **Cons**: more conditional wiring, more moving parts for little gain.
187+
188+
---
189+
190+
## 7) Sequence Overview
191+
192+
```
193+
SimulationRunner.run()
194+
├─ _build_rqs_generator()
195+
├─ _build_client()
196+
├─ _build_servers()
197+
├─ _build_load_balancer()
198+
├─ _build_edges()
199+
├─ _build_events()
200+
│ └─ EventInjectionRuntime(...):
201+
│ - build _edges_events, _edges_affected
202+
│ - build & sort _edges_timeline
203+
204+
├─ _start_events()
205+
│ └─ start _assign_edges_spike() (central timeline process)
206+
207+
├─ _start_all_processes() (edges, client, servers, etc.)
208+
├─ _start_metric_collector()
209+
└─ env.run(until = T)
210+
```
211+
212+
During `EdgeRuntime._deliver()`:
213+
214+
```
215+
base = sample(latency_rv)
216+
if adapters_present and edge_id in edges_affected:
217+
spike = edges_spike.get(edge_id, 0.0)
218+
effective = base + spike
219+
else:
220+
effective = base
221+
yield env.timeout(effective)
222+
```
223+
224+
---
225+
226+
## 8) Extensibility
227+
228+
* **Other edge effects**: add new event kinds and store per-edge state (e.g., drop-rate bumps) in `_edges_events` and update logic in `_assign_edges_spike()`.
229+
* **Server outages**: server timeline is already scaffolded; add a server process to open/close resources (e.g., capacity=0 during downtime).
230+
* **Non-deterministic spikes**: swap `float` `spike_s` for a small sampler (callable) and apply the sampled value at each **start**, or at each **delivery** (define semantics).
231+
* **Per-edge filtering in runner** (micro-optimization): only wire adapters to affected edges.
232+
233+
---
234+
235+
## 9) Operational Notes & Best Practices
236+
237+
* **Start order** matters: always start `EventInjectionRuntime` *before* edges.
238+
* **Adapters must be shared** (not copied) to preserve live updates.
239+
* **Keep `edges_spike` additive** (no negative values unless you introduce “negative spikes” intentionally).
240+
* **Time units**: seconds everywhere; keep it consistent with sampling.
241+
* **Validation first**: reject malformed events early (schema layer), *not* in runtime.
242+
243+
---
244+
245+
## 10) Glossary
246+
247+
* **Adapter**: a minimal, read-only view (e.g., `Mapping[str, float]`, `Container[str]`) handed to edges to observe central state without owning it.
248+
* **Timeline**: sorted list of `(time, event_id, edge_id, mark)` where `mark ∈ {start, end}`.
249+
* **Spike**: deterministic latency offset to be added to the sampled base latency.
250+
251+
---
252+
253+
## 11) Example (end-to-end)
254+
255+
**YAML (conceptual)**
256+
257+
```yaml
258+
events:
259+
- event_id: ev-spike-1
260+
target_id: edge-42
261+
start: { kind: NETWORK_SPIKE_START, t_start: 12.0, spike_s: 0.050 }
262+
end: { kind: NETWORK_SPIKE_END, t_end: 18.0 }
263+
```
264+
265+
**Runtime effect**
266+
267+
* From `t ∈ [12, 18)`, `edge-42` adds **+50 ms** to its sampled latency.
268+
* Overlapping events stack: `edges_spike["edge-42"]` is the **sum** of active spikes.
269+
270+
---
271+
272+
## 12) Summary
273+
274+
* We centralize event logic in **`EventInjectionRuntime`** and expose **live adapters** to edges.
275+
* Edges read **current cumulative spikes** at delivery time—**no coupling** and **no extra processes per edge**.
276+
* The runner keeps the flow simple and deterministic: **build → wire → start events → start actors → run**.
277+
* The architecture is **extensible**, **testable**, and **performant** for realistic workloads.

0 commit comments

Comments
 (0)