Skip to content

Commit cef87be

Browse files
authored
PBSProvider to report stdout/err paths when jobs fails (#3094)
* Updated PBSProvider to report job stdout/err via JobStatus when job reaches terminal state * Adding a simple test to confirm that stdout/err paths are included in internal table
1 parent ab06b95 commit cef87be

File tree

3 files changed

+48
-6
lines changed

3 files changed

+48
-6
lines changed

parsl/providers/pbspro/pbspro.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,17 @@ def _status(self):
119119

120120
job_state = job.get('job_state', JobState.UNKNOWN)
121121
state = translate_table.get(job_state, JobState.UNKNOWN)
122-
self.resources[job_id]['status'] = JobStatus(state)
122+
self.resources[job_id]['status'] = JobStatus(state,
123+
stdout_path=self.resources[job_id]['job_stdout_path'],
124+
stderr_path=self.resources[job_id]['job_stderr_path'])
123125
jobs_missing.remove(job_id)
124126

125127
# squeue does not report on jobs that are not running. So we are filling in the
126128
# blanks for missing jobs, we might lose some information about why the jobs failed.
127129
for missing_job in jobs_missing:
128-
self.resources[missing_job]['status'] = JobStatus(JobState.COMPLETED)
130+
self.resources[missing_job]['status'] = JobStatus(JobState.COMPLETED,
131+
stdout_path=self.resources[missing_job]['job_stdout_path'],
132+
stderr_path=self.resources[missing_job]['job_stderr_path'])
129133

130134
def submit(self, command, tasks_per_node, job_name="parsl"):
131135
"""Submits the command job.
@@ -149,7 +153,11 @@ def submit(self, command, tasks_per_node, job_name="parsl"):
149153

150154
job_name = "{0}.{1}".format(job_name, time.time())
151155

152-
script_path = os.path.abspath("{0}/{1}.submit".format(self.script_dir, job_name))
156+
assert self.script_dir, "Expected script_dir to be set"
157+
script_path = os.path.join(self.script_dir, job_name)
158+
script_path = os.path.abspath(script_path)
159+
job_stdout_path = script_path + ".stdout"
160+
job_stderr_path = script_path + ".stderr"
153161

154162
logger.debug("Requesting {} nodes_per_block, {} tasks_per_node".format(
155163
self.nodes_per_block, tasks_per_node)
@@ -163,6 +171,8 @@ def submit(self, command, tasks_per_node, job_name="parsl"):
163171
job_config["scheduler_options"] = self.scheduler_options
164172
job_config["worker_init"] = self.worker_init
165173
job_config["user_script"] = command
174+
job_config["job_stdout_path"] = job_stdout_path
175+
job_config["job_stderr_path"] = job_stderr_path
166176

167177
# Add a colon to select_options if one isn't included
168178
if self.select_options and not self.select_options.startswith(":"):
@@ -194,7 +204,11 @@ def submit(self, command, tasks_per_node, job_name="parsl"):
194204
for line in stdout.split('\n'):
195205
if line.strip():
196206
job_id = line.strip()
197-
self.resources[job_id] = {'job_id': job_id, 'status': JobStatus(JobState.PENDING)}
207+
self.resources[job_id] = {'job_id': job_id,
208+
'status': JobStatus(JobState.PENDING),
209+
'job_stdout_path': job_stdout_path,
210+
'job_stderr_path': job_stderr_path,
211+
}
198212
else:
199213
message = "Command '{}' failed with return code {}".format(launch_cmd, retcode)
200214
if (stdout is not None) and (stderr is not None):

parsl/providers/pbspro/template.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
#PBS -m n
66
#PBS -l walltime=$walltime
77
#PBS -l select=${nodes_per_block}:ncpus=${ncpus}${select_options}
8-
#PBS -o ${submit_script_dir}/${jobname}.submit.stdout
9-
#PBS -e ${submit_script_dir}/${jobname}.submit.stderr
8+
#PBS -o ${job_stdout_path}
9+
#PBS -e ${job_stderr_path}
1010
${scheduler_options}
1111
1212
${worker_init}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import random
2+
3+
from unittest import mock
4+
import pytest
5+
6+
from parsl.channels import LocalChannel
7+
from parsl.providers import PBSProProvider
8+
9+
10+
@pytest.mark.local
11+
def test_submit_script_basic(tmp_path):
12+
"""Test slurm resources table"""
13+
14+
provider = PBSProProvider(
15+
queue="debug", channel=LocalChannel(script_dir=tmp_path)
16+
)
17+
provider.script_dir = tmp_path
18+
job_id = str(random.randint(55000, 59000))
19+
provider.execute_wait = mock.Mock(spec=PBSProProvider.execute_wait)
20+
provider.execute_wait.return_value = (0, job_id, "")
21+
result_job_id = provider.submit("test", tasks_per_node=1)
22+
assert job_id == result_job_id
23+
provider.execute_wait.assert_called()
24+
assert job_id in provider.resources
25+
26+
job_info = provider.resources[job_id]
27+
assert "job_stdout_path" in job_info
28+
assert "job_stderr_path" in job_info

0 commit comments

Comments
 (0)