Skip to content

Commit 2f95453

Browse files
authored
Merge pull request #84 from rkdarst/dev
Integration of run_command, exec_prefix, slurm script, and jinja2
2 parents 872e568 + c7b093c commit 2f95453

File tree

4 files changed

+430
-73
lines changed

4 files changed

+430
-73
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ Common attributes of batch submission / resource manager environments will inclu
3737
* job names instead of PIDs
3838

3939
`BatchSpawnerBase` provides several general mechanisms:
40-
* configurable traits `req_foo` that are exposed as `{foo}` in job template scripts
40+
* configurable traits `req_foo` that are exposed as `{foo}` in job template scripts. Templates (submit scripts in particular) may also use the full power of [jinja2](http://jinja.pocoo.org/). Templates are automatically detected if a `{{` or `{%` is present, otherwise str.format() used.
4141
* configurable command templates for submitting/querying/cancelling jobs
4242
* a generic concept of job-ID and ID-based job state tracking
4343
* overrideable hooks for subclasses to plug in logic at numerous points

batchspawner/batchspawner.py

Lines changed: 112 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import xml.etree.ElementTree as ET
2222

23+
from jinja2 import Template
24+
2325
from tornado import gen
2426
from tornado.process import Subprocess
2527
from subprocess import CalledProcessError
@@ -34,34 +36,20 @@
3436
from jupyterhub.spawner import set_user_setuid
3537
import jupyterhub
3638

37-
@gen.coroutine
38-
def run_command(cmd, input=None, env=None):
39-
proc = Subprocess(cmd, shell=True, env=env, stdin=Subprocess.STREAM, stdout=Subprocess.STREAM,stderr=Subprocess.STREAM)
40-
inbytes = None
41-
if input:
42-
inbytes = input.encode()
43-
try:
44-
yield proc.stdin.write(inbytes)
45-
except StreamClosedError as exp:
46-
# Apparently harmless
47-
pass
48-
proc.stdin.close()
49-
out = yield proc.stdout.read_until_close()
50-
eout = yield proc.stderr.read_until_close()
51-
proc.stdout.close()
52-
proc.stderr.close()
53-
eout = eout.decode().strip()
54-
try:
55-
err = yield proc.wait_for_exit()
56-
except CalledProcessError:
57-
#self.log.error("Subprocess returned exitcode %s" % proc.returncode)
58-
#self.log.error(eout)
59-
raise RuntimeError(eout)
60-
if err != 0:
61-
return err # exit error?
62-
else:
63-
out = out.decode().strip()
64-
return out
39+
40+
def format_template(template, *args, **kwargs):
41+
"""Format a template, either using jinja2 or str.format().
42+
43+
Use jinja2 if the template is a jinja2.Template, or contains '{{' or
44+
'{%'. Otherwise, use str.format() for backwards compatability with
45+
old scripts (but you can't mix them).
46+
"""
47+
if isinstance(template, Template):
48+
return template.render(*args, **kwargs)
49+
elif '{{' in template or '{%' in template:
50+
return Template(template).render(*args, **kwargs)
51+
return template.format(*args, **kwargs)
52+
6553

6654
class BatchSpawnerBase(Spawner):
6755
"""Base class for spawners using resource manager batch job submission mechanisms
@@ -90,6 +78,10 @@ class BatchSpawnerBase(Spawner):
9078
# override default server ip since batch jobs normally running remotely
9179
ip = Unicode("0.0.0.0", help="Address for singleuser server to listen at").tag(config=True)
9280

81+
exec_prefix = Unicode('sudo -E -u {username}', \
82+
help="Standard executon prefix (e.g. the default sudo -E -u {username})"
83+
).tag(config=True)
84+
9385
# all these req_foo traits will be available as substvars for templated strings
9486
req_queue = Unicode('', \
9587
help="Queue name to submit job to resource manager"
@@ -127,6 +119,14 @@ class BatchSpawnerBase(Spawner):
127119
help="Other options to include into job submission script"
128120
).tag(config=True)
129121

122+
req_prologue = Unicode('', \
123+
help="Script to run before single user server starts."
124+
).tag(config=True)
125+
126+
req_epilogue = Unicode('', \
127+
help="Script to run after single user server ends."
128+
).tag(config=True)
129+
130130
req_username = Unicode()
131131
@default('req_username')
132132
def _req_username_default(self):
@@ -177,17 +177,53 @@ def parse_job_id(self, output):
177177
def cmd_formatted_for_batch(self):
178178
return ' '.join(self.cmd + self.get_args())
179179

180+
@gen.coroutine
181+
def run_command(self, cmd, input=None, env=None):
182+
proc = Subprocess(cmd, shell=True, env=env, stdin=Subprocess.STREAM, stdout=Subprocess.STREAM,stderr=Subprocess.STREAM)
183+
inbytes = None
184+
if input:
185+
inbytes = input.encode()
186+
try:
187+
yield proc.stdin.write(inbytes)
188+
except StreamClosedError as exp:
189+
# Apparently harmless
190+
pass
191+
proc.stdin.close()
192+
out = yield proc.stdout.read_until_close()
193+
eout = yield proc.stderr.read_until_close()
194+
proc.stdout.close()
195+
proc.stderr.close()
196+
eout = eout.decode().strip()
197+
try:
198+
err = yield proc.wait_for_exit()
199+
except CalledProcessError:
200+
self.log.error("Subprocess returned exitcode %s" % proc.returncode)
201+
self.log.error(eout)
202+
raise RuntimeError(eout)
203+
if err != 0:
204+
return err # exit error?
205+
else:
206+
out = out.decode().strip()
207+
return out
208+
209+
@gen.coroutine
210+
def _get_batch_script(self, **subvars):
211+
"""Format batch script from vars"""
212+
# Colud be overridden by subclasses, but mainly useful for testing
213+
return format_template(self.batch_script, **subvars)
214+
180215
@gen.coroutine
181216
def submit_batch_script(self):
182217
subvars = self.get_req_subvars()
183-
cmd = self.batch_submit_cmd.format(**subvars)
218+
cmd = self.exec_prefix + ' ' + self.batch_submit_cmd
219+
cmd = format_template(cmd, **subvars)
184220
subvars['cmd'] = self.cmd_formatted_for_batch()
185221
if hasattr(self, 'user_options'):
186222
subvars.update(self.user_options)
187-
script = self.batch_script.format(**subvars)
223+
script = yield self._get_batch_script(**subvars)
188224
self.log.info('Spawner submitting job using ' + cmd)
189225
self.log.info('Spawner submitted script:\n' + script)
190-
out = yield run_command(cmd, input=script, env=self.get_env())
226+
out = yield self.run_command(cmd, input=script, env=self.get_env())
191227
try:
192228
self.log.info('Job submitted. cmd: ' + cmd + ' output: ' + out)
193229
self.job_id = self.parse_job_id(out)
@@ -210,10 +246,11 @@ def read_job_state(self):
210246
return self.job_status
211247
subvars = self.get_req_subvars()
212248
subvars['job_id'] = self.job_id
213-
cmd = self.batch_query_cmd.format(**subvars)
249+
cmd = self.exec_prefix + ' ' + self.batch_query_cmd
250+
cmd = format_template(cmd, **subvars)
214251
self.log.debug('Spawner querying job: ' + cmd)
215252
try:
216-
out = yield run_command(cmd)
253+
out = yield self.run_command(cmd)
217254
self.job_status = out
218255
except Exception as e:
219256
self.log.error('Error querying job ' + self.job_id)
@@ -229,9 +266,10 @@ def read_job_state(self):
229266
def cancel_batch_job(self):
230267
subvars = self.get_req_subvars()
231268
subvars['job_id'] = self.job_id
232-
cmd = self.batch_cancel_cmd.format(**subvars)
269+
cmd = self.exec_prefix + ' ' + self.batch_cancel_cmd
270+
cmd = format_template(cmd, **subvars)
233271
self.log.info('Cancelling job ' + self.job_id + ': ' + cmd)
234-
yield run_command(cmd)
272+
yield self.run_command(cmd)
235273

236274
def load_state(self, state):
237275
"""load job_id from state"""
@@ -423,21 +461,21 @@ class TorqueSpawner(BatchSpawnerRegexStates):
423461
""").tag(config=True)
424462

