diff --git a/src/sophios/apis/python/api.py b/src/sophios/apis/python/api.py index bdb2ca82..e79b467b 100644 --- a/src/sophios/apis/python/api.py +++ b/src/sophios/apis/python/api.py @@ -13,7 +13,7 @@ from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator from sophios import compiler, input_output, plugins, utils_cwl -from sophios import run_local as run_local_module +from sophios import run_local as rl from sophios import post_compile as pc from sophios.cli import get_args, get_known_and_unknown_args from sophios.utils_graphs import get_graph_reps @@ -753,7 +753,7 @@ def get_cwl_workflow(self) -> Json: return workflow_json def run(self, run_args_dict: Dict[str, str] = default_values.default_run_args_dict, - user_env: Dict[str, str] = {}, basepath: str = 'autogenerated') -> None: + user_env_vars: Dict[str, str] = {}, basepath: str = 'autogenerated') -> None: """Run the built CWL workflow. Args: @@ -781,14 +781,15 @@ def run(self, run_args_dict: Dict[str, str] = default_values.default_run_args_di user_args = convert_args_dict_to_args_list(run_args_dict) # update the environment with user supplied env args - for k, v in user_env.items(): - os.environ[k] = v + os.environ.update(rl.sanitize_env_vars(user_env_vars)) + _, unknown_args = get_known_and_unknown_args( self.process_name, user_args) # Use mock CLI args + # if there are no unknown_args then unkown_args will be an empty list [] # so no need for a separate check of a particular flag! - run_local_module.run_local(run_args_dict, False, - workflow_name=self.process_name, - basepath=basepath, passthrough_args=unknown_args) + rl.run_local(run_args_dict, False, + workflow_name=self.process_name, + basepath=basepath, passthrough_args=unknown_args) # Process = Union[Step, Workflow] diff --git a/src/sophios/run_local.py b/src/sophios/run_local.py index 9d314a68..e6f3691b 100644 --- a/src/sophios/run_local.py +++ b/src/sophios/run_local.py @@ -3,12 +3,16 @@ import subprocess as sub import sys import os +import re import stat from pathlib import Path +from pprint import pprint import shutil import traceback -from typing import List, Optional, Dict from datetime import datetime +from typing import List, Optional, Dict +import requests +from sophios.wic_types import Json try: import cwltool.main @@ -30,6 +34,47 @@ from .plugins import logging_filters +def sanitize_env_vars(env_vars: Dict[str, str]) -> Dict[str, str]: + """ + Sanitizes a dictionary of user-defined environment variables, assuming all + values are strings. + + - Ensures keys are valid Bash variable names. + - Removes potentially dangerous characters from string values. + + Args: + env_vars (Dict[str, str]): A dictionary of string key-value pairs. + + Returns: + Dict[str, str]: A new dictionary with sanitized key-value pairs. + """ + sanitized = {} + + # Regex for a valid Bash variable name + valid_key_pattern = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$') + + # Characters to remove from values to prevent command injection + dangerous_chars_pattern = re.compile(r'[;`\'"$()|<>&!\n\r]') + + for key, value in env_vars.items(): + # Step 1: Validate the key. + if not valid_key_pattern.fullmatch(key): + print(f"Warning: Invalid environment variable key '{key}' skipped.") + continue + + # Step 2: Sanitize the value. + sanitized_value = dangerous_chars_pattern.sub('', value) + sanitized[key] = sanitized_value + + return sanitized + + +def create_safe_env(user_env: Dict[str, str]) -> dict: + """Generate a sanitized environment dict without applying it""" + sanitized_user_env = sanitize_env_vars(user_env) + return {**os.environ, **sanitized_user_env} + + def generate_run_script(cmdline: str) -> None: """Writes the command used to invoke the cwl-runner to run.sh Does not actually invoke ./run.sh @@ -189,6 +234,69 @@ def run_local(run_args_dict: Dict[str, str], use_subprocess: bool, return retval +def run_compute(workflow_name: str, workflow: Json, workflow_inputs: Json, + submit_url: str, user_env_vars: Dict[str, str] = {}) -> Optional[int]: + """This function runs the compiled workflow through compute. + + Args: + workflow_name (str): The name of the workflow + workflow (Json): The compiled CWL workflow + workflow_inputs (Json): The inputs for compiled CWL workflow + submit_url (str): URL of Compute where the job is to be submitted + user_env_vars (Dict[str,str]): User supplied environment variables + + Returns: + retval (Optional[int]): The return value indicating if run succeeded (0) or not + """ + # update the environment with user supplied env args + os.environ.update(sanitize_env_vars(user_env_vars)) + + connect_timeout = 5 # in seconds + read_timeout = 30 # in seconds + timeout_tuple = (connect_timeout, read_timeout) + # construct compute_workflow object to be submitted + # append timestamp to the job/workflow name to create jobid + now = datetime.now() + date_time = now.strftime("%Y_%m_%d_%H.%M.%S") + jobid = workflow_name + '__' + str(date_time) + '__' + compute_workflow = { + 'cwlWorkflow': workflow, + 'cwlJobInputs': workflow_inputs, + 'id': jobid, + 'jobs': {} + } + + # sanity check if the string has the form of an URL + if not utils.is_valid_url(submit_url): + print("Ill-formed URL string detected! Please provide a valid URL") + return 1 + + print('Sending request to Compute') + res = requests.post(submit_url, json=compute_workflow, timeout=timeout_tuple) + print('Post response code: ' + str(res.status_code)) + + res = requests.get(submit_url + f'{jobid}/outputs/', timeout=timeout_tuple) + print('Output response code: ' + str(res.status_code)) + retval = 0 if res.status_code == 200 else 1 + print('Toil output: ' + str(res.text)) + + res = requests.get(submit_url + f'{jobid}/logs/', timeout=timeout_tuple) + # 1. Parse the JSON string into a Python dictionary + log_data = json.loads(res.text) + + # 2. Extract the first key-value pair, which contains the main log content. + # The key is the filename, and the value is the log text. + first_key = list(log_data.keys())[0] + log_content = log_data[first_key] + + print('Toil logs: ') + pprint(log_content, indent=4) + + with open(f'compute_logs_{jobid}.txt', 'w', encoding='utf-8') as f: + f.write(log_content) + return retval + + def copy_output_files(yaml_stem: str, basepath: str = '') -> None: """Copies output files from the cachedir to outdir/ diff --git a/src/sophios/run_local_async.py b/src/sophios/run_local_async.py index 4978cb13..c49ef137 100644 --- a/src/sophios/run_local_async.py +++ b/src/sophios/run_local_async.py @@ -1,6 +1,5 @@ from pathlib import Path import traceback -import os from typing import Optional, Dict, Any import asyncio import aiofiles @@ -12,16 +11,7 @@ import sophios.post_compile as pc from sophios.wic_types import Json -from .run_local import build_cmd, copy_output_files - - -def create_safe_env(user_env: Dict[str, str]) -> dict: - """Generate a sanitized environment dict without applying it""" - forbidden = {"PATH", "LD_", "PYTHON", "SECRET_", "BASH_ENV"} - for key in user_env: - if any(key.startswith(prefix) for prefix in forbidden): - raise ValueError(f"Prohibited key: {key}") - return {**os.environ, **user_env} +from .run_local import build_cmd, copy_output_files, create_safe_env async def run_cwl_workflow( diff --git a/src/sophios/utils.py b/src/sophios/utils.py index 8d1fa537..fbfd0d84 100644 --- a/src/sophios/utils.py +++ b/src/sophios/utils.py @@ -1,5 +1,6 @@ import copy from pathlib import Path +from urllib.parse import urlparse from typing import Any, Dict, List, Optional, Tuple import yaml @@ -522,3 +523,19 @@ def convert_args_dict_to_args_list(args_dict: Dict[str, str]) -> List[str]: for arg_name, arg_value in args_dict.items(): args_list += ['--' + arg_name, arg_value] return args_list + + +def is_valid_url(url: str) -> bool: + """A simple utility that tells if the string is a valid url + + Args: + url(str): A string that is supposed to be an URL + + Returns: + bool: True if it is an URL + """ + try: + result = urlparse(url) + return all([result.scheme, result.netloc]) and result.scheme in ('http', 'https') + except ValueError: + return False