Skip to content

Commit f9210c4

Browse files
authored
Make executor.outstanding a regular method (#3915)
Prior to this PR it was a dynamically generated attributed, which has made understanding of hangs hard in the past - for example, by disguising that these innocent looking lines might hang despite "only" being attribute references: logger.debug(f"Marking all {self.outstanding} outstanding tasks as failed") or active_tasks = executor.outstanding After this PR, it should be clearer that those references to outstanding are function invocations that might have complex behaviour. # Changed Behaviour external users of the parsl executor abstractions (eg Globus Compute) might need to change their API use ## Type of change - New feature
1 parent c6b2d66 commit f9210c4

File tree

9 files changed

+10
-13
lines changed

9 files changed

+10
-13
lines changed

parsl/executors/high_throughput/executor.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,6 @@ def _hold_manager(self, manager_id: str) -> None:
621621
self.command_client.run("HOLD_WORKER;{}".format(manager_id))
622622
logger.debug("Sent hold request to manager: {}".format(manager_id))
623623

624-
@property
625624
def outstanding(self) -> int:
626625
"""Returns the count of tasks outstanding across the interchange
627626
and managers"""

parsl/executors/status_handling.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def status_polling_interval(self):
122122
else:
123123
return self._provider.status_polling_interval
124124

125-
@abstractproperty
125+
@abstractmethod
126126
def outstanding(self) -> int:
127127
"""This should return the number of tasks that the executor has been given to run (waiting to run, and running now)"""
128128

parsl/executors/taskvine/executor.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,6 @@ def initialize_scaling(self):
573573
self._worker_command = self._construct_worker_command()
574574
self._patch_providers()
575575

576-
@property
577576
def outstanding(self) -> int:
578577
"""Count the number of outstanding tasks."""
579578
logger.debug(f"Counted {self._outstanding_tasks} outstanding tasks")
@@ -659,7 +658,7 @@ def _collect_taskvine_results(self):
659658
with self._outstanding_tasks_lock:
660659
self._outstanding_tasks -= 1
661660
finally:
662-
logger.debug(f"Marking all {self.outstanding} outstanding tasks as failed")
661+
logger.debug(f"Marking all {self.outstanding()} outstanding tasks as failed")
663662
logger.debug("Acquiring tasks_lock")
664663
with self._tasks_lock:
665664
logger.debug("Acquired tasks_lock")

parsl/executors/workqueue/executor.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -674,7 +674,6 @@ def initialize_scaling(self):
674674
self.worker_command = self._construct_worker_command()
675675
self._patch_providers()
676676

677-
@property
678677
def outstanding(self) -> int:
679678
"""Count the number of outstanding slots required. This is inefficiently
680679
implemented and probably could be replaced with a counter.

parsl/jobs/strategy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ def _general_strategy(self, executors: List[BlockProviderExecutor], *, strategy_
193193
self.executors[label]['first'] = False
194194

195195
# Tasks that are either pending completion
196-
active_tasks = executor.outstanding
196+
active_tasks = executor.outstanding()
197197

198198
status = executor.status_facade
199199

parsl/tests/site_tests/test_site.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def test_platform(n=2, sleep_dur=10):
3636

3737
print("Executor : ", dfk.executors[name])
3838
print("Connected : ", dfk.executors[name].connected_workers)
39-
print("Outstanding : ", dfk.executors[name].outstanding)
39+
print("Outstanding : ", dfk.executors[name].outstanding())
4040

4141
d = []
4242
for i in range(0, n):

parsl/tests/test_scaling/test_regression_3696_oscillation.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def test_htex_strategy_does_not_oscillate(ns):
4848
statuses = {}
4949

5050
executor.provider = provider
51-
executor.outstanding = n_tasks
51+
executor.outstanding = lambda: n_tasks
5252
executor.status_facade = statuses
5353
executor.workers_per_node = n_workers
5454

@@ -97,7 +97,7 @@ def scale_in(n, max_idletime=None):
9797
# this assert fails due to issue #3696
9898

9999
# Now check scale in happens with 0 load
100-
executor.outstanding = 0
100+
executor.outstanding = lambda: 0
101101
s.strategize([executor])
102102
executor.scale_in_facade.assert_called()
103103
assert len([k for k in statuses if statuses[k].state == JobState.PENDING]) == 0

parsl/tests/test_scaling/test_scale_down.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def test_scale_out(tmpd_cwd, try_assert):
6666
num_managers = len(dfk.executors['htex_local'].connected_managers())
6767

6868
assert num_managers == 0, "Expected 0 managers at start"
69-
assert dfk.executors['htex_local'].outstanding == 0, "Expected 0 tasks at start"
69+
assert dfk.executors['htex_local'].outstanding() == 0, "Expected 0 tasks at start"
7070

7171
ntasks = 10
7272
ready_path = tmpd_cwd / "workers_ready"
@@ -85,7 +85,7 @@ def test_scale_out(tmpd_cwd, try_assert):
8585
finish_path.touch() # Approximation of Event, via files
8686
[x.result() for x in futs]
8787

88-
assert dfk.executors['htex_local'].outstanding == 0
88+
assert dfk.executors['htex_local'].outstanding() == 0
8989

9090
def assert_kernel():
9191
return len(dfk.executors['htex_local'].connected_managers()) == _min_blocks

parsl/tests/test_scaling/test_scale_down_htex_auto_scale.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def test_scale_out(tmpd_cwd, try_assert):
6363
num_managers = len(dfk.executors['htex_local'].connected_managers())
6464

6565
assert num_managers == 0, "Expected 0 managers at start"
66-
assert dfk.executors['htex_local'].outstanding == 0, "Expected 0 tasks at start"
66+
assert dfk.executors['htex_local'].outstanding() == 0, "Expected 0 tasks at start"
6767

6868
ntasks = _max_blocks * 2
6969
ready_path = tmpd_cwd / "workers_ready"
@@ -84,7 +84,7 @@ def test_scale_out(tmpd_cwd, try_assert):
8484
finish_path.touch() # Approximation of Event, via files
8585
[x.result() for x in futs]
8686

87-
assert dfk.executors['htex_local'].outstanding == 0
87+
assert dfk.executors['htex_local'].outstanding() == 0
8888

8989
# now we can launch one "long" task -
9090
# and what should happen is that the connected_managers count "eventually" (?) converges to 1 and stays there.

0 commit comments

Comments
 (0)