Skip to content

Commit 979b544

Browse files
authored
add support for launcher (#33)
* add support for launcher now that we have flux executors within workflow managers, launching from a single job (e.g., in the docker container) is likely not going to work. We introduce here the idea of a launcher, or a known command to run on the server, e.g., "nextflow" that is able to launch flux jobs that are equivalently owned by the instance. Currently the one issue with this approach is the launcher writing to output file instead of stdout/stderr, but I have an issue open to figure out how to handle this. This PR will also fix a current set of bugs with parsing the checkbox (boolean) form fields - the name attribute was in the wrong spot, doh. Signed-off-by: vsoch <[email protected]>
1 parent da6f92c commit 979b544

File tree

17 files changed

+175
-65
lines changed

17 files changed

+175
-65
lines changed

.github/workflows/tests.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,25 @@ on:
44
pull_request: []
55

66
jobs:
7+
prepare-container:
8+
runs-on: ubuntu-latest
9+
outputs:
10+
branch: ${{ steps.extract_branch.outputs.branch }}
11+
steps:
12+
- name: Extract branch name
13+
run: echo "branch=$(echo ${GITHUB_REF#refs/heads/})" >> $GITHUB_OUTPUT
14+
id: extract_branch
15+
716
test:
817
runs-on: ubuntu-latest
18+
needs: [prepare-container]
919
container:
1020
image: ghcr.io/flux-framework/flux-restful-api:latest
1121
ports:
1222
- 5000
23+
env:
24+
INSTALL_BRANCH: ${{ needs.prepare-container.outputs.branch }}
25+
INSTALL_REPO: ${{ github.repository }}
1326
steps:
1427
- uses: actions/checkout@v3
1528
- name: Install Dependencies (in case changes)

app/core/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,9 @@ class Settings(BaseSettings):
4141
flux_token: str = os.environ.get("FLUX_TOKEN")
4242
require_auth: bool = get_bool_envar("FLUX_REQUIRE_AUTH")
4343

44+
# If the user requests a launcher, be strict.
45+
# We only allow nextflow and snakemake, sorry
46+
known_launchers: list = ["nextflow", "snakemake"]
47+
4448

4549
settings = Settings()

app/forms.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ def __init__(self, request: Request):
1717
self.cores_per_task: Optional[int] = None
1818
self.gpus_per_task: Optional[int] = None
1919
self.exclusive: Optional[bool] = False
20+
self.is_launcher: Optional[bool] = False
21+
self.exclusive: Optional[bool] = False
2022

2123
# STOPPED HERE - serialize in jquery from form, submit as application/json.
2224
async def load_data(self):
@@ -28,28 +30,31 @@ async def load_data(self):
2830
self.runtime = form.get("runtime") or 0
2931
self.cores_per_task = form.get("cores_per_task")
3032
self.gpus_per_task = form.get("gpus_per_task")
31-
self.exclusive = form.get("exclusive")
33+
self.exclusive = True if form.get("exclusive") == "on" else False
34+
self.is_launcher = True if form.get("is_launcher") == "on" else False
3235

3336
@property
3437
def kwargs(self):
3538
"""
3639
Prepared key value dictionary of items.
3740
"""
3841
kwargs = {}
39-
for key in [
40-
"command",
41-
"num_tasks",
42-
"num_nodes",
43-
"cores_per_task",
44-
"gpus_per_task",
45-
"exclusive",
46-
]:
42+
as_int = ["num_tasks", "num_nodes", "cores_per_task", "gpus_per_task"]
43+
as_bool = ["exclusive", "is_launcher"]
44+
for key in as_int + as_bool + ["command"]:
4745
if getattr(self, key, None) is not None:
4846
value = getattr(self, key)
4947
# Form could submit an empty value
5048
if value == "":
5149
continue
52-
kwargs[key] = value
50+
51+
# Parse as integer
52+
if key in as_int:
53+
kwargs[key] = int(value)
54+
elif key in as_bool:
55+
kwargs[key] = True
56+
else:
57+
kwargs[key] = value
5358
return kwargs
5459

5560
def is_valid(self):

app/library/flux.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def validate_submit_kwargs(kwargs, envars=None, runtime=None):
2323

2424
# We can't ask for more nodes than available!
2525
num_nodes = kwargs.get("num_nodes")
26-
if num_nodes and num_nodes > settings.flux_nodes:
26+
if num_nodes and int(num_nodes) > settings.flux_nodes:
2727
errors.append(
2828
f"The server only has {settings.flux_nodes} nodes, you requested {num_nodes}"
2929
)

app/library/launcher.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import os
2+
import shlex
3+
import subprocess
4+
5+
from app.core.config import settings
6+
7+
8+
def launch(kwargs, workdir=None, envars=None):
9+
"""
10+
Launch a job with a known launcher
11+
"""
12+
envars = envars or {}
13+
14+
# Generate the flux job
15+
command = kwargs["command"]
16+
if isinstance(command, str):
17+
command = shlex.split(command)
18+
print(f"⭐️ Command being submit: {command}")
19+
20+
# We don't allow commands willy nilly
21+
if command[0] not in settings.known_launchers:
22+
return f"{command[0]} is not a known launcher. "
23+
24+
# Delete command from the kwargs (we added because is required and validated that way)
25+
del kwargs["command"]
26+
27+
# Additional envars in the payload?
28+
environment = dict(os.environ)
29+
environment.update(envars)
30+
31+
print(f"⭐️ Workdir provided: {workdir}")
32+
33+
# Submit using subprocess (we can see output in terminal, if any)
34+
try:
35+
subprocess.Popen(
36+
command, cwd=workdir, env=environment, stdout=None, stderr=None, stdin=None
37+
)
38+
except Exception as e:
39+
return str(e)
40+
return "Job submit, see jobs table for spawned jobs."

app/routers/api.py

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import app.library.flux as flux_cli
1212
import app.library.helpers as helpers
13+
import app.library.launcher as launcher
1314
from app.core.config import settings
1415
from app.library.auth import alert_auth, check_auth
1516

@@ -152,15 +153,17 @@ async def submit_job(request: Request):
152153
kwargs[required] = payload.get(required)
153154

154155
# Optional arguments
155-
for optional in [
156-
"num_tasks",
157-
"cores_per_task",
158-
"gpus_per_task",
159-
"num_nodes",
160-
"exclusive",
161-
]:
156+
as_int = ["num_tasks", "cores_per_task", "gpus_per_task", "num_nodes"]
157+
as_bool = ["exclusive"]
158+
159+
for optional in as_int + as_bool:
162160
if optional in payload and payload[optional]:
163-
kwargs[optional] = payload[optional]
161+
if optional in as_bool:
162+
kwargs[optional] = bool(payload[optional])
163+
elif optional in as_int:
164+
kwargs[optional] = int(payload[optional])
165+
else:
166+
kwargs[optional] = payload[optional]
164167

165168
# One off args not provided to JobspecV1
166169
envars = payload.get("envars", {})
@@ -177,22 +180,28 @@ async def submit_job(request: Request):
177180
status_code=400,
178181
)
179182

