@@ -79,9 +79,6 @@ def __init__(self, plan, input_values: dict):
7979 self .executed = {}
8080 #: A sorted set of :term:`canceled operation`\\s due to upstream failures.
8181 self .canceled = iset () # not iterated, order not important, but ...
82- #: a flag controlled by `plan` (by invoking :meth:`finalized` is invoked)
83- #: which becomes `True` when this instance has finished accepting results.
84- self .finalized = False
8582 self .elapsed_ms = {}
8683 #: A unique identifier to distinguish separate flows in execution logs.
8784 self .solid = "%X" % random .randint (0 , 2 ** 16 )
@@ -105,7 +102,7 @@ def __init__(self, plan, input_values: dict):
105102 def __copy__ (self ):
106103 clone = type (self )(self .plan , {})
107104 props = (
108- "maps executed canceled finalized elapsed_ms solid _layers"
105+ "maps executed canceled elapsed_ms solid _layers"
109106 " is_endurance is_reschedule is_parallel is_marshal dag"
110107 ).split ()
111108 for p in props :
@@ -168,7 +165,6 @@ def collect_canceled_sideffects(dep, val) -> Collection:
168165 return ()
169166 return dep_singularized (dep )
170167
171- assert not self .finalized , f"Cannot reuse solution: { self } "
172168 self ._layers [op ].update (outputs )
173169 self .executed [op ] = None
174170
@@ -203,16 +199,9 @@ def operation_failed(self, op, ex):
203199 It will update :attr:`executed` with the operation status and
204200 the :attr:`canceled` with the unsatisfied ops downstream of `op`.
205201 """
206- assert not self .finalized , f"Cannot reuse solution: { self } "
207202 self .executed [op ] = ex
208203 self ._reschedule (self .dag , op , op )
209204
210- def finalize (self ):
211- """invoked only once, after all ops have been executed"""
212- # Invert solution so that last value wins
213- if not self .finalized :
214- self .finalized = True
215-
216205 def __delitem__ (self , key ):
217206 for d in self .maps :
218207 d .pop (key , None )
@@ -226,10 +215,8 @@ def overwrites(self) -> Mapping[Any, List]:
226215 The data in the solution that exist more than once.
227216
228217 A "virtual" property to a dictionary with keys the names of values that
229- exist more than once, and values, all those values in a list, ordered:
230-
231- - before :meth:`finalized()`, as computed;
232- - after :meth:`finalized()`, in reverse.
218+ exist more than once, and values, all those values in a list, ordered
219+ in reverse compute order (1st is the last one computed).
233220 """
234221 maps = self .maps
235222 dd = defaultdict (list )
@@ -745,18 +732,19 @@ def execute(
745732 self ,
746733 )
747734
735+ ok = False
748736 try :
749737 executor (solution )
738+ ok = True
750739 finally :
751- solution .finalize ()
752-
753740 ## Log cumulative operations elapsed time.
754741 #
755742 if _isDebugLogging ():
756743 elapsed = sum (solution .elapsed_ms .values ())
757744 log .debug (
758- "=== (%s) Completed pipeline(%s) in %0.3fms." ,
745+ "=== (%s) %s pipeline(%s) in %0.3fms." ,
759746 solution .solid ,
747+ "Completed" if ok else "FAILED" ,
760748 name ,
761749 elapsed ,
762750 )
0 commit comments