1
1
import json
2
2
import os
3
+ import pwd
3
4
import re
4
5
import shlex
5
6
import time
6
7
7
8
import flux
8
9
import flux .job
9
10
11
+ import app .library .terminal as terminal
10
12
from app .core .config import settings
11
13
14
+ # Faux user environment (filtered set of application environment)
15
+ # We could likely find a way to better do this, but likely the users won't have customized environments
16
+ user_env = {
17
+ "SHELL" : "/bin/bash" ,
18
+ "PATH" : "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin" ,
19
+ "XDG_RUNTIME_DIR" : "/tmp/user/0" ,
20
+ "DISPLAY" : ":0" ,
21
+ "COLORTERM" : "truecolor" ,
22
+ "SHLVL" : "2" ,
23
+ "DEBIAN_FRONTEND" : "noninteractive" ,
24
+ "MAKE_TERMERR" : "/dev/pts/1" ,
25
+ "LANG" : "C.UTF-8" ,
26
+ "TERM" : "xterm-256color" ,
27
+ }
28
+
29
+
30
+ def submit_job (handle , jobspec , user ):
31
+ """
32
+ Handle to submit a job, either with flux job submit or on behalf of user.
33
+ """
34
+ # We've enabled PAM auth
35
+ if settings .enable_pam :
36
+ return terminal .submit_job (jobspec , user )
37
+ return flux .job .submit_async (handle , jobspec )
38
+
12
39
13
40
def validate_submit_kwargs (kwargs , envars = None , runtime = None ):
14
41
"""
@@ -68,6 +95,7 @@ def prepare_job(kwargs, runtime=0, workdir=None, envars=None):
68
95
command = kwargs ["command" ]
69
96
if isinstance (command , str ):
70
97
command = shlex .split (command )
98
+
71
99
print (f"⭐️ Command being submit: { command } " )
72
100
73
101
# Delete command from the kwargs (we added because is required and validated that way)
@@ -90,8 +118,14 @@ def prepare_job(kwargs, runtime=0, workdir=None, envars=None):
90
118
# A duration of zero (the default) means unlimited
91
119
fluxjob .duration = runtime
92
120
121
+ # If we are running as the user, we don't want the current (root) environment
122
+ # This isn't perfect because it's artifically created, but it ensures we have paths
123
+ if settings .enable_pam :
124
+ environment = user_env
125
+ else :
126
+ environment = dict (os .environ )
127
+
93
128
# Additional envars in the payload?
94
- environment = dict (os .environ )
95
129
environment .update (envars )
96
130
fluxjob .environment = environment
97
131
return fluxjob
@@ -131,12 +165,15 @@ def stream_job_output(jobid):
131
165
pass
132
166
133
167
134
- def cancel_job (jobid ):
168
+ def cancel_job (jobid , user ):
135
169
"""
136
170
Request a job to be cancelled by id.
137
171
138
172
Returns a message to the user and a return code.
139
173
"""
174
+ if settings .enable_pam :
175
+ return terminal .cancel_job (jobid , user )
176
+
140
177
from app .main import app
141
178
142
179
try :
@@ -147,12 +184,16 @@ def cancel_job(jobid):
147
184
return "Job is requested to cancel." , 200
148
185
149
186
150
- def get_job_output (jobid , delay = None ):
187
+ def get_job_output (jobid , user = None , delay = None ):
151
188
"""
152
189
Given a jobid, get the output.
153
190
154
191
If there is a delay, we are requesting on demand, so we want to return early.
155
192
"""
193
+ # We've enabled PAM auth
194
+ if settings .enable_pam :
195
+ return terminal .get_job_output (jobid , user , delay = delay )
196
+
156
197
lines = []
157
198
start = time .time ()
158
199
from app .main import app
@@ -171,38 +212,48 @@ def get_job_output(jobid, delay=None):
171
212
return lines
172
213
173
214
174
- def list_jobs_detailed (limit = None , query = None ):
215
+ def list_jobs_detailed (user = None , limit = None , query = None ):
175
216
"""
176
217
Get a detailed listing of jobs.
177
218
"""
178
- listing = list_jobs ()
219
+ listing = list_jobs (user = user )
179
220
ids = listing .get ()["jobs" ]
180
221
jobs = {}
181
222
for job in ids :
182
-
183
223
# Stop if a limit is defined and we have hit it!
184
224
if limit is not None and len (jobs ) >= limit :
185
225
break
186
226
187
227
try :
188
- jobinfo = get_job (job ["id" ])
228
+ jobinfo = get_job (job ["id" ], user = user )
189
229
190
230
# Best effort hack to do a query
191
231
if query and not query_job (jobinfo , query ):
192
232
continue
233
+
234
+ # This will trigger a data table warning
235
+ for needed in ["ranks" , "expiration" ]:
236
+ if needed not in jobinfo :
237
+ jobinfo [needed ] = ""
238
+
193
239
jobs [job ["id" ]] = jobinfo
240
+
194
241
except Exception :
195
242
pass
196
243
return jobs
197
244
198
245
199
- def list_jobs ():
246
+ def list_jobs (user = None ):
200
247
"""
201
248
Get a simple listing of jobs (just the ids)
202
249
"""
203
250
from app .main import app
204
251
205
- return flux .job .job_list (app .handle )
252
+ if user is None or not settings .enable_pam :
253
+ return flux .job .job_list (app .handle )
254
+ pw_record = pwd .getpwnam (user )
255
+ user_uid = pw_record .pw_uid
256
+ return flux .job .job_list (app .handle , userid = user_uid )
206
257
207
258
208
259
def get_simple_job (jobid ):
@@ -215,13 +266,17 @@ def get_simple_job(jobid):
215
266
return json .loads (info .get_str ())["job" ]
216
267
217
268
218
- def get_job (jobid ):
269
+ def get_job (jobid , user ):
219
270
"""
220
271
Get details for a job
221
272
"""
222
273
from app .main import app
223
274
224
- payload = {"id" : int (jobid ), "attrs" : ["all" ]}
275
+ jobid = flux .job .JobID (jobid )
276
+
277
+ payload = {"id" : jobid , "attrs" : ["all" ]}
278
+ if settings .enable_pam :
279
+ payload ["user" ] = user
225
280
rpc = flux .job .list .JobListIdRPC (app .handle , "job-list.list-id" , payload )
226
281
try :
227
282
jobinfo = rpc .get ()
0 commit comments