425463
# outputs job id string
426-
batch_submit_cmd = Unicode('sudo -E -u {username} qsub').tag(config=True)
464+
batch_submit_cmd = Unicode('qsub').tag(config=True)
427465
# outputs job data XML string
428-
batch_query_cmd = Unicode('sudo -E -u {username} qstat -x {job_id}').tag(config=True)
429-
batch_cancel_cmd = Unicode('sudo -E -u {username} qdel {job_id}').tag(config=True)
466+
batch_query_cmd = Unicode('qstat -x {job_id}').tag(config=True)
467+
batch_cancel_cmd = Unicode('qdel {job_id}').tag(config=True)
430468
# search XML string for job_state - [QH] = pending, R = running, [CE] = done
431469
state_pending_re = Unicode(r'<job_state>[QH]</job_state>').tag(config=True)
432470
state_running_re = Unicode(r'<job_state>R</job_state>').tag(config=True)
433471
state_exechost_re = Unicode(r'<exec_host>((?:[\w_-]+\.?)+)/\d+').tag(config=True)
434472

435473
class MoabSpawner(TorqueSpawner):
436474
# outputs job id string
437-
batch_submit_cmd = Unicode('sudo -E -u {username} msub').tag(config=True)
475+
batch_submit_cmd = Unicode('msub').tag(config=True)
438476
# outputs job data XML string
439-
batch_query_cmd = Unicode('sudo -E -u {username} mdiag -j {job_id} --xml').tag(config=True)
440-
batch_cancel_cmd = Unicode('sudo -E -u {username} mjobctl -c {job_id}').tag(config=True)
477+
batch_query_cmd = Unicode('mdiag -j {job_id} --xml').tag(config=True)
478+
batch_cancel_cmd = Unicode('mjobctl -c {job_id}').tag(config=True)
441479
state_pending_re = Unicode(r'State="Idle"').tag(config=True)
442480
state_running_re = Unicode(r'State="Running"').tag(config=True)
443481
state_exechost_re = Unicode(r'AllocNodeList="([^\r\n\t\f :"]*)').tag(config=True)
@@ -476,24 +514,29 @@ class SlurmSpawner(UserEnvMixin,BatchSpawnerRegexStates):
476514
).tag(config=True)
477515

