Skip to content

Commit 4ffc8b3

Browse files
committed
job-list: use constraints to filter jobs
Problem: Using RFC31 constraints to match jobs would allow us to support many new filtering and query opportunities in job-list. Solution: Convert job-list queries to use constraints for filtering instead of the earlier solution. This change breaks the old filtering RPC protocol. The "userid", "states", "results", "name", and "queue" RPC fields are no longer supported. Update callers in libjob, flux-job, flux-top, job-archive, python JobList and in the testsuite.
1 parent bce7253 commit 4ffc8b3

File tree

7 files changed

+114
-128
lines changed

7 files changed

+114
-128
lines changed

src/bindings/python/flux/job/list.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,31 @@ def job_list(
4646
name=None,
4747
queue=None,
4848
):
49+
# N.B. an "and" operation with no values returns everything
50+
constraint = {"and": []}
51+
if userid != flux.constants.FLUX_USERID_UNKNOWN:
52+
constraint["and"].append({"userid": [userid]})
53+
if name:
54+
constraint["and"].append({"name": [name]})
55+
if queue:
56+
constraint["and"].append({"queue": [queue]})
57+
if states and results:
58+
if states & flux.constants.FLUX_JOB_STATE_INACTIVE:
59+
states &= ~flux.constants.FLUX_JOB_STATE_INACTIVE
60+
tmp = {"or": []}
61+
tmp["or"].append({"states": [states]})
62+
tmp["or"].append({"results": [results]})
63+
constraint["and"].append(tmp)
64+
elif states:
65+
constraint["and"].append({"states": [states]})
66+
elif results:
67+
constraint["and"].append({"results": [results]})
4968
payload = {
5069
"max_entries": int(max_entries),
5170
"attrs": attrs,
52-
"userid": int(userid),
53-
"states": states,
54-
"results": results,
5571
"since": since,
72+
"constraint": constraint,
5673
}
57-
if name:
58-
payload["name"] = name
59-
if queue:
60-
payload["queue"] = queue
6174
return JobListRPC(flux_handle, "job-list.list", payload)
6275

6376

@@ -240,8 +253,6 @@ def add_filter(self, fname):
240253
if fname in self.STATES:
241254
self.states |= self.STATES[fname]
242255
elif fname in self.RESULTS:
243-
# Must specify "inactive" to get results:
244-
self.states |= self.STATES["inactive"]
245256
self.results |= self.RESULTS[fname]
246257
else:
247258
raise ValueError(f"Invalid filter specified: {fname}")

src/cmd/flux-job.c

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,6 +1262,7 @@ int cmd_list (optparse_t *p, int argc, char **argv)
12621262
json_t *value;
12631263
uint32_t userid;
12641264
int states = 0;
1265+
json_t *c;
12651266

12661267
if (isatty (STDOUT_FILENO)) {
12671268
fprintf (stderr,
@@ -1290,16 +1291,20 @@ int cmd_list (optparse_t *p, int argc, char **argv)
12901291
else
12911292
userid = getuid ();
12921293

1294+
if (!(c = json_pack ("{ s:[ {s:[i]}, {s:[i]} ] }",
1295+
"and",
1296+
"userid", userid,
1297+
"states", states)))
1298+
log_msg_exit ("failed to construct constraint object");
1299+
12931300
if (!(f = flux_rpc_pack (h,
12941301
"job-list.list",
12951302
FLUX_NODEID_ANY,
12961303
0,
1297-
"{s:i s:[s] s:i s:i s:i}",
1304+
"{s:i s:[s] s:o}",
12981305
"max_entries", max_entries,
12991306
"attrs", "all",
1300-
"userid", userid,
1301-
"states", states,
1302-
"results", 0)))
1307+
"constraint", c)))
13031308
log_err_exit ("flux_rpc_pack");
13041309
if (flux_rpc_get_unpack (f, "{s:o}", "jobs", &jobs) < 0)
13051310
log_err_exit ("flux job-list.list");
@@ -1327,6 +1332,7 @@ int cmd_list_inactive (optparse_t *p, int argc, char **argv)
13271332
json_t *jobs;
13281333
size_t index;
13291334
json_t *value;
1335+
json_t *c;
13301336

13311337
if (isatty (STDOUT_FILENO)) {
13321338
fprintf (stderr,
@@ -1341,17 +1347,18 @@ int cmd_list_inactive (optparse_t *p, int argc, char **argv)
13411347
if (!(h = flux_open (NULL, 0)))
13421348
log_err_exit ("flux_open");
13431349

1350+
if (!(c = json_pack ("{s:[i]}", "states", FLUX_JOB_STATE_INACTIVE)))
1351+
log_msg_exit ("failed to construct constraint object");
1352+
13441353
if (!(f = flux_rpc_pack (h,
13451354
"job-list.list",
13461355
FLUX_NODEID_ANY,
13471356
0,
1348-
"{s:i s:f s:i s:i s:i s:[s]}",
1357+
"{s:i s:f s:[s] s:o}",
13491358
"max_entries", max_entries,
13501359
"since", since,
1351-
"userid", FLUX_USERID_UNKNOWN,
1352-
"states", FLUX_JOB_STATE_INACTIVE,
1353-
"results", 0,
1354-
"attrs", "all")))
1360+
"attrs", "all",
1361+
"constraint", c)))
13551362
log_err_exit ("flux_rpc_pack");
13561363
if (flux_rpc_get_unpack (f, "{s:o}", "jobs", &jobs) < 0)
13571364
log_err_exit ("flux job-list.list");

