Skip to content

Commit 95a817d

Browse files
committed
python: add JobListConstraintParser
Problem: There is no user friendly way to create constraint objects that can be used with the job-list service. Add a JobListConstraintParser class which can be used to create RFC 31 constraing objects suitable for sending to the job-list service.
1 parent 2ff0ec9 commit 95a817d

File tree

1 file changed

+179
-1
lines changed
  • src/bindings/python/flux/job

1 file changed

+179
-1
lines changed

src/bindings/python/flux/job/list.py

Lines changed: 179 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,18 @@
1010
import errno
1111
import os
1212
import pwd
13+
import sys
1314
from collections.abc import Iterable
15+
from datetime import datetime
16+
from functools import reduce
1417

1518
import flux.constants
19+
from flux.constraint.parser import ConstraintLexer, ConstraintParser
1620
from flux.future import WaitAllFuture
1721
from flux.job import JobID
18-
from flux.job.info import JobInfo
22+
from flux.job.info import JobInfo, strtoresult, strtostate
1923
from flux.rpc import RPC
24+
from flux.util import parse_datetime
2025

2126

2227
class JobListRPC(RPC):
@@ -341,3 +346,176 @@ def jobs(self):
341346
if hasattr(rpc, "errors"):
342347
self.errors = rpc.errors
343348
return [JobInfo(job) for job in jobs]
349+
350+
351+
def job_list_filter_to_mask(args, conv):
352+
"""
353+
Convert all job state or result strings with conv() and combine into
354+
a single state or result mask as accepted by the job-list constraints.
355+
356+
This is a convenience function for the JobListConstraintParser class.
357+
358+
Args:
359+
args (list): list of values to convert
360+
conv (callable): function to call on each arg to convert to a state
361+
or result mask.
362+
"""
363+
return reduce(lambda x, y: x | y, map(conv, args))
364+
365+
366+
class JobListConstraintParser(ConstraintParser):
367+
operator_map = {
368+
None: "filter",
369+
"id": "jobid",
370+
"host": "hostlist",
371+
"hosts": "hostlist",
372+
"rank": "ranks",
373+
}
374+
split_values = {"states": ",", "results": ",", "userid": ","}
375+
convert_values = {
376+
"userid": lambda args: [int(x) for x in args],
377+
"states": lambda args: [job_list_filter_to_mask(args, strtostate)],
378+
"results": lambda args: [job_list_filter_to_mask(args, strtoresult)],
379+
}
380+
valid_states = (
381+
"depend",
382+
"priority",
383+
"sched",
384+
"run",
385+
"cleanup",
386+
"inactive",
387+
"pending",
388+
"running",
389+
"active",
390+
)
391+
valid_results = ("completed", "failed", "canceled", "timeout")
392+
393+
def convert_filter(self, arg):
394+
#
395+
# This is a generic state/result filter for backwards compat with
396+
# --filter=. Split into separate states and results operators and
397+
# return the new term(s) (joined by 'or' since that preserves the
398+
# behavior of `--filter`).
399+
#
400+
states = []
401+
results = []
402+
for name in arg.split(","):
403+
name = name.lower()
404+
if name in self.valid_states:
405+
states.append(name)
406+
elif name in self.valid_results:
407+
results.append(name)
408+
else:
409+
raise ValueError(f"Invalid filter specified: {name}")
410+
arg = ""
411+
if states:
412+
arg += "states:" + ",".join(states) + " "
413+
if results:
414+
arg += "or "
415+
if results:
416+
arg += "results:" + ",".join(results)
417+
return arg.rstrip()
418+
419+
@staticmethod
420+
def convert_user(arg):
421+
op, _, arg = arg.partition(":")
422+
users = []
423+
for user in arg.split(","):
424+
try:
425+
users.append(str(int(user)))
426+
except ValueError:
427+
users.append(str(pwd.getpwnam(user).pw_uid))
428+
return "userid:" + ",".join(users)
429+
430+
@staticmethod
431+
def convert_datetime(dt):
432+
if isinstance(dt, (float, int)):
433+
if dt == 0:
434+
# A datetime of zero indicates unset, or an arbitrary time
435+
# in the future. Return 12 months from now.
436+
return parse_datetime("+12m")
437+
dt = datetime.fromtimestamp(dt).astimezone()
438+
else:
439+
dt = parse_datetime(dt, assumeFuture=False)
440+
return dt.timestamp()
441+
442+
def convert_range(self, arg):
443+
arg = arg[1:]
444+
if ".." in arg:
445+
start, end = arg.split("..")
446+
arg = "(not ("
447+
if start:
448+
dt = self.convert_datetime(start)
449+
arg += f"'t_cleanup:<{dt}'"
450+
if start and end:
451+
arg += " or "
452+
if end:
453+
dt = self.convert_datetime(end)
454+
arg += f"'t_run:>{dt}'"
455+
arg += "))"
456+
else:
457+
dt = self.convert_datetime(arg)
458+
arg = f"(t_run:'<={dt}' and t_cleanup:'>={dt}')"
459+
return arg
460+
461+
def convert_timeop(self, arg):
462+
op, _, arg = arg.partition(":")
463+
prefix = ""
464+
if arg[0] in (">", "<"):
465+
if arg[1] == "=":
466+
prefix = arg[:2]
467+
arg = arg[2:]
468+
else:
469+
prefix = arg[0]
470+
arg = arg[1:]
471+
arg = self.convert_datetime(arg)
472+
return f"'{op}:{prefix}{arg}'"
473+
474+
def convert_token(self, arg):
475+
if arg.startswith("@"):
476+
return self.convert_range(arg)
477+
if arg.startswith("t_"):
478+
return self.convert_timeop(arg)
479+
if arg.startswith("user:"):
480+
return self.convert_user(arg)
481+
if ":" not in arg:
482+
return self.convert_filter(arg)
483+
return f"'{arg}'"
484+
485+
def parse(self, string, debug=False):
486+
# First pass: traverse all tokens and apply convenience conversions
487+
expression = ""
488+
lexer = ConstraintLexer()
489+
lexer.input(str(string))
490+
if debug:
491+
print(f"input: {string}", file=sys.stderr)
492+
493+
# Get all tokens first so we can do lookahead in the next step for
494+
# proper use of whitespace:
495+
tokens = []
496+
while True:
497+
tok = lexer.token()
498+
if tok is None:
499+
break
500+
tokens.append(tok)
501+
502+
# Reconstruct expression while converting tokens:
503+
for i, tok in enumerate(tokens):
504+
next_tok = None
505+
if i < len(tokens) - 1:
506+
next_tok = tokens[i + 1]
507+
if debug:
508+
print(tok, file=sys.stderr)
509+
if tok.type != "TOKEN":
510+
expression += tok.value
511+
else:
512+
expression += self.convert_token(tok.value)
513+
if tok.type not in ("LPAREN", "NEGATE") and (
514+
next_tok and next_tok.type not in ("RPAREN")
515+
):
516+
expression += " "
517+
518+
if debug:
519+
print(f"expression: '{expression}'", file=sys.stderr)
520+
521+
return super().parse(expression)

0 commit comments

Comments
 (0)