Skip to content

Commit 2b0017a

Browse files
authored
feat: scale in (#296)
Added logic to determine if the job is underutilized based on effective throughput and queue trends. The system is underutilized when can handle load without last pipeline (remaining capacity > demand) and queue is decreasing or stably low. If this condition is true, we scale in by removing last pipeline and do an update.
1 parent c735a9a commit 2b0017a

File tree

5 files changed

+125
-10
lines changed

5 files changed

+125
-10
lines changed

infscale/common/exceptions.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ def __init__(self, err_msg: str):
5959
super().__init__(err_msg)
6060

6161

62+
class InsufficientThroughput(InfScaleException):
63+
"""Exception for insufficient throughput."""
64+
65+
def __init__(self, err_msg: str):
66+
"""Initialize InsufficientThroughput exception instance."""
67+
super().__init__(err_msg)
68+
69+
6270
class DifferentResourceAmount(InfScaleException):
6371
"""Exception for different resource amounts."""
6472

infscale/common/metrics.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ class PerfMetrics:
9797
_sensitivity_factor: float = 1
9898
_qthresh: float = 10**9
9999

100+
# defines how much the queue must decrease (as a ratio of its average)
101+
# For example:
102+
# q_drop_factor = 0.8 -> current queue < 80% of recent avg -> queue is dropping (~20% drop)
103+
# lower values = more aggressive scale-in detection (more sensitive to noise)
104+
# higher values = more conservative (requires stronger drop signal)
105+
_q_drop_factor: float = 0.8
106+
100107
_qlevel_rs: RollingStats = None
101108
_in_rate_rs: RollingStats = None
102109
_out_rate_rs: RollingStats = None
@@ -140,6 +147,37 @@ def is_congested(self) -> bool:
140147
"""Return true if queue continues to build up."""
141148
return self.qlevel > self._qthresh
142149

150+
def is_underutilized(self) -> bool:
151+
"""Returns true if queue has a decreasing trend."""
152+
153+
# need enough samples to make a stable judgment
154+
if not self._qlevel_rs.is_filled():
155+
return False
156+
157+
# rolling average of current queue length
158+
avg_q = self._qlevel_rs.mean()
159+
160+
# detect if queue is trending downward
161+
# a decreasing qlevel indicates demand is below processing capacity
162+
return self.qlevel < avg_q * self._q_drop_factor
163+
164+
def rate_to_scale_in(self, margin: float = 0.2) -> float:
165+
"""Return a safe arrival rate threshold to trigger scale-in.
166+
167+
This represents the effective arrival rate (including a safety margin)
168+
below which the system is considered underutilized and can safely
169+
reduce resources.
170+
171+
Args:
172+
margin (float): A fractional buffer (e.g., 0.2 for 20%) added to the
173+
average input rate to absorb short-term fluctuations and avoid
174+
premature scaling in.
175+
176+
Returns:
177+
float: The adjusted input rate threshold for scale-in decisions.
178+
"""
179+
return self._in_rate_rs.mean() * (1 + margin)
180+
143181
def rate_to_decongest(self) -> float:
144182
"""Return a required rate to relieve congestion.
145183

infscale/controller/autoscaler.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from infscale import get_logger
2626
from infscale.common.metrics import PerfMetrics
2727
from infscale.controller.job_context import JobContext, JobStateEnum
28+
from infscale.controller.planner import DemandData
2829

2930

3031
if TYPE_CHECKING:
@@ -75,8 +76,11 @@ async def run(self) -> None:
7576
continue
7677

7778
if not metrics.is_congested():
78-
# TODO: if not congested, check if scale-in is necessary
7979
self._congestion_count = 0
80+
81+
if metrics.is_underutilized():
82+
await self._scale_in(job_ctx, metrics)
83+
8084
continue
8185

8286
if self._last_output_rate >= metrics.output_rate:
@@ -92,7 +96,8 @@ async def run(self) -> None:
9296

9397
async def _scale_out(self, ctx: JobContext, metrics: PerfMetrics) -> None:
9498
rate = metrics.rate_to_decongest()
95-
ctx.set_desired_rate(rate)
99+
demand_data = DemandData(rate)
100+
ctx.set_demand_data(demand_data)
96101

97102
logger.debug(f"congested, desired rate = {rate}")
98103

@@ -107,6 +112,24 @@ async def _scale_out(self, ctx: JobContext, metrics: PerfMetrics) -> None:
107112
self._last_output_rate = metrics.output_rate
108113
logger.debug("finished scaling-out")
109114

115+
async def _scale_in(self, ctx: JobContext, metrics: PerfMetrics) -> None:
116+
rate = metrics.rate_to_scale_in()
117+
demand_data = DemandData(rate, False)
118+
ctx.set_demand_data(demand_data)
119+
120+
logger.debug(f"underutilized, desired rate = {rate}")
121+
122+
try:
123+
await ctx.update()
124+
except Exception as e:
125+
logger.warning(f"exception: {e}")
126+
self._last_run = time.perf_counter()
127+
return
128+
129+
self._last_run = time.perf_counter()
130+
131+
logger.info("finished scaling-in")
132+
110133
async def set_event(self, job_id: str, wrkr_id: str) -> None:
111134
"""Set an autoscaling event for a given job and worker."""
112135
await self._event_queue.put((job_id, wrkr_id))

infscale/controller/job_context.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from infscale.controller.ctrl_dtype import CommandAction, CommandActionModel
4444
from infscale.controller.deployment.assignment import AssignmentCollection
4545
from infscale.controller.job_checker import JobChecker
46+
from infscale.controller.planner import DemandData
4647

4748

4849
if TYPE_CHECKING:
@@ -674,14 +675,14 @@ def __init__(self, ctrl: Controller, job_id: str):
674675
self.past_running_agent_info: dict[str, AgentMetaData] = {}
675676
self.job_checker = JobChecker(self.wrk_status)
676677

677-
self._desired_rate = 0.0
678+
self._demand_data: DemandData = DemandData()
678679

679680
global logger
680681
logger = get_logger()
681682

682-
def set_desired_rate(self, rate: float) -> None:
683-
"""Set diresed output rate for a job."""
684-
self._desired_rate = rate
683+
def set_demand_data(self, demand_data: DemandData) -> None:
684+
"""Set demand data for a job."""
685+
self._demand_data = demand_data
685686

686687
def get_agent_data(self, agent_id: str) -> AgentMetaData:
687688
"""Return agent metadata."""
@@ -821,7 +822,7 @@ def process_cfg(self) -> None:
821822
self._new_cfg = self.ctrl.planner.build_config(
822823
self.req.config,
823824
self.ctrl.agent_contexts,
824-
self._desired_rate,
825+
self._demand_data,
825826
self._cur_cfg,
826827
)
827828

infscale/controller/planner.py

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from dataclasses import dataclass
2121
from pathlib import Path
2222

23-
from infscale.common.exceptions import InsufficientResources
23+
from infscale.common.exceptions import InsufficientResources, InsufficientThroughput
2424
from infscale.configs.job import JobConfig
2525
from infscale.configs.plan import ExecPlan
2626
from infscale.controller.agent_context import AgentContext
@@ -89,6 +89,14 @@ class PipelineData:
8989
total_throughput: float
9090

9191

92+
@dataclass
93+
class DemandData:
94+
"""DemandData class."""
95+
96+
rate: float = 0.0
97+
scale_out: bool = True
98+
99+
92100
class Planner:
93101
"""Planner class."""
94102

@@ -106,21 +114,35 @@ def build_config(
106114
self,
107115
source: JobConfig,
108116
agent_ctxts: dict[str, AgentContext],
109-
demand: float = 0,
117+
demand_data: DemandData,
110118
base_cfg: JobConfig = None,
111119
) -> JobConfig:
112120
"""Build a config based on source config."""
113121
if not self._autoscale:
114122
# if autoscale is not enabled, we use source as is
115123
return source
116124

125+
rate, scale_out = demand_data.rate, demand_data.scale_out
126+
127+
if scale_out:
128+
return self._get_scaled_out_cfg(source, agent_ctxts, rate, base_cfg)
129+
130+
return self._get_scaled_in_cfg(base_cfg, rate)
131+
132+
def _get_scaled_out_cfg(
133+
self,
134+
source: JobConfig,
135+
agent_ctxts: dict[str, AgentContext],
136+
rate: float,
137+
base_cfg: JobConfig = None,
138+
) -> JobConfig:
117139
# if base_cfg is none, this is the first time we build a config,
118140
# so we need to place the dispatcher on a GPU
119141
# otherwise, we already have a base config, so we don't need to
120142
# spare a GPU for the dispatcher
121143
dispatcher_on_gpu = base_cfg is None
122144
solution = self._calculate_placement(
123-
source, agent_ctxts, demand, dispatcher_on_gpu=dispatcher_on_gpu
145+
source, agent_ctxts, rate, dispatcher_on_gpu=dispatcher_on_gpu
124146
)
125147

126148
if solution is None:
@@ -146,6 +168,29 @@ def build_config(
146168
# gen = CfgGen(agent_ctxts, source, plan_list, "cuda", base_cfg)
147169
# return gen.generate()
148170

171+
def _get_scaled_in_cfg(self, cfg: JobConfig, rate: float) -> JobConfig:
172+
# compute remaining capacity if we remove the last pipeline
173+
total_thrpt = sum(
174+
data.total_throughput for data in self.pipeline_data[cfg.job_id]
175+
)
176+
last_pipeline_thrpt = self.pipeline_data[cfg.job_id][-1].total_throughput
177+
178+
remaining_throughput = total_thrpt - last_pipeline_thrpt
179+
180+
# check if remaining capacity still comfortably exceeds current arrival rate
181+
# margin ensures we don't scale in too early due to random dips
182+
can_handle_load = remaining_throughput > rate
183+
184+
# return source config
185+
if not can_handle_load:
186+
raise InsufficientThroughput("Not enough remaining throughput for scale in")
187+
188+
data = self.pipeline_data[cfg.job_id].pop()
189+
190+
cfg = JobConfig.remove_pipeline(cfg, data.worker_ids)
191+
192+
return cfg
193+
149194
def _set_pipeline_data(self, cfg: JobConfig, total_throughput) -> None:
150195
"""Set pipeline data."""
151196
job_id = cfg.job_id

0 commit comments

Comments
 (0)