11/*
22 * Copyright (c) 2010-2012 Oak Ridge National Labs. All rights reserved.
3- * Copyright (c) 2011-2020 The University of Tennessee and The University
3+ * Copyright (c) 2011-2022 The University of Tennessee and The University
44 *
55 * of Tennessee Research Foundation. All rights
66 * reserved.
@@ -30,7 +30,7 @@ ompi_comm_rank_failure_callback_t *ompi_rank_failure_cbfunc = NULL;
3030 * The handling of known failed processes is based on a two level process. On one
3131 * side the MPI library itself must know the failed processes (in order to be able
3232 * to correctly handle complex operations such as shrink). On the other side, the
33- * failed processes acknowledged by the users shuould not be altered during any of
33+ * failed processes acknowledged by the users should not be altered during any of
3434 * the internal calls, as they must only be updated upon user request.
3535 * Thus, the global list (ompi_group_all_failed_procs) is the list of all known
3636 * failed processes (by the MPI library internals), and it is allegedly updated
@@ -39,6 +39,7 @@ ompi_comm_rank_failure_callback_t *ompi_rank_failure_cbfunc = NULL;
3939 * in the context of a communicator. Thus, using a single index to know the user-level
4040 * acknowledged failure is the simplest solution.
4141 */
42+ /* deprecated ulfm v1 API */
4243int ompi_comm_failure_ack_internal (ompi_communicator_t * comm )
4344{
4445 opal_mutex_lock (& ompi_group_afp_mutex );
@@ -49,11 +50,13 @@ int ompi_comm_failure_ack_internal(ompi_communicator_t* comm)
4950 /* use the AFP lock implicit memory barrier to propagate the update to
5051 * any_source_enabled at the same time.
5152 */
53+ comm -> num_acked = -1 ; /* compat with v2 API: force recompute next time ack_failed is called */
5254 opal_mutex_unlock (& ompi_group_afp_mutex );
5355
5456 return OMPI_SUCCESS ;
5557}
5658
59+ /* deprecated ulfm v1 API; used internally in MPI_COMM_AGREE as well */
5760int ompi_comm_failure_get_acked_internal (ompi_communicator_t * comm , ompi_group_t * * group )
5861{
5962 int ret , exit_status = OMPI_SUCCESS ;
@@ -114,6 +117,105 @@ int ompi_comm_failure_get_acked_internal(ompi_communicator_t* comm, ompi_group_t
114117 return exit_status ;
115118}
116119
120+ /* New v2 interface get_failed/ack_failed.
121+ * This interface uses a cached value comm->num_acked to track how many
122+ * processes in the group of this comm have been acknowledged in prior calls.
123+ * For compatibility with v1 interface (failure_get_acked), it still updates
124+ * the comm->any_source_offset, and the v1 interface failure_ack may erase the
125+ * cached value comm->num_acked with -1 to force recomputing this value in mixed
126+ * use cases (that is calling failure_ack will force full recomputation of
127+ * comm->num_acked during the next ack_failed call). */
128+ int ompi_comm_ack_failed_internal (ompi_communicator_t * comm , int num_to_ack , int * num_acked ) {
129+ int ret , exit_status = MPI_SUCCESS ;
130+ int nf = -1 , na = -1 ;
131+ ompi_group_t * c_group = OMPI_COMM_IS_INTER (comm )? comm -> c_local_group : comm -> c_remote_group ;
132+ ompi_group_t * failed_group = NULL ;
133+
134+ opal_mutex_lock (& ompi_group_afp_mutex );
135+
136+ /* shortcut when reading only */
137+ if (num_to_ack <= comm -> num_acked )
138+ goto return_num_acked ;
139+
140+ /* shortcut when no new faults */
141+ if (comm -> any_source_offset == ompi_group_size (ompi_group_all_failed_procs )
142+ && comm -> num_acked >= 0 /* reset by a call to v1 API? */ )
143+ goto return_num_acked ;
144+
145+ /* compute num_acked */
146+ ret = ompi_group_intersection (ompi_group_all_failed_procs ,
147+ c_group , & failed_group );
148+ if (OMPI_SUCCESS != ret ) {
149+ exit_status = ret ;
150+ goto cleanup ;
151+ }
152+ nf = ompi_group_size (failed_group );
153+ na = (num_to_ack < nf )? num_to_ack : nf ; /* never ack more than requested */
154+
155+ if (comm -> num_acked < 0 ) { /* reset by a call to the v1 API: recompute it */
156+ if (0 == comm -> any_source_offset ) comm -> num_acked = 0 ;
157+ else {
158+ int aso = comm -> any_source_offset - 1 ;
159+ ret = ompi_group_translate_ranks (ompi_group_all_failed_procs , 1 , & aso ,
160+ failed_group , & comm -> num_acked );
161+ comm -> num_acked ++ ; /* make it a group size again */
162+ if (OMPI_SUCCESS != ret ) {
163+ exit_status = ret ;
164+ goto cleanup ;
165+ }
166+ }
167+ }
168+
169+ if (comm -> num_acked < na ) { /* comm->num_acked needs to be updated during this call */
170+ comm -> num_acked = na ;
171+ if (nf == na ) {
172+ /* all faults on comm acknowledged, resume any source then */
173+ comm -> any_source_enabled = true;
174+ comm -> any_source_offset = ompi_group_size (ompi_group_all_failed_procs ); // compat with v1 interface
175+ }
176+ else {
177+ /* some faults not acknowledged, do not resume any source then, but
178+ * still update any_source_offset */
179+ assert (comm -> num_acked > 0 );
180+ int cna = comm -> num_acked - 1 ;
181+ ret = ompi_group_translate_ranks (failed_group , 1 , & cna ,
182+ ompi_group_all_failed_procs , & comm -> any_source_offset ); // compat with v1 interface
183+ comm -> any_source_offset ++ ; /* make it a group size again */
184+ if (OMPI_SUCCESS != ret ) {
185+ exit_status = ret ;
186+ goto cleanup ;
187+ }
188+ }
189+ }
190+
191+ return_num_acked :
192+ * num_acked = comm -> num_acked ;
193+
194+ cleanup :
195+ if (NULL != failed_group ) OBJ_RELEASE (failed_group );
196+ /* use the AFP lock implicit memory barrier to propagate the update to
197+ * any_source_enabled, num_acked, etc. at the same time.
198+ */
199+ opal_mutex_unlock (& ompi_group_afp_mutex );
200+
201+ return exit_status ;
202+ }
203+
204+ int ompi_comm_get_failed_internal (ompi_communicator_t * comm , ompi_group_t * * group )
205+ {
206+ int ret , exit_status = OMPI_SUCCESS ;
207+ ompi_group_t * c_group = OMPI_COMM_IS_INTER (comm )? comm -> c_local_group : comm -> c_remote_group ;
208+ opal_mutex_lock (& ompi_group_afp_mutex );
209+ ret = ompi_group_intersection (ompi_group_all_failed_procs ,
210+ c_group ,
211+ group );
212+ opal_mutex_unlock (& ompi_group_afp_mutex );
213+ if ( OMPI_SUCCESS != ret ) {
214+ exit_status = ret ;
215+ }
216+ return exit_status ;
217+ }
218+
117219int ompi_comm_shrink_internal (ompi_communicator_t * comm , ompi_communicator_t * * newcomm )
118220{
119221 int ret , exit_status = OMPI_SUCCESS ;
0 commit comments