Skip to content

Commit c9cce6a

Browse files
ryankert01bveeramani
authored andcommitted
[Data] DefaultAutoscalerV2 doesn't scale nodes from zero (ray-project#59896)
## Description Addresses a critical issue in the `DefaultAutoscalerV2`, where nodes were not being properly scaled from zero. With this update, clusters managed by Ray will now automatically provision additional nodes when there is workload demand, even when starting from an idle (zero-node) state. ## Related issues Closes ray-project#59682 ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Hsien-Cheng Huang <[email protected]> Co-authored-by: Balaji Veeramani <[email protected]> Signed-off-by: jasonwrwang <[email protected]>
1 parent 822d333 commit c9cce6a

File tree

2 files changed

+185
-8
lines changed

2 files changed

+185
-8
lines changed

python/ray/data/_internal/cluster_autoscaler/default_cluster_autoscaler_v2.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,16 +117,36 @@ def __init__(
117117
# so the first `get_total_resources` call can get the allocated resources.
118118
self._send_resource_request([])
119119

120-
def _get_node_resource_spec_and_count(self) -> Dict[_NodeResourceSpec, int]:
121-
"""Get the unique node resource specs and their count in the cluster."""
122-
# Filter out the head node.
120+
def get_node_resource_spec_and_count(self) -> Dict[_NodeResourceSpec, int]:
121+
"""Get node types from cluster config and count alive nodes.
122+
123+
Enables scaling from zero by discovering node types from cluster config
124+
even when no worker nodes are running.
125+
"""
126+
nodes_resource_spec_count = defaultdict(int)
127+
128+
# Discover node types from cluster config
129+
cluster_config = ray._private.state.state.get_cluster_config()
130+
if cluster_config and cluster_config.node_group_configs:
131+
for node_group_config in cluster_config.node_group_configs:
132+
# Skip if no resources or max_count=0 (cannot scale)
133+
if not node_group_config.resources or node_group_config.max_count == 0:
134+
continue
135+
136+
node_resource_spec = _NodeResourceSpec.of(
137+
cpu=node_group_config.resources.get("CPU", 0),
138+
gpu=node_group_config.resources.get("GPU", 0),
139+
mem=node_group_config.resources.get("memory", 0),
140+
)
141+
nodes_resource_spec_count[node_resource_spec] = 0
142+
143+
# Count alive worker nodes
123144
node_resources = [
124145
node["Resources"]
125146
for node in ray.nodes()
126147
if node["Alive"] and "node:__internal_head__" not in node["Resources"]
127148
]
128149

129-
nodes_resource_spec_count = defaultdict(int)
130150
for r in node_resources:
131151
node_resource_spec = _NodeResourceSpec.of(
132152
cpu=r["CPU"], gpu=r.get("GPU", 0), mem=r["memory"]
@@ -168,7 +188,7 @@ def try_trigger_scaling(self):
168188
return
169189

170190
resource_request = []
171-
node_resource_spec_count = self._get_node_resource_spec_and_count()
191+
node_resource_spec_count = self.get_node_resource_spec_and_count()
172192
debug_msg = ""
173193
if logger.isEnabledFor(logging.DEBUG):
174194
debug_msg = (

python/ray/data/tests/test_default_cluster_autoscaler_v2.py

Lines changed: 160 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import pytest
44

55
import ray
6+
from ray.core.generated import autoscaler_pb2
67
from ray.data._internal.cluster_autoscaler.default_cluster_autoscaler_v2 import (
78
DefaultClusterAutoscalerV2,
89
_NodeResourceSpec,
@@ -72,7 +73,7 @@ def teardown_class(self):
7273
ray.shutdown()
7374

7475
def test_get_node_resource_spec_and_count(self):
75-
# Test _get_node_resource_spec_and_count
76+
# Test get_node_resource_spec_and_count
7677
autoscaler = DefaultClusterAutoscalerV2(
7778
resource_manager=MagicMock(),
7879
execution_id="test_execution_id",
@@ -123,8 +124,13 @@ def test_get_node_resource_spec_and_count(self):
123124
): 1,
124125
}
125126

127+
# Patch cluster config to return None
126128
with patch("ray.nodes", return_value=node_table):
127-
assert autoscaler._get_node_resource_spec_and_count() == expected
129+
with patch(
130+
"ray._private.state.state.get_cluster_config",
131+
return_value=None,
132+
):
133+
assert autoscaler.get_node_resource_spec_and_count() == expected
128134

129135
@pytest.mark.parametrize("cpu_util", [0.5, 0.75])
130136
@pytest.mark.parametrize("gpu_util", [0.5, 0.75])
@@ -154,7 +160,7 @@ def test_try_scale_up_cluster(
154160

155161
resource_spec1 = _NodeResourceSpec.of(cpu=4, gpu=0, mem=1000)
156162
resource_spec2 = _NodeResourceSpec.of(cpu=8, gpu=1, mem=1000)
157-
autoscaler._get_node_resource_spec_and_count = MagicMock(
163+
autoscaler.get_node_resource_spec_and_count = MagicMock(
158164
return_value={
159165
resource_spec1: 2,
160166
resource_spec2: 1,
@@ -195,6 +201,157 @@ def test_try_scale_up_cluster(
195201

196202
_send_resource_request.assert_called_with(expected_resource_request)
197203

204+
def test_get_node_resource_spec_and_count_from_zero(self):
205+
"""Test that get_node_resource_spec_and_count can discover node types
206+
from cluster config even when there are zero worker nodes."""
207+
autoscaler = DefaultClusterAutoscalerV2(
208+
resource_manager=MagicMock(),
209+
execution_id="test_execution_id",
210+
)
211+
212+
# Simulate a cluster with only head node (no worker nodes)
213+
node_table = [
214+
{
215+
"Resources": self._head_node,
216+
"Alive": True,
217+
},
218+
]
219+
220+
# Create a mock cluster config with 2 worker node types
221+
cluster_config = autoscaler_pb2.ClusterConfig()
222+
223+
# Node type 1: 4 CPU, 0 GPU, 1000 memory
224+
node_group_config1 = autoscaler_pb2.NodeGroupConfig()
225+
node_group_config1.resources["CPU"] = 4
226+
node_group_config1.resources["memory"] = 1000
227+
node_group_config1.max_count = 10
228+
cluster_config.node_group_configs.append(node_group_config1)
229+
230+
# Node type 2: 8 CPU, 2 GPU, 2000 memory
231+
node_group_config2 = autoscaler_pb2.NodeGroupConfig()
232+
node_group_config2.resources["CPU"] = 8
233+
node_group_config2.resources["GPU"] = 2
234+
node_group_config2.resources["memory"] = 2000
235+
node_group_config2.max_count = 5
236+
cluster_config.node_group_configs.append(node_group_config2)
237+
238+
expected = {
239+
_NodeResourceSpec.of(cpu=4, gpu=0, mem=1000): 0,
240+
_NodeResourceSpec.of(cpu=8, gpu=2, mem=2000): 0,
241+
}
242+
243+
with patch("ray.nodes", return_value=node_table):
244+
with patch(
245+
"ray._private.state.state.get_cluster_config",
246+
return_value=cluster_config,
247+
):
248+
result = autoscaler.get_node_resource_spec_and_count()
249+
assert result == expected
250+
251+
@patch(
252+
"ray.data._internal.cluster_autoscaler.default_cluster_autoscaler_v2.DefaultClusterAutoscalerV2._send_resource_request"
253+
)
254+
def test_try_scale_up_cluster_from_zero(self, _send_resource_request):
255+
"""Test that the autoscaler can scale up from zero worker nodes."""
256+
scale_up_threshold = 0.75
257+
scale_up_delta = 1
258+
# High utilization to trigger scaling
259+
utilization = ExecutionResources(cpu=0.9, gpu=0.9, object_store_memory=0.9)
260+
261+
autoscaler = DefaultClusterAutoscalerV2(
262+
resource_manager=MagicMock(),
263+
execution_id="test_execution_id",
264+
cluster_scaling_up_delta=scale_up_delta,
265+
resource_utilization_calculator=StubUtilizationGauge(utilization),
266+
cluster_scaling_up_util_threshold=scale_up_threshold,
267+
)
268+
_send_resource_request.assert_called_with([])
269+
270+
# Mock the node resource spec with zero counts
271+
resource_spec1 = _NodeResourceSpec.of(cpu=4, gpu=0, mem=1000)
272+
resource_spec2 = _NodeResourceSpec.of(cpu=8, gpu=2, mem=2000)
273+
autoscaler.get_node_resource_spec_and_count = MagicMock(
274+
return_value={
275+
resource_spec1: 0, # Zero nodes of this type
276+
resource_spec2: 0, # Zero nodes of this type
277+
},
278+
)
279+
280+
autoscaler.try_trigger_scaling()
281+
282+
# Should request scale_up_delta nodes of each type
283+
expected_resource_request = []
284+
285+
expected_resource_request.extend(
286+
[
287+
{
288+
"CPU": resource_spec1.cpu,
289+
"GPU": resource_spec1.gpu,
290+
"memory": resource_spec1.mem,
291+
}
292+
]
293+
* scale_up_delta
294+
)
295+
296+
expected_resource_request.extend(
297+
[
298+
{
299+
"CPU": resource_spec2.cpu,
300+
"GPU": resource_spec2.gpu,
301+
"memory": resource_spec2.mem,
302+
}
303+
]
304+
* scale_up_delta
305+
)
306+
307+
_send_resource_request.assert_called_with(expected_resource_request)
308+
309+
def test_get_node_resource_spec_and_count_skips_max_count_zero(self):
310+
"""Test that node types with max_count=0 are skipped."""
311+
autoscaler = DefaultClusterAutoscalerV2(
312+
resource_manager=MagicMock(),
313+
execution_id="test_execution_id",
314+
)
315+
316+
# Simulate a cluster with only head node (no worker nodes)
317+
node_table = [
318+
{
319+
"Resources": self._head_node,
320+
"Alive": True,
321+
},
322+
]
323+
324+
# Create a mock cluster config with one valid node type and one with max_count=0
325+
cluster_config = autoscaler_pb2.ClusterConfig()
326+
327+
# Node type 1: 4 CPU, 0 GPU, 1000 memory, max_count=10
328+
node_group_config1 = autoscaler_pb2.NodeGroupConfig()
329+
node_group_config1.resources["CPU"] = 4
330+
node_group_config1.resources["memory"] = 1000
331+
node_group_config1.max_count = 10
332+
cluster_config.node_group_configs.append(node_group_config1)
333+
334+
# Node type 2: 8 CPU, 2 GPU, 2000 memory, max_count=0 (should be skipped)
335+
node_group_config2 = autoscaler_pb2.NodeGroupConfig()
336+
node_group_config2.resources["CPU"] = 8
337+
node_group_config2.resources["GPU"] = 2
338+
node_group_config2.resources["memory"] = 2000
339+
node_group_config2.max_count = 0 # This should be skipped
340+
cluster_config.node_group_configs.append(node_group_config2)
341+
342+
# Only the first node type should be discovered
343+
expected = {
344+
_NodeResourceSpec.of(cpu=4, gpu=0, mem=1000): 0,
345+
}
346+
347+
with patch("ray.nodes", return_value=node_table):
348+
with patch(
349+
"ray._private.state.state.get_cluster_config",
350+
return_value=cluster_config,
351+
):
352+
result = autoscaler.get_node_resource_spec_and_count()
353+
assert result == expected
354+
198355

199356
if __name__ == "__main__":
200357
import sys

0 commit comments

Comments
 (0)