@@ -395,12 +395,91 @@ def _complete_prev_state(self, prev_state=None):
395
395
prev_state = prev_state [0 ]
396
396
397
397
if isinstance (prev_state , list ):
398
- prev_state , self .other_states = hlpst .removed_previous_repeated (
399
- prev_state , self .other_states
400
- )
401
-
398
+ prev_state = self ._removed_repeated (prev_state )
402
399
return prev_state
403
400
401
+ def _removed_repeated (self , previous_splitters ):
402
+ """removing states from previous tasks that are repeated either directly or indirectly"""
403
+ for el in previous_splitters :
404
+ if el [1 :] not in self .other_states :
405
+ raise hlpst .PydraStateError (
406
+ f"can't ask for splitter from { el [1 :]} , other nodes that are connected: { self .other_states } "
407
+ )
408
+
409
+ repeated = set (
410
+ [
411
+ (el , previous_splitters .count (el ))
412
+ for el in previous_splitters
413
+ if previous_splitters .count (el ) > 1
414
+ ]
415
+ )
416
+ if repeated :
417
+ # assuming that I want to remove fro right
418
+ previous_splitters .reverse ()
419
+ for el , cnt in repeated :
420
+ for ii in range (cnt ):
421
+ previous_splitters .remove (el )
422
+ previous_splitters .reverse ()
423
+
424
+ el_state = []
425
+ el_connect = []
426
+ el_state_connect = []
427
+ for el in previous_splitters :
428
+ nm = el [1 :]
429
+ st = self .other_states [nm ][0 ]
430
+ if not st .other_states :
431
+ # states that has no other connections
432
+ el_state .append (el )
433
+ else : # element has previous_connection
434
+ if st .current_splitter : # final?
435
+ # states that has previous connections and it's own splitter
436
+ el_state_connect .append ((el , st .prev_state_splitter ))
437
+ else :
438
+ # states with previous connections but no additional splitter
439
+ el_connect .append ((el , st .prev_state_splitter ))
440
+
441
+ for el in el_connect :
442
+ nm = el [0 ][1 :]
443
+ repeated_prev = set (ensure_list (el [1 ])).intersection (el_state )
444
+ if repeated_prev :
445
+ for r_el in repeated_prev :
446
+ r_nm = r_el [1 :]
447
+ self .other_states [r_nm ] = (
448
+ self .other_states [r_nm ][0 ],
449
+ self .other_states [r_nm ][1 ] + self .other_states [nm ][1 ],
450
+ )
451
+ new_st = set (ensure_list (el [1 ])) - set (el_state )
452
+ if not new_st :
453
+ previous_splitters .remove (el [0 ])
454
+ else :
455
+ for n_el in new_st :
456
+ n_nm = n_el [1 :]
457
+ self .other_states [n_nm ] = (
458
+ self .other_states [nm ][0 ].other_states [n_nm ][0 ],
459
+ self .other_states [nm ][1 ],
460
+ )
461
+ # removing el of the splitter and adding new_st instead
462
+ ind = previous_splitters .index (el [0 ])
463
+ if ind == len (previous_splitters ) - 1 :
464
+ previous_splitters = previous_splitters [:- 1 ] + list (new_st )
465
+ else :
466
+ previous_splitters = (
467
+ previous_splitters [:ind ]
468
+ + list (new_st )
469
+ + previous_splitters [ind + 1 :]
470
+ )
471
+ # TODO: this part is not tested, needs more work
472
+ for el in el_state_connect :
473
+ repeated_prev = set (ensure_list (el [1 ])).intersection (el_state )
474
+ if repeated_prev :
475
+ for r_el in repeated_prev :
476
+ previous_splitters .remove (r_el )
477
+
478
+ if len (previous_splitters ) == 1 :
479
+ return previous_splitters [0 ]
480
+ else :
481
+ return previous_splitters
482
+
404
483
def _prevst_current_check (self , splitter_part , check_nested = True ):
405
484
"""
406
485
Check if splitter_part is purely prev-state part, the current part,
0 commit comments