|
2 | 2 | import os
|
3 | 3 | import re
|
4 | 4 | import shlex
|
| 5 | +import subprocess |
5 | 6 | import time
|
6 | 7 |
|
7 | 8 | import flux
|
8 | 9 | import flux.job
|
9 | 10 |
|
10 | 11 | from app.core.config import settings
|
11 | 12 |
|
| 13 | +root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| 14 | +submit_script = os.path.join(root, "scripts", "submit-job.py") |
12 | 15 |
|
13 |
| -def submit_job(handle, jobspec, user): |
| 16 | + |
| 17 | +class FakeJob: |
| 18 | + def __init__(self, jobid): |
| 19 | + self.jobid = jobid |
| 20 | + |
| 21 | + def get_id(self): |
| 22 | + return self.jobid |
| 23 | + |
| 24 | + |
| 25 | +def submit_job(handle, fluxjob, user): |
14 | 26 | """
|
15 |
| - Handle to submit a job, either with flux job submit or on behalf of user. |
| 27 | + Submit the job on behalf of user. |
16 | 28 | """
|
17 | 29 | if user and hasattr(user, "user_name"):
|
18 | 30 | print(f"User submitting job {user.user_name}")
|
| 31 | + user = user.user_name |
19 | 32 | elif user and isinstance(user, str):
|
20 | 33 | print(f"User submitting job {user}")
|
21 |
| - return flux.job.submit_async(handle, jobspec) |
| 34 | + |
| 35 | + # If we don't have auth enabled, submit in single-user mode |
| 36 | + if not settings.require_auth: |
| 37 | + print("Submit in single-user mode.") |
| 38 | + return flux.job.submit_async(handle, fluxjob) |
| 39 | + |
| 40 | + # Update the payload for the correct user |
| 41 | + # Use helper script to sign payload |
| 42 | + payload = json.dumps(fluxjob.jobspec) |
| 43 | + # payload['HOME'] = |
| 44 | + |
| 45 | + # We ideally need to pipe the payload into flux python |
| 46 | + try: |
| 47 | + ps = subprocess.Popen(("echo", payload), stdout=subprocess.PIPE) |
| 48 | + output = subprocess.check_output( |
| 49 | + ("sudo", "-E", "-u", user, "flux", "python", submit_script), |
| 50 | + stdin=ps.stdout, |
| 51 | + env=os.environ, |
| 52 | + ) |
| 53 | + ps.wait() |
| 54 | + |
| 55 | + # A flux start without sudo -u flux can cause this |
| 56 | + # This will be caught and returned to the user |
| 57 | + except PermissionError as e: |
| 58 | + raise ValueError( |
| 59 | + f"Permission error: {e}! Are you running the instance as the flux user?" |
| 60 | + ) |
| 61 | + |
| 62 | + jobid = output.decode("utf-8").strip() |
| 63 | + print("Submit job {jobid}") |
| 64 | + job = FakeJob(jobid) |
| 65 | + return job |
22 | 66 |
|
23 | 67 |
|
24 | 68 | def validate_submit_kwargs(kwargs, envars=None, runtime=None):
|
@@ -98,8 +142,9 @@ def prepare_job(user, kwargs, runtime=0, workdir=None, envars=None):
|
98 | 142 | # Set an attribute about the owning user
|
99 | 143 | if user and hasattr(user, "user_name"):
|
100 | 144 | fluxjob.setattr("user", user.user_name)
|
101 |
| - elif isinstance(user, str): |
102 |
| - fluxjob.setattr("user", user) |
| 145 | + user = user.user_name |
| 146 | + |
| 147 | + fluxjob.setattr("user", user) |
103 | 148 |
|
104 | 149 | # Set a provided working directory
|
105 | 150 | print(f"⭐️ Workdir provided: {workdir}")
|
|
0 commit comments