22"""CLT utilities."""
33import logging
44from pathlib import Path
5- from typing import Any , ClassVar , Optional , TypeVar , Union
5+ from typing import Any , ClassVar , Optional , TypeVar , Union , Dict , List
66
77import cwl_utils .parser as cu_parser
88import yaml
@@ -465,6 +465,10 @@ def _save_cwl(self, path: Path) -> None:
465465 ) as file :
466466 file .write (yaml .dump (self .yaml ))
467467
468+ def get_inp_attr (self , __name : str ) -> Any :
469+ """Returns the input object of the given name"""
470+ return self .inputs [self ._input_names .index (__name )]
471+
468472
469473def extract_tools_paths_NONPORTABLE (steps : list [Step ]) -> Tools :
470474 """extract a Tools global configuration object from the NONPORTABLE paths hardcoded in the Steps.
@@ -488,7 +492,6 @@ class Workflow(BaseModel):
488492
489493 steps : list # list[Process] # and cannot use Process defined after Workflow within a Workflow
490494 process_name : str
491- user_args : list [str ]
492495 inputs : list [ProcessInput ] = []
493496 outputs : list [ProcessOutput ] = []
494497 _input_names : list [str ] = PrivateAttr (default_factory = list )
@@ -499,11 +502,10 @@ class Workflow(BaseModel):
499502 # field(default=None, init=False, repr=False)
500503 # TypeError: 'ModelPrivateAttr' object is not iterable
501504
502- def __init__ (self , steps : list , workflow_name : str , user_args : list [ str ] = [] ):
505+ def __init__ (self , steps : list , workflow_name : str ):
503506 data = {
504507 "process_name" : workflow_name ,
505- "steps" : steps ,
506- "user_args" : user_args
508+ "steps" : steps
507509 }
508510 super ().__init__ (** data )
509511
@@ -713,7 +715,21 @@ def flatten_subworkflows(self) -> list:
713715 subworkflows += step .flatten_subworkflows ()
714716 return subworkflows
715717
716- def compile (self , write_to_disk : bool = False ) -> CompilerInfo :
718+ def _convert_args_dict_to_args_list (self , args_dict : Dict [str , str ]) -> List [str ]:
719+ """ A simple utility converting a dict whose keys are CLI flag/args and
720+ values are CLI flag values
721+ Args:
722+ args_dict: A dictionary containing args and values
723+
724+ Returns:
725+ List[str]: A syntactically correct list of arguments (CLI flags) and values
726+ """
727+ args_list : List [str ] = []
728+ for arg_name , arg_value in args_dict .items ():
729+ args_list += ['--' + arg_name , arg_value ]
730+ return args_list
731+
732+ def compile (self , write_to_disk : bool = False , args_dict : Dict [str , str ] = {}) -> CompilerInfo :
717733 """Compile Workflow using WIC.
718734
719735 Args:
@@ -726,7 +742,8 @@ def compile(self, write_to_disk: bool = False) -> CompilerInfo:
726742 """
727743 global global_config
728744 self ._validate ()
729- args = get_args (self .process_name , self .user_args ) # Use mock CLI args
745+ user_args = self ._convert_args_dict_to_args_list (args_dict )
746+ args = get_args (self .process_name , user_args ) # Use mock CLI args
730747
731748 graph = get_graph_reps (self .process_name )
732749 yaml_tree = YamlTree (StepId (self .process_name , 'global' ), self .yaml )
@@ -746,13 +763,14 @@ def compile(self, write_to_disk: bool = False) -> CompilerInfo:
746763
747764 return compiler_info
748765
749- def get_cwl_workflow (self ) -> Json :
766+ def get_cwl_workflow (self , args_dict : Dict [ str , str ] = {} ) -> Json :
750767 """Return the compiled Cwl and its inputs in one payload Json
751768 Returns:
752769 Json: Contains the compiled CWL and yaml inputs to the workflow.
753770 """
754- compiler_info = self .compile (write_to_disk = False )
755- args = get_args (self .process_name , self .user_args )
771+ user_args = self ._convert_args_dict_to_args_list (args_dict )
772+ args = get_args (self .process_name , user_args )
773+ compiler_info = self .compile (args_dict = args_dict , write_to_disk = False )
756774 rose_tree = compiler_info .rose
757775
758776 # this is unfortunately necessary for now
@@ -780,12 +798,13 @@ def get_cwl_workflow(self) -> Json:
780798 }
781799 return workflow_json
782800
783- def run (self ) -> None :
801+ def run (self , compile_args_dict : Dict [ str , str ] = {}, run_args_dict : Dict [ str , str ] = {} ) -> None :
784802 """Run compiled workflow."""
785803 logger .info (f"Running { self .process_name } " )
786804 plugins .logging_filters ()
787- compiler_info = self .compile (write_to_disk = True )
788- args , unknown_args = get_known_and_unknown_args (self .process_name , self .user_args ) # Use mock CLI args
805+ compiler_info = self .compile (args_dict = compile_args_dict , write_to_disk = True )
806+ user_args = self ._convert_args_dict_to_args_list ({** compile_args_dict , ** run_args_dict })
807+ args , unknown_args = get_known_and_unknown_args (self .process_name , user_args ) # Use mock CLI args
789808 rose_tree : RoseTree = compiler_info .rose
790809
791810 post_compile .cwl_docker_extract (args .container_engine , args .pull_dir , self .process_name )
0 commit comments