17
17
#include "orte/mca/errmgr/errmgr.h"
18
18
19
19
static int rank , size ;
20
- static volatile int active ;
21
20
static volatile bool wait_for_release = true;
22
21
#define MEMPROBE_RELEASE 12345
23
22
@@ -27,7 +26,6 @@ static void _release_fn(int status,
27
26
opal_pmix_notification_complete_fn_t cbfunc ,
28
27
void * cbdata )
29
28
{
30
- fprintf (stderr , "Rank %d: Release recvd\n" , rank );
31
29
/* must let the notifier know we are done */
32
30
if (NULL != cbfunc ) {
33
31
cbfunc (OPAL_ERR_HANDLERS_COMPLETE , NULL , NULL , NULL , cbdata );
@@ -58,7 +56,6 @@ static void qcbfunc(int status,
58
56
opal_list_t * results = (opal_list_t * )cbdata ;
59
57
opal_value_t * kv ;
60
58
61
- fprintf (stderr , "Rank %d: Query returned status %d\n" , rank , status );
62
59
if (NULL != info ) {
63
60
while (NULL != (kv = (opal_value_t * )opal_list_remove_first (info ))) {
64
61
opal_list_append (results , & kv -> super );
@@ -70,61 +67,90 @@ static void qcbfunc(int status,
70
67
wait_for_release = false;
71
68
}
72
69
73
- int main (int argc , char * argv [] )
70
+ static void notifycbfunc (int status , void * cbdata )
74
71
{
75
- opal_list_t * codes ;
76
- opal_value_t * kv ;
77
- opal_pmix_query_t * q ;
78
- opal_list_t query , response ;
79
-
80
- MPI_Init (& argc , & argv );
81
- MPI_Comm_rank (MPI_COMM_WORLD , & rank );
82
- MPI_Comm_size (MPI_COMM_WORLD , & size );
72
+ volatile int * active = (volatile int * )cbdata ;
73
+ * active = status ;
74
+ }
83
75
84
- /* everyone registers their event handler */
85
- codes = OBJ_NEW (opal_list_t );
76
+ static void sample (void )
77
+ {
78
+ opal_value_t * kv , * ival ;
79
+ opal_pmix_query_t * q ;
80
+ opal_list_t query , response , * lt ;
81
+ volatile int active ;
82
+ char * * answer = NULL , * tmp , * msg ;
83
+
84
+ OBJ_CONSTRUCT (& query , opal_list_t );
85
+ OBJ_CONSTRUCT (& response , opal_list_t );
86
+ q = OBJ_NEW (opal_pmix_query_t );
87
+ opal_list_append (& query , & q -> super );
88
+ opal_argv_append_nosize (& q -> keys , OPAL_PMIX_QUERY_MEMORY_USAGE );
89
+ /* qualify that we just want local avg, min/max values reported */
86
90
kv = OBJ_NEW (opal_value_t );
87
- kv -> key = strdup ("errorcode" );
88
- kv -> type = OPAL_INT ;
89
- kv -> data .integer = MEMPROBE_RELEASE ;
90
- opal_list_append (codes , & kv -> super );
91
+ kv -> key = strdup (OPAL_PMIX_QUERY_LOCAL_ONLY );
92
+ kv -> type = OPAL_BOOL ;
93
+ kv -> data .flag = true;
94
+ opal_list_append (& q -> qualifiers , & kv -> super );
95
+ kv = OBJ_NEW (opal_value_t );
96
+ kv -> key = strdup (OPAL_PMIX_QUERY_REPORT_AVG );
97
+ kv -> type = OPAL_BOOL ;
98
+ kv -> data .flag = true;
99
+ opal_list_append (& q -> qualifiers , & kv -> super );
100
+ kv = OBJ_NEW (opal_value_t );
101
+ kv -> key = strdup (OPAL_PMIX_QUERY_REPORT_MINMAX );
102
+ kv -> type = OPAL_BOOL ;
103
+ kv -> data .flag = true;
104
+ opal_list_append (& q -> qualifiers , & kv -> super );
105
+ /* issue the request */
106
+ wait_for_release = true;
107
+ opal_pmix .query (& query , qcbfunc , (void * )& response );
108
+ /* wait for the query to complete */
109
+ while (wait_for_release ) {
110
+ usleep (10 );
111
+ }
112
+ wait_for_release = true;
113
+ /* log my own results as a single string so the output
114
+ * doesn't get garbled on the other end */
115
+ asprintf (& tmp , "Data for node %s" , orte_process_info .nodename );
116
+ opal_argv_append_nosize (& answer , tmp );
117
+ free (tmp );
118
+ OPAL_LIST_FOREACH (kv , & response , opal_value_t ) {
119
+ lt = (opal_list_t * )kv -> data .ptr ;
120
+ OPAL_LIST_FOREACH (ival , lt , opal_value_t ) {
121
+ if (0 == strcmp (ival -> key , OPAL_PMIX_DAEMON_MEMORY )) {
122
+ asprintf (& tmp , "\tDaemon: %f" , ival -> data .fval );
123
+ opal_argv_append_nosize (& answer , tmp );
124
+ free (tmp );
125
+ } else if (0 == strcmp (ival -> key , OPAL_PMIX_CLIENT_AVG_MEMORY )) {
126
+ asprintf (& tmp , "\tClient: %f" , ival -> data .fval );
127
+ opal_argv_append_nosize (& answer , tmp );
128
+ free (tmp );
129
+ } else {
130
+ fprintf (stderr , "\tUnknown key: %s" , ival -> key );
131
+ }
132
+ }
133
+ }
134
+ opal_argv_append_nosize (& answer , "\n" );
135
+ OPAL_LIST_DESTRUCT (& response );
91
136
137
+ /* construct the log output */
138
+ OBJ_CONSTRUCT (& response , opal_list_t );
139
+ kv = OBJ_NEW (opal_value_t );
140
+ kv -> key = strdup (OPAL_PMIX_LOG_STDOUT );
141
+ kv -> type = OPAL_STRING ;
142
+ kv -> data .string = opal_argv_join (answer , '\n' );
143
+ opal_list_append (& response , & kv -> super );
144
+ opal_argv_free (answer );
92
145
active = -1 ;
93
- opal_pmix .register_evhandler ( codes , NULL , _release_fn , _register_fn , (void * )& active );
146
+ opal_pmix .log ( & response , notifycbfunc , (void * )& active );
94
147
while (-1 == active ) {
95
148
usleep (10 );
96
149
}
150
+ OPAL_LIST_DESTRUCT (& response );
151
+
97
152
98
- /* rank 0 asks for memory to be sampled, while everyone else waits */
99
153
if (0 == rank ) {
100
- fprintf (stderr , "Sampling memory usage after MPI_Init\n" );
101
- OBJ_CONSTRUCT (& query , opal_list_t );
102
- OBJ_CONSTRUCT (& response , opal_list_t );
103
- q = OBJ_NEW (opal_pmix_query_t );
104
- opal_list_append (& query , & q -> super );
105
- opal_argv_append_nosize (& q -> keys , OPAL_PMIX_QUERY_MEMORY_USAGE );
106
- /* qualify that we just want avg, min/max values reported */
107
- kv = OBJ_NEW (opal_value_t );
108
- kv -> key = strdup (OPAL_PMIX_QUERY_REPORT_AVG );
109
- kv -> type = OPAL_BOOL ;
110
- kv -> data .flag = true;
111
- opal_list_append (& q -> qualifiers , & kv -> super );
112
- kv = OBJ_NEW (opal_value_t );
113
- kv -> key = strdup (OPAL_PMIX_QUERY_REPORT_MINMAX );
114
- kv -> type = OPAL_BOOL ;
115
- kv -> data .flag = true;
116
- opal_list_append (& q -> qualifiers , & kv -> super );
117
- /* issue the request */
118
- wait_for_release = true;
119
- opal_pmix .query (& query , qcbfunc , (void * )& response );
120
- while (wait_for_release ) {
121
- usleep (10 );
122
- }
123
- /* output the results */
124
- OPAL_LIST_FOREACH (kv , & response , opal_value_t ) {
125
- fprintf (stderr , "\tResults: %s\n" , kv -> key );
126
- }
127
- OPAL_LIST_DESTRUCT (& response );
128
154
/* send the notification to release the other procs */
129
155
wait_for_release = true;
130
156
OBJ_CONSTRUCT (& response , opal_list_t );
@@ -133,16 +159,58 @@ int main(int argc, char* argv[])
133
159
kv -> type = OPAL_BOOL ;
134
160
kv -> data .flag = true;
135
161
opal_list_append (& response , & kv -> super );
162
+ active = -1 ;
136
163
if (OPAL_SUCCESS != opal_pmix .notify_event (MEMPROBE_RELEASE , NULL ,
137
164
OPAL_PMIX_RANGE_GLOBAL , & response ,
138
- NULL , NULL )) {
165
+ notifycbfunc , ( void * ) & active )) {
139
166
fprintf (stderr , "Notify event failed\n" );
140
167
exit (1 );
141
168
}
142
- while (wait_for_release ) {
169
+ while (-1 == active ) {
143
170
usleep (10 );
144
171
}
145
172
OPAL_LIST_DESTRUCT (& response );
173
+ }
174
+
175
+ /* now wait for notification */
176
+ while (wait_for_release ) {
177
+ usleep (10 );
178
+ }
179
+ }
180
+
181
+ int main (int argc , char * argv [])
182
+ {
183
+ opal_list_t * codes ;
184
+ opal_value_t * kv ;
185
+ volatile int active ;
186
+
187
+ MPI_Init (& argc , & argv );
188
+ MPI_Comm_rank (MPI_COMM_WORLD , & rank );
189
+ MPI_Comm_size (MPI_COMM_WORLD , & size );
190
+
191
+ if (0 == rank ) {
192
+ fprintf (stderr , "Sampling memory usage after MPI_Init\n" );
193
+ }
194
+
195
+ /* everyone registers their event handler */
196
+ codes = OBJ_NEW (opal_list_t );
197
+ kv = OBJ_NEW (opal_value_t );
198
+ kv -> key = strdup ("errorcode" );
199
+ kv -> type = OPAL_INT ;
200
+ kv -> data .integer = MEMPROBE_RELEASE ;
201
+ opal_list_append (codes , & kv -> super );
202
+
203
+ active = -1 ;
204
+ opal_pmix .register_evhandler (codes , NULL , _release_fn , _register_fn , (void * )& active );
205
+ while (-1 == active ) {
206
+ usleep (10 );
207
+ }
208
+
209
+ /* if I am the local leader (i.e., local_rank=0), then I ask
210
+ * my daemon to report the local memory usage, and send it
211
+ * to rank=0 */
212
+ if (0 == orte_process_info .my_local_rank ) {
213
+ sample ();
146
214
} else {
147
215
/* now wait for notification */
148
216
while (wait_for_release ) {
@@ -157,60 +225,21 @@ int main(int argc, char* argv[])
157
225
158
226
if (0 == rank ) {
159
227
fprintf (stderr , "\n\nSampling memory usage after MPI_Barrier\n" );
160
- OBJ_CONSTRUCT (& query , opal_list_t );
161
- OBJ_CONSTRUCT (& response , opal_list_t );
162
- q = OBJ_NEW (opal_pmix_query_t );
163
- opal_list_append (& query , & q -> super );
164
- opal_argv_append_nosize (& q -> keys , OPAL_PMIX_QUERY_MEMORY_USAGE );
165
- /* qualify that we just want avg, min/max values reported */
166
- kv = OBJ_NEW (opal_value_t );
167
- kv -> key = strdup (OPAL_PMIX_QUERY_REPORT_AVG );
168
- kv -> type = OPAL_BOOL ;
169
- kv -> data .flag = true;
170
- opal_list_append (& q -> qualifiers , & kv -> super );
171
- kv = OBJ_NEW (opal_value_t );
172
- kv -> key = strdup (OPAL_PMIX_QUERY_REPORT_MINMAX );
173
- kv -> type = OPAL_BOOL ;
174
- kv -> data .flag = true;
175
- opal_list_append (& q -> qualifiers , & kv -> super );
176
- /* issue the request */
177
- wait_for_release = true;
178
- opal_pmix .query (& query , qcbfunc , (void * )& response );
179
- while (wait_for_release ) {
180
- usleep (10 );
181
- }
182
- /* output the results */
183
- OPAL_LIST_FOREACH (kv , & response , opal_value_t ) {
184
- fprintf (stderr , "\tResults: %s\n" , kv -> key );
185
- }
186
- OPAL_LIST_DESTRUCT (& response );
187
- /* send the notification to release the other procs */
188
- wait_for_release = true;
189
- OBJ_CONSTRUCT (& response , opal_list_t );
190
- kv = OBJ_NEW (opal_value_t );
191
- kv -> key = strdup (OPAL_PMIX_EVENT_NON_DEFAULT );
192
- kv -> type = OPAL_BOOL ;
193
- kv -> data .flag = true;
194
- opal_list_append (& response , & kv -> super );
195
- if (OPAL_SUCCESS != opal_pmix .notify_event (MEMPROBE_RELEASE , NULL ,
196
- OPAL_PMIX_RANGE_GLOBAL , & response ,
197
- NULL , NULL )) {
198
- fprintf (stderr , "Notify event failed\n" );
199
- exit (1 );
200
- }
201
- while (wait_for_release ) {
202
- usleep (10 );
228
+ }
229
+
230
+ if (0 == orte_process_info .my_local_rank ) {
231
+ if (0 != rank ) {
232
+ /* wait a little */
233
+ usleep (1000 );
203
234
}
204
- OPAL_LIST_DESTRUCT ( & response );
235
+ sample ( );
205
236
} else {
206
237
/* wait again while memory is sampled */
207
238
while (wait_for_release ) {
208
239
usleep (10 );
209
240
}
210
241
}
211
242
212
- fprintf (stderr , "%d: FINALIZING\n" , rank );
213
- fflush (stderr );
214
243
MPI_Finalize ();
215
244
return 0 ;
216
245
}
0 commit comments