Skip to content

Commit c4732d7

Browse files
vjaganat90Vasu Jaganath
andauthored
Add run_compute (#352)
* add run_compute and improve user_env sanitization * sanity_check URL string --------- Co-authored-by: Vasu Jaganath <[email protected]>
1 parent 661cb18 commit c4732d7

File tree

4 files changed

+135
-19
lines changed

4 files changed

+135
-19
lines changed

src/sophios/apis/python/api.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator
1414

1515
from sophios import compiler, input_output, plugins, utils_cwl
16-
from sophios import run_local as run_local_module
16+
from sophios import run_local as rl
1717
from sophios import post_compile as pc
1818
from sophios.cli import get_args, get_known_and_unknown_args
1919
from sophios.utils_graphs import get_graph_reps
@@ -753,7 +753,7 @@ def get_cwl_workflow(self) -> Json:
753753
return workflow_json
754754

755755
def run(self, run_args_dict: Dict[str, str] = default_values.default_run_args_dict,
756-
user_env: Dict[str, str] = {}, basepath: str = 'autogenerated') -> None:
756+
user_env_vars: Dict[str, str] = {}, basepath: str = 'autogenerated') -> None:
757757
"""Run the built CWL workflow.
758758
759759
Args:
@@ -781,14 +781,15 @@ def run(self, run_args_dict: Dict[str, str] = default_values.default_run_args_di
781781
user_args = convert_args_dict_to_args_list(run_args_dict)
782782

783783
# update the environment with user supplied env args
784-
for k, v in user_env.items():
785-
os.environ[k] = v
784+
os.environ.update(rl.sanitize_env_vars(user_env_vars))
785+
786786
_, unknown_args = get_known_and_unknown_args(
787787
self.process_name, user_args) # Use mock CLI args
788+
788789
# if there are no unknown_args then unkown_args will be an empty list []
789790
# so no need for a separate check of a particular flag!
790-
run_local_module.run_local(run_args_dict, False,
791-
workflow_name=self.process_name,
792-
basepath=basepath, passthrough_args=unknown_args)
791+
rl.run_local(run_args_dict, False,
792+
workflow_name=self.process_name,
793+
basepath=basepath, passthrough_args=unknown_args)
793794

794795
# Process = Union[Step, Workflow]

src/sophios/run_local.py

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,16 @@
33
import subprocess as sub
44
import sys
55
import os
6+
import re
67
import stat
78
from pathlib import Path
9+
from pprint import pprint
810
import shutil
911
import traceback
10-
from typing import List, Optional, Dict
1112
from datetime import datetime
13+
from typing import List, Optional, Dict
14+
import requests
15+
from sophios.wic_types import Json
1216

1317
try:
1418
import cwltool.main
@@ -30,6 +34,47 @@
3034
from .plugins import logging_filters
3135

3236

37+
def sanitize_env_vars(env_vars: Dict[str, str]) -> Dict[str, str]:
38+
"""
39+
Sanitizes a dictionary of user-defined environment variables, assuming all
40+
values are strings.
41+
42+
- Ensures keys are valid Bash variable names.
43+
- Removes potentially dangerous characters from string values.
44+
45+
Args:
46+
env_vars (Dict[str, str]): A dictionary of string key-value pairs.
47+
48+
Returns:
49+
Dict[str, str]: A new dictionary with sanitized key-value pairs.
50+
"""
51+
sanitized = {}
52+
53+
# Regex for a valid Bash variable name
54+
valid_key_pattern = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$')
55+
56+
# Characters to remove from values to prevent command injection
57+
dangerous_chars_pattern = re.compile(r'[;`\'"$()|<>&!\n\r]')
58+
59+
for key, value in env_vars.items():
60+
# Step 1: Validate the key.
61+
if not valid_key_pattern.fullmatch(key):
62+
print(f"Warning: Invalid environment variable key '{key}' skipped.")
63+
continue
64+
65+
# Step 2: Sanitize the value.
66+
sanitized_value = dangerous_chars_pattern.sub('', value)
67+
sanitized[key] = sanitized_value
68+
69+
return sanitized
70+
71+
72+
def create_safe_env(user_env: Dict[str, str]) -> dict:
73+
"""Generate a sanitized environment dict without applying it"""
74+
sanitized_user_env = sanitize_env_vars(user_env)
75+
return {**os.environ, **sanitized_user_env}
76+
77+
3378
def generate_run_script(cmdline: str) -> None:
3479
"""Writes the command used to invoke the cwl-runner to run.sh
3580
Does not actually invoke ./run.sh
@@ -189,6 +234,69 @@ def run_local(run_args_dict: Dict[str, str], use_subprocess: bool,
189234
return retval
190235

191236

237+
def run_compute(workflow_name: str, workflow: Json, workflow_inputs: Json,
238+
submit_url: str, user_env_vars: Dict[str, str] = {}) -> Optional[int]:
239+
"""This function runs the compiled workflow through compute.
240+
241+
Args:
242+
workflow_name (str): The name of the workflow
243+
workflow (Json): The compiled CWL workflow
244+
workflow_inputs (Json): The inputs for compiled CWL workflow
245+
submit_url (str): URL of Compute where the job is to be submitted
246+
user_env_vars (Dict[str,str]): User supplied environment variables
247+
248+
Returns:
249+
retval (Optional[int]): The return value indicating if run succeeded (0) or not
250+
"""
251+
# update the environment with user supplied env args
252+
os.environ.update(sanitize_env_vars(user_env_vars))
253+
254+
connect_timeout = 5 # in seconds
255+
read_timeout = 30 # in seconds
256+
timeout_tuple = (connect_timeout, read_timeout)
257+
# construct compute_workflow object to be submitted
258+
# append timestamp to the job/workflow name to create jobid
259+
now = datetime.now()
260+
date_time = now.strftime("%Y_%m_%d_%H.%M.%S")
261+
jobid = workflow_name + '__' + str(date_time) + '__'
262+
compute_workflow = {
263+
'cwlWorkflow': workflow,
264+
'cwlJobInputs': workflow_inputs,
265+
'id': jobid,
266+
'jobs': {}
267+
}
268+
269+
# sanity check if the string has the form of an URL
270+
if not utils.is_valid_url(submit_url):
271+
print("Ill-formed URL string detected! Please provide a valid URL")
272+
return 1
273+
274+
print('Sending request to Compute')
275+
res = requests.post(submit_url, json=compute_workflow, timeout=timeout_tuple)
276+
print('Post response code: ' + str(res.status_code))
277+
278+
res = requests.get(submit_url + f'{jobid}/outputs/', timeout=timeout_tuple)
279+
print('Output response code: ' + str(res.status_code))
280+
retval = 0 if res.status_code == 200 else 1
281+
print('Toil output: ' + str(res.text))
282+
283+
res = requests.get(submit_url + f'{jobid}/logs/', timeout=timeout_tuple)
284+
# 1. Parse the JSON string into a Python dictionary
285+
log_data = json.loads(res.text)
286+
287+
# 2. Extract the first key-value pair, which contains the main log content.
288+
# The key is the filename, and the value is the log text.
289+
first_key = list(log_data.keys())[0]
290+
log_content = log_data[first_key]
291+
292+
print('Toil logs: ')
293+
pprint(log_content, indent=4)
294+
295+
with open(f'compute_logs_{jobid}.txt', 'w', encoding='utf-8') as f:
296+
f.write(log_content)
297+
return retval
298+
299+
192300
def copy_output_files(yaml_stem: str, basepath: str = '') -> None:
193301
"""Copies output files from the cachedir to outdir/
194302

src/sophios/run_local_async.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from pathlib import Path
22
import traceback
3-
import os
43
from typing import Optional, Dict, Any
54
import asyncio
65
import aiofiles
@@ -12,16 +11,7 @@
1211

1312
import sophios.post_compile as pc
1413
from sophios.wic_types import Json
15-
from .run_local import build_cmd, copy_output_files
16-
17-
18-
def create_safe_env(user_env: Dict[str, str]) -> dict:
19-
"""Generate a sanitized environment dict without applying it"""
20-
forbidden = {"PATH", "LD_", "PYTHON", "SECRET_", "BASH_ENV"}
21-
for key in user_env:
22-
if any(key.startswith(prefix) for prefix in forbidden):
23-
raise ValueError(f"Prohibited key: {key}")
24-
return {**os.environ, **user_env}
14+
from .run_local import build_cmd, copy_output_files, create_safe_env
2515

2616

2717
async def run_cwl_workflow(

src/sophios/utils.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import copy
22
from pathlib import Path
3+
from urllib.parse import urlparse
34
from typing import Any, Dict, List, Optional, Tuple
45

56
import yaml
@@ -522,3 +523,19 @@ def convert_args_dict_to_args_list(args_dict: Dict[str, str]) -> List[str]:
522523
for arg_name, arg_value in args_dict.items():
523524
args_list += ['--' + arg_name, arg_value]
524525
return args_list
526+
527+
528+
def is_valid_url(url: str) -> bool:
529+
"""A simple utility that tells if the string is a valid url
530+
531+
Args:
532+
url(str): A string that is supposed to be an URL
533+
534+
Returns:
535+
bool: True if it is an URL
536+
"""
537+
try:
538+
result = urlparse(url)
539+
return all([result.scheme, result.netloc]) and result.scheme in ('http', 'https')
540+
except ValueError:
541+
return False

0 commit comments

Comments
 (0)