Skip to content

Commit b4088c3

Browse files
author
Ralph Castain
authored
Merge pull request #2662 from rhc54/topic/stuff
Variety of cleanups
2 parents 57c0c84 + 91d714f commit b4088c3

File tree

11 files changed

+201
-27
lines changed

11 files changed

+201
-27
lines changed

contrib/scaling/mpi_memprobe.c

Lines changed: 132 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "orte/runtime/orte_globals.h"
1717
#include "orte/mca/errmgr/errmgr.h"
1818

19+
static int rank, size;
1920
static volatile int active;
2021
static volatile bool wait_for_release = true;
2122
#define MEMPROBE_RELEASE 12345
@@ -26,9 +27,10 @@ static void _release_fn(int status,
2627
opal_pmix_notification_complete_fn_t cbfunc,
2728
void *cbdata)
2829
{
30+
fprintf(stderr, "Rank %d: Release recvd\n", rank);
2931
/* must let the notifier know we are done */
3032
if (NULL != cbfunc) {
31-
cbfunc(0, NULL, NULL, NULL, cbdata);
33+
cbfunc(OPAL_ERR_HANDLERS_COMPLETE, NULL, NULL, NULL, cbdata);
3234
}
3335
/* flag that the debugger is complete so we can exit */
3436
wait_for_release = false;
@@ -47,20 +49,39 @@ static void _register_fn(int status,
4749
*active = status;
4850
}
4951

52+
static void qcbfunc(int status,
53+
opal_list_t *info,
54+
void *cbdata,
55+
opal_pmix_release_cbfunc_t release_fn,
56+
void *release_cbdata)
57+
{
58+
opal_list_t *results = (opal_list_t*)cbdata;
59+
opal_value_t *kv;
60+
61+
fprintf(stderr, "Rank %d: Query returned status %d\n", rank, status);
62+
if (NULL != info) {
63+
while (NULL != (kv = (opal_value_t*)opal_list_remove_first(info))) {
64+
opal_list_append(results, &kv->super);
65+
}
66+
}
67+
if (NULL != release_fn) {
68+
release_fn(release_cbdata);
69+
}
70+
wait_for_release = false;
71+
}
72+
5073
int main(int argc, char* argv[])
5174
{
52-
int rank, size;
5375
opal_list_t *codes;
5476
opal_value_t *kv;
77+
opal_pmix_query_t *q;
78+
opal_list_t query, response;
5579

5680
MPI_Init(&argc, &argv);
5781
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
5882
MPI_Comm_size(MPI_COMM_WORLD, &size);
5983

60-
if (0 == rank) {
61-
fprintf(stderr, "Sampling memory usage after MPI_Init\n");
62-
}
63-
84+
/* everyone registers their event handler */
6485
codes = OBJ_NEW(opal_list_t);
6586
kv = OBJ_NEW(opal_value_t);
6687
kv->key = strdup("errorcode");
@@ -74,9 +95,59 @@ int main(int argc, char* argv[])
7495
usleep(10);
7596
}
7697

77-
/* now wait for notification */
78-
while (wait_for_release) {
79-
usleep(10);
98+
/* rank 0 asks for memory to be sampled, while everyone else waits */
99+
if (0 == rank) {
100+
fprintf(stderr, "Sampling memory usage after MPI_Init\n");
101+
OBJ_CONSTRUCT(&query, opal_list_t);
102+
OBJ_CONSTRUCT(&response, opal_list_t);
103+
q = OBJ_NEW(opal_pmix_query_t);
104+
opal_list_append(&query, &q->super);
105+
opal_argv_append_nosize(&q->keys, OPAL_PMIX_QUERY_MEMORY_USAGE);
106+
/* qualify that we just want avg, min/max values reported */
107+
kv = OBJ_NEW(opal_value_t);
108+
kv->key = strdup(OPAL_PMIX_QUERY_REPORT_AVG);
109+
kv->type = OPAL_BOOL;
110+
kv->data.flag = true;
111+
opal_list_append(&q->qualifiers, &kv->super);
112+
kv = OBJ_NEW(opal_value_t);
113+
kv->key = strdup(OPAL_PMIX_QUERY_REPORT_MINMAX);
114+
kv->type = OPAL_BOOL;
115+
kv->data.flag = true;
116+
opal_list_append(&q->qualifiers, &kv->super);
117+
/* issue the request */
118+
wait_for_release = true;
119+
opal_pmix.query(&query, qcbfunc, (void*)&response);
120+
while (wait_for_release) {
121+
usleep(10);
122+
}
123+
/* output the results */
124+
OPAL_LIST_FOREACH(kv, &response, opal_value_t) {
125+
fprintf(stderr, "\tResults: %s\n", kv->key);
126+
}
127+
OPAL_LIST_DESTRUCT(&response);
128+
/* send the notification to release the other procs */
129+
wait_for_release = true;
130+
OBJ_CONSTRUCT(&response, opal_list_t);
131+
kv = OBJ_NEW(opal_value_t);
132+
kv->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT);
133+
kv->type = OPAL_BOOL;
134+
kv->data.flag = true;
135+
opal_list_append(&response, &kv->super);
136+
if (OPAL_SUCCESS != opal_pmix.notify_event(MEMPROBE_RELEASE, NULL,
137+
OPAL_PMIX_RANGE_GLOBAL, &response,
138+
NULL, NULL)) {
139+
fprintf(stderr, "Notify event failed\n");
140+
exit(1);
141+
}
142+
while (wait_for_release) {
143+
usleep(10);
144+
}
145+
OPAL_LIST_DESTRUCT(&response);
146+
} else {
147+
/* now wait for notification */
148+
while (wait_for_release) {
149+
usleep(10);
150+
}
80151
}
81152
wait_for_release = true;
82153

@@ -86,13 +157,60 @@ int main(int argc, char* argv[])
86157

87158
if (0 == rank) {
88159
fprintf(stderr, "\n\nSampling memory usage after MPI_Barrier\n");
160+
OBJ_CONSTRUCT(&query, opal_list_t);
161+
OBJ_CONSTRUCT(&response, opal_list_t);
162+
q = OBJ_NEW(opal_pmix_query_t);
163+
opal_list_append(&query, &q->super);
164+
opal_argv_append_nosize(&q->keys, OPAL_PMIX_QUERY_MEMORY_USAGE);
165+
/* qualify that we just want avg, min/max values reported */
166+
kv = OBJ_NEW(opal_value_t);
167+
kv->key = strdup(OPAL_PMIX_QUERY_REPORT_AVG);
168+
kv->type = OPAL_BOOL;
169+
kv->data.flag = true;
170+
opal_list_append(&q->qualifiers, &kv->super);
171+
kv = OBJ_NEW(opal_value_t);
172+
kv->key = strdup(OPAL_PMIX_QUERY_REPORT_MINMAX);
173+
kv->type = OPAL_BOOL;
174+
kv->data.flag = true;
175+
opal_list_append(&q->qualifiers, &kv->super);
176+
/* issue the request */
177+
wait_for_release = true;
178+
opal_pmix.query(&query, qcbfunc, (void*)&response);
179+
while (wait_for_release) {
180+
usleep(10);
181+
}
182+
/* output the results */
183+
OPAL_LIST_FOREACH(kv, &response, opal_value_t) {
184+
fprintf(stderr, "\tResults: %s\n", kv->key);
185+
}
186+
OPAL_LIST_DESTRUCT(&response);
187+
/* send the notification to release the other procs */
188+
wait_for_release = true;
189+
OBJ_CONSTRUCT(&response, opal_list_t);
190+
kv = OBJ_NEW(opal_value_t);
191+
kv->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT);
192+
kv->type = OPAL_BOOL;
193+
kv->data.flag = true;
194+
opal_list_append(&response, &kv->super);
195+
if (OPAL_SUCCESS != opal_pmix.notify_event(MEMPROBE_RELEASE, NULL,
196+
OPAL_PMIX_RANGE_GLOBAL, &response,
197+
NULL, NULL)) {
198+
fprintf(stderr, "Notify event failed\n");
199+
exit(1);
200+
}
201+
while (wait_for_release) {
202+
usleep(10);
203+
}
204+
OPAL_LIST_DESTRUCT(&response);
205+
} else {
206+
/* wait again while memory is sampled */
207+
while (wait_for_release) {
208+
usleep(10);
209+
}
89210
}
90211

