1- /* -*- Mode: C; c-basic-offset:4 ; -*- */
1+ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
22/*
33 * Copyright (c) 2007 The Trustees of Indiana University.
44 * All rights reserved.
55 * Copyright (c) 2011-2015 Cisco Systems, Inc. All rights reserved.
6- * Copyright (c) 2011-2013 Los Alamos National Security, LLC. All
7- * rights reserved.
6+ * Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights
7+ * reserved.
88 * Copyright (c) 2014-2015 Intel, Inc. All rights reserved.
99 * Copyright (c) 2014 Mellanox Technologies, Inc.
1010 * All rights reserved.
@@ -74,8 +74,6 @@ static void finalize(void)
7474{
7575 /* cancel the recv */
7676 orte_rml .recv_cancel (ORTE_NAME_WILDCARD , ORTE_RML_TAG_ALLGATHER_BRKS );
77-
78- return ;
7977}
8078
8179static int allgather (orte_grpcomm_coll_t * coll ,
@@ -84,18 +82,37 @@ static int allgather(orte_grpcomm_coll_t *coll,
8482 OPAL_OUTPUT_VERBOSE ((5 , orte_grpcomm_base_framework .framework_output ,
8583 "%s grpcomm:coll:bruck algo employed for %d processes" ,
8684 ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ), (int )coll -> ndmns ));
85+ /* get my own rank */
86+ coll -> my_rank = ORTE_VPID_INVALID ;
87+ for (orte_vpid_t nv = 0 ; nv < coll -> ndmns ; nv ++ ) {
88+ if (coll -> dmns [nv ] == ORTE_PROC_MY_NAME -> vpid ) {
89+ coll -> my_rank = nv ;
90+ break ;
91+ }
92+ }
93+
94+ /* check for bozo case */
95+ if (ORTE_VPID_INVALID == coll -> my_rank ) {
96+ OPAL_OUTPUT ((orte_grpcomm_base_framework .framework_output ,
97+ "Peer not found" ));
98+ ORTE_ERROR_LOG (ORTE_ERR_NOT_FOUND );
99+ brks_finalize_coll (coll , ORTE_ERR_NOT_FOUND );
100+ return ORTE_ERR_NOT_FOUND ;
101+ }
87102
88103 /* record that we contributed */
89104 coll -> nreported = 1 ;
90105
91106 /* mark local data received */
92- coll -> distance_mask_recv = (uint32_t * )calloc (sizeof (uint32_t ), (coll -> ndmns - 1 ));
107+ if (coll -> ndmns > 1 ) {
108+ opal_bitmap_init (& coll -> distance_mask_recv , (uint32_t ) log2 (coll -> ndmns ) + 1 );
109+ }
93110
94111 /* start by seeding the collection with our own data */
95112 opal_dss .copy_payload (& coll -> bucket , sendbuf );
96113
97114 /* process data */
98- brks_allgather_process_data (coll , 1 );
115+ brks_allgather_process_data (coll , 0 );
99116
100117 return ORTE_SUCCESS ;
101118}
@@ -118,6 +135,12 @@ static int brks_allgather_send_dist(orte_grpcomm_coll_t *coll, orte_process_name
118135 OBJ_RELEASE (send_buf );
119136 return rc ;
120137 }
138+ /* pack the number of daemons included in the payload */
139+ if (OPAL_SUCCESS != (rc = opal_dss .pack (send_buf , & coll -> nreported , 1 , OPAL_SIZE ))) {
140+ ORTE_ERROR_LOG (rc );
141+ OBJ_RELEASE (send_buf );
142+ return rc ;
143+ }
121144 /* pack the data */
122145 if (OPAL_SUCCESS != (rc = opal_dss .copy_payload (send_buf , & coll -> bucket ))) {
123146 ORTE_ERROR_LOG (rc );
@@ -142,76 +165,116 @@ static int brks_allgather_send_dist(orte_grpcomm_coll_t *coll, orte_process_name
142165 return ORTE_SUCCESS ;
143166}
144167
168+ static int brks_allgather_process_buffered (orte_grpcomm_coll_t * coll , uint32_t distance ) {
169+ opal_buffer_t * buffer ;
170+ size_t nreceived ;
171+ int32_t cnt = 1 ;
172+ int rc ;
173+
174+ /* check whether data for next distance is available*/
175+ if (NULL == coll -> buffers || NULL == coll -> buffers [distance ]) {
176+ return 0 ;
177+ }
178+
179+ buffer = coll -> buffers [distance ];
180+ coll -> buffers [distance ] = NULL ;
181+
182+ OPAL_OUTPUT_VERBOSE ((80 , orte_grpcomm_base_framework .framework_output ,
183+ "%s grpcomm:coll:brks %u distance data found" ,
184+ ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ), distance ));
185+ rc = opal_dss .unpack (buffer , & nreceived , & cnt , OPAL_SIZE );
186+ if (OPAL_SUCCESS != rc ) {
187+ ORTE_ERROR_LOG (rc );
188+ brks_finalize_coll (coll , rc );
189+ return rc ;
190+ }
191+
192+ if (OPAL_SUCCESS != (rc = opal_dss .copy_payload (& coll -> bucket , buffer ))) {
193+ ORTE_ERROR_LOG (rc );
194+ brks_finalize_coll (coll , rc );
195+ return rc ;
196+ }
197+
198+ coll -> nreported += nreceived ;
199+ orte_grpcomm_base_mark_distance_recv (coll , distance );
200+ OBJ_RELEASE (buffer );
201+
202+ return 1 ;
203+ }
204+
145205static void brks_allgather_process_data (orte_grpcomm_coll_t * coll , uint32_t distance ) {
146206 /* Communication step:
147207 At every step i, rank r:
148208 - doubles the distance
149209 - sends message containing all data collected so far to rank r - distance
150210 - receives message containing all data collected so far from rank (r + distance)
151211 */
212+ uint32_t log2ndmns = (uint32_t ) log2 (coll -> ndmns );
213+ uint32_t last_round , remainder ;
152214 orte_process_name_t peer ;
153- orte_vpid_t nv , rank ;
215+ orte_vpid_t nv ;
154216 int rc ;
155217
156- peer .jobid = ORTE_PROC_MY_NAME -> jobid ;
218+ /* NTH: calculate in which round we should send the final data. this is the first
219+ * round in which we have data from at least (coll->ndmns - (1 << log2ndmns))
220+ * daemons. alternatively we could just send when distance reaches log2ndmns but
221+ * that could end up sending more data than needed */
222+ last_round = (uint32_t ) ceil (log2 ((double ) (coll -> ndmns - (1 << log2ndmns ))));
157223
158- /* get my own rank */
159- rank = ORTE_VPID_INVALID ;
160- for (orte_vpid_t nv = 0 ; nv < coll -> ndmns ; nv ++ ) {
161- if (coll -> dmns [nv ] == ORTE_PROC_MY_NAME -> vpid ) {
162- rank = nv ;
163- break ;
164- }
165- }
166- /* check for bozo case */
167- if (ORTE_VPID_INVALID == rank ) {
168- OPAL_OUTPUT ((orte_grpcomm_base_framework .framework_output ,
169- "Peer not found" ));
170- ORTE_ERROR_LOG (ORTE_ERR_NOT_FOUND );
171- brks_finalize_coll (coll , ORTE_ERR_NOT_FOUND );
172- return ;
173- }
224+ peer .jobid = ORTE_PROC_MY_NAME -> jobid ;
174225
175- while (distance < coll -> ndmns ) {
226+ while (distance < log2ndmns ) {
176227 OPAL_OUTPUT_VERBOSE ((80 , orte_grpcomm_base_framework .framework_output ,
177228 "%s grpcomm:coll:brks process distance %u)" ,
178229 ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ), distance ));
179230
180231 /* first send my current contents */
181- nv = (coll -> ndmns + rank - distance ) % coll -> ndmns ;
232+ nv = (coll -> ndmns + coll -> my_rank - ( 1 << distance ) ) % coll -> ndmns ;
182233 peer .vpid = coll -> dmns [nv ];
183234
184235 brks_allgather_send_dist (coll , & peer , distance );
185236
186- /* check whether data for next distance is available*/
187- if ((NULL != coll -> buffers ) && (coll -> buffers [distance - 1 ] != NULL )) {
188- OPAL_OUTPUT_VERBOSE ((80 , orte_grpcomm_base_framework .framework_output ,
189- "%s grpcomm:coll:brks %u distance data found" ,
190- ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ), distance ));
191- if (OPAL_SUCCESS != (rc = opal_dss .copy_payload (& coll -> bucket , coll -> buffers [distance - 1 ]))) {
192- ORTE_ERROR_LOG (rc );
193- brks_finalize_coll (coll , rc );
194- return ;
195- }
196- coll -> nreported += distance ;
197- orte_grpcomm_base_mark_distance_recv (coll , distance );
198- OBJ_RELEASE (coll -> buffers [distance - 1 ]);
199- coll -> buffers [distance - 1 ] = NULL ;
200- distance = distance << 1 ;
201- continue ;
237+ if (distance == last_round ) {
238+ /* have enough data to send the final round now */
239+ nv = (coll -> ndmns + coll -> my_rank - (1 << log2ndmns )) % coll -> ndmns ;
240+ peer .vpid = coll -> dmns [nv ];
241+ brks_allgather_send_dist (coll , & peer , log2ndmns );
242+ }
243+
244+ rc = brks_allgather_process_buffered (coll , distance );
245+ if (!rc ) {
246+ break ;
247+ } else if (rc < 0 ) {
248+ return ;
249+ }
250+
251+ ++ distance ;
252+ }
253+
254+ if (distance == log2ndmns ) {
255+ if (distance == last_round ) {
256+ /* need to send the final round now */
257+ nv = (coll -> ndmns + coll -> my_rank - (1 << log2ndmns )) % coll -> ndmns ;
258+ peer .vpid = coll -> dmns [nv ];
259+ brks_allgather_send_dist (coll , & peer , log2ndmns );
260+ }
261+
262+ /* check if the final message is already queued */
263+ rc = brks_allgather_process_buffered (coll , distance );
264+ if (rc < 0 ) {
265+ return ;
202266 }
203- break ;
204267 }
268+
205269 OPAL_OUTPUT_VERBOSE ((80 , orte_grpcomm_base_framework .framework_output ,
206270 "%s grpcomm:coll:brks reported %lu process from %lu" ,
207271 ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ), (unsigned long )coll -> nreported ,
208272 (unsigned long )coll -> ndmns ));
209273
210- /* if we are done, then complete things */
274+ /* if we are done, then complete things. we may get data from more daemons than expected */
211275 if (coll -> nreported >= coll -> ndmns ){
212276 brks_finalize_coll (coll , ORTE_SUCCESS );
213277 }
214- return ;
215278}
216279
217280static void brks_allgather_recv_dist (int status , orte_process_name_t * sender ,
@@ -253,43 +316,51 @@ static void brks_allgather_recv_dist(int status, orte_process_name_t* sender,
253316 assert (0 == orte_grpcomm_base_check_distance_recv (coll , distance ));
254317
255318 /* Check whether we can process next distance */
256- if (orte_grpcomm_base_check_distance_recv (coll , (distance >> 1 ))) {
319+ if (coll -> nreported && (!distance || orte_grpcomm_base_check_distance_recv (coll , distance - 1 ))) {
320+ size_t nreceived ;
257321 OPAL_OUTPUT_VERBOSE ((80 , orte_grpcomm_base_framework .framework_output ,
258322 "%s grpcomm:coll:brks data from %d distance received, "
259323 "Process the next distance." ,
260324 ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ), distance ));
261325 /* capture any provided content */
326+ rc = opal_dss .unpack (buffer , & nreceived , & cnt , OPAL_SIZE );
327+ if (OPAL_SUCCESS != rc ) {
328+ OBJ_RELEASE (sig );
329+ ORTE_ERROR_LOG (rc );
330+ brks_finalize_coll (coll , rc );
331+ return ;
332+ }
262333 if (OPAL_SUCCESS != (rc = opal_dss .copy_payload (& coll -> bucket , buffer ))) {
263334 OBJ_RELEASE (sig );
264335 ORTE_ERROR_LOG (rc );
265336 brks_finalize_coll (coll , rc );
266337 return ;
267338 }
268- coll -> nreported += distance ;
339+ coll -> nreported += nreceived ;
269340 orte_grpcomm_base_mark_distance_recv (coll , distance );
270- brks_allgather_process_data (coll , ( uint32_t )( distance << 1 ) );
341+ brks_allgather_process_data (coll , distance + 1 );
271342 } else {
272343 OPAL_OUTPUT_VERBOSE ((80 , orte_grpcomm_base_framework .framework_output ,
273344 "%s grpcomm:coll:brks data from %d distance received, "
274345 "still waiting for data." ,
275346 ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ), distance ));
276347 if (NULL == coll -> buffers ) {
277- if (NULL == (coll -> buffers = (opal_buffer_t * * )calloc ( sizeof ( opal_buffer_t * ), coll -> ndmns - 1 ))) {
348+ if (NULL == (coll -> buffers = (opal_buffer_t * * ) calloc (( uint32_t ) log2 ( coll -> ndmns ) + 1 , sizeof ( opal_buffer_t * ) ))) {
278349 rc = OPAL_ERR_OUT_OF_RESOURCE ;
279350 OBJ_RELEASE (sig );
280351 ORTE_ERROR_LOG (rc );
281352 brks_finalize_coll (coll , rc );
282353 return ;
283354 }
284355 }
285- if (NULL == (coll -> buffers [distance - 1 ] = OBJ_NEW (opal_buffer_t ))) {
356+ if (NULL == (coll -> buffers [distance ] = OBJ_NEW (opal_buffer_t ))) {
286357 rc = OPAL_ERR_OUT_OF_RESOURCE ;
287358 OBJ_RELEASE (sig );
288359 ORTE_ERROR_LOG (rc );
289360 brks_finalize_coll (coll , rc );
290361 return ;
291362 }
292- if (OPAL_SUCCESS != (rc = opal_dss .copy_payload (coll -> buffers [distance - 1 ], buffer ))) {
363+ if (OPAL_SUCCESS != (rc = opal_dss .copy_payload (coll -> buffers [distance ], buffer ))) {
293364 OBJ_RELEASE (sig );
294365 ORTE_ERROR_LOG (rc );
295366 brks_finalize_coll (coll , rc );
@@ -298,8 +369,6 @@ static void brks_allgather_recv_dist(int status, orte_process_name_t* sender,
298369 }
299370
300371 OBJ_RELEASE (sig );
301-
302- return ;
303372}
304373
305374static int brks_finalize_coll (orte_grpcomm_coll_t * coll , int ret )
0 commit comments