Skip to content

Commit 9a8811a

Browse files
author
Ralph Castain
committed
Ensure that data from a job that was stored in ompi-server is purged once that job completes. Cleanup a few typos. Silence a Coverity warning
Signed-off-by: Ralph Castain <[email protected]>
1 parent 2263183 commit 9a8811a

File tree

8 files changed

+137
-22
lines changed

8 files changed

+137
-22
lines changed

orte/mca/state/base/state_base_fns.c

Lines changed: 70 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
#include "opal/mca/event/event.h"
2525
#include "opal/mca/pmix/pmix.h"
2626

27+
#include "orte/orted/pmix/pmix_server_internal.h"
28+
#include "orte/runtime/orte_data_server.h"
2729
#include "orte/runtime/orte_globals.h"
2830
#include "orte/runtime/orte_wait.h"
2931
#include "orte/mca/errmgr/errmgr.h"
@@ -466,6 +468,50 @@ void orte_state_base_report_progress(int fd, short argc, void *cbdata)
466468
OBJ_RELEASE(caddy);
467469
}
468470

471+
void orte_state_base_notify_data_server(orte_process_name_t *target)
472+
{
473+
opal_buffer_t *buf;
474+
int rc, room = -1;
475+
uint8_t cmd = ORTE_PMIX_PURGE_PROC_CMD;
476+
477+
/* if nobody local to us published anything, then we can ignore this */
478+
if (ORTE_JOBID_INVALID == orte_pmix_server_globals.server.jobid) {
479+
return;
480+
}
481+
482+
buf = OBJ_NEW(opal_buffer_t);
483+
484+
/* pack the room number */
485+
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &room, 1, OPAL_INT))) {
486+
ORTE_ERROR_LOG(rc);
487+
OBJ_RELEASE(buf);
488+
return;
489+
}
490+
491+
/* load the command */
492+
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &cmd, 1, OPAL_UINT8))) {
493+
ORTE_ERROR_LOG(rc);
494+
OBJ_RELEASE(buf);
495+
return;
496+
}
497+
498+
/* provide the target */
499+
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, target, 1, ORTE_NAME))) {
500+
ORTE_ERROR_LOG(rc);
501+
OBJ_RELEASE(buf);
502+
return;
503+
}
504+
505+
/* send the request to the server */
506+
rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
507+
&orte_pmix_server_globals.server, buf,
508+
ORTE_RML_TAG_DATA_SERVER,
509+
orte_rml_send_callback, NULL);
510+
if (ORTE_SUCCESS != rc) {
511+
OBJ_RELEASE(buf);
512+
}
513+
}
514+
469515
static void _send_notification(int status,
470516
orte_proc_state_t state,
471517
orte_process_name_t *proc,
@@ -725,6 +771,13 @@ void orte_state_base_track_procs(int fd, short argc, void *cbdata)
725771
if (orte_state_base_run_fdcheck) {
726772
orte_state_base_check_fds(jdata);
727773
}
774+
/* if ompi-server is around, then notify it to purge
775+
* any session-related info */
776+
if (NULL != orte_data_server_uri) {
777+
target.jobid = jdata->jobid;
778+
target.vpid = ORTE_VPID_WILDCARD;
779+
orte_state_base_notify_data_server(&target);
780+
}
728781
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED);
729782
/* if they requested notification upon completion, provide it */
730783
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_NOTIFY_COMPLETION, NULL, OPAL_BOOL)) {
@@ -1035,6 +1088,7 @@ void orte_state_base_check_fds(orte_job_t *jdata)
10351088
char path[1024], info[256], **list=NULL, *status, *result, *r2;
10361089
ssize_t rc;
10371090
struct flock fl;
1091+
bool flk;
10381092
int cnt = 0;
10391093

10401094
/* get the number of available file descriptors
@@ -1066,7 +1120,11 @@ void orte_state_base_check_fds(orte_job_t *jdata)
10661120
fl.l_whence = 0;
10671121
fl.l_start = 0;
10681122
fl.l_len = 0;
1069-
fcntl(i, F_GETLK, &fl);
1123+
if (-1 == fcntl(i, F_GETLK, &fl)) {
1124+
flk = false;
1125+
} else {
1126+
flk = true;
1127+
}
10701128
/* construct the list of capabilities */
10711129
if (fdflags & FD_CLOEXEC) {
10721130
opal_argv_append_nosize(&list, "cloexec");
@@ -1077,14 +1135,18 @@ void orte_state_base_check_fds(orte_job_t *jdata)
10771135
if (flflags & O_NONBLOCK) {
10781136
opal_argv_append_nosize(&list, "nonblock");
10791137
}
1080-
if (flflags & O_RDONLY) {
1138+
/* from the man page:
1139+
* Unlike the other values that can be specified in flags,
1140+
* the access mode values O_RDONLY, O_WRONLY, and O_RDWR,
1141+
* do not specify individual bits. Rather, they define
1142+
* the low order two bits of flags, and defined respectively
1143+
* as 0, 1, and 2. */
1144+
if (O_RDONLY == (flflags & 3)) {
10811145
opal_argv_append_nosize(&list, "rdonly");
1082-
}
1083-
if (flflags & O_RDWR) {
1084-
opal_argv_append_nosize(&list, "rdwr");
1085-
}
1086-
if (flflags & O_WRONLY) {
1146+
} else if (O_WRONLY == (flflags & 3)) {
10871147
opal_argv_append_nosize(&list, "wronly");
1148+
} else {
1149+
opal_argv_append_nosize(&list, "rdwr");
10881150
}
10891151
if (flflags & O_DSYNC) {
10901152
opal_argv_append_nosize(&list, "dsync");
@@ -1095,7 +1157,7 @@ void orte_state_base_check_fds(orte_job_t *jdata)
10951157
if (flflags & O_SYNC) {
10961158
opal_argv_append_nosize(&list, "sync");
10971159
}
1098-
if (F_UNLCK != fl.l_type) {
1160+
if (flk && F_UNLCK != fl.l_type) {
10991161
if (F_WRLCK == fl.l_type) {
11001162
opal_argv_append_nosize(&list, "wrlock");
11011163
} else {

orte/mca/state/base/state_private.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ ORTE_DECLSPEC void orte_state_base_report_progress(int fd, short argc, void *cbd
7878
ORTE_DECLSPEC void orte_state_base_track_procs(int fd, short argc, void *cbdata);
7979
ORTE_DECLSPEC void orte_state_base_check_all_complete(int fd, short args, void *cbdata);
8080
ORTE_DECLSPEC void orte_state_base_check_fds(orte_job_t *jdata);
81+
ORTE_DECLSPEC void orte_state_base_notify_data_server(orte_process_name_t *target);
8182

8283
END_C_DECLS
8384
#endif

orte/mca/state/orted/state_orted.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
#include "orte/mca/rml/rml.h"
2828
#include "orte/mca/routed/routed.h"
2929
#include "orte/util/session_dir.h"
30+
#include "orte/orted/pmix/pmix_server_internal.h"
31+
#include "orte/runtime/orte_data_server.h"
3032
#include "orte/runtime/orte_quit.h"
3133

3234
#include "orte/mca/state/state.h"
@@ -260,6 +262,7 @@ static void track_procs(int fd, short argc, void *cbdata)
260262
orte_std_cntr_t index;
261263
orte_job_map_t *map;
262264
orte_node_t *node;
265+
orte_process_name_t target;
263266

264267
OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
265268
"%s state:orted:track_procs called for proc %s state %s",
@@ -489,6 +492,14 @@ static void track_procs(int fd, short argc, void *cbdata)
489492
orte_state_base_check_fds(jdata);
490493
}
491494

495+
/* if ompi-server is around, then notify it to purge
496+
* any session-related info */
497+
if (NULL != orte_data_server_uri) {
498+
target.jobid = jdata->jobid;
499+
target.vpid = ORTE_VPID_WILDCARD;
500+
orte_state_base_notify_data_server(&target);
501+
}
502+
492503
/* cleanup the job info */
493504
opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, NULL);
494505
OBJ_RELEASE(jdata);

orte/orted/pmix/pmix_server.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ int pmix_server_init(void)
220220
return rc;
221221
}
222222
OBJ_CONSTRUCT(&orte_pmix_server_globals.notifications, opal_list_t);
223+
orte_pmix_server_globals.server = *ORTE_NAME_INVALID;
223224

224225
/* setup recv for direct modex requests */
225226
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX,

orte/orted/pmix/pmix_server_internal.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@
4545
#include "opal/util/proc.h"
4646

4747
#include "orte/mca/grpcomm/base/base.h"
48+
#include "orte/runtime/orte_globals.h"
4849

49-
BEGIN_C_DECLS
50+
BEGIN_C_DECLS
5051

5152
#define ORTED_PMIX_MIN_DMX_TIMEOUT 10
5253
#define ORTE_ADJUST_TIMEOUT(a) \
@@ -252,7 +253,6 @@ typedef struct {
252253
opal_hotel_t reqs;
253254
int num_rooms;
254255
int timeout;
255-
char *server_uri;
256256
bool wait_for_server;
257257
orte_process_name_t server;
258258
opal_list_t notifications;

orte/orted/pmix/pmix_server_pub.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,38 +69,38 @@ static int init_server(void)
6969
if (NULL == filename) {
7070
/* filename is not correctly formatted */
7171
orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-bad", true,
72-
orte_basename, orte_pmix_server_globals.server_uri);
72+
orte_basename, orte_data_server_uri);
7373
return ORTE_ERR_BAD_PARAM;
7474
}
7575
++filename; /* space past the : */
7676

7777
if (0 >= strlen(filename)) {
7878
/* they forgot to give us the name! */
7979
orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-missing", true,
80-
orte_basename, orte_pmix_server_globals.server_uri);
80+
orte_basename, orte_data_server_uri);
8181
return ORTE_ERR_BAD_PARAM;
8282
}
8383

8484
/* open the file and extract the uri */
8585
fp = fopen(filename, "r");
8686
if (NULL == fp) { /* can't find or read file! */
8787
orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-access", true,
88-
orte_basename, orte_pmix_server_globals.server_uri);
88+
orte_basename, orte_data_server_uri);
8989
return ORTE_ERR_BAD_PARAM;
9090
}
9191
if (NULL == fgets(input, 1024, fp)) {
9292
/* something malformed about file */
9393
fclose(fp);
9494
orte_show_help("help-orterun.txt", "orterun:ompi-server-file-bad", true,
95-
orte_basename, orte_pmix_server_globals.server_uri,
95+
orte_basename, orte_data_server_uri,
9696
orte_basename);
9797
return ORTE_ERR_BAD_PARAM;
9898
}
9999
fclose(fp);
100100
input[strlen(input)-1] = '\0'; /* remove newline */
101101
server = strdup(input);
102102
} else {
103-
server = strdup(orte_pmix_server_globals.server_uri);
103+
server = strdup(orte_data_server_uri);
104104
}
105105
/* setup our route to the server */
106106
OBJ_CONSTRUCT(&buf, opal_buffer_t);
@@ -154,8 +154,8 @@ static void execute(int sd, short args, void *cbdata)
154154
/* we need to initialize our connection to the server */
155155
if (ORTE_SUCCESS != (rc = init_server())) {
156156
orte_show_help("help-orted.txt", "noserver", true,
157-
(NULL == orte_pmix_server_globals.server_uri) ?
158-
"NULL" : orte_pmix_server_globals.server_uri);
157+
(NULL == orte_data_server_uri) ?
158+
"NULL" : orte_data_server_uri);
159159
goto callback;
160160
}
161161
}

