|
12 | 12 | from parsl.executors.base import ParslExecutor |
13 | 13 | from parsl.executors.errors import BadStateException, ScalingFailed |
14 | 14 | from parsl.jobs.error_handlers import noop_error_handler, simple_error_handler |
15 | | -from parsl.jobs.states import JobState, JobStatus |
| 15 | +from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus |
16 | 16 | from parsl.monitoring.message_type import MessageType |
17 | 17 | from parsl.providers.base import ExecutionProvider |
18 | 18 | from parsl.utils import AtomicIDCounter |
@@ -205,16 +205,20 @@ def scale_in(self, blocks: int) -> List[str]: |
205 | 205 |
|
206 | 206 | :return: A list of block ids corresponding to the blocks that were removed. |
207 | 207 | """ |
208 | | - # Obtain list of blocks to kill |
209 | | - to_kill = list(self.blocks_to_job_id.keys())[:blocks] |
210 | | - kill_ids = [self.blocks_to_job_id[block] for block in to_kill] |
| 208 | + |
| 209 | + active_blocks = [block_id for block_id, status in self._status.items() |
| 210 | + if status.state not in TERMINAL_STATES] |
| 211 | + |
| 212 | + block_ids_to_kill = active_blocks[:blocks] |
| 213 | + |
| 214 | + job_ids_to_kill = [self.blocks_to_job_id[block] for block in block_ids_to_kill] |
211 | 215 |
|
212 | 216 | # Cancel the blocks provisioned |
213 | 217 | if self.provider: |
214 | | - logger.info(f"Scaling in jobs: {kill_ids}") |
215 | | - r = self.provider.cancel(kill_ids) |
216 | | - job_ids = self._filter_scale_in_ids(kill_ids, r) |
217 | | - block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] |
| 218 | + logger.info(f"Scaling in jobs: {job_ids_to_kill}") |
| 219 | + r = self.provider.cancel(job_ids_to_kill) |
| 220 | + job_ids = self._filter_scale_in_ids(job_ids_to_kill, r) |
| 221 | + block_ids_killed = [self.job_ids_to_block[job_id] for job_id in job_ids] |
218 | 222 | return block_ids_killed |
219 | 223 | else: |
220 | 224 | logger.error("No execution provider available to scale in") |
|
0 commit comments