Skip to content

Commit 949442a

Browse files
committed
job-list: use constraints to filter jobs
Problem: Using RFC31 constraints to match jobs would allow us to support many new filtering and query opportunities in job-list. Solution: Convert job-list queries to use constraints for filtering instead of the earlier solution. This change breaks the old filtering RPC protocol. The "userid", "states", "results", "name", and "queue" RPC fields are no longer supported. Update callers in libjob, flux-job, flux-top, job-archive, python JobList and in the testsuite.
1 parent d0ea55d commit 949442a

File tree

7 files changed

+114
-128
lines changed

7 files changed

+114
-128
lines changed

src/bindings/python/flux/job/list.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,31 @@ def job_list(
4646
name=None,
4747
queue=None,
4848
):
49+
# N.B. an "and" operation with no values returns everything
50+
constraint = {"and": []}
51+
if userid != flux.constants.FLUX_USERID_UNKNOWN:
52+
constraint["and"].append({"userid": [userid]})
53+
if name:
54+
constraint["and"].append({"name": [name]})
55+
if queue:
56+
constraint["and"].append({"queue": [queue]})
57+
if states and results:
58+
if states & flux.constants.FLUX_JOB_STATE_INACTIVE:
59+
states &= ~flux.constants.FLUX_JOB_STATE_INACTIVE
60+
tmp = {"or": []}
61+
tmp["or"].append({"states": [states]})
62+
tmp["or"].append({"results": [results]})
63+
constraint["and"].append(tmp)
64+
elif states:
65+
constraint["and"].append({"states": [states]})
66+
elif results:
67+
constraint["and"].append({"results": [results]})
4968
payload = {
5069
"max_entries": int(max_entries),
5170
"attrs": attrs,
52-
"userid": int(userid),
53-
"states": states,
54-
"results": results,
5571
"since": since,
72+
"constraint": constraint,
5673
}
57-
if name:
58-
payload["name"] = name
59-
if queue:
60-
payload["queue"] = queue
6174
return JobListRPC(flux_handle, "job-list.list", payload)
6275

6376

@@ -240,8 +253,6 @@ def add_filter(self, fname):
240253
if fname in self.STATES:
241254
self.states |= self.STATES[fname]
242255
elif fname in self.RESULTS:
243-
# Must specify "inactive" to get results:
244-
self.states |= self.STATES["inactive"]
245256
self.results |= self.RESULTS[fname]
246257
else:
247258
raise ValueError(f"Invalid filter specified: {fname}")

src/cmd/flux-job.c

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1263,6 +1263,7 @@ int cmd_list (optparse_t *p, int argc, char **argv)
12631263
json_t *value;
12641264
uint32_t userid;
12651265
int states = 0;
1266+
json_t *c;
12661267

12671268
if (isatty (STDOUT_FILENO)) {
12681269
fprintf (stderr,
@@ -1291,16 +1292,20 @@ int cmd_list (optparse_t *p, int argc, char **argv)
12911292
else
12921293
userid = getuid ();
12931294

1295+
if (!(c = json_pack ("{ s:[ {s:[i]}, {s:[i]} ] }",
1296+
"and",
1297+
"userid", userid,
1298+
"states", states)))
1299+
log_msg_exit ("failed to construct constraint object");
1300+
12941301
if (!(f = flux_rpc_pack (h,
12951302
"job-list.list",
12961303
FLUX_NODEID_ANY,
12971304
0,
1298-
"{s:i s:[s] s:i s:i s:i}",
1305+
"{s:i s:[s] s:o}",
12991306
"max_entries", max_entries,
13001307
"attrs", "all",
1301-
"userid", userid,
1302-
"states", states,
1303-
"results", 0)))
1308+
"constraint", c)))
13041309
log_err_exit ("flux_rpc_pack");
13051310
if (flux_rpc_get_unpack (f, "{s:o}", "jobs", &jobs) < 0)
13061311
log_err_exit ("flux job-list.list");
@@ -1328,6 +1333,7 @@ int cmd_list_inactive (optparse_t *p, int argc, char **argv)
13281333
json_t *jobs;
13291334
size_t index;
13301335
json_t *value;
1336+
json_t *c;
13311337

13321338
if (isatty (STDOUT_FILENO)) {
13331339
fprintf (stderr,
@@ -1342,17 +1348,18 @@ int cmd_list_inactive (optparse_t *p, int argc, char **argv)
13421348
if (!(h = flux_open (NULL, 0)))
13431349
log_err_exit ("flux_open");
13441350

1351+
if (!(c = json_pack ("{s:[i]}", "states", FLUX_JOB_STATE_INACTIVE)))
1352+
log_msg_exit ("failed to construct constraint object");
1353+
13451354
if (!(f = flux_rpc_pack (h,
13461355
"job-list.list",
13471356
FLUX_NODEID_ANY,
13481357
0,
1349-
"{s:i s:f s:i s:i s:i s:[s]}",
1358+
"{s:i s:f s:[s] s:o}",
13501359
"max_entries", max_entries,
13511360
"since", since,
1352-
"userid", FLUX_USERID_UNKNOWN,
1353-
"states", FLUX_JOB_STATE_INACTIVE,
1354-
"results", 0,
1355-
"attrs", "all")))
1361+
"attrs", "all",
1362+
"constraint", c)))
13561363
log_err_exit ("flux_rpc_pack");
13571364
if (flux_rpc_get_unpack (f, "{s:o}", "jobs", &jobs) < 0)
13581365
log_err_exit ("flux job-list.list");

