Skip to content

Commit e4b1106

Browse files
committed
Move PolledExecutorFacade status into BlockProviderExecutor
This makes this status available to the scale_in methods of BlockProviderExecutors, and so can be used as part of a scaling bugfix in a subsequent PR. This is part of ongoing work to minimise and possibly eliminate the PolledExecutorFacade class. This PR leaves accesses to those structures in the PolledExecutorFacade, with future PRs intended to evaporate away more of the PolledExecutorFacade. This should not change behaviour, as it only moves an attribute from one object to a 1:1 paired object.
1 parent e2b4176 commit e4b1106

File tree

2 files changed

+11
-10
lines changed

2 files changed

+11
-10
lines changed

parsl/executors/status_handling.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ def __init__(self, *,
6262
# to keep track of such errors so that they can be handled in one place
6363
# together with errors reported by status()
6464
self._simulated_status: Dict[str, JobStatus] = {}
65+
self._poller_mutable_status: Dict[str, JobStatus] = {}
66+
6567
self._executor_bad_state = threading.Event()
6668
self._executor_exception: Optional[Exception] = None
6769

parsl/jobs/job_status_poller.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,19 @@ class PolledExecutorFacade:
1919
def __init__(self, executor: BlockProviderExecutor, monitoring: Optional["parsl.monitoring.radios.MonitoringRadio"] = None):
2020
self._executor = executor
2121
self._last_poll_time = 0.0
22-
self._status = {} # type: Dict[str, JobStatus]
2322
self._monitoring = monitoring
2423

2524
def poll(self) -> None:
2625
now = time.time()
2726
if now >= self._last_poll_time + self._executor.status_polling_interval:
28-
previous_status = self._status
29-
self._status = self._executor.status()
27+
previous_status = self._executor._poller_mutable_status
28+
self._executor._poller_mutable_status = self._executor.status()
3029
self._last_poll_time = now
3130
delta_status = {}
32-
for block_id in self._status:
31+
for block_id in self._executor._poller_mutable_status:
3332
if block_id not in previous_status \
34-
or previous_status[block_id].state != self._status[block_id].state:
35-
delta_status[block_id] = self._status[block_id]
33+
or previous_status[block_id].state != self._executor._poller_mutable_status[block_id].state:
34+
delta_status[block_id] = self._executor._poller_mutable_status[block_id]
3635

3736
if delta_status:
3837
self.send_monitoring_info(delta_status)
@@ -50,7 +49,7 @@ def status(self) -> Dict[str, JobStatus]:
5049
5150
:return: a dictionary mapping block ids (in string) to job status
5251
"""
53-
return self._status
52+
return self._executor._poller_mutable_status
5453

5554
@property
5655
def executor(self) -> BlockProviderExecutor:
@@ -71,7 +70,7 @@ def scale_in(self, n: int, max_idletime: Optional[float] = None) -> List[str]:
7170
new_status = {}
7271
for block_id in block_ids:
7372
new_status[block_id] = JobStatus(JobState.CANCELLED)
74-
del self._status[block_id]
73+
del self._executor._poller_mutable_status[block_id]
7574
self.send_monitoring_info(new_status)
7675
return block_ids
7776

@@ -82,11 +81,11 @@ def scale_out(self, n: int) -> List[str]:
8281
for block_id in block_ids:
8382
new_status[block_id] = JobStatus(JobState.PENDING)
8483
self.send_monitoring_info(new_status)
85-
self._status.update(new_status)
84+
self._executor._poller_mutable_status.update(new_status)
8685
return block_ids
8786

8887
def __repr__(self) -> str:
89-
return self._status.__repr__()
88+
return self._executor._poller_mutable_status.__repr__()
9089

9190

9291
class JobStatusPoller(Timer):

0 commit comments

Comments
 (0)