Skip to content

Commit d23ecc3

Browse files
committed
move executor.status() API boundary to return the cached/mutated data, not fresh provider+simulated data
potentially-refresh cache on every call instead of driven by poller - this might change the refresh cadence. code that assumes that repeated calls to PollItem.status will be constant, except across the poll loop iteration, might break -- does anything make that assumption? for example: handle_errors vs strategize now happen with potentially different status info (that should be eventually convergent) the monitoring loop in poll now can't see the update happening/can't get a before/after state inside the loop, because its decoupled now. probably needs some more persistent (across loop iterations) storage of the previous state... things to test: * what test makes sure the provider isn't polled too often? if not here, write one. * what tests that we don't scale too much? (eg. if we ignore the PENDING status added by scale_out to poller_mutable_status, and let the strategy keep running, then we should see excessive blocks being launched) * test monitoring delta recording works this breaks: pytest parsl/tests/test_scaling/test_scale_down_htex_unregistered.py --config local I think it's because the status update hasn't happened yet at the point that its being asserted, because .status() is now cached...
1 parent 69ad2aa commit d23ecc3

File tree

2 files changed

+10
-11
lines changed

2 files changed

+10
-11
lines changed

parsl/executors/status_handling.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def outstanding(self) -> int:
114114
raise NotImplementedError("Classes inheriting from BlockProviderExecutor must implement "
115115
"outstanding()")
116116

117-
def status(self) -> Dict[str, JobStatus]:
117+
def _old_status_impl(self) -> Dict[str, JobStatus]:
118118
"""Return the status of all jobs/blocks currently known to this executor.
119119
120120
:return: a dictionary mapping block ids (in string) to job status
@@ -128,6 +128,13 @@ def status(self) -> Dict[str, JobStatus]:
128128

129129
return status
130130

131+
def status(self) -> Dict[str, JobStatus]:
132+
now = time.time()
133+
if self._should_poll(now):
134+
self._poller_mutable_status = self._old_status_impl()
135+
self._last_poll_time = now
136+
return self._poller_mutable_status
137+
131138
def set_bad_state_and_fail_all(self, exception: Exception):
132139
"""Allows external error handlers to mark this executor as irrecoverably bad and cause
133140
all tasks submitted to it now and in the future to fail. The executor is responsible
@@ -242,9 +249,3 @@ def workers_per_node(self) -> Union[int, float]:
242249

243250
def _should_poll(self, now: float) -> bool:
244251
return now >= self._last_poll_time + self.status_polling_interval
245-
246-
def _refresh_poll_mutable_status_if_time(self):
247-
now = time.time()
248-
if self._should_poll(now):
249-
self._poller_mutable_status = self.status()
250-
self._last_poll_time = now

parsl/jobs/job_status_poller.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ def __init__(self, executor: BlockProviderExecutor, monitoring: Optional["parsl.
2020
self._monitoring = monitoring
2121

2222
def poll(self) -> None:
23-
previous_status = self.executor._poller_mutable_status
24-
25-
self._executor._refresh_poll_mutable_status_if_time()
23+
previous_status = self._executor.status()
2624

2725
if previous_status != self.executor._poller_mutable_status:
2826
# short circuit the case where the two objects are identical so
@@ -50,7 +48,7 @@ def status(self) -> Dict[str, JobStatus]:
5048
5149
:return: a dictionary mapping block ids (in string) to job status
5250
"""
53-
return self._executor._poller_mutable_status
51+
return self._executor.status()
5452

5553
@property
5654
def executor(self) -> BlockProviderExecutor:

0 commit comments

Comments
 (0)