91-
/* wait again while memory is sampled */
92-
while (wait_for_release) {
93-
usleep(10);
94-
}
95-
212+
fprintf(stderr, "%d: FINALIZING\n", rank);
213+
fflush(stderr);
96214
MPI_Finalize();
97215
return 0;
98216
}

opal/mca/pmix/pmix2x/pmix/include/pmix_common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ typedef uint32_t pmix_rank_t;
134134
/* attributes for the USOCK rendezvous socket */
135135
#define PMIX_USOCK_DISABLE "pmix.usock.disable" // (bool) disable legacy usock support
136136
#define PMIX_SOCKET_MODE "pmix.sockmode" // (uint32_t) POSIX mode_t (9 bits valid)
137+
#define PMIX_SINGLE_LISTENER "pmix.sing.listnr" // (bool) use only one rendezvous socket, letting priorities and/or
138+
// MCA param select the active transport
137139

138140
/* attributes for TCP connections */
139141
#define PMIX_TCP_URI "pmix.tcp.uri" // (char*) URI of server to connect to

opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_frame.c

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* Copyright (c) 2004-2005 The Regents of the University of California.
1212
* All rights reserved.
1313
* Copyright (c) 2012-2013 Los Alamos National Security, Inc. All rights reserved.
14-
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
14+
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
1515
* Copyright (c) 2015 Research Organization for Information Science
1616
* and Technology (RIST). All rights reserved.
1717
* $COPYRIGHT$
@@ -39,6 +39,7 @@
3939
#include "src/mca/base/pmix_mca_base_var.h"
4040
#include "src/mca/base/pmix_mca_base_framework.h"
4141
#include "src/class/pmix_list.h"
42+
#include "src/client/pmix_client_ops.h"
4243
#include "src/mca/ptl/base/base.h"
4344