src/cmd/top/joblist_pane.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -357,11 +357,10 @@ void joblist_pane_query (struct joblist_pane *joblist)
357357
"job-list.list",
358358
0,
359359
0,
360-
"{s:i s:i s:i s:i s:[s,s,s,s,s,s,s,s]}",
360+
"{s:i s:{s:[i]} s:[s,s,s,s,s,s,s,s]}",
361361
"max_entries", win_dim.y_length - 1,
362-
"userid", FLUX_USERID_UNKNOWN,
362+
"constraint",
363363
"states", FLUX_JOB_STATE_RUNNING,
364-
"results", 0,
365364
"attrs",
366365
"annotations",
367366
"userid",

src/common/libjob/list.c

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ flux_future_t *flux_job_list (flux_t *h,
2525
{
2626
flux_future_t *f;
2727
json_t *o = NULL;
28+
json_t *c = NULL;
2829
int valid_states = (FLUX_JOB_STATE_PENDING
2930
| FLUX_JOB_STATE_RUNNING
3031
| FLUX_JOB_STATE_INACTIVE);
@@ -37,15 +38,22 @@ flux_future_t *flux_job_list (flux_t *h,
3738
errno = EINVAL;
3839
return NULL;
3940
}
41+
if (!(c = json_pack ("{ s:[ {s:[i]}, {s:[i]} ] }",
42+
"and",
43+
"userid", userid,
44+
"states", states ? states : valid_states))) {
45+
json_decref (o);
46+
errno = ENOMEM;
47+
return NULL;
48+
}
4049
if (!(f = flux_rpc_pack (h, "job-list.list", FLUX_NODEID_ANY, 0,
41-
"{s:i s:o s:i s:i s:i}",
50+
"{s:i s:o s:o}",
4251
"max_entries", max_entries,
4352
"attrs", o,
44-
"userid", userid,
45-
"states", states,
46-
"results", 0))) {
53+
"constraint", c))) {
4754
saved_errno = errno;
4855
json_decref (o);
56+
json_decref (c);
4957
errno = saved_errno;
5058
return NULL;
5159
}
@@ -59,23 +67,28 @@ flux_future_t *flux_job_list_inactive (flux_t *h,
5967
{
6068
flux_future_t *f;
6169
json_t *o = NULL;
70+
json_t *c = NULL;
6271
int saved_errno;
6372

6473
if (!h || max_entries < 0 || since < 0. || !attrs_json_str
6574
|| !(o = json_loads (attrs_json_str, 0, NULL))) {
6675
errno = EINVAL;
6776
return NULL;
6877
}
78+
if (!(c = json_pack ("{s:[i]}", "states", FLUX_JOB_STATE_INACTIVE))) {
79+
json_decref (o);
80+
errno = ENOMEM;
81+
return NULL;
82+
}
6983
if (!(f = flux_rpc_pack (h, "job-list.list", FLUX_NODEID_ANY, 0,
70-
"{s:i s:f s:i s:i s:i s:o}",
84+
"{s:i s:f s:o s:o}",
7185
"max_entries", max_entries,
7286
"since", since,
73-
"userid", FLUX_USERID_UNKNOWN,
74-
"states", FLUX_JOB_STATE_INACTIVE,
75-
"results", 0,
76-
"attrs", o))) {
87+
"attrs", o,
88+
"constraint", c))) {
7789
saved_errno = errno;
7890
json_decref (o);
91+
json_decref (c);
7992
errno = saved_errno;
8093
return NULL;
8194
}

src/modules/job-archive/job-archive.c

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -506,19 +506,18 @@ void job_archive_cb (flux_reactor_t *r,
506506
"job-list.list",
507507
FLUX_NODEID_ANY,
508508
0,
509-
"{s:i s:f s:i s:i s:i s:[ssssss]}",
509+
"{s:i s:f s:[ssssss] s:{s:[i]}}",
510510
"max_entries", 0,
511511
"since", ctx->since,
512-
"userid", FLUX_USERID_UNKNOWN,
513-
"states", FLUX_JOB_STATE_INACTIVE,
514-
"results", 0,
515512
"attrs",
516513
"userid",
517514
"ranks",
518515
"t_submit",
519516
"t_run",
520517
"t_cleanup",
521-
"t_inactive"))) {
518+
"t_inactive",
519+
"constraint",
520+
"states", FLUX_JOB_STATE_INACTIVE))) {
522521
flux_log_error (ctx->h, "%s: flux_rpc_pack", __FUNCTION__);
523522
return;
524523
}

0 commit comments

Comments
 (0)