Skip to content

Commit 4b769ba

Browse files
committed
Batch scheduler executors and the local executor now export PSIJ_NODEFILE,
which contains a list of nodes allocated to the job.
1 parent 9e1a777 commit 4b769ba

File tree

11 files changed

+125
-6
lines changed

11 files changed

+125
-6
lines changed

src/psij/executors/batch/batch_scheduler_executor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ class BatchSchedulerExecutor(JobExecutor):
152152
2. store the exit code of the launch command in the *exit code file* named
153153
`<native_id>.ec`, also inside `<script_dir>`.
154154
155+
Additionally, where appropriate, the submit script should set the environment variable named
156+
``PSIJ_NODEFILE`` to point to a file containing a list of nodes that are allocated for the job,
157+
one per line, with a total number of lines matching the process count of the job.
158+
155159
Once the submit script is generated, the executor renders the submit command using
156160
:func:`~get_submit_command` and executes it. Its output is then parsed using
157161
:func:`~job_id_from_submit_output` to retrieve the `native_id` of the job. Subsequently, the

src/psij/executors/batch/cobalt/cobalt.mustache

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ only results in empty files that are not cleaned up}}
3737
#COBALT -e /dev/null
3838
#COBALT -o /dev/null
3939

40+
{{!like PBS, this is also cheap and there is not need to check setting}}
41+
PSIJ_NODEFILE="$COBALT_NODEFILE"
42+
export PSIJ_NODEFILE
4043

4144
{{!redirect output here instead of through #COBALT directive since COBALT_JOB_ID is not available
4245
when the directives are evaluated; the reason for using the job id in the first place being the

src/psij/executors/batch/lsf/lsf.mustache

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ only results in empty files that are not cleaned up}}
6464
#BSUB -e /dev/null
6565
#BSUB -o /dev/null
6666

67+
PSIJ_NODEFILE="$LSB_HOSTS"
68+
export PSIJ_NODEFILE
69+
6770
{{!redirect output here instead of through #BSUB directive since LSB_JOBID is not available
6871
when the directives are evaluated; the reason for using the job id in the first place being the
6972
same as for the exit code file.}}

src/psij/executors/batch/pbspro/pbspro.mustache

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ only results in empty files that are not cleaned up}}
4040
#PBS -v {{name}}={{value}}
4141
{{/env}}
4242

43+
PSIJ_NODEFILE="$PBS_NODEFILE"
44+
export PSIJ_NODEFILE
45+
4346

