2222
2323"""Workflow."""
2424
25+ from __future__ import annotations
26+
2527from enum import Enum
2628import logging
2729import os
@@ -714,7 +716,12 @@ def output_names(self):
714716 return out
715717
716718 @version_requires ("3.0" )
717- def connect_with (self , left_workflow , output_input_names = None ):
719+ def connect_with (
720+ self ,
721+ left_workflow : Workflow ,
722+ output_input_names : Union [tuple [str , str ], dict [str , str ]] = None ,
723+ permissive : bool = True ,
724+ ):
718725 """Prepend a given workflow to the current workflow.
719726
720727 Updates the current workflow to include all the operators of the workflow given as argument.
@@ -724,15 +731,18 @@ def connect_with(self, left_workflow, output_input_names=None):
724731
725732 Parameters
726733 ----------
727- left_workflow : core.Workflow
734+ left_workflow:
728735 The given workflow's outputs are chained with the current workflow's inputs.
729- output_input_names : str tuple, str dict optional
736+ output_input_names:
730737 Map used to connect the outputs of the given workflow to the inputs of the current
731738 workflow.
732739 Check the names of available inputs and outputs for each workflow using
733740 `Workflow.input_names` and `Workflow.output_names`.
734741 The default is ``None``, in which case it tries to connect each output of the
735742 left_workflow with an input of the current workflow with the same name.
743+ permissive:
744+ Whether to filter 'output_input_names' to only keep available connections.
745+ Otherwise raise an error if 'output_input_names' contains unavailable inputs or outputs.
736746
737747 Examples
738748 --------
@@ -791,24 +801,40 @@ def connect_with(self, left_workflow, output_input_names=None):
791801
792802 """
793803 if output_input_names :
794- core_api = self ._server .get_api_for_type (
795- capi = data_processing_capi .DataProcessingCAPI ,
796- grpcapi = data_processing_grpcapi .DataProcessingGRPCAPI ,
797- )
798- map = object_handler .ObjHandler (
799- data_processing_api = core_api ,
800- internal_obj = self ._api .workflow_create_connection_map_for_object (self ),
801- )
802804 if isinstance (output_input_names , tuple ):
803- self ._api .workflow_add_entry_connection_map (
804- map , output_input_names [0 ], output_input_names [1 ]
805+ output_input_names = {output_input_names [0 ]: output_input_names [1 ]}
806+ if isinstance (output_input_names , dict ):
807+ core_api = self ._server .get_api_for_type (
808+ capi = data_processing_capi .DataProcessingCAPI ,
809+ grpcapi = data_processing_grpcapi .DataProcessingGRPCAPI ,
810+ )
811+ map = object_handler .ObjHandler (
812+ data_processing_api = core_api ,
813+ internal_obj = self ._api .workflow_create_connection_map_for_object (self ),
805814 )
806- elif isinstance (output_input_names , dict ):
807- for key in output_input_names :
808- self ._api .workflow_add_entry_connection_map (map , key , output_input_names [key ])
815+ output_names = left_workflow .output_names
816+ input_names = self .input_names
817+ if permissive :
818+ output_input_names = dict (
819+ filter (
820+ lambda item : item [0 ] in left_workflow .output_names
821+ and item [1 ] in self .input_names ,
822+ output_input_names .items (),
823+ )
824+ )
825+ for output_name , input_name in output_input_names .items ():
826+ if output_name not in output_names :
827+ raise ValueError (
828+ f"Cannot connect workflow output '{ output_name } '. Exposed outputs are:\n { output_names } "
829+ )
830+ elif input_name not in input_names :
831+ raise ValueError (
832+ f"Cannot connect workflow input '{ input_name } '. Exposed inputs are:\n { input_names } "
833+ )
834+ self ._api .workflow_add_entry_connection_map (map , output_name , input_name )
809835 else :
810836 raise TypeError (
811- "output_input_names argument is expect" " to be either a str tuple or a str dict"
837+ "output_input_names argument is expected to be either a str tuple or a str dict"
812838 )
813839 self ._api .work_flow_connect_with_specified_names (self , left_workflow , map )
814840 else :
0 commit comments