Skip to content

Commit da25fb3

Browse files
committed
python: add API to get job info from the KVS
Problem: There is not a convenient API to get job information via the `job-info.lookup` RPC. Solution: Add a new "kvslookup.py" module to the Python bindings. It includes the "job_kvs_lookup()" function and the "JobKVSLookup" class for users to retrieve data via the `job-info.lookup` RPC. The module and interface is similar to the "list.py" module ( "get_job()" function and "JobList" class). Fixes #5176
1 parent 1ee56af commit da25fb3

File tree

3 files changed

+173
-0
lines changed

3 files changed

+173
-0
lines changed

src/bindings/python/flux/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ nobase_fluxpy_PYTHON = \
3131
job/kill.py \
3232
job/kvs.py \
3333
job/list.py \
34+
job/kvslookup.py \
3435
job/info.py \
3536
job/wait.py \
3637
job/submit.py \

src/bindings/python/flux/job/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from flux.job.submit import submit_async, submit, submit_get_id
1616
from flux.job.info import JobInfo, JobInfoFormat, job_fields_to_attrs
1717
from flux.job.list import job_list, job_list_inactive, job_list_id, JobList, get_job
18+
from flux.job.kvslookup import job_info_lookup, JobKVSLookup, job_kvs_lookup
1819
from flux.job.wait import wait_async, wait, wait_get_status, result_async, result
1920
from flux.job.event import (
2021
event_watch_async,
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
###############################################################
2+
# Copyright 2023 Lawrence Livermore National Security, LLC
3+
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
4+
#
5+
# This file is part of the Flux resource manager framework.
6+
# For details, see https://github.com/flux-framework.
7+
#
8+
# SPDX-License-Identifier: LGPL-3.0
9+
###############################################################
10+
import errno
11+
import json
12+
13+
from flux.future import WaitAllFuture
14+
from flux.job import JobID
15+
from flux.rpc import RPC
16+
17+
18+
# a few keys are special, decode them into dicts if you can
19+
def decode_special_metadata(metadata):
20+
for key in ("jobspec", "R"):
21+
if key in metadata:
22+
try:
23+
tmp = json.loads(metadata[key])
24+
metadata[key] = tmp
25+
except json.decoder.JSONDecodeError:
26+
# Ignore if can't be decoded
27+
pass
28+
29+
30+
class JobInfoLookupRPC(RPC):
31+
def __init__(self, *args, **kwargs):
32+
super().__init__(*args, **kwargs)
33+
self.jobid = None
34+
35+
def get(self):
36+
return super().get()
37+
38+
def get_decode(self):
39+
metadata = super().get()
40+
decode_special_metadata(metadata)
41+
return metadata
42+
43+
44+
def job_info_lookup(flux_handle, jobid, keys=["jobspec"]):
45+
payload = {"id": int(jobid), "keys": keys, "flags": 0}
46+
rpc = JobInfoLookupRPC(flux_handle, "job-info.lookup", payload)
47+
rpc.jobid = jobid
48+
return rpc
49+
50+
51+
# jobs_kvs_lookup simple variant for one jobid
52+
def job_kvs_lookup(flux_handle, jobid, keys=["jobspec"], decode=True):
53+
"""
54+
Lookup job kvs data based on a jobid
55+
56+
:flux_handle: A Flux handle obtained from flux.Flux()
57+
:jobid: jobid to lookup info for
58+
:keys: Optional list of keys to fetch. (default is "jobspec")
59+
:decode: Optional flag to decode special data into Python data structures
60+
currently decodes "jobspec" and "R" into dicts
61+
(default True)
62+
"""
63+
payload = {"id": int(jobid), "keys": keys, "flags": 0}
64+
rpc = JobInfoLookupRPC(flux_handle, "job-info.lookup", payload)
65+
try:
66+
if decode:
67+
rsp = rpc.get_decode()
68+
else:
69+
rsp = rpc.get()
70+
# The job does not exist!
71+
except FileNotFoundError:
72+
return None
73+
return rsp
74+
75+
76+
class JobKVSLookupFuture(WaitAllFuture):
77+
"""Wrapper Future for multiple jobids"""
78+
79+
def __init__(self):
80+
super(JobKVSLookupFuture, self).__init__()
81+
self.errors = []
82+
83+
def _get(self, decode=True):
84+
jobs = []
85+
# Wait for all RPCs to complete
86+
self.wait_for()
87+
88+
# Get all successful jobs, accumulate errors in self.errors
89+
for child in self.children:
90+
try:
91+
if decode:
92+
rsp = child.get_decode()
93+
else:
94+
rsp = child.get()
95+
jobs.append(rsp)
96+
except EnvironmentError as err:
97+
if err.errno == errno.ENOENT:
98+
msg = f"JobID {child.jobid.orig} unknown"
99+
else:
100+
msg = f"rpc: {err.strerror}"
101+
self.errors.append(msg)
102+
return jobs
103+
104+
def get(self):
105+
"""get all successful results, appending errors into self.errors"""
106+
return self._get(False)
107+
108+
def get_decode(self):
109+
"""
110+
get all successful results, appending errors into self.errors. Decode
111+
special data into Python data structures
112+
"""
113+
return self._get(True)
114+
115+
116+
class JobKVSLookup:
117+
"""User friendly class to lookup job KVS data
118+
119+
:flux_handle: A Flux handle obtained from flux.Flux()
120+
:ids: List of jobids to get data for
121+
:keys: Optional list of keys to fetch. (default is "jobspec")
122+
:decode: Optional flag to decode special data into Python data structures
123+
currently decodes "jobspec" and "R" into dicts
124+
(default True)
125+
"""
126+
127+
def __init__(
128+
self,
129+
flux_handle,
130+
ids=[],
131+
keys=["jobspec"],
132+
decode=True,
133+
):
134+
self.handle = flux_handle
135+
self.keys = list(keys)
136+
self.ids = list(map(JobID, ids)) if ids else []
137+
self.decode = decode
138+
self.errors = []
139+
140+
def fetch_data(self):
141+
"""Initiate the job info lookup to the Flux job-info module
142+
143+
JobKVSLookup.fetch_data() returns a JobKVSLookupFuture,
144+
which will be fulfilled when the job data is available.
145+
146+
Once the Future has been fulfilled, a list of objects
147+
can be obtained via JobKVSLookup.data(). If
148+
JobKVSLookupFuture.errors is non-empty, then it will contain a
149+
list of errors returned via the query.
150+
"""
151+
listids = JobKVSLookupFuture()
152+
for jobid in self.ids:
153+
listids.push(job_info_lookup(self.handle, jobid, self.keys))
154+
return listids
155+
156+
def data(self):
157+
"""Synchronously fetch a list of data responses
158+
159+
If the Future object returned by JobKVSLookup.fetch_data has
160+
not yet been fulfilled (e.g. is_ready() returns False), then this call
161+
may block. Otherwise, returns a list of responses for all job ids
162+
returned.
163+
"""
164+
rpc = self.fetch_data()
165+
if self.decode:
166+
data = rpc.get_decode()
167+
else:
168+
data = rpc.get()
169+
if hasattr(rpc, "errors"):
170+
self.errors = rpc.errors
171+
return data

0 commit comments

Comments
 (0)