Skip to content

Commit 16ab11b

Browse files
authored
feat: pipeline data (#295)
We need to save pipeline data to be able to identify its identifiers and total throughput. For that, whenever a new config is being generated, we extract this data and save it in a list. Whenever we need to scale in, we get latest pipeline data that was added to the config and we use it to decide if we can remove it based on its data.
1 parent 4008ce6 commit 16ab11b

File tree

2 files changed

+49
-6
lines changed

2 files changed

+49
-6
lines changed

infscale/configs/job.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ def max_world_id(self) -> int:
332332

333333
return max_id
334334

335-
def _server_id(self) -> str | None:
335+
def server_id(self) -> str | None:
336336
"""Return server id."""
337337
for worker in self.workers:
338338
if worker.is_server:
@@ -342,7 +342,7 @@ def _server_id(self) -> str | None:
342342

343343
def server_ip(self) -> str:
344344
"""Return IP address of server."""
345-
server_id = self._server_id()
345+
server_id = self.server_id()
346346
if server_id is None:
347347
return ""
348348

@@ -399,6 +399,20 @@ def is_identical(x: JobConfig, y: JobConfig) -> bool:
399399
def world_name(world_id: int) -> str:
400400
"""Return world name given a world id."""
401401
return f"w{world_id}"
402+
403+
@staticmethod
404+
def get_pipeline_identifiers(new_cfg: JobConfig) -> set[str]:
405+
"""Get pipeline identifiers based on server id."""
406+
server_id = new_cfg.server_id()
407+
408+
wrk_ids = set()
409+
410+
for wid, worlds_list in new_cfg.flow_graph.items():
411+
for world_info in worlds_list:
412+
if server_id in world_info.peers:
413+
wrk_ids.add(wid)
414+
415+
return wrk_ids
402416

403417
@staticmethod
404418
def merge(base: JobConfig, extra: JobConfig) -> JobConfig:

infscale/controller/planner.py

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
"""planner.py."""
1818

19+
from dataclasses import dataclass
1920
import json
2021
from pathlib import Path
2122

@@ -80,6 +81,14 @@ def enumerate(self) -> ExecPlan:
8081
yield plan
8182

8283

84+
@dataclass
85+
class PipelineData:
86+
"""PipelineData class."""
87+
88+
worker_ids: set[str]
89+
total_throughput: float
90+
91+
8392
class Planner:
8493
"""Planner class."""
8594

@@ -90,6 +99,8 @@ def __init__(self, path: str, autoscale: bool) -> None:
9099
self._autoscale = autoscale
91100

92101
self._colls: dict[str, PlanCollection] = {}
102+
103+
self.pipeline_data: dict[str, list[PipelineData]] = {}
93104

94105
def build_config(
95106
self,
@@ -115,8 +126,13 @@ def build_config(
115126
if solution is None:
116127
raise InsufficientResources("No placement solution found")
117128

118-
gen2 = CfgGen2(solution[0], solution[1], source, "cuda", base_cfg)
129+
placement, agent_ctxts_list, total_throughput = solution
130+
131+
gen2 = CfgGen2(placement, agent_ctxts_list, source, "cuda", base_cfg)
119132
cfg = gen2.generate()
133+
134+
self._set_pipeline_data(cfg, total_throughput)
135+
120136
return cfg
121137

122138
#####
@@ -129,6 +145,19 @@ def build_config(
129145
# plan_list = self._colls[source.model].pick_plans(demand)
130146
# gen = CfgGen(agent_ctxts, source, plan_list, "cuda", base_cfg)
131147
# return gen.generate()
148+
149+
def _set_pipeline_data(self, cfg: JobConfig, total_throughput) -> None:
150+
"""Set pipeline data."""
151+
job_id = cfg.job_id
152+
153+
if job_id not in self.pipeline_data:
154+
self.pipeline_data[job_id] = []
155+
156+
pipeline_identifiers = JobConfig.get_pipeline_identifiers(cfg)
157+
prev_identifiers = {wid for data in self.pipeline_data[job_id] for wid in data.worker_ids}
158+
new_identifiers = pipeline_identifiers - prev_identifiers
159+
160+
self.pipeline_data[job_id].append(PipelineData(new_identifiers, total_throughput))
132161

133162
def _search_feasible_placement(
134163
self,
@@ -138,15 +167,15 @@ def _search_feasible_placement(
138167
gpu_count: int,
139168
ctx_list: list[AgentContext],
140169
dispatcher_on_gpu: bool = True,
141-
) -> tuple[dict, list[AgentContext]] | None:
170+
) -> tuple[dict, list[AgentContext], float] | None:
142171
# we'd like to search a feasible solution by increasing the number of nodes
143172
for num_nodes in range(1, len(ctx_list) + 1):
144173
res = placement.calculate_placement(
145174
gpu_count, len(ctx_list[:num_nodes]), nfaults, dispatcher_on_gpu
146175
)
147176
meta = res["meta"]
148177
if meta["total_throughput"] > demand:
149-
return (res, ctx_list[:num_nodes])
178+
return (res, ctx_list[:num_nodes], meta["total_throughput"])
150179

151180
return None
152181

@@ -156,7 +185,7 @@ def _calculate_placement(
156185
agent_ctxts: dict[str, AgentContext],
157186
demand: float,
158187
dispatcher_on_gpu: bool = True,
159-
) -> tuple[dict, list[AgentContext]] | None:
188+
) -> tuple[dict, list[AgentContext], float] | None:
160189
gpu_count_and_nodes: dict[int, list[AgentContext]] = {}
161190
for ctx in agent_ctxts.values():
162191
count = ctx.avail_gpu_count()

0 commit comments

Comments
 (0)