Skip to content

Commit 1370ce1

Browse files
committed
job-list: use constraints to filter jobs
Problem: Using RFC31 constraints to match jobs would allow us to support many new filtering and query opportunities in job-list. Solution: Convert job-list queries to use constraints for filtering instead of the earlier solution. This change breaks the old filtering RPC protocol. Update callers in libjob, flux-job, flux-top, job-archive, python JobList and in the testsuite.
1 parent a91eac3 commit 1370ce1

File tree

7 files changed

+126
-160
lines changed

7 files changed

+126
-160
lines changed

src/bindings/python/flux/job/list.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,32 @@ def job_list(
4545
name=None,
4646
queue=None,
4747
):
48+
# N.B. an "and" operation with no values returns everything
49+
constraint = {"and": []}
50+
if userid != flux.constants.FLUX_USERID_UNKNOWN:
51+
constraint["and"].append({"userid": [userid]})
52+
if name:
53+
constraint["and"].append({"name": [name]})
54+
if queue:
55+
constraint["and"].append({"queue": [queue]})
56+
if since:
57+
constraint["and"].append({"since": [since]})
58+
if states and results:
59+
if states & flux.constants.FLUX_JOB_STATE_INACTIVE:
60+
states &= ~flux.constants.FLUX_JOB_STATE_INACTIVE
61+
tmp = {"or": []}
62+
tmp["or"].append({"states": [states]})
63+
tmp["or"].append({"results": [results]})
64+
constraint["and"].append(tmp)
65+
elif states:
66+
constraint["and"].append({"states": [states]})
67+
elif results:
68+
constraint["and"].append({"results": [results]})
4869
payload = {
4970
"max_entries": int(max_entries),
5071
"attrs": attrs,
51-
"userid": int(userid),
52-
"states": states,
53-
"results": results,
54-
"since": since,
72+
"constraint": constraint,
5573
}
56-
if name:
57-
payload["name"] = name
58-
if queue:
59-
payload["queue"] = queue
6074
return JobListRPC(flux_handle, "job-list.list", payload)
6175

6276

@@ -239,8 +253,6 @@ def add_filter(self, fname):
239253
if fname in self.STATES:
240254
self.states |= self.STATES[fname]
241255
elif fname in self.RESULTS:
242-
# Must specify "inactive" to get results:
243-
self.states |= self.STATES["inactive"]
244256
self.results |= self.RESULTS[fname]
245257
else:
246258
raise ValueError(f"Invalid filter specified: {fname}")

src/cmd/flux-job.c

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1329,6 +1329,7 @@ int cmd_list (optparse_t *p, int argc, char **argv)
13291329
json_t *value;
13301330
uint32_t userid;
13311331
int states = 0;
1332+
json_t *c;
13321333

13331334
if (isatty (STDOUT_FILENO)) {
13341335
fprintf (stderr,
@@ -1357,16 +1358,20 @@ int cmd_list (optparse_t *p, int argc, char **argv)
13571358
else
13581359
userid = getuid ();
13591360

1361+
if (!(c = json_pack ("{ s:[ {s:[i]}, {s:[i]} ] }",
1362+
"and",
1363+
"userid", userid,
1364+
"states", states)))
1365+
log_msg_exit ("failed to construct constraint object");
1366+
13601367
if (!(f = flux_rpc_pack (h,
13611368
"job-list.list",
13621369
FLUX_NODEID_ANY,
13631370
0,
1364-
"{s:i s:[s] s:i s:i s:i}",
1371+
"{s:i s:[s] s:o}",
13651372
"max_entries", max_entries,
13661373
"attrs", "all",
1367-
"userid", userid,
1368-
"states", states,
1369-
"results", 0)))
1374+
"constraint", c)))
13701375
log_err_exit ("flux_rpc_pack");
13711376
if (flux_rpc_get_unpack (f, "{s:o}", "jobs", &jobs) < 0)
13721377
log_err_exit ("flux job-list.list");
@@ -1394,6 +1399,7 @@ int cmd_list_inactive (optparse_t *p, int argc, char **argv)
13941399
json_t *jobs;
13951400
size_t index;
13961401
json_t *value;
1402+
json_t *c;
13971403

13981404
if (isatty (STDOUT_FILENO)) {
13991405
fprintf (stderr,
@@ -1408,17 +1414,20 @@ int cmd_list_inactive (optparse_t *p, int argc, char **argv)
14081414
if (!(h = flux_open (NULL, 0)))
14091415
log_err_exit ("flux_open");
14101416

1417+
if (!(c = json_pack ("{ s:[ {s:[f]}, {s:[i]} ] }",
1418+
"and",
1419+
"since", since,
1420+
"states", FLUX_JOB_STATE_INACTIVE)))
1421+
log_msg_exit ("failed to construct constraint object");
1422+
14111423
if (!(f = flux_rpc_pack (h,
14121424
"job-list.list",
14131425
FLUX_NODEID_ANY,
14141426
0,
1415-
"{s:i s:f s:i s:i s:i s:[s]}",
1427+
"{s:i s:[s] s:o}",
14161428
"max_entries", max_entries,
1417-
"since", since,
1418-
"userid", FLUX_USERID_UNKNOWN,
1419-
"states", FLUX_JOB_STATE_INACTIVE,
1420-
"results", 0,
1421-
"attrs", "all")))
1429+
"attrs", "all",
1430+
"constraint", c)))
14221431
log_err_exit ("flux_rpc_pack");
14231432
if (flux_rpc_get_unpack (f, "{s:o}", "jobs", &jobs) < 0)
14241433
log_err_exit ("flux job-list.list");

