Skip to content

Commit 22128a0

Browse files
authored
Make parsl.AUTO_LOGNAME generation pluggable (#3388)
A generation function can now be specified at configuration time, with the previous behaviour invoked as default.
1 parent 83ed013 commit 22128a0

File tree

4 files changed

+173
-22
lines changed

4 files changed

+173
-22
lines changed

docs/userguide/plugins.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@ be added in the workflow configuration, in the ``storage`` parameter of the
4949
relevant `ParslExecutor`. Each provider should subclass the `Staging` class.
5050

5151

52+
Default stdout/stderr name generation
53+
-------------------------------------
54+
Parsl can choose names for your bash apps stdout and stderr streams
55+
automatically, with the parsl.AUTO_LOGNAME parameter. The choice of path is
56+
made by a function which can be configured with the ``std_autopath``
57+
parameter of Parsl `Config`. By default, ``DataFlowKernel.default_std_autopath``
58+
will be used.
59+
60+
5261
Memoization/checkpointing
5362
-------------------------
5463

parsl/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ class Config(RepresentationMixin, UsageInformation):
5151
of 1.
5252
run_dir : str, optional
5353
Path to run directory. Default is 'runinfo'.
54+
std_autopath : function, optional
55+
Sets the function used to generate stdout/stderr specifications when parsl.AUTO_LOGPATH is used. If no function
56+
is specified, generates paths that look like: ``rundir/NNN/task_logs/X/task_{id}_{name}{label}.{out/err}``
5457
strategy : str, optional
5558
Strategy to use for scaling blocks according to workflow needs. Can be 'simple', 'htex_auto_scale', 'none'
5659
or `None`.
@@ -90,6 +93,7 @@ def __init__(self,
9093
retries: int = 0,
9194
retry_handler: Optional[Callable[[Exception, TaskRecord], float]] = None,
9295
run_dir: str = 'runinfo',
96+
std_autopath: Optional[Callable] = None,
9397
strategy: Optional[str] = 'simple',
9498
strategy_period: Union[float, int] = 5,
9599
max_idletime: float = 120.0,
@@ -130,6 +134,7 @@ def __init__(self,
130134
self.usage_tracking = usage_tracking
131135
self.initialize_logging = initialize_logging
132136
self.monitoring = monitoring
137+
self.std_autopath: Optional[Callable] = std_autopath
133138

134139
@property
135140
def executors(self) -> Sequence[ParslExecutor]:

parsl/dataflow/dflow.py

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -995,32 +995,16 @@ def submit(self,
995995
executor = random.choice(choices)
996996
logger.debug("Task {} will be sent to executor {}".format(task_id, executor))
997997

998-
# The below uses func.__name__ before it has been wrapped by any staging code.
999-
1000-
label = app_kwargs.get('label')
1001-
for kw in ['stdout', 'stderr']:
1002-
if kw in app_kwargs:
1003-
if app_kwargs[kw] == parsl.AUTO_LOGNAME:
1004-
if kw not in ignore_for_cache:
1005-
ignore_for_cache += [kw]
1006-
app_kwargs[kw] = os.path.join(
1007-
self.run_dir,
1008-
'task_logs',
1009-
str(int(task_id / 10000)).zfill(4), # limit logs to 10k entries per directory
1010-
'task_{}_{}{}.{}'.format(
1011-
str(task_id).zfill(4),
1012-
func.__name__,
1013-
'' if label is None else '_{}'.format(label),
1014-
kw)
1015-
)
1016-
1017998
resource_specification = app_kwargs.get('parsl_resource_specification', {})
1018999

10191000
task_record: TaskRecord
1020-
task_record = {'depends': [],
1001+
task_record = {'args': app_args,
1002+
'depends': [],
10211003
'dfk': self,
10221004
'executor': executor,
1005+
'func': func,
10231006
'func_name': func.__name__,
1007+
'kwargs': app_kwargs,
10241008
'memoize': cache,
10251009
'hashsum': None,
10261010
'exec_fu': None,
@@ -1042,18 +1026,30 @@ def submit(self,
10421026

10431027
self.update_task_state(task_record, States.unsched)
10441028

1029+
for kw in ['stdout', 'stderr']:
1030+
if kw in app_kwargs:
1031+
if app_kwargs[kw] == parsl.AUTO_LOGNAME:
1032+
if kw not in ignore_for_cache:
1033+
ignore_for_cache += [kw]
1034+
if self.config.std_autopath is None:
1035+
app_kwargs[kw] = self.default_std_autopath(task_record, kw)
1036+
else:
1037+
app_kwargs[kw] = self.config.std_autopath(task_record, kw)
1038+
10451039
app_fu = AppFuture(task_record)
1040+
task_record['app_fu'] = app_fu
10461041

10471042
# Transform remote input files to data futures
10481043
app_args, app_kwargs, func = self._add_input_deps(executor, app_args, app_kwargs, func)
10491044

10501045
func = self._add_output_deps(executor, app_args, app_kwargs, app_fu, func)
10511046

1047+
# Replace the function invocation in the TaskRecord with whatever file-staging
1048+
# substitutions have been made.
10521049
task_record.update({
10531050
'args': app_args,
10541051
'func': func,
1055-
'kwargs': app_kwargs,
1056-
'app_fu': app_fu})
1052+
'kwargs': app_kwargs})
10571053

10581054
assert task_id not in self.tasks
10591055

@@ -1441,6 +1437,19 @@ def log_std_stream(name: str, target) -> None:
14411437
log_std_stream("Standard out", task_record['app_fu'].stdout)
14421438
log_std_stream("Standard error", task_record['app_fu'].stderr)
14431439

1440+
def default_std_autopath(self, taskrecord, kw):
1441+
label = taskrecord['kwargs'].get('label')
1442+
task_id = taskrecord['id']
1443+
return os.path.join(
1444+
self.run_dir,
1445+
'task_logs',
1446+
str(int(task_id / 10000)).zfill(4), # limit logs to 10k entries per directory
1447+
'task_{}_{}{}.{}'.format(
1448+
str(task_id).zfill(4),
1449+
taskrecord['func_name'],
1450+
'' if label is None else '_{}'.format(label),
1451+
kw))
1452+
14441453

14451454
class DataFlowKernelLoader:
14461455
"""Manage which DataFlowKernel is active.
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import logging
2+
import parsl
3+
import pytest
4+
import zipfile
5+
6+
from functools import partial
7+
from parsl.app.futures import DataFuture
8+
from parsl.data_provider.files import File
9+
from parsl.executors import ThreadPoolExecutor
10+
11+
12+
@parsl.bash_app
13+
def app_stdout(stdout=parsl.AUTO_LOGNAME):
14+
return "echo hello"
15+
16+
17+
def const_str(cpath, task_record, err_or_out):
18+
return cpath
19+
20+
21+
def const_with_cpath(autopath_specifier, content_path, caplog):
22+
with parsl.load(parsl.Config(std_autopath=partial(const_str, autopath_specifier))):
23+
fut = app_stdout()
24+
25+
# we don't have to wait for a result to check this attributes
26+
assert fut.stdout is autopath_specifier
27+
28+
# there is no DataFuture to wait for in the str case: the model is that
29+
# the stdout will be immediately available on task completion.
30+
fut.result()
31+
32+
with open(content_path, "r") as file:
33+
assert file.readlines() == ["hello\n"]
34+
35+
for record in caplog.records:
36+
assert record.levelno < logging.ERROR
37+
38+
parsl.clear()
39+
40+
41+
@pytest.mark.local
42+
def test_std_autopath_const_str(caplog, tmpd_cwd):
43+
"""Tests str and tuple mode autopaths with constant autopath, which should
44+
all be passed through unmodified.
45+
"""
46+
cpath = str(tmpd_cwd / "CONST")
47+
const_with_cpath(cpath, cpath, caplog)
48+
49+
50+
@pytest.mark.local
51+
def test_std_autopath_const_pathlike(caplog, tmpd_cwd):
52+
cpath = tmpd_cwd / "CONST"
53+
const_with_cpath(cpath, cpath, caplog)
54+
55+
56+
@pytest.mark.local
57+
def test_std_autopath_const_tuples(caplog, tmpd_cwd):
58+
file = tmpd_cwd / "CONST"
59+
cpath = (file, "w")
60+
const_with_cpath(cpath, file, caplog)
61+
62+
63+
class URIFailError(Exception):
64+
pass
65+
66+
67+
def fail_uri(task_record, err_or_out):
68+
raise URIFailError("Deliberate failure in std stream filename generation")
69+
70+
71+
@pytest.mark.local
72+
def test_std_autopath_fail(caplog):
73+
with parsl.load(parsl.Config(std_autopath=fail_uri)):
74+
with pytest.raises(URIFailError):
75+
app_stdout()
76+
77+
parsl.clear()
78+
79+
80+
@parsl.bash_app
81+
def app_both(stdout=parsl.AUTO_LOGNAME, stderr=parsl.AUTO_LOGNAME):
82+
return "echo hello; echo goodbye >&2"
83+
84+
85+
def zip_uri(base, task_record, err_or_out):
86+
"""Should generate Files in base.zip like app_both.0.out or app_both.123.err"""
87+
zip_path = base / "base.zip"
88+
file = f"{task_record['func_name']}.{task_record['id']}.{task_record['try_id']}.{err_or_out}"
89+
return File(f"zip:{zip_path}/{file}")
90+
91+
92+
@pytest.mark.local
93+
def test_std_autopath_zip(caplog, tmpd_cwd):
94+
with parsl.load(parsl.Config(run_dir=str(tmpd_cwd),
95+
executors=[ThreadPoolExecutor(working_dir=str(tmpd_cwd))],
96+
std_autopath=partial(zip_uri, tmpd_cwd))):
97+
futs = []
98+
99+
for _ in range(10):
100+
fut = app_both()
101+
102+
# assertions that should hold after submission
103+
assert isinstance(fut.stdout, DataFuture)
104+
assert fut.stdout.file_obj.url.startswith("zip")
105+
106+
futs.append(fut)
107+
108+
# Barrier for all the stageouts to complete so that we can
109+
# poke at the zip file.
110+
[(fut.stdout.result(), fut.stderr.result()) for fut in futs]
111+
112+
with zipfile.ZipFile(tmpd_cwd / "base.zip") as z:
113+
for fut in futs:
114+
115+
assert fut.done(), "AppFuture should be done if stageout is done"
116+
117+
stdout_relative_path = f"app_both.{fut.tid}.0.stdout"
118+
with z.open(stdout_relative_path) as f:
119+
assert f.readlines() == [b'hello\n']
120+
121+
stderr_relative_path = f"app_both.{fut.tid}.0.stderr"
122+
with z.open(stderr_relative_path) as f:
123+
assert f.readlines()[-1] == b'goodbye\n'
124+
125+
for record in caplog.records:
126+
assert record.levelno < logging.ERROR
127+
128+
parsl.clear()

0 commit comments

Comments
 (0)