Skip to content

Commit 9affe8e

Browse files
committed
move scale in at exit code into close method of job status poller
1 parent 011e074 commit 9affe8e

File tree

2 files changed

+17
-15
lines changed

2 files changed

+17
-15
lines changed

parsl/dataflow/dflow.py

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,21 +1213,7 @@ def cleanup(self) -> None:
12131213
self.job_status_poller.close()
12141214
logger.info("Terminated job status poller")
12151215

1216-
logger.info("Scaling in and shutting down executors")
1217-
1218-
for pi in self.job_status_poller._poll_items:
1219-
if not pi.executor.bad_state_is_set:
1220-
logger.info(f"Scaling in executor {pi.executor.label}")
1221-
1222-
# this code needs to be at least as many blocks as need
1223-
# cancelling, but it is safe to be more, as the scaling
1224-
# code will cope with being asked to cancel more blocks
1225-
# than exist.
1226-
block_count = len(pi.status)
1227-
pi.scale_in(block_count)
1228-
1229-
else: # and bad_state_is_set
1230-
logger.warning(f"Not scaling in executor {pi.executor.label} because it is in bad state")
1216+
logger.info("Shutting down executors")
12311217

12321218
for executor in self.executors.values():
12331219
logger.info(f"Shutting down executor {executor.label}")

parsl/jobs/job_status_poller.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,3 +136,19 @@ def add_executors(self, executors: Sequence[BlockProviderExecutor]) -> None:
136136
logger.debug("Adding executor {}".format(executor.label))
137137
self._poll_items.append(PolledExecutorFacade(executor, self.dfk))
138138
self._strategy.add_executors(executors)
139+
140+
def close(self):
141+
super().close()
142+
for pi in self._poll_items:
143+
if not pi.executor.bad_state_is_set:
144+
logger.info(f"Scaling in executor {pi.executor.label}")
145+
146+
# this code needs to be at least as many blocks as need
147+
# cancelling, but it is safe to be more, as the scaling
148+
# code will cope with being asked to cancel more blocks
149+
# than exist.
150+
block_count = len(pi.status)
151+
pi.scale_in(block_count)
152+
153+
else: # and bad_state_is_set
154+
logger.warning(f"Not scaling in executor {pi.executor.label} because it is in bad state")

0 commit comments

Comments
 (0)