Skip to content

Commit 011e074

Browse files
committed
Scale in blocks at shutdown using Job Status Poller
This will now scale in blocks using the job status poller scale in code, which means the DFK does not need to send its own BLOCK_INFO monitoring messages. minimalish change of which blocks get scaled in at shutdown - to come from the jobstatuspoller list: that will get pending blocks scaled in at shutdown, I think, but will now push the dynamically updated list to the cac hed-side of the cache poll... what does that change? we will now be delayed in s eeing ended jobs, but the executor.status data is already out of date in that se nse the moment the call returns (but *less* out of date) this patch is deliberately minimalist in that it does not attempt to move the scale down code - this is a PR about changing behaviour, not about rewriting the scale down strategy more seriously. the behaviour change is to move towards treating the jobstatuspoller pollitem status as the source of best-estimated truth. other work should probably do that moving, to complement the recent init_blocks handling PR #3283
1 parent 19e998a commit 011e074

File tree

1 file changed

+14
-16
lines changed

1 file changed

+14
-16
lines changed

parsl/dataflow/dflow.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
from parsl.dataflow.taskrecord import TaskRecord
3535
from parsl.errors import ConfigurationError, InternalConsistencyError, NoDataFlowKernelError
3636
from parsl.jobs.job_status_poller import JobStatusPoller
37-
from parsl.jobs.states import JobStatus, JobState
3837
from parsl.usage_tracking.usage import UsageTracker
3938
from parsl.executors.base import ParslExecutor
4039
from parsl.executors.status_handling import BlockProviderExecutor
@@ -1216,22 +1215,21 @@ def cleanup(self) -> None:
12161215

12171216
logger.info("Scaling in and shutting down executors")
12181217

1218+
for pi in self.job_status_poller._poll_items:
1219+
if not pi.executor.bad_state_is_set:
1220+
logger.info(f"Scaling in executor {pi.executor.label}")
1221+
1222+
# this code needs to be at least as many blocks as need
1223+
# cancelling, but it is safe to be more, as the scaling
1224+
# code will cope with being asked to cancel more blocks
1225+
# than exist.
1226+
block_count = len(pi.status)
1227+
pi.scale_in(block_count)
1228+
1229+
else: # and bad_state_is_set
1230+
logger.warning(f"Not scaling in executor {pi.executor.label} because it is in bad state")
1231+
12191232
for executor in self.executors.values():
1220-
if isinstance(executor, BlockProviderExecutor):
1221-
if not executor.bad_state_is_set:
1222-
logger.info(f"Scaling in executor {executor.label}")
1223-
if executor.provider:
1224-
job_ids = executor.provider.resources.keys()
1225-
block_ids = executor.scale_in(len(job_ids))
1226-
if self.monitoring and block_ids:
1227-
new_status = {}
1228-
for bid in block_ids:
1229-
new_status[bid] = JobStatus(JobState.CANCELLED)
1230-
msg = executor.create_monitoring_info(new_status)
1231-
logger.debug("Sending message {} to hub from DFK".format(msg))
1232-
self.monitoring.send(MessageType.BLOCK_INFO, msg)
1233-
else: # and bad_state_is_set
1234-
logger.warning(f"Not scaling in executor {executor.label} because it is in bad state")
12351233
logger.info(f"Shutting down executor {executor.label}")
12361234
executor.shutdown()
12371235
logger.info(f"Shut down executor {executor.label}")

0 commit comments

Comments
 (0)