Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions src/sophios/apis/python/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator

from sophios import compiler, input_output, plugins, utils_cwl
from sophios import run_local as run_local_module
from sophios import run_local as rl
from sophios import post_compile as pc
from sophios.cli import get_args, get_known_and_unknown_args
from sophios.utils_graphs import get_graph_reps
Expand Down Expand Up @@ -753,7 +753,7 @@ def get_cwl_workflow(self) -> Json:
return workflow_json

def run(self, run_args_dict: Dict[str, str] = default_values.default_run_args_dict,
user_env: Dict[str, str] = {}, basepath: str = 'autogenerated') -> None:
user_env_vars: Dict[str, str] = {}, basepath: str = 'autogenerated') -> None:
"""Run the built CWL workflow.

Args:
Expand Down Expand Up @@ -781,14 +781,15 @@ def run(self, run_args_dict: Dict[str, str] = default_values.default_run_args_di
user_args = convert_args_dict_to_args_list(run_args_dict)

# update the environment with user supplied env args
for k, v in user_env.items():
os.environ[k] = v
os.environ.update(rl.sanitize_env_vars(user_env_vars))

_, unknown_args = get_known_and_unknown_args(
self.process_name, user_args) # Use mock CLI args

# if there are no unknown_args then unkown_args will be an empty list []
# so no need for a separate check of a particular flag!
run_local_module.run_local(run_args_dict, False,
workflow_name=self.process_name,
basepath=basepath, passthrough_args=unknown_args)
rl.run_local(run_args_dict, False,
workflow_name=self.process_name,
basepath=basepath, passthrough_args=unknown_args)

# Process = Union[Step, Workflow]
110 changes: 109 additions & 1 deletion src/sophios/run_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
import subprocess as sub
import sys
import os
import re
import stat
from pathlib import Path
from pprint import pprint
import shutil
import traceback
from typing import List, Optional, Dict
from datetime import datetime
from typing import List, Optional, Dict
import requests
from sophios.wic_types import Json

try:
import cwltool.main
Expand All @@ -30,6 +34,47 @@
from .plugins import logging_filters


def sanitize_env_vars(env_vars: Dict[str, str]) -> Dict[str, str]:
"""
Sanitizes a dictionary of user-defined environment variables, assuming all
values are strings.

- Ensures keys are valid Bash variable names.
- Removes potentially dangerous characters from string values.

Args:
env_vars (Dict[str, str]): A dictionary of string key-value pairs.

