Skip to content

Commit d5b38ff

Browse files
committed
python: remove manual construction of jobspec/R
Problem: In the kvslookup.py module, the current jobspecs and R values are done manually. They could instead take advantage of the job-info.lookup service and the recently added CURRENT flag. Update kvslookup.py to use job-info.lookup with the CURRENT flag. Remove code that manually constructs current jobspecs and R.
1 parent 4f7d5b1 commit d5b38ff

File tree

1 file changed

+24
-64
lines changed

1 file changed

+24
-64
lines changed

src/bindings/python/flux/job/kvslookup.py

Lines changed: 24 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,11 @@
1010
import errno
1111
import json
1212

13+
import flux.constants
1314
from _flux._core import ffi, lib
1415
from flux.future import WaitAllFuture
15-
from flux.job import JobID, JobspecV1
16-
from flux.job.event import EventLogEvent
16+
from flux.job import JobID
1717
from flux.rpc import RPC
18-
from flux.util import set_treedict
1918

2019

2120
def _decode_field(data, key):
@@ -48,26 +47,19 @@ def get_decode(self):
4847
return data
4948

5049

51-
def job_info_lookup(flux_handle, jobid, keys=["jobspec"]):
52-
payload = {"id": int(jobid), "keys": keys, "flags": 0}
50+
def job_info_lookup(flux_handle, jobid, keys=["jobspec"], flags=0):
51+
payload = {"id": int(jobid), "keys": keys, "flags": flags}
5352
rpc = JobInfoLookupRPC(flux_handle, "job-info.lookup", payload)
5453
rpc.jobid = jobid
5554
return rpc
5655

5756

58-
def _setup_lookup_keys(keys, original, base):
57+
def _setup_lookup_keys(keys, original):
5958
if "jobspec" in keys:
6059
if original:
6160
keys.remove("jobspec")
6261
if "J" not in keys:
6362
keys.append("J")
64-
elif not base:
65-
if "eventlog" not in keys:
66-
keys.append("eventlog")
67-
if "R" in keys:
68-
if not base:
69-
if "eventlog" not in keys:
70-
keys.append("eventlog")
7163

7264

7365
def _get_original_jobspec(job_data):
@@ -78,58 +70,14 @@ def _get_original_jobspec(job_data):
7870
return result.decode("utf-8")
7971

8072

81-
def _get_updated_jobspec(job_data):
82-
if isinstance(job_data["jobspec"], str):
83-
data = json.loads(job_data["jobspec"])
84-
else:
85-
data = job_data["jobspec"]
86-
jobspec = JobspecV1(**data)
87-
for entry in job_data["eventlog"].splitlines():
88-
event = EventLogEvent(entry)
89-
if event.name == "jobspec-update":
90-
for key, value in event.context.items():
91-
jobspec.setattr(key, value)
92-
return jobspec.dumps()
93-
94-
95-
def _get_updated_R(job_data):
96-
if isinstance(job_data["R"], str):
97-
R = json.loads(job_data["R"])
98-
else:
99-
R = job_data["R"]
100-
for entry in job_data["eventlog"].splitlines():
101-
event = EventLogEvent(entry)
102-
if event.name == "resource-update":
103-
for key, value in event.context.items():
104-
if key == "expiration":
105-
set_treedict(R, "execution.expiration", value)
106-
return json.dumps(R, ensure_ascii=False)
107-
108-
109-
def _update_keys(job_data, decode, keys, original, base):
110-
remove_eventlog = False
73+
def _update_keys(job_data, decode, keys, original):
11174
if "jobspec" in keys:
11275
if original:
11376
job_data["jobspec"] = _get_original_jobspec(job_data)
11477
if decode:
11578
_decode_field(job_data, "jobspec")
11679
if "J" not in keys:
11780
job_data.pop("J")
118-
elif not base:
119-
job_data["jobspec"] = _get_updated_jobspec(job_data)
120-
if decode:
121-
_decode_field(job_data, "jobspec")
122-
if "eventlog" not in keys:
123-
remove_eventlog = True
124-
if "R" in keys:
125-
if not base:
126-
job_data["R"] = _get_updated_R(job_data)
127-
if decode:
128-
_decode_field(job_data, "R")
129-
if "eventlog" not in keys:
130-
remove_eventlog = True
131-
if remove_eventlog:
132-
job_data.pop("eventlog")
13381

13482

13583
# jobs_kvs_lookup simple variant for one jobid
@@ -153,15 +101,21 @@ def job_kvs_lookup(
153101
:base: For 'jobspec' or 'R', get base value, do not apply updates from eventlog
154102
"""
155103
keyslookup = list(keys)
156-
_setup_lookup_keys(keyslookup, original, base)
157-
payload = {"id": int(jobid), "keys": keyslookup, "flags": 0}
104+
_setup_lookup_keys(keyslookup, original)
105+
# N.B. JobInfoLookupRPC() has a "get_decode()" member
106+
# function, so we will always get the non-decoded result from
107+
# job-info.
108+
flags = 0
109+
if not base:
110+
flags |= flux.constants.FLUX_JOB_LOOKUP_CURRENT
111+
payload = {"id": int(jobid), "keys": keyslookup, "flags": flags}
158112
rpc = JobInfoLookupRPC(flux_handle, "job-info.lookup", payload)
159113
try:
160114
if decode:
161115
rsp = rpc.get_decode()
162116
else:
163117
rsp = rpc.get()
164-
_update_keys(rsp, decode, keys, original, base)
118+
_update_keys(rsp, decode, keys, original)
165119
# The job does not exist!
166120
except FileNotFoundError:
167121
return None
@@ -242,7 +196,7 @@ def __init__(
242196
self.original = original
243197
self.base = base
244198
self.errors = []
245-
_setup_lookup_keys(self.keyslookup, self.original, self.base)
199+
_setup_lookup_keys(self.keyslookup, self.original)
246200

247201
def fetch_data(self):
248202
"""Initiate the job info lookup to the Flux job-info module
@@ -255,9 +209,15 @@ def fetch_data(self):
255209
JobKVSLookupFuture.errors is non-empty, then it will contain a
256210
list of errors returned via the query.
257211
"""
212+
flags = 0
213+
# N.B. JobInfoLookupRPC() has a "get_decode()" member
214+
# function, so we will always get the non-decoded result from
215+
# job-info.
216+
if not self.base:
217+
flags |= flux.constants.FLUX_JOB_LOOKUP_CURRENT
258218
listids = JobKVSLookupFuture()
259219
for jobid in self.ids:
260-
listids.push(job_info_lookup(self.handle, jobid, self.keyslookup))
220+
listids.push(job_info_lookup(self.handle, jobid, self.keyslookup, flags))
261221
return listids
262222

263223
def data(self):
@@ -276,5 +236,5 @@ def data(self):
276236
if hasattr(rpc, "errors"):
277237
self.errors = rpc.errors
278238
for job_data in data:
279-
_update_keys(job_data, self.decode, self.keys, self.original, self.base)
239+
_update_keys(job_data, self.decode, self.keys, self.original)
280240
return data

0 commit comments

Comments
 (0)