src/cmd/top/joblist_pane.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -357,11 +357,10 @@ void joblist_pane_query (struct joblist_pane *joblist)
357357
"job-list.list",
358358
0,
359359
0,
360-
"{s:i s:i s:i s:i s:[s,s,s,s,s,s,s,s]}",
360+
"{s:i s:{s:[i]} s:[s,s,s,s,s,s,s,s]}",
361361
"max_entries", win_dim.y_length - 1,
362-
"userid", FLUX_USERID_UNKNOWN,
362+
"constraint",
363363
"states", FLUX_JOB_STATE_RUNNING,
364-
"results", 0,
365364
"attrs",
366365
"annotations",
367366
"userid",

src/common/libjob/list.c

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ flux_future_t *flux_job_list (flux_t *h,
2525
{
2626
flux_future_t *f;
2727
json_t *o = NULL;
28+
json_t *c = NULL;
2829
int valid_states = (FLUX_JOB_STATE_PENDING
2930
| FLUX_JOB_STATE_RUNNING
3031
| FLUX_JOB_STATE_INACTIVE);
@@ -37,15 +38,22 @@ flux_future_t *flux_job_list (flux_t *h,
3738
errno = EINVAL;
3839
return NULL;
3940
}
41+
if (!(c = json_pack ("{ s:[ {s:[i]}, {s:[i]} ] }",
42+
"and",
43+
"userid", userid,
44+
"states", states ? states : valid_states))) {
45+
json_decref (o);
46+
errno = ENOMEM;
47+
return NULL;
48+
}
4049
if (!(f = flux_rpc_pack (h, "job-list.list", FLUX_NODEID_ANY, 0,
41-
"{s:i s:o s:i s:i s:i}",
50+
"{s:i s:o s:o}",
4251
"max_entries", max_entries,
4352
"attrs", o,
44-
"userid", userid,
45-
"states", states,
46-
"results", 0))) {
53+
"constraint", c))) {
4754
saved_errno = errno;
4855
json_decref (o);
56+
json_decref (c);
4957
errno = saved_errno;
5058
return NULL;
5159
}
@@ -59,23 +67,28 @@ flux_future_t *flux_job_list_inactive (flux_t *h,
5967
{
6068
flux_future_t *f;
6169
json_t *o = NULL;
70+
json_t *c = NULL;
6271
int saved_errno;
6372

6473
if (!h || max_entries < 0 || since < 0. || !attrs_json_str
6574
|| !(o = json_loads (attrs_json_str, 0, NULL))) {
6675
errno = EINVAL;
6776
return NULL;
6877
}
78+
if (!(c = json_pack ("{s:[i]}", "states", FLUX_JOB_STATE_INACTIVE))) {
79+
json_decref (o);
80+
errno = ENOMEM;
81+
return NULL;
82+
}
6983
if (!(f = flux_rpc_pack (h, "job-list.list", FLUX_NODEID_ANY, 0,
70-
"{s:i s:f s:i s:i s:i s:o}",
84+
"{s:i s:f s:o s:o}",
7185
"max_entries", max_entries,
7286
"since", since,
73-
"userid", FLUX_USERID_UNKNOWN,
74-
"states", FLUX_JOB_STATE_INACTIVE,
75-
"results", 0,
76-
"attrs", o))) {
87+
"attrs", o,
88+
"constraint", c))) {
7789
saved_errno = errno;
7890
json_decref (o);
91+
json_decref (c);
7992
errno = saved_errno;
8093
return NULL;
8194
}

src/modules/job-archive/job-archive.c

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -506,19 +506,18 @@ void job_archive_cb (flux_reactor_t *r,
506506
"job-list.list",
507507
FLUX_NODEID_ANY,
508508
0,
509-
"{s:i s:f s:i s:i s:i s:[ssssss]}",
509+
"{s:i s:f s:[ssssss] s:{s:[i]}}",
510510
"max_entries", 0,
511511
"since", ctx->since,
512-
"userid", FLUX_USERID_UNKNOWN,
513-
"states", FLUX_JOB_STATE_INACTIVE,
514-
"results", 0,
515512
"attrs",
516513
"userid",
517514
"ranks",
518515
"t_submit",
519516
"t_run",
520517
"t_cleanup",
521-
"t_inactive"))) {
518+
"t_inactive",
519+
"constraint",
520+
"states", FLUX_JOB_STATE_INACTIVE))) {
522521
flux_log_error (ctx->h, "%s: flux_rpc_pack", __FUNCTION__);
523522
return;
524523
}

0 commit comments

Comments
 (0)