Skip to content

Commit 796cbb1

Browse files
committed
defined architecture for the central collector + documentation
1 parent 13d31d6 commit 796cbb1

File tree

8 files changed

+306
-19
lines changed

8 files changed

+306
-19
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
## Time‑Series Metrics: Architectural Overview
2+
3+
Collecting high‑frequency, time‑series metrics from a complex simulation requires an architecture that is **performant, maintainable, and extensible**. Our design meets those goals by keeping metric *declaration*, *state management,* and *data collection* in strictly separate layers.
4+
5+
\### 1  Guiding Principles & Architecture
6+
7+
1. **Minimal Hot‑Path Overhead** — every state update in the simulation core is `O(1)`.
8+
2. **Single Source of Truth** — one “Registry” enumerates every metric that can exist.
9+
3. **User‑Defined Extensibility** — advanced users can register custom metrics without touching the framework.
10+
4. **Predictable Memory Footprint** — data structures are pre‑allocated once, never rebuilt at each sample tick.
11+
12+
| Layer | Responsibility | Lifetime |
13+
| ------------- | ---------------------------------------------------------------- | ---------------------- |
14+
| **Registry** | Declare *which* metrics exist for each component type | Module import (once) |
15+
| **Runtime** | Maintain the **current value** of each metric per component | Per component instance |
16+
| **Collector** | Periodically read runtime values and append to time‑series lists | One per simulation run |
17+
18+
---
19+
20+
\### 2  Case Study — Edge Metric Collection
21+
22+
```mermaid
23+
graph TD
24+
subgraph Init
25+
A(Registry: EDGE_METRICS) -- builds --> B{Metric Dict}
26+
end
27+
subgraph Loop
28+
C[EdgeRuntime] -- inc/dec --> D(_concurrent_connections)
29+
E[SampledMetricCollector] -->|every N ms| F{iterate}
30+
F -->|read property| D
31+
F -->|append| B
32+
end
33+
C -- owns --> B
34+
```
35+
36+
#### Layer Walk‑through
37+
38+
1. **Registry (`metrics/edge.py`)**
39+
40+
```python
41+
EDGE_METRICS = (SampledMetricName.EDGE_CONCURRENT_CONNECTION,)
42+
43+
def build_edge_metrics(enabled):
44+
return {m: [] for m in EDGE_METRICS if m in enabled}
45+
```
46+
47+
2. **Runtime (`EdgeRuntime`)**
48+
49+
* Updates the counter `_concurrent_connections` in `O(1)`.
50+
* Holds the dict produced by `build_edge_metrics`.
51+
* **New:** exposes read‑only properties so external modules never touch private fields directly.
52+
53+
```python
54+
class EdgeRuntime:
55+
@property
56+
def concurrent_connections(self) -> int:
57+
return self._concurrent_connections
58+
59+
@property
60+
def enabled_metrics(self) -> dict[SampledMetricName, list[float | int]]:
61+
return self._edge_enabled_metrics
62+
```
63+
64+
3. **Collector (`SampledMetricCollector`)**
65+
66+
```python
67+
while True:
68+
yield env.timeout(sample_period_s)
69+
for edge in self.edges:
70+
key = SampledMetricName.EDGE_CONCURRENT_CONNECTION
71+
# properties keep encapsulation intact
72+
if key in edge.enabled_metrics:
73+
edge.enabled_metrics[key].append(edge.concurrent_connections)
74+
```
75+
76+
---
77+
78+
\### 3  Why the `if key in …` Guard Still Matters
79+
80+
Even with the new properties, the guard remains essential:
81+
82+
* **Robustness** — prevents `KeyError` when a metric is disabled for a given edge.
83+
* **Extensibility** — a user can add `EDGE_PACKET_LOSS` (or any custom metric) to a subset of edges; the collector automatically respects that configuration.
84+
85+
This single `O(1)` check keeps the system plug‑and‑play while preserving full encapsulation:
86+
87+
* Runtime internals stay protected behind properties.
88+
* The collector never needs to know which metrics exist ahead of time.
89+
90+
---
91+