src/cmd/top/joblist_pane.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -356,11 +356,10 @@ void joblist_pane_query (struct joblist_pane *joblist)
356356
"job-list.list",
357357
0,
358358
0,
359-
"{s:i s:i s:i s:i s:[s,s,s,s,s,s,s,s]}",
359+
"{s:i s:{s:[i]} s:[s,s,s,s,s,s,s,s]}",
360360
"max_entries", win_dim.y_length - 1,
361-
"userid", FLUX_USERID_UNKNOWN,
361+
"constraint",
362362
"states", FLUX_JOB_STATE_RUNNING,
363-
"results", 0,
364363
"attrs",
365364
"annotations",
366365
"userid",

src/common/libjob/list.c

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ flux_future_t *flux_job_list (flux_t *h,
2525
{
2626
flux_future_t *f;
2727
json_t *o = NULL;
28+
json_t *c = NULL;
2829
int valid_states = (FLUX_JOB_STATE_PENDING
2930
| FLUX_JOB_STATE_RUNNING
3031
| FLUX_JOB_STATE_INACTIVE);
@@ -37,15 +38,22 @@ flux_future_t *flux_job_list (flux_t *h,
3738
errno = EINVAL;
3839
return NULL;
3940
}
41+
if (!(c = json_pack ("{ s:[ {s:[i]}, {s:[i]} ] }",
42+
"and",
43+
"userid", userid,
44+
"states", states ? states : valid_states))) {
45+
json_decref (o);
46+
errno = ENOMEM;
47+
return NULL;
48+
}
4049
if (!(f = flux_rpc_pack (h, "job-list.list", FLUX_NODEID_ANY, 0,
41-
"{s:i s:o s:i s:i s:i}",
50+
"{s:i s:o s:o}",
4251
"max_entries", max_entries,
4352
"attrs", o,
44-
"userid", userid,
45-
"states", states,
46-
"results", 0))) {
53+
"constraints", c))) {
4754
saved_errno = errno;
4855
json_decref (o);
56+
json_decref (c);
4957
errno = saved_errno;
5058
return NULL;
5159
}
@@ -59,23 +67,30 @@ flux_future_t *flux_job_list_inactive (flux_t *h,
5967
{
6068
flux_future_t *f;
6169
json_t *o = NULL;
70+
json_t *c = NULL;
6271
int saved_errno;
6372

6473
if (!h || max_entries < 0 || since < 0. || !attrs_json_str
6574
|| !(o = json_loads (attrs_json_str, 0, NULL))) {
6675
errno = EINVAL;
6776
return NULL;
6877
}
78+
if (!(c = json_pack ("{ s:[ {s:[f]}, {s:[i]} ] }",
79+
"and",
80+
"since", since,
81+
"states", FLUX_JOB_STATE_INACTIVE))) {
82+
json_decref (o);
83+
errno = ENOMEM;
84+
return NULL;
85+
}
6986
if (!(f = flux_rpc_pack (h, "job-list.list", FLUX_NODEID_ANY, 0,
70-
"{s:i s:f s:i s:i s:i s:o}",
87+
"{s:i s:o s:o}",
7188
"max_entries", max_entries,
72-
"since", since,
73-
"userid", FLUX_USERID_UNKNOWN,
74-
"states", FLUX_JOB_STATE_INACTIVE,
75-
"results", 0,
76-
"attrs", o))) {
89+
"attrs", o,
90+
"constraints", c))) {
7791
saved_errno = errno;
7892
json_decref (o);
93+
json_decref (c);
7994
errno = saved_errno;
8095
return NULL;
8196
}

src/modules/job-archive/job-archive.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -505,19 +505,19 @@ void job_archive_cb (flux_reactor_t *r,
505505
"job-list.list",
506506
FLUX_NODEID_ANY,
507507
0,
508-
"{s:i s:f s:i s:i s:i s:[ssssss]}",
508+
"{s:i s:[ssssss] s:{s:[ {s:[i]}, {s:[f]} ]}}",
509509
"max_entries", 0,
510-
"since", ctx->since,
511-
"userid", FLUX_USERID_UNKNOWN,
512-
"states", FLUX_JOB_STATE_INACTIVE,
513-
"results", 0,
514510
"attrs",
515511
"userid",
516512
"ranks",
517513
"t_submit",
518514
"t_run",
519515
"t_cleanup",
520-
"t_inactive"))) {
516+
"t_inactive",
517+
"constraint",
518+
"and",
519+
"states", FLUX_JOB_STATE_INACTIVE,
520+
"since", ctx->since))) {
521521
flux_log_error (ctx->h, "%s: flux_rpc_pack", __FUNCTION__);
522522
return;
523523
}

0 commit comments

Comments
 (0)