4447
{{#job.spec.directory}}
4548
cd "{{.}}"

src/psij/executors/batch/slurm.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,3 +185,7 @@ def _format_duration(self, d: timedelta) -> str:
185185
if d.days > 0:
186186
days = str(d.days) + '-'
187187
return days + "%s:%s:%s" % (d.seconds // 3600, (d.seconds // 60) % 60, d.seconds % 60)
188+
189+
def _clean_submit_script(self, job: Job) -> None:
190+
super()._clean_submit_script(job)
191+
self._delete_aux_file(job, '.nodefile')

src/psij/executors/batch/slurm/slurm.mustache

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ only results in empty files that are not cleaned up}}
7373
#SBATCH -e /dev/null
7474
#SBATCH -o /dev/null
7575

76+
PSIJ_NODEFILE="{{psij.script_dir}}/$SLURM_JOB_ID.nodefile"
77+
scontrol show hostnames >"$PSIJ_NODEFILE"
78+
export PSIJ_NODEFILE
79+
80+
7681
{{#env}}
7782
#SBATCH --export={{name}}={{value}}
7883
{{/env}}

src/psij/executors/local.py

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@
66
import threading
77
import time
88
from abc import ABC, abstractmethod
9+
from tempfile import mkstemp
910
from types import FrameType
1011
from typing import Optional, Dict, List, Tuple, Type, cast
1112

1213
import psutil
1314

14-
from psij import InvalidJobException, SubmitException, Launcher
15+
from psij import InvalidJobException, SubmitException, Launcher, ResourceSpecV1
1516
from psij import Job, JobSpec, JobExecutorConfig, JobState, JobStatus
1617
from psij import JobExecutor
1718
from psij.utils import SingletonThread
@@ -67,6 +68,7 @@ class _ChildProcessEntry(_ProcessEntry):
6768
def __init__(self, job: Job, executor: 'LocalJobExecutor',
6869
launcher: Optional[Launcher]) -> None:
6970
super().__init__(job, executor, launcher)
71+
self.nodefile: Optional[str] = None
7072

7173
def kill(self) -> None:
7274
super().kill()
@@ -75,6 +77,8 @@ def poll(self) -> Tuple[Optional[int], Optional[str]]:
7577
assert self.process is not None
7678
exit_code = self.process.poll()
7779
if exit_code is not None:
80+
if self.nodefile:
81+
os.unlink(self.nodefile)
7882
if self.process.stdout:
7983
return exit_code, self.process.stdout.read().decode('utf-8')
8084
else:
@@ -103,19 +107,30 @@ def poll(self) -> Tuple[Optional[int], Optional[str]]:
103107
return None, None
104108

105109

106-
def _get_env(spec: JobSpec) -> Optional[Dict[str, str]]:
110+
def _get_env(spec: JobSpec, nodefile: Optional[str]) -> Optional[Dict[str, str]]:
111+
env: Optional[Dict[str, str]] = None
107112
if spec.inherit_environment:
108-
if not spec.environment:
113+
if spec.environment is None and nodefile is None:
109114
# if env is none in Popen, it inherits env from parent
110115
return None
111116
else:
112117
# merge current env with spec env
113118
env = os.environ.copy()
114-
env.update(spec.environment)
119+
if spec.environment:
120+
env.update(spec.environment)
121+
if nodefile is not None:
122+
env['PSIJ_NODEFILE'] = nodefile
115123
return env
116124
else:
117125
# only spec env
118-
return spec.environment
126+
if nodefile is None:
127+
env = spec.environment
128+
else:
129+
env = {'PSIJ_NODEFILE': nodefile}
130+
if spec.environment:
131+
env.update(spec.environment)
132+
133+
return env
119134

120135

121136
class _ProcessReaper(SingletonThread):
@@ -222,6 +237,26 @@ def __init__(self, url: Optional[str] = None,
222237
super().__init__(url=url, config=config if config else JobExecutorConfig())
223238
self._reaper = _ProcessReaper.get_instance()
224239

240+
def _generate_nodefile(self, job: Job, p: _ChildProcessEntry) -> Optional[str]:
241+
assert job.spec is not None
242+
if job.spec.resources is None:
243+
return None
244+
if job.spec.resources.version == 1:
245+
assert isinstance(job.spec.resources, ResourceSpecV1)
246+
n = job.spec.resources.computed_process_count
247+
if n == 1:
248+
# as a bit of an optimization, we don't generate a nodefile when doing "single
249+
# node" jobs on local.
250+
return None
251+
(file, p.nodefile) = mkstemp(suffix='.nodelist')
252+
for i in range(n):
253+
os.write(file, 'localhost\n'.encode())
254+
os.close(file)
255+
return p.nodefile
256+
else:
257+
raise SubmitException('Cannot handle resource specification with version %s'
258+
% job.spec.resources.version)
259+
225260
def submit(self, job: Job) -> None:
226261
"""
227262
Submits the specified :class:`~psij.Job` to be run locally.
@@ -245,8 +280,10 @@ def submit(self, job: Job) -> None:
245280
if job.status.state == JobState.CANCELED:
246281
raise SubmitException('Job canceled')
247282
logger.debug('Running %s, out=%s, err=%s', args, spec.stdout_path, spec.stderr_path)
283+
nodefile = self._generate_nodefile(job, p)
284+
env = _get_env(spec, nodefile)
248285
p.process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
249-
close_fds=True, cwd=spec.directory, env=_get_env(spec))
286+
close_fds=True, cwd=spec.directory, env=env)
250287
self._reaper.register(p)
251288
job._native_id = p.process.pid
252289
self._set_job_status(job, JobStatus(JobState.QUEUED, time=time.time(),

tests/plugins1/_batch_test/_batch_test.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ def _get_state(self, state: str) -> JobState:
9191
assert state in _TestJobExecutor._STATE_MAP
9292
return _TestJobExecutor._STATE_MAP[state]
9393

94+
def _clean_submit_script(self, job: Job):
95+
super()._clean_submit_script(job)
96+
self._delete_aux_file(job, '.nodefile')
97+
9498

9599
class _TestLauncher(MultipleLauncher):
96100
def __init__(self, config: Optional[JobExecutorConfig] = None):

tests/plugins1/_batch_test/test/test.mustache

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ export {{key}}="{{value}}"
2222
{{/custom_attributes.test}}
2323
{{/job.spec.attributes}}
2424

25+
HOSTNAME=`hostname`
26+
PSIJ_NODEFILE="{{psij.script_dir}}/$PSIJ_BATCH_TEST_JOB_ID.nodefile"
27+
rm -f "$PSIJ_NODEFILE"
28+
for NODE in $(seq 1 1 "$PSIJ_TEST_BATCH_EXEC_COUNT"); do
29+
echo "$HOSTNAME-$NODE" >> "$PSIJ_NODEFILE"
30+
done
31+
32+
export PSIJ_NODEFILE
2533
{{#job.spec.inherit_environment}}env \{{/job.spec.inherit_environment}}{{^job.spec.inherit_environment}}env --ignore-environment \{{/job.spec.inherit_environment}}{{#env}}
2634
{{name}}="{{value}}" \
2735
{{/env}}{{#psij.launch_command}}{{.}} {{/psij.launch_command}}

tests/test_nodefile.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import os
2+
from pathlib import Path
3+
from tempfile import TemporaryDirectory
4+
5+
import pytest
6+
7+
from _test_tools import assert_completed
8+
from executor_test_params import ExecutorTestParams
9+
from psij import Job, JobSpec, JobExecutor, ResourceSpecV1
10+
11+
NOT_TESTED = set(['rp', 'flux'])
12+
13+
14+
def test_nodefile(execparams: ExecutorTestParams) -> None:
15+
if execparams.executor in NOT_TESTED:
16+
pytest.skip('This test does not work with %s' % execparams.executor)
17+
18+
my_path = os.path.dirname(os.path.realpath(__file__))
19+
20+
N_PROC = 4
21+
with TemporaryDirectory(dir=Path.home() / '.psij' / 'test') as td:
22+
outp = Path(td, 'stdout.txt')
23+
spec = JobSpec('/bin/bash', [os.path.join(my_path, 'test_nodefile.sh'), str(N_PROC)],
24+
stdout_path=outp)
25+
job = Job(spec)
26+
spec.resources = ResourceSpecV1(process_count=N_PROC)
27+
ex = JobExecutor.get_instance(execparams.executor)
28+
ex.submit(job)
29+
status = job.wait()
30+
assert_completed(job, status)

0 commit comments

Comments
 (0)