src/app/config/constants.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,17 @@ class SampledMetricName(StrEnum):
208208
EVENT_LOOP_IO_SLEEP = "event_loop_io_sleep"
209209
RAM_IN_USE = "ram_in_use"
210210
THROUGHPUT_RPS = "throughput_rps"
211+
EDGE_CONCURRENT_CONNECTION = "edge_concurrent_connection"
212+
213+
class SamplePeriods(IntEnum):
214+
"""
215+
defining the value of the sample periods for the metrics for which
216+
we have to extract a time series
217+
"""
218+
219+
STANDARD_TIME = 0.005 # 5 MILLISECONDS
220+
MINIMUM_TIME = 0.001 # 1 MILLISECOND
221+
MAXIMUM_TIME = 0.1 # 10 MILLISECONDS
211222

212223
# ======================================================================
213224
# CONSTANTS FOR EVENT METRICS

src/app/metrics/collector.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
"""class to centralized the the collection of time series regarding metrics"""
2+
3+
from collections.abc import Generator
4+
5+
import simpy
6+
7+
from app.config.constants import SampledMetricName
8+
from app.runtime.actors.edge import EdgeRuntime
9+
from app.schemas.simulation_settings_input import SimulationSettings
10+
11+
# The idea for this class is to gather list of runtime objects that
12+
# are defined in the central class to build the simulation, in this
13+
# way we optimize the initialization of various objects reducing
14+
# the global overhead
15+
16+
17+
class SampledMetricCollector:
18+
"""class to define a centralized object to collect sampled metrics"""
19+
20+
def __init__(
21+
self,
22+
*,
23+
edges: list[EdgeRuntime],
24+
env: simpy.Environment,
25+
sim_settings: SimulationSettings,
26+
) -> None:
27+
"""Docstring to complete"""
28+
self.edges = edges
29+
self.sim_settings = sim_settings
30+
self.env = env
31+
self._sample_period = sim_settings.sample_period_s
32+
33+
env.process(self._build_time_series())
34+
35+
def _build_time_series(self) -> Generator[simpy.Event, None, None]:
36+
"""Function to build time series for enabled metrics"""
37+
connection_key = SampledMetricName.EDGE_CONCURRENT_CONNECTION
38+
while True:
39+
40+
yield self.env.timeout(self._sample_period)
41+
42+
for edge in self.edges:
43+
if connection_key in edge.enabled_metrics:
44+
edge.enabled_metrics[connection_key].append(
45+
edge.concurrent_connections,
46+
)
47+
48+
49+
50+
51+
52+
53+
54+
55+

src/app/metrics/edge.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"""initialization of the structure to gather the metrics for the edges of the system"""
2+
3+
from collections.abc import Iterable
4+
5+
from app.config.constants import SampledMetricName
6+
7+
# Initialize one time outside the function all possible metrics
8+
# related to the edges, the idea of this structure is to
9+
# guarantee scalability in the long term if multiple metrics
10+
# will be considered
11+
12+
EDGE_METRICS = (
13+
SampledMetricName.EDGE_CONCURRENT_CONNECTION,
14+
)
15+
16+
def build_edge_metrics(
17+
enabled_sample_metrics: Iterable[SampledMetricName],
18+
) -> dict[SampledMetricName, list[float | int]]:
19+
"""
20+
Function to populate a dictionary to collect values for
21+
time series of sampled metrics related to the edges of
22+
the system.
23+
"""
24+
# The edge case of the empty dict is avoided since at least
25+
# one metric is always measured by default.
26+
return {
27+
metric: [] for metric in EDGE_METRICS
28+
if metric in enabled_sample_metrics
29+
}

src/app/runtime/actors/edge.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212
import numpy as np
1313
import simpy
1414

15-
from app.config.constants import SystemEdges
15+
from app.config.constants import SampledMetricName, SystemEdges
16+
from app.metrics.edge import build_edge_metrics
1617
from app.runtime.rqs_state import RequestState
1718
from app.samplers.common_helpers import general_sampler
19+
from app.schemas.simulation_settings_input import SimulationSettings
1820
from app.schemas.system_topology.full_system_topology import Edge
1921