orte/runtime/orte_data_server.c

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,46 @@ void orte_data_server(int status, orte_process_name_t* sender,
653653
goto SEND_ANSWER;
654654
break;
655655

656+
case ORTE_PMIX_PURGE_PROC_CMD:
657+
/* unpack the proc whose data is to be purged - session
658+
* data is purged by providing a requestor whose rank
659+
* is wildcard */
660+
count = 1;
661+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &requestor, &count, OPAL_NAME))) {
662+
ORTE_ERROR_LOG(rc);
663+
goto SEND_ERROR;
664+
}
665+
666+
OPAL_OUTPUT_VERBOSE((1, orte_data_server_output,
667+
"%s data server: purge data from %s",
668+
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
669+
ORTE_NAME_PRINT(&requestor)));
670+
671+
/* cycle across the stored data, looking for a match */
672+
for (k=0; k < orte_data_server_store.size; k++) {
673+
data = (orte_data_object_t*)opal_pointer_array_get_item(&orte_data_server_store, k);
674+
if (NULL == data) {
675+
continue;
676+
}
677+
/* check if data posted by the same process */
678+
if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &data->owner, &requestor)) {
679+
continue;
680+
}
681+
/* check persistence - if it is intended to persist beyond the
682+
* proc itself, then we only delete it if rank=wildcard*/
683+
if ((data->persistence == OPAL_PMIX_PERSIST_APP ||
684+
data->persistence == OPAL_PMIX_PERSIST_SESSION) &&
685+
ORTE_VPID_WILDCARD != requestor.vpid) {
686+
continue;
687+
}
688+
/* remove the object */
689+
opal_pointer_array_set_item(&orte_data_server_store, k, NULL);
690+
OBJ_RELEASE(data);
691+
}
692+
/* no response is required */
693+
OBJ_RELEASE(answer);
694+
return;
695+
656696
default:
657697
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
658698
rc = ORTE_ERR_BAD_PARAM;

orte/runtime/orte_data_server.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* All rights reserved.
1212
* Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
1313
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
14-
* Copyright (c) 2015 Intel, Inc. All rights reserved.
14+
* Copyright (c) 2015-2017 Intel, Inc. All rights reserved.
1515
* $COPYRIGHT$
1616
*
1717
* Additional copyrights may follow
@@ -35,10 +35,10 @@
3535

3636
BEGIN_C_DECLS
3737

38-
#define ORTE_PMIX_PUBLISH_CMD 0x01
39-
#define ORTE_PMIX_LOOKUP_CMD 0x02
40-
#define ORTE_PMIX_UNPUBLISH_CMD 0x03
41-
38+
#define ORTE_PMIX_PUBLISH_CMD 0x01
39+
#define ORTE_PMIX_LOOKUP_CMD 0x02
40+
#define ORTE_PMIX_UNPUBLISH_CMD 0x03
41+
#define ORTE_PMIX_PURGE_PROC_CMD 0x04
4242

4343
/* provide hooks to startup and finalize the data server */
4444
ORTE_DECLSPEC int orte_data_server_init(void);

0 commit comments

Comments
 (0)