Skip to content

Commit ed80dad

Browse files
authored
Fix rounding error in htex block scale in (#3721)
# Description PR #2196 calculates a number of blocks to scale in, in the htex strategy, rather than scaling in one block per strategy iteration. However, it rounds the wrong way: it scales in a rounded up, rather than rounded down, number of blocks. Issue #3696 shows that then resulting in oscillating behaviour: With 14 tasks and 48 workers per block, on alternating strategy runs, the code will either scale up to the rounded up number of needed blocks (14/48 => 1), or scale down to the rounded down number of needed blocks (14/48 => 0). This PR changes the rounding introduced in #2196 to be consistent: rounding up the number of blocks to scale up, and rounding down the number of blocks to scale down. # Changed Behaviour HTEX scale down should oscillate less # Fixes Fixes #3696 ## Type of change - Bug fix
1 parent ab5e247 commit ed80dad

File tree

2 files changed

+105
-2
lines changed

2 files changed

+105
-2
lines changed

parsl/jobs/strategy.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,8 +298,8 @@ def _general_strategy(self, executors: List[BlockProviderExecutor], *, strategy_
298298
# Scale in for htex
299299
if isinstance(executor, HighThroughputExecutor):
300300
if active_blocks > min_blocks:
301-
excess_slots = math.ceil(active_slots - (active_tasks * parallelism))
302-
excess_blocks = math.ceil(float(excess_slots) / (tasks_per_node * nodes_per_block))
301+
excess_slots = math.floor(active_slots - (active_tasks * parallelism))
302+
excess_blocks = math.floor(float(excess_slots) / (tasks_per_node * nodes_per_block))
303303
excess_blocks = min(excess_blocks, active_blocks - min_blocks)
304304
logger.debug(f"Requesting scaling in by {excess_blocks} blocks with idle time {self.max_idletime}s")
305305
executor.scale_in_facade(excess_blocks, max_idletime=self.max_idletime)
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import math
2+
from unittest.mock import MagicMock
3+
4+
import pytest
5+
6+
from parsl.executors.high_throughput.executor import HighThroughputExecutor
7+
from parsl.jobs.states import JobState, JobStatus
8+
from parsl.jobs.strategy import Strategy
9+
10+
11+
# the parameterize tuple consists of:
12+
# Input:
13+
# * number of tasks to mock the load as
14+
# * number of workers per node
15+
# Expected output:
16+
# * the number of blocks we should expect to be launched
17+
# in this situation
18+
#
19+
# This test will configure an executor, then run strategize
20+
# a few times, asserting that it converges to the correct
21+
# number of blocks without oscillating.
22+
@pytest.mark.local
23+
@pytest.mark.parametrize("ns", [(14, 48, 1), # values from issue #3696
24+
25+
(1, 1, 1), # one task needs one block
26+
27+
(100, 1, 20), # many one-task blocks, hitting hard-coded max blocks
28+
29+
(47, 48, 1), # some edge cases around #3696 values
30+
(48, 48, 1), # "
31+
(49, 48, 2), # "
32+
(149, 50, 3)]) # "
33+
def test_htex_strategy_does_not_oscillate(ns):
34+
"""Check for oscillations in htex scaling.
35+
In issue 3696, with a large number of workers per block
36+
and a smaller number of active tasks, the htex scaling
37+
strategy oscillates between 0 and 1 active block, rather
38+
than converging to 1 active block.
39+
"""
40+
41+
n_tasks, n_workers, n_blocks = ns
42+
43+
s = Strategy(strategy='htex_auto_scale', max_idletime=0)
44+
45+
provider = MagicMock()
46+
executor = MagicMock(spec=HighThroughputExecutor)
47+
48+
statuses = {}
49+
50+
executor.provider = provider
51+
executor.outstanding = n_tasks
52+
executor.status_facade = statuses
53+
executor.workers_per_node = n_workers
54+
55+
provider.parallelism = 1
56+
provider.init_blocks = 0
57+
provider.min_blocks = 0
58+
provider.max_blocks = 20
59+
provider.nodes_per_block = 1
60+
61+
def scale_out(n):
62+
for _ in range(n):
63+
statuses[len(statuses)] = JobStatus(state=JobState.PENDING)
64+
65+
executor.scale_out_facade.side_effect = scale_out
66+
67+
def scale_in(n, max_idletime=None):
68+
# find n PENDING jobs and set them to CANCELLED
69+
for k in statuses:
70+
if n == 0:
71+
return
72+
if statuses[k].state == JobState.PENDING:
73+
statuses[k].state = JobState.CANCELLED
74+
n -= 1
75+
76+
executor.scale_in_facade.side_effect = scale_in
77+
78+
s.add_executors([executor])
79+
80+
# In issue #3696, this first strategise does initial and load based
81+
# scale outs, because n_tasks > n_workers*0
82+
s.strategize([executor])
83+
84+
executor.scale_out_facade.assert_called()
85+
assert len(statuses) == n_blocks, "Should have launched n_blocks"
86+
assert len([k for k in statuses if statuses[k].state == JobState.PENDING]) == n_blocks
87+
# there might be several calls to scale_out_facade inside strategy,
88+
# but the end effect should be that exactly one block is scaled out.
89+
90+
executor.scale_in_facade.assert_not_called()
91+
92+
# In issue #3696, this second strategize does a scale in, because n_tasks < n_workers*1
93+
s.strategize([executor])
94+
95+
# assert that there should still be n_blocks pending blocks
96+
assert len([k for k in statuses if statuses[k].state == JobState.PENDING]) == n_blocks
97+
# this assert fails due to issue #3696
98+
99+
# Now check scale in happens with 0 load
100+
executor.outstanding = 0
101+
s.strategize([executor])
102+
executor.scale_in_facade.assert_called()
103+
assert len([k for k in statuses if statuses[k].state == JobState.PENDING]) == 0

0 commit comments

Comments
 (0)