478516
batch_script = Unicode("""#!/bin/bash
479-
#SBATCH --partition={partition}
480-
#SBATCH --time={runtime}
481-
#SBATCH --output={homedir}/jupyterhub_slurmspawner_%j.log
517+
#SBATCH --output={{homedir}}/jupyterhub_slurmspawner_%j.log
482518
#SBATCH --job-name=spawner-jupyterhub
483-
#SBATCH --workdir={homedir}
484-
#SBATCH --mem={memory}
485-
#SBATCH --export={keepvars}
519+
#SBATCH --workdir={{homedir}}
520+
#SBATCH --export={{keepvars}}
486521
#SBATCH --get-user-env=L
487-
#SBATCH {options}
488-
522+
{% if partition %}#SBATCH --partition={{partition}}
523+
{% endif %}{% if runtime %}#SBATCH --time={{runtime}}
524+
{% endif %}{% if memory %}#SBATCH --mem={{memory}}
525+
{% endif %}{% if nprocs %}#SBATCH --cpus-per-task={{nprocs}}
526+
{% endif %}{% if options %}#SBATCH {{options}}{% endif %}
527+
528+
trap 'echo SIGTERM received' TERM
529+
{{prologue}}
489530
which jupyterhub-singleuser
490-
{cmd}
531+
srun {{cmd}}
532+
echo "jupyterhub-singleuser ended gracefully"
533+
{{epilogue}}
491534
""").tag(config=True)
492535
# outputs line like "Submitted batch job 209"
493-
batch_submit_cmd = Unicode('sudo -E -u {username} sbatch --parsable').tag(config=True)
536+
batch_submit_cmd = Unicode('sbatch --parsable').tag(config=True)
494537
# outputs status and exec node like "RUNNING hostname"
495-
batch_query_cmd = Unicode("sudo -E -u {username} squeue -h -j {job_id} -o '%T %B'").tag(config=True) #
496-
batch_cancel_cmd = Unicode('sudo -E -u {username} scancel {job_id}').tag(config=True)
538+
batch_query_cmd = Unicode("squeue -h -j {job_id} -o '%T %B'").tag(config=True) #
539+
batch_cancel_cmd = Unicode('scancel {job_id}').tag(config=True)
497540
# use long-form states: PENDING, CONFIGURING = pending
498541
# RUNNING, COMPLETING = running
499542
state_pending_re = Unicode(r'^(?:PENDING|CONFIGURING)').tag(config=True)
@@ -535,10 +578,10 @@ class GridengineSpawner(BatchSpawnerBase):
535578
""").tag(config=True)
536579

537580
# outputs job id string
538-
batch_submit_cmd = Unicode('sudo -E -u {username} qsub').tag(config=True)
581+
batch_submit_cmd = Unicode('qsub').tag(config=True)
539582
# outputs job data XML string
540-
batch_query_cmd = Unicode('sudo -E -u {username} qstat -xml').tag(config=True)
541-
batch_cancel_cmd = Unicode('sudo -E -u {username} qdel {job_id}').tag(config=True)
583+
batch_query_cmd = Unicode('qstat -xml').tag(config=True)
584+
batch_cancel_cmd = Unicode('qdel {job_id}').tag(config=True)
542585

543586
def parse_job_id(self, output):
544587
return output.split(' ')[2]
@@ -585,10 +628,10 @@ class CondorSpawner(UserEnvMixin,BatchSpawnerRegexStates):
585628
""").tag(config=True)
586629