180-
# Prepare the flux job!
181-
fluxjob = flux_cli.prepare_job(
182-
kwargs, runtime=runtime, workdir=workdir, envars=envars
183-
)
184-
185-
# Submit the job and return the ID, but allow for error
186-
try:
187-
flux_future = flux.job.submit_async(app.handle, fluxjob)
188-
except Exception as e:
189-
result = jsonable_encoder(
190-
{"Message": "There was an issue submitting that job.", "Error": str(e)}
183+
# Are we using a launcher instead?
184+
is_launcher = payload.get("is_launcher", False)
185+
if is_launcher:
186+
message = launcher.launch(kwargs, workdir=workdir, envars=envars)
187+
result = jsonable_encoder({"Message": message, "id": "MANY"})
188+
else:
189+
# Prepare the flux job!
190+
fluxjob = flux_cli.prepare_job(
191+
kwargs, runtime=runtime, workdir=workdir, envars=envars
191192
)
192-
return JSONResponse(content=result, status_code=400)
193-
194-
jobid = flux_future.get_id()
195-
result = jsonable_encoder({"Message": "Job submit.", "id": jobid})
193+
# Submit the job and return the ID, but allow for error
194+
try:
195+
flux_future = flux.job.submit_async(app.handle, fluxjob)
196+
except Exception as e:
197+
result = jsonable_encoder(
198+
{"Message": "There was an issue submitting that job.", "Error": str(e)}
199+
)
200+
return JSONResponse(content=result, status_code=400)
201+
jobid = flux_future.get_id()
202+
result = jsonable_encoder({"Message": "Job submit.", "id": jobid})
203+
204+
# If we get down here, either launcher derived or submit
196205
return JSONResponse(content=result, status_code=200)
197206

198207

app/routers/views.py

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import app.library.flux as flux_cli
88
import app.library.helpers as helpers
9+
import app.library.launcher as launcher
910
from app.core.config import settings
1011
from app.forms import SubmitForm
1112
from app.library.auth import check_auth
@@ -41,7 +42,6 @@ async def home(request: Request):
4142
@auth_views_router.get("/jobs", response_class=HTMLResponse)
4243
async def jobs_table(request: Request):
4344
jobs = list(flux_cli.list_jobs_detailed().values())
44-
print(jobs)
4545
return templates.TemplateResponse(
4646
"jobs/jobs.html",
4747
{
@@ -71,7 +71,6 @@ async def job_info(request: Request, jobid, msg=None):
7171
# Otherwise ensure we get all the logs!
7272
else:
7373
info = flux_cli.get_job_output(jobid, delay=1)
74-
7574
return templates.TemplateResponse(
7675
"jobs/job.html",
7776
{
@@ -111,36 +110,58 @@ async def submit_job_post(request: Request):
111110
"""
112111
from app.main import app
113112

113+
messages = []
114114
form = SubmitForm(request)
115115
await form.load_data()
116116
if form.is_valid():
117117
print("🍦 Submit form is valid!")
118118
print(form.kwargs)
119119

120-
# Prepare the flux job! We don't support envars here yet
121-
fluxjob = flux_cli.prepare_job(
122-
form.kwargs, runtime=form.runtime, workdir=form.workdir
123-
)
124-
125-
# Submit the job and return the ID, but allow for error
126-
try:
127-
flux_future = flux.job.submit_async(app.handle, fluxjob)
128-
jobid = flux_future.get_id()
129-
intid = int(jobid)
130-
return templates.TemplateResponse(
131-
"jobs/submit.html",
132-
context={
133-
"request": request,
134-
"form": form,
135-
"messages": [
136-
f"Your job was successfully submit! 🦊 <a target='_blank' style='color:magenta' href='/job/{intid}'>{jobid}</a>"
137-
],
138-
},
139-
)
140-
except Exception as e:
141-
form.errors.append("There was an issue submitting that job: %s" % str(e))
120+
if form.kwargs.get("is_launcher") is True:
121+
messages.append(launcher.launch(form.kwargs, workdir=form.workdir))
122+
else:
123+
return submit_job_helper(request, app, form)
142124
else:
143125
print("🍒 Submit form is NOT valid!")
126+
return templates.TemplateResponse(
127+
"jobs/submit.html",
128+
context={
129+
"request": request,
130+
"form": form,
131+
"messages": messages,
132+
"has_gpus": settings.has_gpus,
133+
**form.__dict__,
134+
},
135+
)
136+
137+
138+
def submit_job_helper(request, app, form):
139+
"""
140+
A helper to submit a flux job (not a launcher)
141+
"""
142+
143+
# Prepare the flux job! We don't support envars here yet
144+
fluxjob = flux_cli.prepare_job(
145+
form.kwargs, runtime=form.runtime, workdir=form.workdir
146+
)
147+
148+
# Submit the job and return the ID, but allow for error
149+
try:
150+
flux_future = flux.job.submit_async(app.handle, fluxjob)
151+
jobid = flux_future.get_id()
152+
intid = int(jobid)
153+
message = f"Your job was successfully submit! 🦊 <a target='_blank' style='color:magenta' href='/job/{intid}'>{jobid}</a>"
154+
return templates.TemplateResponse(
155+
"jobs/submit.html",
156+
context={
157+
"request": request,
158+
"form": form,
159+
"messages": [message],
160+
},
161+
)
162+
except Exception as e:
163+
form.errors.append("There was an issue submitting that job: %s" % str(e))
164+
144165
return templates.TemplateResponse(
145166
"jobs/submit.html",
146167
context={

clients/python/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ and **Merged pull requests**. Critical items to know are:
1414
The versions coincide with releases on pip. Only major versions will be released as tags on Github.
1515

1616
## [0.0.x](https://github.com/flux-framework/flux-restful-api/tree/main) (0.0.x)
17+
- support for `is_launcher` parameter to indicate a launcher should be used instead (0.0.14)
1718
- support for streaming job output (0.0.13)
1819
- ensure logs end with one newline! (0.0.12)
1920
- support for job info and logs (0.0.11)

clients/python/flux_restful_client/main/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ def submit(self, command, **kwargs):
214214
gpus_per_task (int): Number of gpus per task (defaults to None)
215215
num_nodes (int): Number of nodes (defaults to None)
216216
exclusive (bool): is the job exclusive? (defaults to False)
217+
is_launcher (bool): the command should be submit to a launcher.
218+
This is currently supported for snakemake and nextflow.
217219
"""
218220
# Allow the user to provide a list (and stringify everything)
219221
if isinstance(command, list):
@@ -225,6 +227,7 @@ def submit(self, command, **kwargs):
225227
"gpus_per_task",
226228
"num_nodes",
227229
"exclusive",
230+
"is_launcher",
228231
"workdir",
229232
"envars",
230233
]:

clients/python/flux_restful_client/main/schemas.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@
4646
"type": ["boolean", "null"],
4747
"description": "ask for exclusive nodes for the job.",
4848
},
49+
"is_launcher": {
50+
"type": ["boolean", "null"],
51+
"description": "indicate the command is for a launcher (e.g., nextflow, snakemake)",
52+
},
4953
}
5054

5155
job_submit_schema = {

0 commit comments

Comments
 (0)