Skip to content

Commit 87d30ea

Browse files
authored
Structure input for multiprocessing, improved tests, improved description (#23)
1 parent 6e4355b commit 87d30ea

File tree

25 files changed

+238
-106
lines changed

25 files changed

+238
-106
lines changed

docs/api/components.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ These classes are Pydantic models with strict validation and are the
1717
from asyncflow.components import (
1818
Client,
1919
Server,
20-
ServerResources,
20+
NodesResources,
2121
LoadBalancer,
2222
Endpoint,
2323
Edge,
@@ -32,7 +32,7 @@ from asyncflow.enums import Distribution
3232

3333
```python
3434
from asyncflow.components import (
35-
Client, Server, ServerResources, LoadBalancer, Endpoint, Edge
35+
Client, Server, NodesResources, LoadBalancer, Endpoint, Edge
3636
)
3737

3838
# Nodes
@@ -49,7 +49,7 @@ endpoint = Endpoint(
4949

5050
server = Server(
5151
id="srv-1",
52-
server_resources=ServerResources(cpu_cores=2, ram_mb=2048),
52+
server_resources=NodesResources(cpu_cores=2, ram_mb=2048),
5353
endpoints=[endpoint],
5454
)
5555

@@ -103,18 +103,18 @@ Client(id: str)
103103

104104
---
105105

106-
### `ServerResources`
106+
### `NodesResources`
107107

108108
```python
109-
ServerResources(
109+
NodesResources(
110110
cpu_cores: int = 1, # ≥ 1 NOW MUST BE FIXED TO ONE
111111
ram_mb: int = 1024, # ≥ 256
112112
db_connection_pool: int | None = None,
113113
)
114114
```
115115

116116
* Server capacity knobs used by the runtime (CPU tokens, RAM reservoir, optional DB pool).
117-
* You may pass a **dict** instead of `ServerResources`; Pydantic will coerce it.
117+
* You may pass a **dict** instead of `NodesResources`; Pydantic will coerce it.
118118

119119
**Bounds & defaults**
120120

@@ -166,7 +166,7 @@ Each step is a dict with **exactly one** operation:
166166
```python
167167
Server(
168168
id: str,
169-
server_resources: ServerResources | dict,
169+
server_resources: NodesResources | dict,
170170
endpoints: list[Endpoint],
171171
)
172172
```
@@ -234,7 +234,7 @@ Edge(
234234
## Type coercion & enums
235235

236236
* You may pass strings for enums (`kind`, `distribution`, etc.); they will be validated against the allowed values.
237-
* For `ServerResources` and `Edge.latency` you can pass dictionaries; Pydantic will coerce them to typed models.
237+
* For `NodesResources` and `Edge.latency` you can pass dictionaries; Pydantic will coerce them to typed models.
238238
* If you prefer, you can import and use the enums:
239239

240240
```python

docs/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ AsyncFlow is a discrete-event simulator for Python async backends (FastAPI/Uvico
1414
## Public API (stable surface)
1515

1616
* **[High-Level API](api/high-level.md)** — The two entry points you’ll use most: `AsyncFlow` (builder) and `SimulationRunner` (orchestrator).
17-
* **[Components](api/components.md)** — Public Pydantic models for topology: `Client`, `Server`, `Endpoint`, `Edge`, `LoadBalancer`, `ServerResources`.
17+
* **[Components](api/components.md)** — Public Pydantic models for topology: `Client`, `Server`, `Endpoint`, `Edge`, `LoadBalancer`, `NodesResources`.
1818
* **[Workload](api/workload.md)** — Traffic inputs: `RqsGenerator` and `RVConfig` (random variables).
1919
* **[Settings](api/settings.md)** — Global controls: `SimulationSettings` (duration, sampling cadence, metrics).
2020
* **[Enums](api/enums.md)** — Optional importable enums: distributions, step kinds/ops, metric names, node/edge types, LB algorithms.

docs/internals/runtime-and-resources.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ AsyncFlow mirrors that physical constraint through the **Resource layer**, which
8888

8989
| Responsibility | Implementation detail |
9090
| --------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
91-
| **Discover capacity** | Walks the *validated* `TopologyGraph.nodes.servers`, reading `cpu_cores` and `ram_mb` from each `ServerResources` spec. |
91+
| **Discover capacity** | Walks the *validated* `TopologyGraph.nodes.servers`, reading `cpu_cores` and `ram_mb` from each `NodesResources` spec. |
9292
| **Mint containers** | Calls `build_containers(env, spec)` which returns<br>`{"CPU": simpy.Container(init=cpu_cores), "RAM": simpy.Container(init=ram_mb)}` — the containers start **full** so a server can immediately consume tokens. |
9393
| **Registry map** | Stores them in a private dict `_by_server: dict[str, ServerContainers]`. |
9494
| **Public API** | `registry[server_id] → ServerContainers` (raises `KeyError` if the ID is unknown). |

docs/internals/simulation-input.md

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -114,15 +114,15 @@ class Client(BaseModel):
114114
# validator: type must equal SystemNodes.CLIENT
115115
```
116116

117-
#### `ServerResources`
117+
#### `NodesResources`
118118

119119
```python
120-
class ServerResources(BaseModel):
121-
cpu_cores: PositiveInt = Field(ServerResourcesDefaults.CPU_CORES,
122-
ge=ServerResourcesDefaults.MINIMUM_CPU_CORES)
123-
db_connection_pool: PositiveInt | None = Field(ServerResourcesDefaults.DB_CONNECTION_POOL)
124-
ram_mb: PositiveInt = Field(ServerResourcesDefaults.RAM_MB,
125-
ge=ServerResourcesDefaults.MINIMUM_RAM_MB)
120+
class NodesResources(BaseModel):
121+
cpu_cores: PositiveInt = Field(NodesResourcesDefaults.CPU_CORES,
122+
ge=NodesResourcesDefaults.MINIMUM_CPU_CORES)
123+
db_connection_pool: PositiveInt | None = Field(NodesResourcesDefaults.DB_CONNECTION_POOL)
124+
ram_mb: PositiveInt = Field(NodesResourcesDefaults.RAM_MB,
125+
ge=NodesResourcesDefaults.MINIMUM_RAM_MB)
126126
```
127127

128128
Each attribute maps directly to a SimPy primitive (core tokens, RAM container, optional DB pool).
@@ -164,7 +164,7 @@ Canonical lowercase names avoid accidental duplicates by case.
164164
class Server(BaseModel):
165165
id: str
166166
type: SystemNodes = SystemNodes.SERVER
167-
server_resources: ServerResources
167+
server_resources: NodesResources
168168
endpoints: list[Endpoint]
169169
# validator: type must equal SystemNodes.SERVER
170170
```
@@ -302,7 +302,7 @@ class SimulationSettings(BaseModel):
302302
### Nodes
303303

304304
* `Client.type == client`, `Server.type == server`, `LoadBalancer.type == load_balancer` (enforced).
305-
* `ServerResources` obey lower bounds: `cpu_cores ≥ 1`, `ram_mb ≥ 256`.
305+
* `NodesResources` obey lower bounds: `cpu_cores ≥ 1`, `ram_mb ≥ 256`.
306306
* `TopologyNodes` contains **unique ids** across `client`, `servers[]`, and (optional) `load_balancer`. Duplicates → `ValueError`.
307307
* `TopologyNodes` forbids unknown fields (`extra="forbid"`).
308308

src/asyncflow/components/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
from asyncflow.schemas.topology.nodes import (
88
Client,
99
LoadBalancer,
10+
NodesResources,
1011
Server,
11-
ServerResources,
1212
)
1313

1414
__all__ = [
@@ -17,8 +17,8 @@
1717
"Endpoint",
1818
"EventInjection",
1919
"LoadBalancer",
20+
"NodesResources",
2021
"Server",
21-
"ServerResources",
2222
]
2323

2424

src/asyncflow/config/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class StepOperation(StrEnum):
117117
# CONSTANTS FOR THE RESOURCES OF A SERVER
118118
# ======================================================================
119119

120-
class ServerResourcesDefaults:
120+
class NodesResourcesDefaults:
121121
"""Resources available for a single server"""
122122

123123
CPU_CORES = 1

src/asyncflow/resources/server_containers.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import simpy
1313

1414
from asyncflow.config.constants import ServerResourceName
15-
from asyncflow.schemas.topology.nodes import ServerResources
15+
from asyncflow.schemas.topology.nodes import NodesResources
1616

1717
# ==============================================================
1818
# DICT FOR THE REGISTRY TO INITIALIZE RESOURCES FOR EACH SERVER
@@ -33,12 +33,12 @@ class ServerContainers(TypedDict):
3333
# Central funcrion to initialize the dictionary with ram and cpu container
3434
def build_containers(
3535
env: simpy.Environment,
36-
spec: ServerResources,
36+
spec: NodesResources,
3737
) -> ServerContainers:
3838
"""
3939
Construct and return a mapping of SimPy Containers for a server's CPU and RAM.
4040
41-
Given a SimPy environment and a validated ServerResources spec, this function
41+
Given a SimPy environment and a validated NodesResources spec, this function
4242
initializes one simpy.Container for CPU (with capacity equal to cpu_cores)
4343
and one for RAM (with capacity equal to ram_mb), then returns them in a
4444
ServerContainers TypedDict keyed by "CPU" and "RAM".
@@ -47,7 +47,7 @@ def build_containers(
4747
----------
4848
env : simpy.Environment
4949
The simulation environment in which the Containers will be created.
50-
spec : ServerResources
50+
spec : NodesResources
5151
A Pydantic model instance defining the server's cpu_cores and ram_mb.
5252
5353
Returns

src/asyncflow/runtime/actors/server.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901
196196

197197
for step in selected_endpoint.steps:
198198

199-
if step.kind in EndpointStepCPU:
199+
if isinstance(step.kind, EndpointStepCPU):
200200
# with the boolean we avoid redundant operation of asking
201201
# the core multiple time on a given step
202202
# for example if we have two consecutive cpu bound step
@@ -232,7 +232,7 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901
232232

233233
# since the object is of an Enum class we check if the step.kind
234234
# is one member of enum
235-
elif step.kind in EndpointStepIO:
235+
elif isinstance(step.kind, EndpointStepIO):
236236
# define the io time
237237
io_time = step.step_operation[StepOperation.IO_WAITING_TIME]
238238

@@ -244,7 +244,7 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901
244244
if not is_in_io_queue:
245245
is_in_io_queue = True
246246
self._el_io_queue_len += 1
247-
247+
248248
# here is a sage check: the first step should always
249249
# be a cpu bound (parsing of the request), if an user
250250
# start with a I/O this allow to don't break the flux
@@ -305,6 +305,21 @@ def _dispatcher(self) -> Generator[simpy.Event, None, None]:
305305
The main dispatcher loop. It pulls requests from the inbox and
306306
spawns a new '_handle_request' process for each one.
307307
"""
308+
# we assume in the current model that there is a one
309+
# to one correspondence between cpu cores and workers
310+
# before entering in the loop in the current implementation
311+
# we reserve the ram necessary to run the processes
312+
if self.server_config.ram_per_process:
313+
processes_ram = (
314+
self.server_config.ram_per_process *
315+
self.server_config.server_resources.cpu_cores
316+
)
317+
318+
yield self.server_resources[
319+
ServerResourceName.RAM.value
320+
].get(processes_ram)
321+
322+
308323
while True:
309324
# Wait for a request to arrive in the server's inbox
310325
raw_state = yield self.server_box.get()

0 commit comments

Comments
 (0)