Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 23 additions & 19 deletions asyncflow_queue_limit/asyncflow_mm1.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
},
{
"cell_type": "code",
"execution_count": 34,
"execution_count": 1,
"id": "c3a69413",
"metadata": {},
"outputs": [],
Expand All @@ -34,7 +34,7 @@
"from asyncflow import AsyncFlow, SimulationRunner\n",
"from asyncflow.analysis import MMc, ResultsAnalyzer, SweepAnalyzer\n",
"from asyncflow.components import (\n",
" Client, Server, Edge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
" Client, Server, LinkEdge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
")\n",
"from asyncflow.settings import SimulationSettings\n",
"\n",
Expand All @@ -43,18 +43,22 @@
},
{
"cell_type": "code",
"execution_count": 35,
"execution_count": null,
"metadata": {
"tags": [
"imports"
]
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Imports OK.\n"
"ename": "ImportError",
"evalue": "cannot import name 'Edge' from 'asyncflow.components' (/home/gioele/projects/AsyncFlow/src/asyncflow/components/__init__.py)",
"output_type": "error",
"traceback": [
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
"\u001b[31mImportError\u001b[39m Traceback (most recent call last)",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[2]\u001b[39m\u001b[32m, line 6\u001b[39m\n\u001b[32m 4\u001b[39m \u001b[38;5;66;03m# Public AsyncFlow API\u001b[39;00m\n\u001b[32m 5\u001b[39m \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01masyncflow\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mimport\u001b[39;00m AsyncFlow, SimulationRunner, Sweep\n\u001b[32m----> \u001b[39m\u001b[32m6\u001b[39m \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01masyncflow\u001b[39;00m\u001b[34;01m.\u001b[39;00m\u001b[34;01mcomponents\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mimport\u001b[39;00m Client, Server, Edge, Endpoint, ArrivalsGenerator\n\u001b[32m 7\u001b[39m \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01masyncflow\u001b[39;00m\u001b[34;01m.\u001b[39;00m\u001b[34;01msettings\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mimport\u001b[39;00m SimulationSettings\n\u001b[32m 8\u001b[39m \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01masyncflow\u001b[39;00m\u001b[34;01m.\u001b[39;00m\u001b[34;01manalysis\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mimport\u001b[39;00m MMc, ResultsAnalyzer, SweepAnalyzer\n",
"\u001b[31mImportError\u001b[39m: cannot import name 'Edge' from 'asyncflow.components' (/home/gioele/projects/AsyncFlow/src/asyncflow/components/__init__.py)"
]
}
],
Expand All @@ -64,7 +68,7 @@
"\n",
"# Public AsyncFlow API\n",
"from asyncflow import AsyncFlow, SimulationRunner, Sweep\n",
"from asyncflow.components import Client, Server, Edge, Endpoint, ArrivalsGenerator\n",
"from asyncflow.components import Client, Server, LinkEdge, Endpoint, ArrivalsGenerator\n",
"from asyncflow.settings import SimulationSettings\n",
"from asyncflow.analysis import MMc, ResultsAnalyzer, SweepAnalyzer\n",
"from asyncflow.enums import Distribution\n",
Expand Down Expand Up @@ -103,7 +107,7 @@
},
{
"cell_type": "code",
"execution_count": 36,
"execution_count": null,
"metadata": {
"tags": [
"build"
Expand Down Expand Up @@ -139,9 +143,9 @@
" endpoints=[endpoint],\n",
" )\n",
"\n",
" e_gen_client = Edge(id=\"gen-client\", source=\"rqs-1\", target=\"client-1\", latency=0.0001, dropout_rate=0.0)\n",
" e_client_app = Edge(id=\"client-app\", source=\"client-1\", target=\"app-1\", latency=0.0001, dropout_rate=0.0)\n",
" e_app_client = Edge(id=\"app-client\", source=\"app-1\", target=\"client-1\", latency=0.0001, dropout_rate=0.0)\n",
" e_gen_client = LinkEdge(id=\"gen-client\", source=\"rqs-1\", target=\"client-1\")\n",
" e_client_app = LinkEdge(id=\"client-app\", source=\"client-1\", target=\"app-1\")\n",
" e_app_client = LinkEdge(id=\"app-client\", source=\"app-1\", target=\"client-1\")\n",
"\n",
" settings = SimulationSettings(\n",
" total_simulation_time=2400,\n",
Expand All @@ -168,7 +172,7 @@
},
{
"cell_type": "code",
"execution_count": 37,
"execution_count": null,
"metadata": {
"tags": [
"run"
Expand Down Expand Up @@ -296,12 +300,12 @@
"\\end{cases}\n",
"$$\n",
"\n",
"> **Why small deltas appear:** warm-up effects, the user-sampling window (piecewise-constant rate), finite simulation horizon, and a (small) deterministic network latency naturally introduce small Theory vs Observed gaps. Increasing the simulation time and reducing network latency typically shrinks these deltas.\n"
"> **Why small deltas appear:** warm-up effects, the user-sampling window (piecewise-constant rate), finite simulation horizon. Increasing the simulation time typically shrinks these deltas.\n"
]
},
{
"cell_type": "code",
"execution_count": 38,
"execution_count": null,
"metadata": {
"tags": [
"mm1"
Expand Down Expand Up @@ -378,7 +382,7 @@
},
{
"cell_type": "code",
"execution_count": 39,
"execution_count": null,
"metadata": {
"tags": [
"plots"
Expand Down Expand Up @@ -467,7 +471,7 @@
},
{
"cell_type": "code",
"execution_count": 40,
"execution_count": null,
"id": "c9063bbe",
"metadata": {},
"outputs": [
Expand Down Expand Up @@ -514,7 +518,7 @@
},
{
"cell_type": "code",
"execution_count": 41,
"execution_count": null,
"id": "48716bc8",
"metadata": {},
"outputs": [
Expand Down Expand Up @@ -564,7 +568,7 @@
},
{
"cell_type": "code",
"execution_count": 42,
"execution_count": null,
"id": "9b9f0236",
"metadata": {},
"outputs": [
Expand Down
24 changes: 12 additions & 12 deletions asyncflow_queue_limit/asyncflow_mmc_split.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": null,
"id": "b8a94d93",
"metadata": {},
"outputs": [],
Expand All @@ -36,7 +36,7 @@
"from asyncflow import AsyncFlow, SimulationRunner\n",
"from asyncflow.analysis import MMc, ResultsAnalyzer\n",
"from asyncflow.components import (\n",
" Client, Server, Edge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
" Client, Server, LinkEdge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
")\n",
"from asyncflow.settings import SimulationSettings\n",
"\n",
Expand All @@ -46,7 +46,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": null,
"id": "d1b7ad7d",
"metadata": {},
"outputs": [
Expand All @@ -64,7 +64,7 @@
"\n",
"# Public AsyncFlow API\n",
"from asyncflow import AsyncFlow, SimulationRunner, Sweep\n",
"from asyncflow.components import Client, Server, Edge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
"from asyncflow.components import Client, Server, LinkEdge, Endpoint, LoadBalancer, ArrivalsGenerator\n",
"from asyncflow.settings import SimulationSettings\n",
"from asyncflow.analysis import ResultsAnalyzer, SweepAnalyzer, MMc\n",
"from asyncflow.enums import Distribution\n",
Expand Down Expand Up @@ -94,7 +94,7 @@
" Arrivals are produced by the same **two-stage, windowed Poisson sampler**: in each user-sampling window \\$\\Delta\\$, we draw the active users \\$U\\$ (Poisson or Normal, per config).\n",
" Within the window, arrivals are a **homogeneous Poisson process** with rate \\$\\Lambda = U \\cdot \\lambda\\_r/60\\$.\n",
"\n",
" With **small \\$\\Delta\\$**, **Poisson users**, **long runs**, and **tiny edge latency**, the aggregate arrivals seen by the load balancer approximate a global Poisson input, yielding a good empirical match to the M/M/c model.\n",
" \n",
"\n",
"---\n",
"\n",
Expand Down Expand Up @@ -128,7 +128,7 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": null,
"id": "ba93587a",
"metadata": {},
"outputs": [],
Expand Down Expand Up @@ -173,12 +173,12 @@
" )\n",
"\n",
" edges = [\n",
" Edge(id=\"gen-client\", source=\"rqs-1\", target=\"client-1\", latency=0.00001, dropout_rate=0),\n",
" Edge(id=\"client-lb\", source=\"client-1\", target=\"lb-1\", latency=0.00001, dropout_rate=0),\n",
" Edge(id=\"lb-srv1\", source=\"lb-1\", target=\"srv-1\", latency=0.00001, dropout_rate=0),\n",
" Edge(id=\"lb-srv2\", source=\"lb-1\", target=\"srv-2\", latency=0.00001, dropout_rate=0),\n",
" Edge(id=\"srv1-client\", source=\"srv-1\", target=\"client-1\", latency=0.00001, dropout_rate=0),\n",
" Edge(id=\"srv2-client\", source=\"srv-2\", target=\"client-1\", latency=0.00001, dropout_rate=0),\n",
" LinkEdge(id=\"gen-client\", source=\"rqs-1\", target=\"client-1\",),\n",
" LinkEdge(id=\"client-lb\", source=\"client-1\", target=\"lb-1\", ),\n",
" LinkEdge(id=\"lb-srv1\", source=\"lb-1\", target=\"srv-1\", ),\n",
" LinkEdge(id=\"lb-srv2\", source=\"lb-1\", target=\"srv-2\", ),\n",
" LinkEdge(id=\"srv1-client\", source=\"srv-1\", target=\"client-1\",),\n",
" LinkEdge(id=\"srv2-client\", source=\"srv-2\", target=\"client-1\",),\n",
" ]\n",
"\n",
" settings = SimulationSettings(\n",
Expand Down
76 changes: 61 additions & 15 deletions src/asyncflow/builder/asyncflow_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

from typing import Self

from asyncflow.config.enums import EventDescription
from asyncflow.config.enums import EventDescription, SystemEdges
from asyncflow.schemas.arrivals.generator import ArrivalsGenerator
from asyncflow.schemas.events.injection import End, EventInjection, Start
from asyncflow.schemas.payload import SimulationPayload
from asyncflow.schemas.settings.simulation import SimulationSettings
from asyncflow.schemas.topology.edges import Edge
from asyncflow.schemas.topology.edges import LinkEdge, NetworkEdge
from asyncflow.schemas.topology.graph import TopologyGraph
from asyncflow.schemas.topology.nodes import (
Client,
Expand All @@ -27,10 +27,12 @@ def __init__(self) -> None:
self._arrivals: ArrivalsGenerator | None = None
self._client: Client | None = None
self._servers: list[Server] | None = None
self._edges: list[Edge] | None = None
self._net_edges: list[NetworkEdge] | None = None
self._link_edges: list[LinkEdge] | None = None
self._sim_settings: SimulationSettings | None = None
self._load_balancer: LoadBalancer | None = None
self._events: list[EventInjection] = []
self._edges_kind: SystemEdges | None = None

def add_arrivals_generator(
self,
Expand Down Expand Up @@ -64,18 +66,48 @@ def add_servers(self, *servers: Server) -> Self:
self._servers.append(server)
return self

def add_edges(self, *edges: Edge) -> Self:
"""Method to instantiate the list of edges"""
if self._edges is None:
self._edges = []
def add_edges(self, *edges: NetworkEdge | LinkEdge) -> Self:
"""Add edges; enforces homogeneous type (all NetworkEdge or all LinkEdge)."""
if not edges:
return self

if self._edges_kind is None:
first = edges[0]
if isinstance(first, NetworkEdge):
self._edges_kind = SystemEdges.NETWORK_CONNECTION
self._net_edges = []
elif isinstance(first, LinkEdge):
self._edges_kind = SystemEdges.LINK_CONNECTION
self._link_edges = []
else:
msg = "Edges must be NetworkEdge or LinkEdge."
raise TypeError(msg)

assert self._edges_kind is not None

for edge in edges:
if not isinstance(edge, Edge):
msg = "All the instances must be of the type Edge"
if self._edges_kind == SystemEdges.NETWORK_CONNECTION:
assert self._net_edges is not None
if any(not isinstance(e, NetworkEdge) for e in edges):
msg = "Cannot mix LinkEdge with NetworkEdge."
raise TypeError(msg)
self._edges.append(edge)
# ⬇️ Build a typed batch so mypy is happy
net_batch: list[NetworkEdge] = [
e for e in edges if isinstance(e, NetworkEdge)
]
self._net_edges.extend(net_batch)
else:
assert self._link_edges is not None
if any(not isinstance(e, LinkEdge) for e in edges):
msg = "Cannot mix NetworkEdge with LinkEdge."
raise TypeError(msg)
# ⬇️ Typed batch for LinkEdge
link_batch: list[LinkEdge] = [e for e in edges if isinstance(e, LinkEdge)]
self._link_edges.extend(link_batch)

return self



def add_simulation_settings(self, sim_settings: SimulationSettings) -> Self:
"""Method to instantiate the settings for the simulation"""
if not isinstance(sim_settings, SimulationSettings):
Expand Down Expand Up @@ -146,17 +178,31 @@ def add_server_outage(
def build_payload(self) -> SimulationPayload:
"""Method to build the payload for the simulation"""
if self._arrivals is None:
msg = "The generator input must be instantiated before the simulation"
msg = "The arrivals generator must be instantiated before the simulation"
raise ValueError(msg)
if self._client is None:
msg = "The client input must be instantiated before the simulation"
raise ValueError(msg)
if not self._servers:
msg = "You must instantiate at least one server before the simulation"
raise ValueError(msg)
if not self._edges:
msg = "You must instantiate edges before the simulation"
if self._edges_kind is None:
msg = "You must instantiate edges before the simulation."
raise ValueError(msg)

# mypy facilitator
edges_u: list[NetworkEdge] | list[LinkEdge]
if self._edges_kind == SystemEdges.NETWORK_CONNECTION:
if not self._net_edges:
msg = "You must instantiate edges before the simulation."
raise ValueError(msg)
edges_u = self._net_edges
else:
if not self._link_edges:
msg = "You must instantiate edges before the simulation."
raise ValueError(msg)
edges_u = self._link_edges

if self._sim_settings is None:
msg = "The simulation settings must be instantiated before the simulation"
raise ValueError(msg)
Expand All @@ -169,7 +215,7 @@ def build_payload(self) -> SimulationPayload:

graph = TopologyGraph(
nodes = nodes,
edges=self._edges,
edges=edges_u,
)

return SimulationPayload.model_validate({
Expand Down
5 changes: 3 additions & 2 deletions src/asyncflow/components/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from asyncflow.schemas.arrivals.generator import ArrivalsGenerator
from asyncflow.schemas.events.injection import EventInjection
from asyncflow.schemas.topology.edges import Edge
from asyncflow.schemas.topology.edges import LinkEdge, NetworkEdge
from asyncflow.schemas.topology.endpoint import Endpoint
from asyncflow.schemas.topology.nodes import (
Client,
Expand All @@ -15,10 +15,11 @@
__all__ = [
"ArrivalsGenerator",
"Client",
"Edge",
"Endpoint",
"EventInjection",
"LinkEdge",
"LoadBalancer",
"NetworkEdge",
"NodesResources",
"Server",
]
Expand Down
1 change: 1 addition & 0 deletions src/asyncflow/config/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class SystemEdges(StrEnum):
"""

NETWORK_CONNECTION = "network_connection"
LINK_CONNECTION = "link_connection"

# ======================================================================
# CONSTANTS FOR THE EVENT TO INJECT IN THE SIMULATION
Expand Down
Loading