4445
/*
@@ -76,6 +77,11 @@ static pmix_status_t pmix_ptl_close(void)
7677
/* ensure the listen thread has been shut down */
7778
pmix_ptl.stop_listening();
7879

80+
if (0 <= pmix_client_globals.myserver.sd) {
81+
CLOSE_THE_SOCKET(pmix_client_globals.myserver.sd);
82+
pmix_client_globals.myserver.sd = -1;
83+
}
84+
7985
/* the components will cleanup when closed */
8086
PMIX_DESTRUCT(&pmix_ptl_globals.actives);
8187
PMIX_LIST_DESTRUCT(&pmix_ptl_globals.posted_recvs);
@@ -92,6 +98,7 @@ static pmix_status_t pmix_ptl_open(pmix_mca_base_open_flag_t flags)
9298
PMIX_CONSTRUCT(&pmix_ptl_globals.posted_recvs, pmix_list_t);
9399
pmix_ptl_globals.listen_thread_active = false;
94100
PMIX_CONSTRUCT(&pmix_ptl_globals.listeners, pmix_list_t);
101+
pmix_client_globals.myserver.sd = -1;
95102

96103
/* Open up all available components */
97104
return pmix_mca_base_framework_components_open(&pmix_ptl_base_framework, flags);

opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_listener.c

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
22
/*
3-
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
3+
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
44
* Copyright (c) 2014-2016 Research Organization for Information Science
55
* and Technology (RIST). All rights reserved.
66
* Copyright (c) 2014-2015 Artem Y. Polyakov <[email protected]>.
@@ -69,17 +69,33 @@ static pmix_status_t setup_listeners(pmix_info_t *info, size_t ninfo, bool *need
6969
{
7070
pmix_ptl_base_active_t *active;
7171
pmix_status_t rc;
72+
size_t n;
73+
bool single = false;
7274

7375
if (!pmix_ptl_globals.initialized) {
7476
return PMIX_ERR_INIT;
7577
}
7678

79+
/* scan the directives to see if they want only one listener setup */
80+
if (NULL != info) {
81+
for (n=0; n < ninfo; n++) {
82+
if (0 == strncmp(info[n].key, PMIX_SINGLE_LISTENER, PMIX_MAX_KEYLEN) &&
83+
(PMIX_UNDEF == info[n].value.type || info[n].value.data.flag)) {
84+
single = true;
85+
break;
86+
}
87+
}
88+
}
89+
7790
PMIX_LIST_FOREACH(active, &pmix_ptl_globals.actives, pmix_ptl_base_active_t) {
7891
if (NULL != active->component->setup_listener) {
7992
rc = active->component->setup_listener(info, ninfo, need_listener);
8093
if (PMIX_SUCCESS != rc && PMIX_ERR_NOT_AVAILABLE != rc) {
8194
return rc;
8295
}
96+
if (single) {
97+
return PMIX_SUCCESS;
98+
}
8399
}
84100
}
85101
/* we must have at least one listener */

opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
2+
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
33
* Copyright (c) 2014 Artem Y. Polyakov <[email protected]>.
44
* All rights reserved.
55
* Copyright (c) 2015-2016 Research Organization for Information Science
@@ -478,10 +478,17 @@ void pmix_ptl_base_send(int sd, short args, void *cbdata)
478478
pmix_ptl_queue_t *queue = (pmix_ptl_queue_t*)cbdata;
479479
pmix_ptl_send_t *snd;
480480
pmix_output_verbose(2, pmix_globals.debug_output,
481-
"[%s:%d] queue callback called: reply to %s:%d on tag %d",
481+
"[%s:%d] send to %s:%d on tag %d",
482482
__FILE__, __LINE__,
483483
(queue->peer)->info->nptr->nspace,
484484
(queue->peer)->info->rank, (queue->tag));
485+
486+
if (queue->peer->sd < 0) {
487+
/* this peer's socket has been closed */
488+
PMIX_RELEASE(queue);
489+
return;
490+
}
491+
485492
snd = PMIX_NEW(pmix_ptl_send_t);
486493
snd->hdr.pindex = htonl(pmix_globals.pindex);
487494
snd->hdr.tag = htonl(queue->tag);
@@ -513,6 +520,12 @@ void pmix_ptl_base_send_recv(int fd, short args, void *cbdata)
513520
pmix_ptl_send_t *snd;
514521
uint32_t tag;
515522

523+
if (ms->peer->sd < 0) {
524+
/* this peer's socket has been closed */
525+
PMIX_RELEASE(ms);
526+
return;
527+
}
528+
516529
/* set the tag */
517530
tag = current_tag++;
518531

opal/mca/pmix/pmix2x/pmix/src/mca/ptl/ptl_types.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
* All rights reserved.
1313
* Copyright (c) 2007-2011 Cisco Systems, Inc. All rights reserved.
1414
* Copyright (c) 2012-2013 Los Alamos National Security, Inc. All rights reserved.
15-
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
15+
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
1616
* $COPYRIGHT$
1717
*
1818
* Additional copyrights may follow
@@ -236,7 +236,7 @@ PMIX_CLASS_DECLARATION(pmix_listener_t);
236236
pmix_list_append(&(p)->send_queue, &snd->super); \
237237
} \
238238
/* ensure the send event is active */ \
239-
if (!(p)->send_ev_active) { \
239+
if (!(p)->send_ev_active && 0 <= (p)->sd) { \
240240
event_add(&(p)->send_event, 0); \
241241
(p)->send_ev_active = true; \
242242
} \

opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* Copyright (c) 2011-2014 Cisco Systems, Inc. All rights reserved.
1414
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
1515
* reserved.
16-
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
16+
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
1717
* $COPYRIGHT$
1818
*
1919
* Additional copyrights may follow
@@ -270,13 +270,15 @@ static pmix_status_t connect_to_peer(struct pmix_peer_t *peer,
270270
if (PMIX_SUCCESS != (rc = send_connect_ack(sd))) {
271271
PMIX_ERROR_LOG(rc);
272272
CLOSE_THE_SOCKET(sd);
273+
pmix_client_globals.myserver.sd = -1;
273274
return rc;
274275
}
275276

276277
/* do whatever handshake is required */
277278
if (PMIX_SUCCESS != (rc = recv_connect_ack(sd))) {
278279
PMIX_ERROR_LOG(rc);
279280
CLOSE_THE_SOCKET(sd);
281+
pmix_client_globals.myserver.sd = -1;
280282
return rc;
281283
}
282284

opal/mca/pmix/pmix2x/pmix/src/server/pmix_server.c

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
22
/*
3-
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
3+
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
44
* Copyright (c) 2014-2016 Research Organization for Information Science
55
* and Technology (RIST). All rights reserved.
66
* Copyright (c) 2014-2015 Artem Y. Polyakov <[email protected]>.
@@ -802,7 +802,7 @@ static void _deregister_client(int sd, short args, void *cbdata)
802802
/* nothing to do */
803803
goto cleanup;
804804
}
805-
/* find an remove this client */
805+
/* find and remove this client */
806806
PMIX_LIST_FOREACH(info, &nptr->server->ranks, pmix_rank_info_t) {
807807
if (info->rank == cd->proc.rank) {
808808
pmix_list_remove_item(&nptr->server->ranks, &info->super);
@@ -1442,6 +1442,7 @@ static void op_cbfunc(pmix_status_t status, void *cbdata)
14421442
PMIX_RELEASE(cd);
14431443
return;
14441444
}
1445+
14451446
/* the function that created the server_caddy did a
14461447
* retain on the peer, so we don't have to worry about
14471448
* it still being present - send a copy to the originator */
@@ -2078,6 +2079,10 @@ static pmix_status_t server_switchyard(pmix_peer_t *peer, uint32_t tag,
20782079
if (PMIX_SUCCESS != (rc = pmix_host_server.client_finalized(&proc, peer->info->server_object,
20792080
op_cbfunc, cd))) {
20802081
PMIX_RELEASE(cd);
2082+
} else {
2083+
/* don't reply to them ourselves - we will do so when the host
2084+
* server calls us back */
2085+
return rc;
20812086
}
20822087
}
20832088
/* turn off the recv event - we shouldn't hear anything

0 commit comments

Comments
 (0)