Skip to content

Commit 646da29

Browse files
committed
move executor.status() API boundary to return the cached/mutated data, not fresh provider+simulated data
potentially-refresh cache on every call instead of driven by poller - this might change the refresh cadence. code that assumes that repeated calls to PollItem.status will be constant, except across the poll loop iteration, might break -- does anything make that assumption? for example: handle_errors vs strategize now happen with potentially different status info (that should be eventually convergent) things to test: * what test makes sure the provider isn't polled too often? if not here, write one. * what tests that we don't scale too much? (eg. if we ignore the PENDING status added by scale_out to poller_mutable_status, and let the strategy keep running, then we should see excessive blocks being launched)
1 parent 5baca76 commit 646da29

File tree

2 files changed

+10
-11
lines changed

2 files changed

+10
-11
lines changed

parsl/executors/status_handling.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def outstanding(self) -> int:
114114
raise NotImplementedError("Classes inheriting from BlockProviderExecutor must implement "
115115
"outstanding()")
116116

117-
def status(self) -> Dict[str, JobStatus]:
117+
def _old_status_impl(self) -> Dict[str, JobStatus]:
118118
"""Return the status of all jobs/blocks currently known to this executor.
119119
120120
:return: a dictionary mapping block ids (in string) to job status
@@ -128,6 +128,13 @@ def status(self) -> Dict[str, JobStatus]:
128128

129129
return status
130130

131+
def status(self) -> Dict[str, JobStatus]:
132+
now = time.time()
133+
if self._should_poll(now):
134+
self._poller_mutable_status = self._old_status_impl()
135+
self._last_poll_time = now
136+
return self._poller_mutable_status
137+
131138
def set_bad_state_and_fail_all(self, exception: Exception):
132139
"""Allows external error handlers to mark this executor as irrecoverably bad and cause
133140
all tasks submitted to it now and in the future to fail. The executor is responsible
@@ -242,9 +249,3 @@ def workers_per_node(self) -> Union[int, float]:
242249

243250
def _should_poll(self, now: float) -> bool:
244251
return now >= self._last_poll_time + self.status_polling_interval
245-
246-
def _refresh_poll_mutable_status_if_time(self):
247-
now = time.time()
248-
if self._should_poll(now):
249-
self._poller_mutable_status = self.status()
250-
self._last_poll_time = now

parsl/jobs/job_status_poller.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,7 @@ def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflo
3434
logger.info("Monitoring enabled on job status poller")
3535

3636
def poll(self) -> None:
37-
previous_status = self.executor._poller_mutable_status
38-
39-
self.executor._refresh_poll_mutable_status_if_time()
37+
previous_status = self.executor.status()
4038

4139
if previous_status != self.executor._poller_mutable_status:
4240
# short circuit the case where the two objects are identical so
@@ -64,7 +62,7 @@ def status(self) -> Dict[str, JobStatus]:
6462
6563
:return: a dictionary mapping block ids (in string) to job status
6664
"""
67-
return self.executor._poller_mutable_status
65+
return self.executor.status()
6866

6967
@property
7068
def executor(self) -> BlockProviderExecutor:

0 commit comments

Comments
 (0)