587630
# outputs job id string
588-
batch_submit_cmd = Unicode('sudo -E -u {username} condor_submit').tag(config=True)
631+
batch_submit_cmd = Unicode('condor_submit').tag(config=True)
589632
# outputs job data XML string
590633
batch_query_cmd = Unicode('condor_q {job_id} -format "%s, " JobStatus -format "%s" RemoteHost -format "\n" True').tag(config=True)
591-
batch_cancel_cmd = Unicode('sudo -E -u {username} condor_rm {job_id}').tag(config=True)
634+
batch_cancel_cmd = Unicode('condor_rm {job_id}').tag(config=True)
592635
# job status: 1 = pending, 2 = running
593636
state_pending_re = Unicode(r'^1,').tag(config=True)
594637
state_running_re = Unicode(r'^2,').tag(config=True)
@@ -610,20 +653,20 @@ class LsfSpawner(BatchSpawnerBase):
610653
'''A Spawner that uses IBM's Platform Load Sharing Facility (LSF) to launch notebooks.'''
611654

612655
batch_script = Unicode('''#!/bin/sh
613-
#BSUB -R "select[type==any]" # Allow spawning on non-uniform hardware
614-
#BSUB -R "span[hosts=1]" # Only spawn job on one server
615-
#BSUB -q {queue}
616-
#BSUB -J spawner-jupyterhub
617-
#BSUB -o {homedir}/.jupyterhub.lsf.out
618-
#BSUB -e {homedir}/.jupyterhub.lsf.err
656+
#BSUB -R "select[type==any]" # Allow spawning on non-uniform hardware
657+
#BSUB -R "span[hosts=1]" # Only spawn job on one server
658+
#BSUB -q {queue}
659+
#BSUB -J spawner-jupyterhub
660+
#BSUB -o {homedir}/.jupyterhub.lsf.out
661+
#BSUB -e {homedir}/.jupyterhub.lsf.err
619662
620-
{cmd}
621-
''').tag(config=True)
663+
{cmd}
664+
''').tag(config=True)
622665

623666

624-
batch_submit_cmd = Unicode('sudo -E -u {username} bsub').tag(config=True)
625-
batch_query_cmd = Unicode('sudo -E -u {username} bjobs -a -noheader -o "STAT EXEC_HOST" {job_id}').tag(config=True)
626-
batch_cancel_cmd = Unicode('sudo -E -u {username} bkill {job_id}').tag(config=True)
667+
batch_submit_cmd = Unicode('bsub').tag(config=True)
668+
batch_query_cmd = Unicode('bjobs -a -noheader -o "STAT EXEC_HOST" {job_id}').tag(config=True)
669+
batch_cancel_cmd = Unicode('bkill {job_id}').tag(config=True)
627670

628671
def get_env(self):
629672
env = super().get_env()

0 commit comments

Comments
 (0)