@@ -63,10 +63,52 @@ def is_skip_evictions():
6363 return _execution_configs .get ()["skip_evictions" ]
6464
6565
66- def _del_chain_key (amap : ChainMap , key ):
67- log .debug ("removing data '%s' from solution." , key )
68- for d in amap .maps :
69- d .pop (key , None )
66+ class Solution (ChainMap ):
67+ """Collects outputs from operations, preserving :term:`overwrites`."""
68+
69+ def __init__ (self , plan , * args , ** kw ):
70+ super ().__init__ (* args , ** kw )
71+ self .executed = iset ()
72+ self .finished = False
73+ self .plan = plan
74+
75+ def __repr__ (self ):
76+ items = ", " .join (f"{ k !r} : { v !r} " for k , v in self .items ())
77+ return f"{{{ items } }}"
78+
79+ def operation_executed (self , op , outputs ):
80+ """invoked once per operation, with its results"""
81+ assert not self .finished , f"Cannot reuse solution: { self } "
82+ self .maps .append (outputs )
83+ self .executed .add (op )
84+
85+ def finish (self ):
86+ """invoked only once, after all ops have been executed"""
87+ # Invert solution so that last value wins
88+ if not self .finished :
89+ self .maps = self .maps [::- 1 ]
90+ self .finised = True
91+
92+ def __delitem__ (self , key ):
93+ log .debug ("removing data '%s' from solution." , key )
94+ for d in self .maps :
95+ d .pop (key , None )
96+
97+ def overwrites (self ) -> Mapping [Any , List ]:
98+ """
99+ Collect items in the maps that exist more than once.
100+
101+ :return:
102+ a dictionary with keys only those items that existed in more than one map,
103+ an values, all those values, in the order of given `maps`
104+ """
105+ maps = self .maps
106+ dd = defaultdict (list )
107+ for d in maps :
108+ for k , v in d .items ():
109+ dd [k ].append (v )
110+
111+ return {k : v for k , v in dd .items () if len (v ) > 1 }
70112
71113
72114class _DataNode (str ):
@@ -241,14 +283,14 @@ def _call_operation(self, op, solution):
241283 try :
242284 return op .compute (solution )
243285 except Exception as ex :
244- jetsam (ex , locals (), plan = "self" )
286+ jetsam (ex , locals (), "solution" , plan = "self" )
245287 finally :
246288 # record execution time
247289 t_complete = round (time .time () - t0 , 5 )
248290 self .times [op .name ] = t_complete
249291 log .debug ("...step completion time: %s" , t_complete )
250292
251- def _execute_thread_pool_barrier_method (self , solution : ChainMap , executed ):
293+ def _execute_thread_pool_barrier_method (self , solution : Solution ):
252294 """
253295 This method runs the graph using a parallel pool of thread executors.
254296 You may achieve lower total latency if your graph is sufficiently
@@ -263,7 +305,7 @@ def _execute_thread_pool_barrier_method(self, solution: ChainMap, executed):
263305 # scheduled, then schedule them onto a thread pool, then collect their
264306 # results onto a memory solution for use upon the next iteration.
265307 while True :
266- self ._check_if_aborted (executed )
308+ self ._check_if_aborted (solution . executed )
267309
268310 # the upnext list contains a list of operations for scheduling
269311 # in the current round of scheduling
@@ -273,14 +315,14 @@ def _execute_thread_pool_barrier_method(self, solution: ChainMap, executed):
273315 # based on what has already been executed.
274316 if (
275317 isinstance (node , Operation )
276- and node not in executed
318+ and node not in solution . executed
277319 # Use `broken_dag` to allow executing operations from given inputs
278320 # regardless of whether their producers have yet to re-calc them.
279321 and set (
280322 n
281323 for n in nx .ancestors (self .broken_dag , node )
282324 if isinstance (n , Operation )
283- ).issubset (executed )
325+ ).issubset (solution . executed )
284326 ):
285327 upnext .append (node )
286328 elif isinstance (node , _EvictInstruction ):
@@ -294,10 +336,12 @@ def _execute_thread_pool_barrier_method(self, solution: ChainMap, executed):
294336 node not in self .dag .nodes
295337 # Scan node's successors in `broken_dag`, not to block
296338 # an op waiting for calced data already given as input.
297- or set (self .broken_dag .successors (node )).issubset (executed )
339+ or set (self .broken_dag .successors (node )).issubset (
340+ solution .executed
341+ )
298342 )
299343 ):
300- _del_chain_key ( solution , node )
344+ del solution [ node ]
301345
302346 # stop if no nodes left to schedule, exit out of the loop
303347 if not upnext :
@@ -308,36 +352,33 @@ def _execute_thread_pool_barrier_method(self, solution: ChainMap, executed):
308352 )
309353
310354 for op , outputs in done_iterator :
311- solution .maps .append (outputs )
312- executed .add (op )
355+ solution .operation_executed (op , outputs )
313356
314- def _execute_sequential_method (self , solution : ChainMap , executed ):
357+ def _execute_sequential_method (self , solution : Solution ):
315358 """
316359 This method runs the graph one operation at a time in a single thread
317360
318361 :param solution:
319362 must contain the input values only, gets modified
320363 """
321364 for step in self .steps :
322- self ._check_if_aborted (executed )
365+ self ._check_if_aborted (solution . executed )
323366
324367 if isinstance (step , Operation ):
325368 log .debug ("%sexecuting step: %s" , "-" * 32 , step .name )
326369
327370 outputs = self ._call_operation (step , solution )
328- solution .maps . append ( outputs )
329- executed . add ( step )
371+ solution .operation_executed ( step , outputs )
372+
330373 elif isinstance (step , _EvictInstruction ):
331374 # Cache value may be missing if it is optional.
332375 if step in solution :
333- _del_chain_key ( solution , step )
376+ del solution [ step ]
334377
335378 else :
336379 raise AssertionError (f"Unrecognized instruction.{ step } " )
337380
338- def execute (
339- self , named_inputs , outputs = None , * , solution : ChainMap = None , method = None
340- ):
381+ def execute (self , named_inputs , outputs = None , * , method = None ) -> Solution :
341382 """
342383 :param named_inputs:
343384 A maping of names --> values that must contain at least
@@ -347,16 +388,10 @@ def execute(
347388 :param outputs:
348389 If not None, they are just checked if possible, based on :attr:`provides`,
349390 and scream if not.
350- :param solution:
351- If not None, it must be a :class:`collections.ChainMap`, which will
352- collect all results in a separate dictionary for each operation execution.
353- The 1st dictionary in its maplist will collect the inputs, but will endup
354- to the be last one when execution finishes.
355-
356- See :term:`solution`
357391
358392 :return:
359- the populates `solution`
393+ The :term:`solution` which contains the results of each operation executed
394+ +1 for inputs in separate dictionaries.
360395
361396 :raises ValueError:
362397 - If plan does not contain any operations, with msg:
@@ -382,25 +417,20 @@ def execute(
382417 else self ._execute_sequential_method
383418 )
384419
385- if solution is None :
386- solution = ChainMap ()
387- preload_layers = len (solution .maps ) # TODO: move to solution
388-
389420 # If certain outputs asked, put relevant-only inputs in solution,
390421 # otherwise, keep'em all.
391422 #
392423 # Note: clone and keep original `inputs` in the 1st chained-map.
393- solution .update (
424+ solution = Solution (
425+ self ,
394426 {k : v for k , v in named_inputs .items () if k in self .dag .nodes }
395427 if self .evict
396- else named_inputs
428+ else named_inputs ,
397429 )
398- executed = set ()
399- executor (solution , executed )
400-
401- # Invert solution so that last value wins
402- # TODO: move to solution
403- solution .maps = solution .maps [::- 1 ]
430+ try :
431+ executor (solution )
432+ finally :
433+ solution .finish ()
404434
405435 # Validate eviction was perfect
406436 #
@@ -411,20 +441,9 @@ def execute(
411441 or set (solution ).issubset (self .provides )
412442 ), f"Evictions left more data{ list (iset (solution ) - set (self .provides ))} than { self } !"
413443
414- # Validate solution layers match operations executed + 1(inputs)
415- # TODO: move to solution
416- #
417- assert len (solution .maps ) - preload_layers == sum (
418- 1 for i in yield_ops (self .steps )
419- ), (
420- f"Solution layers({ len (solution .maps )} , preloaded: { preload_layers } ) mismatched "
421- f"operations executed({ sum (1 for i in yield_ops (self .dag ))} )!"
422- f"\n { self } \n solution: { solution } "
423- )
424-
425444 return solution
426445 except Exception as ex :
427- jetsam (ex , locals (), "solution" , "executed" )
446+ jetsam (ex , locals (), "solution" )
428447
429448
430449class Network (Plotter ):
0 commit comments