diff --git a/batchspawner/batchspawner.py b/batchspawner/batchspawner.py index 866bf51..5d86c56 100644 --- a/batchspawner/batchspawner.py +++ b/batchspawner/batchspawner.py @@ -16,11 +16,13 @@ * job names instead of PIDs """ import asyncio +import json import os import pwd import re import xml.etree.ElementTree as ET from enum import Enum +from urllib.parse import urlparse from jinja2 import Template from jupyterhub.spawner import Spawner, set_user_setuid @@ -960,4 +962,50 @@ def state_gethost(self): return +class FluxSpawner(BatchSpawnerBase): + """A Spawner that uses Flux to launch notebooks.""" + + batch_script = Unicode( + """#!/bin/sh + +#flux: --nslots=1 +#flux: --job-name='spawner-jupyterhub' +#flux: --cwd={{homedir}} +#flux: --output='{{homedir}}/{% raw %}.flux-{{id}}-{{name}}.log{% endraw %}' +#flux: --error='{{homedir}}/{% raw %}.flux-{{id}}-{{name}}.error.log{% endraw %}' +{% if runtime %}#flux: --time-limit={{runtime}} +{% endif %}{% if queue %}#flux: --queue={{queue}} +{% endif %}{% if nprocs %}#flux: --cores-per-slot={{nprocs}} +{% endif %}{% if gres %}#flux: --gpus-per-slot={{gres}}{% endif %} + +set -eu + +{{prologue}} +{{cmd}} +{{epilogue}} +""" + ).tag(config=True) + batch_submit_cmd = Unicode("flux batch").tag(config=True) + batch_query_cmd = Unicode("flux jobs --json {job_id}").tag(config=True) + batch_cancel_cmd = Unicode("flux cancel {job_id}").tag(config=True) + + def state_ispending(self): + if not self.job_status: + return False + + status = json.loads(self.job_status) + return status["state"] in ("DEPEND", "PRIORITY", "SCHED") or "uri" not in status + + def state_isrunning(self): + if not self.job_status: + return False + + status = json.loads(self.job_status) + return status["state"] in ("RUN", "CLEANUP") and "uri" in status + + def state_gethost(self): + status = json.loads(self.job_status) + return urlparse(status["uri"]).netloc + + # vim: set ai expandtab softtabstop=4: diff --git a/batchspawner/tests/test_spawners.py b/batchspawner/tests/test_spawners.py index e5d43c0..3899555 100644 --- a/batchspawner/tests/test_spawners.py +++ b/batchspawner/tests/test_spawners.py @@ -625,6 +625,50 @@ async def test_lfs(db, event_loop): ) +async def test_flux(db, event_loop): + spawner_kwargs = { + "req_nprocs": "5", + "req_gres": "5", + "req_queue": "some_queue", + "req_prologue": "PROLOGUE", + "req_epilogue": "EPILOGUE", + } + batch_script_re_list = [ + re.compile( + r"^PROLOGUE.*^batchspawner-singleuser singleuser_command.*^EPILOGUE", + re.S | re.M, + ), + re.compile(r"#flux:\s+--queue=some_queue", re.M), + ] + script = [ + (re.compile(r"sudo.*flux batch"), str(testjob)), + (re.compile(r"sudo.*flux jobs --json"), '{"state": "SCHED"}'), + ( + re.compile(r"sudo.*flux jobs --json"), + f'{{"state": "RUN", "uri": "ssh://{testhost}/foo/bar"}}', + ), + ( + re.compile(r"sudo.*flux jobs --json"), + f'{{"state": "RUN", "uri": "ssh://{testhost}/foo/bar"}}', + ), + ( + re.compile(r"sudo.*flux jobs --json"), + f'{{"state": "RUN", "uri": "ssh://{testhost}/foo/bar"}}', + ), + (re.compile(r"sudo.*flux cancel"), ""), + (re.compile(r"sudo.*flux jobs --json"), '{"state": "INACTIVE"}'), + ] + from .. import FluxSpawner + + await run_spawner_script( + db, + FluxSpawner, + script, + batch_script_re_list=batch_script_re_list, + spawner_kwargs=spawner_kwargs, + ) + + async def test_keepvars(db, event_loop): # req_keepvars spawner_kwargs = {