Skip to content

Commit 49c59b5

Browse files
authored
Merge pull request #5265 from chu11/issue5176_python_job_info_lookup
python: support convenience API for `job-info.lookup` RPC / "flux job info"
2 parents 1008094 + 6ce4b8e commit 49c59b5

File tree

8 files changed

+389
-10
lines changed

8 files changed

+389
-10
lines changed

src/bindings/python/flux/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ nobase_fluxpy_PYTHON = \
3131
job/kill.py \
3232
job/kvs.py \
3333
job/list.py \
34+
job/kvslookup.py \
3435
job/info.py \
3536
job/wait.py \
3637
job/submit.py \

src/bindings/python/flux/job/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from flux.job.submit import submit_async, submit, submit_get_id
1616
from flux.job.info import JobInfo, JobInfoFormat, job_fields_to_attrs
1717
from flux.job.list import job_list, job_list_inactive, job_list_id, JobList, get_job
18+
from flux.job.kvslookup import job_info_lookup, JobKVSLookup, job_kvs_lookup
1819
from flux.job.wait import wait_async, wait, wait_get_status, result_async, result
1920
from flux.job.event import (
2021
event_watch_async,
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
###############################################################
2+
# Copyright 2023 Lawrence Livermore National Security, LLC
3+
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
4+
#
5+
# This file is part of the Flux resource manager framework.
6+
# For details, see https://github.com/flux-framework.
7+
#
8+
# SPDX-License-Identifier: LGPL-3.0
9+
###############################################################
10+
import errno
11+
import json
12+
13+
from flux.future import WaitAllFuture
14+
from flux.job import JobID
15+
from flux.rpc import RPC
16+
17+
18+
# a few keys are special, decode them into dicts if you can
19+
def decode_special_metadata(metadata):
20+
for key in ("jobspec", "R"):
21+
if key in metadata:
22+
try:
23+
tmp = json.loads(metadata[key])
24+
metadata[key] = tmp
25+
except json.decoder.JSONDecodeError:
26+
# Ignore if can't be decoded
27+
pass
28+
29+
30+
class JobInfoLookupRPC(RPC):
31+
def __init__(self, *args, **kwargs):
32+
super().__init__(*args, **kwargs)
33+
self.jobid = None
34+
35+
def get(self):
36+
return super().get()
37+
38+
def get_decode(self):
39+
metadata = super().get()
40+
decode_special_metadata(metadata)
41+
return metadata
42+
43+
44+
def job_info_lookup(flux_handle, jobid, keys=["jobspec"]):
45+
payload = {"id": int(jobid), "keys": keys, "flags": 0}
46+
rpc = JobInfoLookupRPC(flux_handle, "job-info.lookup", payload)
47+
rpc.jobid = jobid
48+
return rpc
49+
50+
51+
# jobs_kvs_lookup simple variant for one jobid
52+
def job_kvs_lookup(flux_handle, jobid, keys=["jobspec"], decode=True):
53+
"""
54+
Lookup job kvs data based on a jobid
55+
56+
:flux_handle: A Flux handle obtained from flux.Flux()
57+
:jobid: jobid to lookup info for
58+
:keys: Optional list of keys to fetch. (default is "jobspec")
59+
:decode: Optional flag to decode special data into Python data structures
60+
currently decodes "jobspec" and "R" into dicts
61+
(default True)
62+
"""
63+
payload = {"id": int(jobid), "keys": keys, "flags": 0}
64+
rpc = JobInfoLookupRPC(flux_handle, "job-info.lookup", payload)
65+
try:
66+
if decode:
67+
rsp = rpc.get_decode()
68+
else:
69+
rsp = rpc.get()
70+
# The job does not exist!
71+
except FileNotFoundError:
72+
return None
73+
return rsp
74+
75+
76+
class JobKVSLookupFuture(WaitAllFuture):
77+
"""Wrapper Future for multiple jobids"""
78+
79+
def __init__(self):
80+
super(JobKVSLookupFuture, self).__init__()
81+
self.errors = []
82+
83+
def _get(self, decode=True):
84+
jobs = []
85+
# Wait for all RPCs to complete
86+
self.wait_for()
87+
88+
# Get all successful jobs, accumulate errors in self.errors
89+
for child in self.children:
90+
try:
91+
if decode:
92+
rsp = child.get_decode()
93+
else:
94+
rsp = child.get()
95+
jobs.append(rsp)
96+
except EnvironmentError as err:
97+
if err.errno == errno.ENOENT:
98+
msg = f"JobID {child.jobid.orig} unknown"
99+
else:
100+
msg = f"rpc: {err.strerror}"
101+
self.errors.append(msg)
102+
return jobs
103+
104+
def get(self):
105+
"""get all successful results, appending errors into self.errors"""
106+
return self._get(False)
107+
108+
def get_decode(self):
109+
"""
110+
get all successful results, appending errors into self.errors. Decode
111+
special data into Python data structures
112+
"""
113+
return self._get(True)
114+
115+
116+
class JobKVSLookup:
117+
"""User friendly class to lookup job KVS data
118+
119+
:flux_handle: A Flux handle obtained from flux.Flux()
120+
:ids: List of jobids to get data for
121+
:keys: Optional list of keys to fetch. (default is "jobspec")
122+
:decode: Optional flag to decode special data into Python data structures
123+
currently decodes "jobspec" and "R" into dicts
124+
(default True)
125+
"""
126+
127+
def __init__(
128+
self,
129+
flux_handle,
130+
ids=[],
131+
keys=["jobspec"],
132+
decode=True,
133+
):
134+
self.handle = flux_handle
135+
self.keys = list(keys)
136+
self.ids = list(map(JobID, ids)) if ids else []
137+
self.decode = decode
138+
self.errors = []
139+
140+
def fetch_data(self):
141+
"""Initiate the job info lookup to the Flux job-info module
142+
143+
JobKVSLookup.fetch_data() returns a JobKVSLookupFuture,
144+
which will be fulfilled when the job data is available.
145+
146+
Once the Future has been fulfilled, a list of objects
147+
can be obtained via JobKVSLookup.data(). If
148+
JobKVSLookupFuture.errors is non-empty, then it will contain a
149+
list of errors returned via the query.
150+
"""
151+
listids = JobKVSLookupFuture()
152+
for jobid in self.ids:
153+
listids.push(job_info_lookup(self.handle, jobid, self.keys))
154+
return listids
155+
156+
def data(self):
157+
"""Synchronously fetch a list of data responses
158+
159+
If the Future object returned by JobKVSLookup.fetch_data has
160+
not yet been fulfilled (e.g. is_ready() returns False), then this call
161+
may block. Otherwise, returns a list of responses for all job ids
162+
returned.
163+
"""
164+
rpc = self.fetch_data()
165+
if self.decode:
166+
data = rpc.get_decode()
167+
else:
168+
data = rpc.get()
169+
if hasattr(rpc, "errors"):
170+
self.errors = rpc.errors
171+
return data

src/modules/job-info/lookup.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ static void info_lookup_continuation (flux_future_t *fall, void *arg)
160160
size_t index;
161161
json_t *key;
162162
json_t *o = NULL;
163+
json_t *tmp = NULL;
163164
char *data = NULL;
164165

165166
if (!l->allow) {
@@ -184,6 +185,12 @@ static void info_lookup_continuation (flux_future_t *fall, void *arg)
184185
if (!(o = json_object ()))
185186
goto enomem;
186187

188+
tmp = json_integer (l->id);
189+
if (json_object_set_new (o, "id", tmp) < 0) {
190+
json_decref (tmp);
191+
goto enomem;
192+
}
193+
187194
json_array_foreach(l->keys, index, key) {
188195
flux_future_t *f;
189196
const char *keystr;

t/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ TESTSCRIPTS = \
260260
python/t0010-job.py \
261261
python/t0012-futures.py \
262262
python/t0013-job-list.py \
263+
python/t0014-job-kvslookup.py \
263264
python/t0020-hostlist.py \
264265
python/t0021-idset.py \
265266
python/t0022-resource-set.py \

t/python/t0010-job.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -689,8 +689,8 @@ def cancel_on_start(future, jobid):
689689
def test_33_get_job(self):
690690
self.sleep_jobspec = JobspecV1.from_command(["sleep", "5"])
691691
jobid = job.submit(self.fh, self.sleep_jobspec)
692-
meta = job.get_job(self.fh, jobid)
693-
self.assertIsInstance(meta, dict)
692+
info = job.get_job(self.fh, jobid)
693+
self.assertIsInstance(info, dict)
694694
for key in [
695695
"id",
696696
"userid",
@@ -712,17 +712,17 @@ def test_33_get_job(self):
712712
"nodelist",
713713
"exception",
714714
]:
715-
self.assertIn(key, meta)
715+
self.assertIn(key, info)
716716

717-
self.assertEqual(meta["id"], jobid)
718-
self.assertEqual(meta["name"], "sleep")
719-
self.assertTrue(meta["state"] in ["SCHED", "DEPEND", "RUN"])
720-
self.assertEqual(meta["ntasks"], 1)
721-
self.assertEqual(meta["ncores"], 1)
717+
self.assertEqual(info["id"], jobid)
718+
self.assertEqual(info["name"], "sleep")
719+
self.assertTrue(info["state"] in ["SCHED", "DEPEND", "RUN"])
720+
self.assertEqual(info["ntasks"], 1)
721+
self.assertEqual(info["ncores"], 1)
722722

723723
# Test a job that does not exist
724-
meta = job.get_job(self.fh, 123456)
725-
self.assertIsNone(meta)
724+
info = job.get_job(self.fh, 123456)
725+
self.assertIsNone(info)
726726

727727
def test_34_timeleft(self):
728728
spec = JobspecV1.from_command(

0 commit comments

Comments
 (0)