@@ -159,63 +159,56 @@ __opal_attribute_always_inline__ static inline int
159159mca_part_persist_progress (void )
160160{
161161 mca_part_persist_list_t * current ;
162- int err ;
162+ int err = OMPI_SUCCESS ;
163+ int completed = 0 ;
163164 size_t i ;
164165
165166 /* prevent re-entry, */
166167 int block_entry = opal_atomic_add_fetch_32 (& (ompi_part_persist .block_entry ), 1 );
167- if (1 < block_entry )
168- {
168+ if (1 < block_entry ) {
169169 block_entry = opal_atomic_add_fetch_32 (& (ompi_part_persist .block_entry ), -1 );
170- return OMPI_SUCCESS ;
170+ return completed ;
171171 }
172172
173173 OPAL_THREAD_LOCK (& ompi_part_persist .lock );
174174
175175 mca_part_persist_request_t * to_delete = NULL ;
176176
177177 /* Don't do anything till a function in the module is called. */
178- if (-1 == ompi_part_persist .init_world )
179- {
180- OPAL_THREAD_UNLOCK (& ompi_part_persist .lock );
181- block_entry = opal_atomic_add_fetch_32 (& (ompi_part_persist .block_entry ), -1 );
182- return OMPI_SUCCESS ;
178+ if (-1 == ompi_part_persist .init_world ) {
179+ goto end_part_progress ;
183180 }
184181
185182 /* Can't do anything if we don't have world */
186183 if (0 == ompi_part_persist .init_world ) {
187184 ompi_part_persist .my_world_rank = ompi_comm_rank (& ompi_mpi_comm_world .comm );
188185 err = ompi_comm_idup (& ompi_mpi_comm_world .comm , & ompi_part_persist .part_comm , & ompi_part_persist .part_comm_req );
189- if (err != OMPI_SUCCESS ) {
190- exit (-1 );
191- }
186+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
192187 ompi_part_persist .part_comm_ready = 0 ;
193188 err = ompi_comm_idup (& ompi_mpi_comm_world .comm , & ompi_part_persist .part_comm_setup , & ompi_part_persist .part_comm_sreq );
194- if (err != OMPI_SUCCESS ) {
195- exit (-1 );
196- }
189+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
197190 ompi_part_persist .part_comm_sready = 0 ;
198191 ompi_part_persist .init_world = 1 ;
199192
200- OPAL_THREAD_UNLOCK (& ompi_part_persist .lock );
201- block_entry = opal_atomic_add_fetch_32 (& (ompi_part_persist .block_entry ), -1 );
202- return OMPI_SUCCESS ;
193+ completed ++ ;
194+ goto end_part_progress ;
203195 }
204196
205197 /* Check to see if Comms are setup */
206198 if (0 == ompi_part_persist .init_comms ) {
207199 if (0 == ompi_part_persist .part_comm_ready ) {
208- ompi_request_test (& ompi_part_persist .part_comm_req , & ompi_part_persist .part_comm_ready , MPI_STATUS_IGNORE );
200+ err = ompi_request_test (& ompi_part_persist .part_comm_req , & ompi_part_persist .part_comm_ready , MPI_STATUS_IGNORE );
201+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
209202 }
210203 if (0 == ompi_part_persist .part_comm_sready ) {
211- ompi_request_test (& ompi_part_persist .part_comm_sreq , & ompi_part_persist .part_comm_sready , MPI_STATUS_IGNORE );
204+ err = ompi_request_test (& ompi_part_persist .part_comm_sreq , & ompi_part_persist .part_comm_sready , MPI_STATUS_IGNORE );
205+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
212206 }
213207 if (0 != ompi_part_persist .part_comm_ready && 0 != ompi_part_persist .part_comm_sready ) {
214208 ompi_part_persist .init_comms = 1 ;
215209 }
216- OPAL_THREAD_UNLOCK (& ompi_part_persist .lock );
217- block_entry = opal_atomic_add_fetch_32 (& (ompi_part_persist .block_entry ), -1 );
218- return OMPI_SUCCESS ;
210+ completed ++ ;
211+ goto end_part_progress ;
219212 }
220213
221214 OPAL_LIST_FOREACH (current , ompi_part_persist .progress_list , mca_part_persist_list_t ) {
@@ -227,10 +220,12 @@ mca_part_persist_progress(void)
227220
228221 if (true == req -> flag_post_setup_recv ) {
229222 err = MCA_PML_CALL (irecv (& (req -> setup_info [1 ]), sizeof (struct ompi_mca_persist_setup_t ), MPI_BYTE , OMPI_ANY_SOURCE , req -> my_recv_tag , ompi_part_persist .part_comm_setup , & req -> setup_req [1 ]));
223+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
230224 req -> flag_post_setup_recv = false;
231225 }
232226
233- ompi_request_test (& (req -> setup_req [1 ]), & done , MPI_STATUS_IGNORE );
227+ err = ompi_request_test (& (req -> setup_req [1 ]), & done , MPI_STATUS_IGNORE );
228+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
234229
235230 if (done ) {
236231 size_t dt_size_ ;
@@ -241,15 +236,16 @@ mca_part_persist_progress(void)
241236 req -> world_peer = req -> setup_info [1 ].world_rank ;
242237
243238 err = opal_datatype_type_size (& (req -> req_datatype -> super ), & dt_size_ );
244- if (OMPI_SUCCESS != err ) return OMPI_ERROR ;
239+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
245240 dt_size = (dt_size_ > (size_t ) UINT_MAX ) ? MPI_UNDEFINED : (uint32_t ) dt_size_ ;
246241 uint32_t bytes = req -> real_count * dt_size ;
247242
248243 /* Set up persistent sends */
249244 req -> persist_reqs = (ompi_request_t * * ) malloc (sizeof (ompi_request_t * )* (req -> real_parts ));
250245 for (i = 0 ; i < req -> real_parts ; i ++ ) {
251- void * buf = ((void * ) (((char * )req -> req_addr ) + (bytes * i )));
252- err = MCA_PML_CALL (isend_init (buf , req -> real_count , req -> req_datatype , req -> world_peer , req -> my_send_tag + i , MCA_PML_BASE_SEND_STANDARD , ompi_part_persist .part_comm , & (req -> persist_reqs [i ])));
246+ void * buf = ((void * ) (((char * )req -> req_addr ) + (bytes * i )));
247+ err = MCA_PML_CALL (isend_init (buf , req -> real_count , req -> req_datatype , req -> world_peer , req -> my_send_tag + i , MCA_PML_BASE_SEND_STANDARD , ompi_part_persist .part_comm , & (req -> persist_reqs [i ])));
248+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
253249 }
254250 } else {
255251 /* parse message */
@@ -262,37 +258,38 @@ mca_part_persist_progress(void)
262258
263259
264260 err = opal_datatype_type_size (& (req -> req_datatype -> super ), & dt_size_ );
265- if (OMPI_SUCCESS != err ) return OMPI_ERROR ;
261+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
266262 dt_size = (dt_size_ > (size_t ) UINT_MAX ) ? MPI_UNDEFINED : (uint32_t ) dt_size_ ;
267263 uint32_t bytes = req -> real_count * dt_size ;
268264
269265
270-
271- /* Set up persistent sends */
266+ /* Set up persistent sends */
272267 req -> persist_reqs = (ompi_request_t * * ) malloc (sizeof (ompi_request_t * )* (req -> real_parts ));
273268 req -> flags = (int * ) calloc (req -> real_parts ,sizeof (int ));
274-
275269 if (req -> real_dt_size == dt_size ) {
276-
277- for (i = 0 ; i < req -> real_parts ; i ++ ) {
270+ for (i = 0 ; i < req -> real_parts ; i ++ ) {
278271 void * buf = ((void * ) (((char * )req -> req_addr ) + (bytes * i )));
279272 err = MCA_PML_CALL (irecv_init (buf , req -> real_count , req -> req_datatype , req -> world_peer , req -> my_send_tag + i , ompi_part_persist .part_comm , & (req -> persist_reqs [i ])));
273+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
280274 }
281275 } else {
282276 for (i = 0 ; i < req -> real_parts ; i ++ ) {
283277 void * buf = ((void * ) (((char * )req -> req_addr ) + (req -> real_count * req -> real_dt_size * i )));
284278 err = MCA_PML_CALL (irecv_init (buf , req -> real_count * req -> real_dt_size , MPI_BYTE , req -> world_peer , req -> my_send_tag + i , ompi_part_persist .part_comm , & (req -> persist_reqs [i ])));
279+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
285280 }
286- }
287- err = req -> persist_reqs [0 ]-> req_start (req -> real_parts , (& (req -> persist_reqs [0 ])));
281+ }
282+ err = req -> persist_reqs [0 ]-> req_start (req -> real_parts , (& (req -> persist_reqs [0 ])));
283+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
288284
289285 /* Send back a message */
290286 req -> setup_info [0 ].world_rank = ompi_part_persist .my_world_rank ;
291287 err = MCA_PML_CALL (isend (& (req -> setup_info [0 ]), sizeof (struct ompi_mca_persist_setup_t ), MPI_BYTE , req -> world_peer , req -> my_recv_tag , MCA_PML_BASE_SEND_STANDARD , ompi_part_persist .part_comm_setup , & req -> setup_req [0 ]));
292- if (OMPI_SUCCESS != err ) return OMPI_ERROR ;
288+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
293289 }
294290
295- req -> initialized = true;
291+ completed ++ ;
292+ req -> initialized = true;
296293 }
297294 } else {
298295 if (false == req -> req_part_complete && REQUEST_COMPLETED != req -> req_ompi .req_complete && OMPI_REQUEST_ACTIVE == req -> req_ompi .req_state ) {
@@ -301,21 +298,27 @@ mca_part_persist_progress(void)
301298 /* Check to see if partition is queued for being started. Only applicable to sends. */
302299 if (-2 == req -> flags [i ]) {
303300 err = req -> persist_reqs [i ]-> req_start (1 , (& (req -> persist_reqs [i ])));
301+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
304302 req -> flags [i ] = 0 ;
305303 }
306304
307305 if (0 == req -> flags [i ] && OMPI_REQUEST_ACTIVE == req -> persist_reqs [i ]-> req_state ) {
308- ompi_request_test (& (req -> persist_reqs [i ]), & (req -> flags [i ]), MPI_STATUS_IGNORE );
309- if (0 != req -> flags [i ]) req -> done_count ++ ;
306+ err = ompi_request_test (& (req -> persist_reqs [i ]), & (req -> flags [i ]), MPI_STATUS_IGNORE );
307+ if (OMPI_SUCCESS != err ) goto end_part_progress ;
308+ if (0 != req -> flags [i ]) {
309+ req -> done_count ++ ;
310+ }
310311 }
311312 }
312313
313314 /* Check for completion and complete the requests */
314- if (req -> done_count == req -> real_parts )
315- {
315+ if (req -> done_count == req -> real_parts ) {
316316 req -> first_send = false;
317317 mca_part_persist_complete (req );
318- }
318+ completed ++ ;
319+ } else if (req -> done_count > req -> real_parts ) {
320+ ompi_rte_abort (OMPI_ERR_FATAL , "internal part request done count is %d > %d" , req -> done_count , req -> real_parts );
321+ }
319322 }
320323
321324 if (true == req -> req_free_called && true == req -> req_part_complete && REQUEST_COMPLETED == req -> req_ompi .req_complete && OMPI_REQUEST_INACTIVE == req -> req_ompi .req_state ) {
@@ -328,10 +331,14 @@ mca_part_persist_progress(void)
328331 err = mca_part_persist_free_req (to_delete );
329332 }
330333
334+ end_part_progress :
335+ if (OMPI_SUCCESS != err ) {
336+ ompi_rte_abort (err , "part progress internal failure" );
337+ }
331338 OPAL_THREAD_UNLOCK (& ompi_part_persist .lock );
332339 block_entry = opal_atomic_add_fetch_32 (& (ompi_part_persist .block_entry ), -1 );
333340
334- return OMPI_SUCCESS ;
341+ return completed ;
335342}
336343
337344__opal_attribute_always_inline__ static inline int
0 commit comments