Skip to content

Commit 7d4bb80

Browse files
authored
add run_local_async with desired async behavior (PolusAI#336)
1 parent eb8a93f commit 7d4bb80

File tree

5 files changed

+129
-91
lines changed

5 files changed

+129
-91
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ repos:
6969
- types-setuptools
7070
- types-six
7171
- types-urllib3
72+
- types-aiofiles
7273
- repo: https://github.com/pre-commit/pygrep-hooks
7374
rev: v1.10.0
7475
hooks:

install/system_deps.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,3 @@ dependencies:
3939
# Needs binary PyQt5 dependencies.
4040
- kubernetes-helm
4141
- zstandard
42-
# # Needed for orjson wheels
43-
# - orjson

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ mypy-types = [
9696
"types-setuptools",
9797
"types-six",
9898
"types-urllib3",
99+
"types-aiofiles"
99100
]
100101
# NOTE: toil and cwltool have a somewhat continuous release model. However,
101102
# there can be updates and bugfixes in main that have not yet been released.

src/sophios/run_local.py

Lines changed: 2 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,8 @@
99
import shutil
1010
import platform
1111
import traceback
12-
import yaml
1312
from typing import Dict, List, Optional
1413
from datetime import datetime
15-
import sophios.post_compile as pc
16-
from sophios.wic_types import Json
1714

1815
try:
1916
import cwltool.main
@@ -304,7 +301,7 @@ def copy_output_files(yaml_stem: str, basepath: str = '') -> None:
304301
Args:
305302
yaml_stem (str): The --yaml filename (without .extension)
306303
"""
307-
output_json_file = Path(f'output_{yaml_stem}.json')
304+
output_json_file = Path(f'{basepath}/output_{yaml_stem}.json')
308305
if output_json_file.exists():
309306
pass # TODO
310307

@@ -386,7 +383,7 @@ def build_cmd(workflow_name: str, basepath: str, cwl_runner: str, container_cmd:
386383
elif cwl_runner == 'toil-cwl-runner':
387384
container_pull = []
388385
now = datetime.now()
389-
date_time = now.strftime("%Y%m%d%H%M%S")
386+
date_time = now.strftime("%Y_%m_%d_%H.%M.%S")
390387
cmd = [script] + container_pull + provenance + container_cmd_ + path_check
391388
cmd += ['--outdir', f'{basepath}/outdir_toil_{date_time}',
392389
'--jobStore', f'file:{basepath}/jobStore_{workflow_name}', # NOTE: This is the equivalent of --cachedir
@@ -401,90 +398,6 @@ def build_cmd(workflow_name: str, basepath: str, cwl_runner: str, container_cmd:
401398
return cmd
402399

403400

404-
def run_cwl_workflow(workflow_name: str, basepath: str, cwl_runner: str, container_cmd: str, use_subprocess: bool, env_commands: List[str] = []) -> int:
405-
"""Run the CWL workflow in an environment
406-
407-
Args:
408-
workflow_name (str): Name of the .cwl workflow file to be executed
409-
basepath (str): The path at which the workflow to be executed
410-
cwl_runner (str): The CWL runner used to execute the workflow
411-
container_cmd (str): The container engine command
412-
use_subprocess (bool): When using cwltool, determines whether to use subprocess.run(...)
413-
or use the cwltool python api.
414-
env_commands (List[str]): environment variables and commands needed to be run before running the workflow
415-
Returns:
416-
retval: The return value
417-
"""
418-
cmd = build_cmd(workflow_name, basepath, cwl_runner, container_cmd)
419-
cmdline = ' '.join(cmd)
420-
421-
retval = 1 # overwrite on success
422-
print('Running ' + cmdline)
423-
if use_subprocess:
424-
# To run in parallel (i.e. pytest ... --workers 8 ...), we need to
425-
# use separate processes. Otherwise:
426-
# "signal only works in main thread or with __pypy__.thread.enable_signals()"
427-
proc = sub.run(cmd, check=False)
428-
retval = proc.returncode
429-
else:
430-
print('via cwltool.main.main python API')
431-
try:
432-
if cwl_runner == 'cwltool':
433-
retval = cwltool.main.main(cmd[1:])
434-
elif cwl_runner == 'toil-cwl-runner':
435-
_ = sub.run(env_commands, shell=True, check=False, executable="/bin/bash")
436-
retval = toil.cwl.cwltoil.main(cmd[1:])
437-
else:
438-
raise Exception("Invalid cwl_runner!")
439-
440-
print(f'Final output json metadata blob is in output_{workflow_name}.json')
441-
except Exception as e:
442-
print('Failed to execute', workflow_name)
443-
print(f'See error_{workflow_name}.txt for detailed technical information.')
444-
# Do not display a nasty stack trace to the user; hide it in a file.
445-
with open(f'error_{workflow_name}.txt', mode='w', encoding='utf-8') as f:
446-
# https://mypy.readthedocs.io/en/stable/common_issues.html#python-version-and-system-platform-checks
447-
if sys.version_info >= (3, 10):
448-
traceback.print_exception(type(e), value=e, tb=None, file=f)
449-
print(e) # we are always running this on CI
450-
# only copy output files if using cwltool
451-
if cwl_runner == 'cwltool':
452-
copy_output_files(workflow_name, basepath=basepath)
453-
return retval
454-
455-
456-
async def run_cwl_serialized_async(workflow: Json, basepath: str,
457-
cwl_runner: str, container_cmd: str,
458-
env_commands: List[str] = []) -> None:
459-
"""Prepare and run compiled and serialized CWL workflow asynchronously
460-
461-
Args:
462-
workflow_json (Json): Compiled and serialized CWL workflow
463-
basepath (str): The path at which the workflow to be executed
464-
cwl_runner (str): The CWL runner used to execute the workflow
465-
container_cmd (str): The container engine command
466-
env_commands (List[str]): environment variables and commands needed to be run before running the workflow
467-
"""
468-
workflow_name = workflow['name']
469-
basepath = basepath.rstrip("/") if basepath != "/" else basepath
470-
output_dirs = pc.find_output_dirs(workflow)
471-
pc.create_output_dirs(output_dirs, basepath)
472-
compiled_cwl = workflow_name + '.cwl'
473-
inputs_yml = workflow_name + '_inputs.yml'
474-
# write _input.yml file
475-
with open(Path(basepath) / inputs_yml, 'w', encoding='utf-8') as f:
476-
yaml.dump(workflow['yaml_inputs'], f)
477-
workflow.pop('retval', None)
478-
workflow.pop('yaml_inputs', None)
479-
workflow.pop('name', None)
480-
# write compiled .cwl file
481-
with open(Path(basepath) / compiled_cwl, 'w', encoding='utf-8') as f:
482-
yaml.dump(workflow, f)
483-
retval = run_cwl_workflow(workflow_name, basepath,
484-
cwl_runner, container_cmd, False, env_commands=env_commands)
485-
assert retval == 0
486-
487-
488401
def stage_input_files(yml_inputs: Yaml, root_yml_dir_abs: Path,
489402
relative_run_path: bool = True, throw: bool = True) -> None:
490403
"""Copies the input files in yml_inputs to the working directory.

src/sophios/run_local_async.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
from pathlib import Path
2+
import traceback
3+
import os
4+
from typing import Optional, Dict, Any
5+
import asyncio
6+
import aiofiles
7+
import yaml
8+
# we are already using fastapi elsewhere in this project
9+
# so use the run_in_threadpool to run sequential functions
10+
# without blocking the main event loop
11+
from fastapi.concurrency import run_in_threadpool
12+
13+
import sophios.post_compile as pc
14+
from sophios.wic_types import Json
15+
from .run_local import build_cmd, copy_output_files
16+
17+
18+
def create_safe_env(user_env: Dict[str, str]) -> dict:
19+
"""Generate a sanitized environment dict without applying it"""
20+
forbidden = {"PATH", "LD_", "PYTHON", "SECRET_", "BASH_ENV"}
21+
for key in user_env:
22+
if any(key.startswith(prefix) for prefix in forbidden):
23+
raise ValueError(f"Prohibited key: {key}")
24+
return {**os.environ, **user_env}
25+
26+
27+
async def run_cwl_workflow(workflow_name: str, basepath: str,
28+
cwl_runner: str, container_cmd: str,
29+
user_env: Dict[str, str]) -> Optional[int]:
30+
"""Run the CWL workflow in an environment
31+
32+
Args:
33+
workflow_name (str): Name of the .cwl workflow file to be executed
34+
basepath (str): The path at which the workflow to be executed
35+
cwl_runner (str): The CWL runner used to execute the workflow
36+
container_cmd (str): The container engine command
37+
use_subprocess (bool): When using cwltool, determines whether to use subprocess.run(...)
38+
or use the cwltool python api.
39+
env_commands (List[str]): environment variables and commands needed to be run before running the workflow
40+
Returns:
41+
retval: The return value
42+
"""
43+
cmd = await run_in_threadpool(build_cmd, workflow_name, basepath, cwl_runner, container_cmd)
44+
45+
retval = 1 # overwrite on success
46+
print('Running ' + (' '.join(cmd)))
47+
print('via command line')
48+
runner_cmnds = ['cwltool', 'toil-cwl-runner']
49+
try:
50+
if cwl_runner in runner_cmnds:
51+
print(f'Setting env vars : {user_env}')
52+
exec_env = create_safe_env(user_env)
53+
54+
proc = await asyncio.create_subprocess_exec(*cmd,
55+
env=exec_env,
56+
stdout=asyncio.subprocess.PIPE,
57+
stderr=asyncio.subprocess.PIPE)
58+
59+
async def stream_to_file(stream: Any, filename: Path) -> None:
60+
filename.parent.mkdir(parents=True, exist_ok=True)
61+
async with aiofiles.open(filename, mode='wb') as f:
62+
while True:
63+
data = await stream.read(1024) # 1KB chunks
64+
if not data:
65+
break
66+
await f.write(data)
67+
68+
await asyncio.gather(
69+
stream_to_file(proc.stdout, Path(basepath) / 'LOGS' / 'stdout.txt'),
70+
stream_to_file(proc.stderr, Path(basepath) / 'LOGS' / 'stderr.txt')
71+
)
72+
retval = await proc.wait()
73+
else:
74+
raise ValueError(
75+
f'Invalid or Unsupported cwl_runner command! Only these are the supported runners {runner_cmnds}')
76+
77+
except Exception as e:
78+
print('Failed to execute', workflow_name)
79+
print(
80+
f'See error_{workflow_name}.txt for detailed technical information.')
81+
# Do not display a nasty stack trace to the user; hide it in a file.
82+
with open(f'error_{workflow_name}.txt', mode='w', encoding='utf-8') as f:
83+
traceback.print_exception(type(e), value=e, tb=None, file=f)
84+
print(e) # we are always running this on CI
85+
# only copy output files if using cwltool
86+
if cwl_runner == 'cwltool':
87+
await run_in_threadpool(copy_output_files, workflow_name, basepath=basepath)
88+
return retval
89+
90+
91+
async def run_cwl_serialized(workflow: Json, basepath: str,
92+
cwl_runner: str, container_cmd: str,
93+
user_env: Dict[str, str]) -> None:
94+
"""Prepare and run compiled and serialized CWL workflow asynchronously
95+
96+
Args:
97+
workflow_json (Json): Compiled and serialized CWL workflow
98+
basepath (str): The path at which the workflow to be executed
99+
cwl_runner (str): The CWL runner used to execute the workflow
100+
container_cmd (str): The container engine command
101+
env_commands (List[str]): environment variables and commands
102+
needed to be run before running the workflow
103+
"""
104+
workflow_name = workflow['name']
105+
basepath = basepath.rstrip("/") if basepath != "/" else basepath
106+
output_dirs = pc.find_output_dirs(workflow)
107+
pc.create_output_dirs(output_dirs, basepath)
108+
compiled_cwl = workflow_name + '.cwl'
109+
inputs_yml = workflow_name + '_inputs.yml'
110+
# write _input.yml file
111+
await run_in_threadpool(yaml.dump, workflow['yaml_inputs'],
112+
open(Path(basepath) / inputs_yml, 'w', encoding='utf-8'))
113+
114+
# clean up the object of tags and data that we don't need anymore
115+
workflow.pop('retval', None)
116+
workflow.pop('yaml_inputs', None)
117+
workflow.pop('name', None)
118+
119+
# write compiled .cwl file
120+
await run_in_threadpool(yaml.dump, workflow,
121+
open(Path(basepath) / compiled_cwl, 'w', encoding='utf-8'))
122+
123+
retval = await run_cwl_workflow(workflow_name, basepath,
124+
cwl_runner, container_cmd, user_env=user_env)
125+
assert retval == 0

0 commit comments

Comments
 (0)