2022
if TYPE_CHECKING:
@@ -32,12 +34,26 @@ def __init__(
3234
edge_config: Edge,
3335
rng: np.random.Generator | None = None,
3436
target_box: simpy.Store,
37+
settings: SimulationSettings,
3538
) -> None:
3639
"""Definition of the instance attributes"""
3740
self.env = env
3841
self.edge_config = edge_config
3942
self.target_box = target_box
4043
self.rng = rng or np.random.default_rng()
44+
self.setting = settings
45+
self._edge_enabled_metrics = build_edge_metrics(
46+
settings.enabled_sample_metrics,
47+
)
48+
self._concurrent_connections: int = 0
49+
50+
# We keep a reference to `settings` because this class needs to observe but not
51+
# persist the edge-related metrics the user has enabled.
52+
# The actual persistence (appending snapshots to the time series lists)
53+
# is handled centrally in metrics/collector.py,which runs every Xmilliseconds.
54+
# Here we only expose the current metric values, guarded by a few if checks to
55+
# verify that each optional metric is active. For deafult metric settings
56+
# is not needed but as we will scale as explained above we will need it
4157

4258
def _deliver(self, state: RequestState) -> Generator[simpy.Event, None, None]:
4359
"""Function to deliver the state to the next node"""
@@ -54,13 +70,17 @@ def _deliver(self, state: RequestState) -> Generator[simpy.Event, None, None]:
5470
)
5571
return
5672

73+
self._concurrent_connections +=1
74+
5775
transit_time = general_sampler(random_variable, self.rng)
5876
yield self.env.timeout(transit_time)
77+
5978
state.record_hop(
6079
SystemEdges.NETWORK_CONNECTION,
6180
self.edge_config.id,
6281
self.env.now,
6382
)
83+
self._concurrent_connections -=1
6484
yield self.target_box.put(state)
6585

6686

@@ -71,7 +91,15 @@ def transport(self, state: RequestState) -> simpy.Process:
7191
"""
7292
return self.env.process(self._deliver(state))
7393

94+
@property
95+
def enabled_metrics(self) -> dict[SampledMetricName, list[float | int]]:
96+
"""Read-only access to the metric store."""
97+
return self._edge_enabled_metrics
7498

99+
@property
100+
def concurrent_connections(self) -> int:
101+
"""Current number of open connections on this edge."""
102+
return self._concurrent_connections
75103

76104

77105

src/app/schemas/simulation_settings_input.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@
22

33
from pydantic import BaseModel, Field
44

5-
from app.config.constants import EventMetricName, SampledMetricName, TimeDefaults
5+
from app.config.constants import (
6+
EventMetricName,
7+
SampledMetricName,
8+
SamplePeriods,
9+
TimeDefaults,
10+
)
611

712

813
class SimulationSettings(BaseModel):
@@ -19,6 +24,7 @@ class SimulationSettings(BaseModel):
1924
SampledMetricName.READY_QUEUE_LEN,
2025
SampledMetricName.CORE_BUSY,
2126
SampledMetricName.RAM_IN_USE,
27+
SampledMetricName.EDGE_CONCURRENT_CONNECTION,
2228
},
2329
description="Which time-series KPIs to collect by default.",
2430
)
@@ -29,3 +35,10 @@ class SimulationSettings(BaseModel):
2935
description="Which per-event KPIs to collect by default.",
3036
)
3137

38+
sample_period_s: int = Field(
39+
default = SamplePeriods.STANDARD_TIME,
40+
ge = SamplePeriods.MINIMUM_TIME,
41+
le = SamplePeriods.MAXIMUM_TIME,
42+
description="constant interval of time to build time series for metrics",
43+
)
44+

tests/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from app.config.constants import (
2424
EventMetricName,
2525
SampledMetricName,
26+
SamplePeriods,
2627
TimeDefaults,
2728
)
2829
from app.config.settings import settings
@@ -195,6 +196,7 @@ def sim_settings(
195196
total_simulation_time=TimeDefaults.MIN_SIMULATION_TIME,
196197
enabled_sample_metrics=enabled_sample_metrics,
197198
enabled_event_metrics=enabled_event_metrics,
199+
sample_period_s=SamplePeriods.STANDARD_TIME,
198200
)
199201

200202

0 commit comments

Comments
 (0)