55 * Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
66 * Copyright (c) 2011-2013 Los Alamos National Security, LLC. All
77 * rights reserved.
8- * Copyright (c) 2014-2015 Intel, Inc. All rights reserved.
8+ * Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
99 * Copyright (c) 2014 Research Organization for Information Science
1010 * and Technology (RIST). All rights reserved.
1111 * $COPYRIGHT$
@@ -124,7 +124,7 @@ static int xcast(orte_vpid_t *vpids,
124124static int allgather (orte_grpcomm_coll_t * coll ,
125125 opal_buffer_t * buf )
126126{
127- int rc , ret ;
127+ int rc ;
128128 opal_buffer_t * relay ;
129129
130130 OPAL_OUTPUT_VERBOSE ((1 , orte_grpcomm_base_framework .framework_output ,
@@ -143,35 +143,16 @@ static int allgather(orte_grpcomm_coll_t *coll,
143143 return rc ;
144144 }
145145
146- /* if we are the HNP and nobody else is participating,
147- * then just execute the xcast */
148- if (ORTE_PROC_IS_HNP && 1 == coll -> ndmns ) {
149- /* pack the status - success since the allgather completed. This
150- * would be an error if we timeout instead */
151- ret = ORTE_SUCCESS ;
152- if (OPAL_SUCCESS != (rc = opal_dss .pack (relay , & ret , 1 , OPAL_INT ))) {
153- ORTE_ERROR_LOG (rc );
154- OBJ_RELEASE (relay );
155- return rc ;
156- }
157- /* pass along the payload */
158- opal_dss .copy_payload (relay , buf );
159- orte_grpcomm .xcast (coll -> sig , ORTE_RML_TAG_COLL_RELEASE , relay );
160- OBJ_RELEASE (relay );
161- return ORTE_SUCCESS ;
162- }
163-
164146 /* pass along the payload */
165147 opal_dss .copy_payload (relay , buf );
166148
167- /* otherwise, we need to send this to the HNP for
168- * processing */
149+ /* send this to ourselves for processing */
169150 OPAL_OUTPUT_VERBOSE ((1 , orte_grpcomm_base_framework .framework_output ,
170- "%s grpcomm:direct:allgather sending to HNP " ,
151+ "%s grpcomm:direct:allgather sending to ourself " ,
171152 ORTE_NAME_PRINT (ORTE_PROC_MY_NAME )));
172153
173- /* send the info to the HNP for tracking */
174- rc = orte_rml .send_buffer_nb (ORTE_PROC_MY_HNP , relay ,
154+ /* send the info to ourselves for tracking */
155+ rc = orte_rml .send_buffer_nb (ORTE_PROC_MY_NAME , relay ,
175156 ORTE_RML_TAG_ALLGATHER_DIRECT ,
176157 orte_rml_send_callback , NULL );
177158 return rc ;
@@ -212,35 +193,60 @@ static void allgather_recv(int status, orte_process_name_t* sender,
212193 opal_dss .copy_payload (& coll -> bucket , buffer );
213194
214195 OPAL_OUTPUT_VERBOSE ((1 , orte_grpcomm_base_framework .framework_output ,
215- "%s grpcomm:direct allgather recv ndmns %d nrep %d" ,
196+ "%s grpcomm:direct allgather recv nexpected %d nrep %d" ,
216197 ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ),
217- (int )coll -> ndmns , (int )coll -> nreported ));
218-
219- /* if all participating daemons have reported */
220- if (coll -> ndmns == coll -> nreported ) {
221- reply = OBJ_NEW (opal_buffer_t );
222- /* pack the signature */
223- if (OPAL_SUCCESS != (rc = opal_dss .pack (reply , & sig , 1 , ORTE_SIGNATURE ))) {
224- ORTE_ERROR_LOG (rc );
225- OBJ_RELEASE (reply );
226- OBJ_RELEASE (sig );
227- return ;
228- }
229- /* pack the status - success since the allgather completed. This
230- * would be an error if we timeout instead */
231- ret = ORTE_SUCCESS ;
232- if (OPAL_SUCCESS != (rc = opal_dss .pack (reply , & ret , 1 , OPAL_INT ))) {
233- ORTE_ERROR_LOG (rc );
198+ (int )coll -> nexpected , (int )coll -> nreported ));
199+
200+ /* see if everyone has reported */
201+ if (coll -> nreported == coll -> nexpected ) {
202+ if (ORTE_PROC_IS_HNP ) {
203+ OPAL_OUTPUT_VERBOSE ((1 , orte_grpcomm_base_framework .framework_output ,
204+ "%s grpcomm:direct allgather HNP reports complete" ,
205+ ORTE_NAME_PRINT (ORTE_PROC_MY_NAME )));
206+ /* the allgather is complete - send the xcast */
207+ reply = OBJ_NEW (opal_buffer_t );
208+ /* pack the signature */
209+ if (OPAL_SUCCESS != (rc = opal_dss .pack (reply , & sig , 1 , ORTE_SIGNATURE ))) {
210+ ORTE_ERROR_LOG (rc );
211+ OBJ_RELEASE (reply );
212+ OBJ_RELEASE (sig );
213+ return ;
214+ }
215+ /* pack the status - success since the allgather completed. This
216+ * would be an error if we timeout instead */
217+ ret = ORTE_SUCCESS ;
218+ if (OPAL_SUCCESS != (rc = opal_dss .pack (reply , & ret , 1 , OPAL_INT ))) {
219+ ORTE_ERROR_LOG (rc );
220+ OBJ_RELEASE (reply );
221+ OBJ_RELEASE (sig );
222+ return ;
223+ }
224+ /* transfer the collected bucket */
225+ opal_dss .copy_payload (reply , & coll -> bucket );
226+ /* send the release via xcast */
227+ (void )orte_grpcomm .xcast (sig , ORTE_RML_TAG_COLL_RELEASE , reply );
234228 OBJ_RELEASE (reply );
235- OBJ_RELEASE (sig );
236- return ;
229+ } else {
230+ OPAL_OUTPUT_VERBOSE ((1 , orte_grpcomm_base_framework .framework_output ,
231+ "%s grpcomm:direct allgather rollup complete - sending to %s" ,
232+ ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ),
233+ ORTE_NAME_PRINT (ORTE_PROC_MY_PARENT )));
234+ /* relay the bucket upward */
235+ reply = OBJ_NEW (opal_buffer_t );
236+ /* pack the signature */
237+ if (OPAL_SUCCESS != (rc = opal_dss .pack (reply , & sig , 1 , ORTE_SIGNATURE ))) {
238+ ORTE_ERROR_LOG (rc );
239+ OBJ_RELEASE (reply );
240+ OBJ_RELEASE (sig );
241+ return ;
242+ }
243+ /* transfer the collected bucket */
244+ opal_dss .copy_payload (reply , & coll -> bucket );
245+ /* send the info to our parent */
246+ rc = orte_rml .send_buffer_nb (ORTE_PROC_MY_PARENT , reply ,
247+ ORTE_RML_TAG_ALLGATHER_DIRECT ,
248+ orte_rml_send_callback , NULL );
237249 }
238- /* transfer the collected bucket */
239- opal_dss .copy_payload (reply , & coll -> bucket );
240-
241- /* send the release via xcast */
242- (void )orte_grpcomm .xcast (sig , ORTE_RML_TAG_COLL_RELEASE , reply );
243- OBJ_RELEASE (reply );
244250 }
245251 OBJ_RELEASE (sig );
246252}
0 commit comments