Skip to content

Commit 127fa70

Browse files
committed
python: add JobListConstraintParser
Problem: There is no user friendly way to create constraint objects that can be used with the job-list service. Add a JobListConstraintParser class which can be used to create RFC 31 constraing objects suitable for sending to the job-list service.
1 parent aa26d8f commit 127fa70

File tree

1 file changed

+166
-1
lines changed
  • src/bindings/python/flux/job

1 file changed

+166
-1
lines changed

src/bindings/python/flux/job/list.py

Lines changed: 166 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,17 @@
1010
import errno
1111
import os
1212
import pwd
13+
import sys
14+
from datetime import datetime
15+
from functools import reduce
1316

1417
import flux.constants
18+
from flux.constraint.parser import ConstraintLexer, ConstraintParser
1519
from flux.future import WaitAllFuture
1620
from flux.job import JobID
17-
from flux.job.info import JobInfo
21+
from flux.job.info import JobInfo, strtoresult, strtostate
1822
from flux.rpc import RPC
23+
from flux.util import parse_datetime
1924

2025

2126
class JobListRPC(RPC):
@@ -316,3 +321,163 @@ def jobs(self):
316321
if hasattr(rpc, "errors"):
317322
self.errors = rpc.errors
318323
return [JobInfo(job) for job in jobs]
324+
325+
326+
def job_list_filter_to_mask(args, conv):
327+
"""
328+
Convert all job state or result strings with conv() and combine into
329+
a single state or result mask as accepted by the job-list constraints.
330+
331+
This is a convenience function for the JobListConstraintParser class.
332+
333+
Args:
334+
args (list): list of values to convert
335+
conv (callable): function to call on each arg to convert to a state
336+
or result mask.
337+
"""
338+
return reduce(lambda x, y: x | y, map(conv, args))
339+
340+
341+
class JobListConstraintParser(ConstraintParser):
342+
operator_map = {
343+
None: "filter",
344+
"id": "jobid",
345+
"host": "hostlist",
346+
"hosts": "hostlist",
347+
"rank": "ranks",
348+
}
349+
split_values = {"states": ",", "results": ","}
350+
convert_values = {
351+
"userid": lambda args: [int(x) for x in args],
352+
"states": lambda args: [job_list_filter_to_mask(args, strtostate)],
353+
"results": lambda args: [job_list_filter_to_mask(args, strtoresult)],
354+
}
355+
valid_states = (
356+
"depend",
357+
"priority",
358+
"sched",
359+
"run",
360+
"cleanup",
361+
"inactive",
362+
"pending",
363+
"running",
364+
"active",
365+
)
366+
valid_results = ("completed", "failed", "canceled", "timeout")
367+
368+
def convert_filter(self, arg):
369+
#
370+
# This is a generic state/result filter for backwards compat with
371+
# --filter=. Split into separate states and results operators and
372+
# return the new term(s) (joined by 'or' since that preserves the
373+
# behavior of `--filter`).
374+
#
375+
states = []
376+
results = []
377+
for name in arg.split(","):
378+
name = name.lower()
379+
if name in self.valid_states:
380+
states.append(name)
381+
elif name in self.valid_results:
382+
results.append(name)
383+
else:
384+
raise ValueError(f"Invalid filter specified: {name}")
385+
arg = ""
386+
if states:
387+
arg += "states:" + ",".join(states) + " "
388+
if results:
389+
arg += "or "
390+
if results:
391+
arg += "results:" + ",".join(results) + " "
392+
return arg
393+
394+
@staticmethod
395+
def convert_user(arg):
396+
op, _, arg = arg.partition(":")
397+
users = []
398+
for user in arg.split(","):
399+
try:
400+
users.append(int(arg))
401+
except ValueError:
402+
users.append(str(pwd.getpwnam(user).pw_uid))
403+
return "userid:" + ",".join(users)
404+
405+
@staticmethod
406+
def convert_datetime(dt):
407+
if isinstance(dt, (float, int)):
408+
if dt == 0:
409+
# A datetime of zero indicates unset, or an arbitrary time
410+
# in the future. Return 12 months from now.
411+
return parse_datetime("+12m")
412+
dt = datetime.fromtimestamp(dt).astimezone()
413+
else:
414+
dt = parse_datetime(dt, assumeFuture=False)
415+
return dt.timestamp()
416+
417+
def convert_range(self, arg):
418+
arg = arg[1:]
419+
if ".." in arg:
420+
start, end = arg.split("..")
421+
arg = "(not ("
422+
if start:
423+
dt = self.convert_datetime(start)
424+
arg += f"'t_cleanup:<{dt}'"
425+
if start and end:
426+
arg += " or "
427+
if end:
428+
dt = self.convert_datetime(end)
429+
arg += f"'t_run:>{dt}'"
430+
arg += "))"
431+
else:
432+
dt = self.convert_datetime(arg)
433+
arg = f"(t_run:'<={dt}' and t_cleanup:'>={dt}')"
434+
return arg
435+
436+
def convert_timeop(self, arg):
437+
op, _, arg = arg.partition(":")
438+
prefix = ""
439+
if arg[0] in (">", "<"):
440+
if arg[1] == "=":
441+
prefix = arg[:2]
442+
arg = arg[2:]
443+
else:
444+
prefix = arg[0]
445+
arg = arg[1:]
446+
arg = self.convert_datetime(arg)
447+
return f"'{op}:{prefix}{arg}'"
448+
449+
def convert_token(self, arg):
450+
if arg.startswith("@"):
451+
return self.convert_range(arg)
452+
if arg.startswith("t_"):
453+
return self.convert_timeop(arg)
454+
if arg.startswith("user:"):
455+
return self.convert_user(arg)
456+
if ":" not in arg:
457+
return self.convert_filter(arg)
458+
return f"'{arg}'"
459+
460+
def parse(self, string, debug=False):
461+
# First pass: traverse all tokens and apply convenience conversions
462+
expression = ""
463+
lexer = ConstraintLexer()
464+
lexer.input(str(string))
465+
if debug:
466+
print(f"input: {string}", file=sys.stderr)
467+
while True:
468+
tok = lexer.token()
469+
if tok is None:
470+
break
471+
if debug:
472+
print(tok, file=sys.stderr)
473+
if tok.type != "TOKEN":
474+
if tok.type in ["LPAREN", "RPAREN", "NEGATE"]:
475+
expression += tok.value
476+
else:
477+
expression += f"{tok.value} "
478+
else:
479+
expression += self.convert_token(tok.value)
480+
481+
if debug:
482+
print(f"expression: {expression}", file=sys.stderr)
483+
return super().parse(expression)

0 commit comments

Comments
 (0)