Returns:
Dict[str, str]: A new dictionary with sanitized key-value pairs.
"""
sanitized = {}

# Regex for a valid Bash variable name
valid_key_pattern = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$')

# Characters to remove from values to prevent command injection
dangerous_chars_pattern = re.compile(r'[;`\'"$()|<>&!\n\r]')

for key, value in env_vars.items():
# Step 1: Validate the key.
if not valid_key_pattern.fullmatch(key):
print(f"Warning: Invalid environment variable key '{key}' skipped.")
continue

# Step 2: Sanitize the value.
sanitized_value = dangerous_chars_pattern.sub('', value)
sanitized[key] = sanitized_value

return sanitized


def create_safe_env(user_env: Dict[str, str]) -> dict:
"""Generate a sanitized environment dict without applying it"""
sanitized_user_env = sanitize_env_vars(user_env)
return {**os.environ, **sanitized_user_env}


def generate_run_script(cmdline: str) -> None:
"""Writes the command used to invoke the cwl-runner to run.sh
Does not actually invoke ./run.sh
Expand Down Expand Up @@ -189,6 +234,69 @@ def run_local(run_args_dict: Dict[str, str], use_subprocess: bool,
return retval


def run_compute(workflow_name: str, workflow: Json, workflow_inputs: Json,
submit_url: str, user_env_vars: Dict[str, str] = {}) -> Optional[int]:
"""This function runs the compiled workflow through compute.

Args:
workflow_name (str): The name of the workflow
workflow (Json): The compiled CWL workflow
workflow_inputs (Json): The inputs for compiled CWL workflow
submit_url (str): URL of Compute where the job is to be submitted
user_env_vars (Dict[str,str]): User supplied environment variables

Returns:
retval (Optional[int]): The return value indicating if run succeeded (0) or not
"""
# update the environment with user supplied env args
os.environ.update(sanitize_env_vars(user_env_vars))

connect_timeout = 5 # in seconds
read_timeout = 30 # in seconds
timeout_tuple = (connect_timeout, read_timeout)
# construct compute_workflow object to be submitted
# append timestamp to the job/workflow name to create jobid
now = datetime.now()
date_time = now.strftime("%Y_%m_%d_%H.%M.%S")
jobid = workflow_name + '__' + str(date_time) + '__'
compute_workflow = {
'cwlWorkflow': workflow,
'cwlJobInputs': workflow_inputs,
'id': jobid,
'jobs': {}
}

# sanity check if the string has the form of an URL
if not utils.is_valid_url(submit_url):
print("Ill-formed URL string detected! Please provide a valid URL")
return 1

print('Sending request to Compute')
res = requests.post(submit_url, json=compute_workflow, timeout=timeout_tuple)
print('Post response code: ' + str(res.status_code))

res = requests.get(submit_url + f'{jobid}/outputs/', timeout=timeout_tuple)
print('Output response code: ' + str(res.status_code))
retval = 0 if res.status_code == 200 else 1
print('Toil output: ' + str(res.text))

res = requests.get(submit_url + f'{jobid}/logs/', timeout=timeout_tuple)
# 1. Parse the JSON string into a Python dictionary
log_data = json.loads(res.text)

# 2. Extract the first key-value pair, which contains the main log content.
# The key is the filename, and the value is the log text.
first_key = list(log_data.keys())[0]
log_content = log_data[first_key]

print('Toil logs: ')
pprint(log_content, indent=4)

with open(f'compute_logs_{jobid}.txt', 'w', encoding='utf-8') as f:
f.write(log_content)
return retval


def copy_output_files(yaml_stem: str, basepath: str = '') -> None:
"""Copies output files from the cachedir to outdir/

Expand Down
12 changes: 1 addition & 11 deletions src/sophios/run_local_async.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from pathlib import Path
import traceback
import os
from typing import Optional, Dict, Any
import asyncio
import aiofiles
Expand All @@ -12,16 +11,7 @@

import sophios.post_compile as pc
from sophios.wic_types import Json
from .run_local import build_cmd, copy_output_files


def create_safe_env(user_env: Dict[str, str]) -> dict:
"""Generate a sanitized environment dict without applying it"""
forbidden = {"PATH", "LD_", "PYTHON", "SECRET_", "BASH_ENV"}
for key in user_env:
if any(key.startswith(prefix) for prefix in forbidden):
raise ValueError(f"Prohibited key: {key}")
return {**os.environ, **user_env}
from .run_local import build_cmd, copy_output_files, create_safe_env


async def run_cwl_workflow(
Expand Down
17 changes: 17 additions & 0 deletions src/sophios/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
from pathlib import Path
from urllib.parse import urlparse
from typing import Any, Dict, List, Optional, Tuple

import yaml
Expand Down Expand Up @@ -522,3 +523,19 @@ def convert_args_dict_to_args_list(args_dict: Dict[str, str]) -> List[str]:
for arg_name, arg_value in args_dict.items():
args_list += ['--' + arg_name, arg_value]
return args_list


def is_valid_url(url: str) -> bool:
"""A simple utility that tells if the string is a valid url

Args:
url(str): A string that is supposed to be an URL

Returns:
bool: True if it is an URL
"""
try:
result = urlparse(url)
return all([result.scheme, result.netloc]) and result.scheme in ('http', 'https')
except ValueError:
return False