Skip to content

Commit caa3b94

Browse files
committed
enh(net): endurance remebers canceled ops
1 parent 956e256 commit caa3b94

File tree

1 file changed

+15
-5
lines changed

1 file changed

+15
-5
lines changed

graphtik/network.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class AbortedException(Exception):
4949

5050

5151
def abort_run():
52+
"""Signal to the 1st running network to stop :term:`execution`."""
5253
_execution_configs.get()["abort"] = True
5354

5455

@@ -57,22 +58,27 @@ def _reset_abort():
5758

5859

5960
def is_abort():
61+
"""Return `True` if networks have been signaled to stop :term:`execution`."""
6062
return _execution_configs.get()["abort"]
6163

6264

6365
def set_skip_evictions(skipped):
66+
"""If true, keep all intermediate solution values, regardless of asked outputs."""
6467
_execution_configs.get()["skip_evictions"] = bool(skipped)
6568

6669

6770
def is_skip_evictions():
71+
"""Return `True` if keeping all intermediate solution values, regardless of asked outputs."""
6872
return _execution_configs.get()["skip_evictions"]
6973

7074

7175
def set_endure_execution(endure):
76+
"""If set to true, keep executing even of some operations fail."""
7277
_execution_configs.get()["endure_execution"] = bool(endure)
7378

7479

7580
def is_endure_execution():
81+
"""Is execution going even of some operations fail?"""
7682
return _execution_configs.get()["endure_execution"]
7783

7884

@@ -157,6 +163,8 @@ class Solution(ChainMap, Plotter):
157163
a "virtual" property with executed operations that had no exception
158164
:ivar failures:
159165
a "virtual" property with executed operations that raised an exception
166+
:ivar canceled:
167+
A sorted set of operations canceled due to upstream failures.
160168
:ivar finished:
161169
a flag denoting that this instance cannot acccept more results
162170
(after the :meth:`finished` has been invoked)
@@ -169,6 +177,7 @@ def __init__(self, plan, *args, **kw):
169177

170178
self.plan = plan
171179
self.executed = {}
180+
self.canceled = iset() # not iterated, order not important, but ...
172181
self.finished = False
173182
self.times = {}
174183

@@ -297,11 +306,11 @@ class _Endurance:
297306
Unsattisfied operations downstream from failed ones.
298307
"""
299308

300-
def __init__(self, dag):
309+
def __init__(self, dag, canceled):
301310
## Clone to remove the downstream edges from the `provides`
302311
# of failed operations.
303312
self.dag = dag.copy()
304-
self.canceled = set() # not iterated, order not important
313+
self.canceled = canceled
305314

306315
def operation_failed(self, op: Operation, inputs):
307316
"""update :attr:`canceled` with the unsatisfiead ops downstream of `op`."""
@@ -443,7 +452,7 @@ def _execute_thread_pool_barrier_method(self, solution: Solution):
443452
"""
444453
pool = _execution_configs.get()["execution_pool"]
445454
# If endurance is enabled, create a collector of canceled ops downstream.
446-
endurance = is_endure_execution() and _Endurance(self.dag)
455+
endurance = is_endure_execution() and _Endurance(self.dag, solution.canceled)
447456

448457
# with each loop iteration, we determine a set of operations that can be
449458
# scheduled, then schedule them onto a thread pool, then collect their
@@ -468,7 +477,8 @@ def _execute_thread_pool_barrier_method(self, solution: Solution):
468477
):
469478
if endurance and node in endurance.canceled:
470479
log.debug(
471-
"+++ SKIPPED op(%r) due to previously failed ops.", node.name
480+
"+++ SKIPPED op(%r) due to previously failed ops.",
481+
node.name,
472482
)
473483
else:
474484
upnext.append(node)
@@ -510,7 +520,7 @@ def _execute_sequential_method(self, solution: Solution):
510520
must contain the input values only, gets modified
511521
"""
512522
# If endurance is enabled, create a collector of canceled ops downstream.
513-
endurance = is_endure_execution() and _Endurance(self.dag)
523+
endurance = is_endure_execution() and _Endurance(self.dag, solution.canceled)
514524
for step in self.steps:
515525
self._check_if_aborted(solution.executed)
516526

0 commit comments

Comments
 (0)