2929#include "opal/class/opal_free_list.h"
3030#include "opal/class/opal_hash_table.h"
3131#include "opal/threads/threads.h"
32- #include "opal/mca/btl/btl .h"
32+ #include "opal/util/output .h"
3333
3434#include "ompi/win/win.h"
35+ #include "ompi/info/info.h"
3536#include "ompi/communicator/communicator.h"
3637#include "ompi/datatype/ompi_datatype.h"
3738#include "ompi/request/request.h"
3839#include "ompi/mca/osc/osc.h"
3940#include "ompi/mca/osc/base/base.h"
40- #include "opal/mca/btl/btl.h"
41- #include "ompi/mca/bml/bml.h"
4241#include "ompi/memchecker.h"
4342
4443#include "osc_pt2pt_header.h"
44+ #include "osc_pt2pt_sync.h"
4545
4646BEGIN_C_DECLS
4747
@@ -85,6 +85,9 @@ struct ompi_osc_pt2pt_peer_t {
8585 /** make this an opal object */
8686 opal_object_t super ;
8787
88+ /** rank of this peer */
89+ int rank ;
90+
8891 /** pointer to the current send fragment for each outgoing target */
8992 struct ompi_osc_pt2pt_frag_t * active_frag ;
9093
@@ -94,25 +97,16 @@ struct ompi_osc_pt2pt_peer_t {
9497 /** fragments queued to this target */
9598 opal_list_t queued_frags ;
9699
97- /** number of acks pending. New requests can not be sent out if there are
98- * acks pending (to fulfill the ordering constraints of accumulate) */
99- uint32_t num_acks_pending ;
100-
101100 /** number of fragments incomming (negative - expected, positive - unsynchronized) */
102101 int32_t passive_incoming_frag_count ;
103102
104- /** peer is in an access epoch */
105- bool access_epoch ;
106-
107- /** eager sends are active to this peer */
108- bool eager_send_active ;
103+ /** unexpected post message arrived */
104+ bool unexpected_post ;
109105};
110106typedef struct ompi_osc_pt2pt_peer_t ompi_osc_pt2pt_peer_t ;
111107
112108OBJ_CLASS_DECLARATION (ompi_osc_pt2pt_peer_t );
113109
114- #define SEQ_INVALID 0xFFFFFFFFFFFFFFFFULL
115-
116110/** Module structure. Exactly one of these is associated with each
117111 PT2PT window */
118112struct ompi_osc_pt2pt_module_t {
@@ -122,6 +116,9 @@ struct ompi_osc_pt2pt_module_t {
122116 /** window should have accumulate ordering... */
123117 bool accumulate_ordering ;
124118
119+ /** no locks info key value */
120+ bool no_locks ;
121+
125122 /** pointer to free on cleanup (may be NULL) */
126123 void * free_after ;
127124
@@ -141,11 +138,11 @@ struct ompi_osc_pt2pt_module_t {
141138 /** condition variable associated with lock */
142139 opal_condition_t cond ;
143140
144- /** lock for atomic window updates from reductions */
145- opal_mutex_t acc_lock ;
141+ /** hash table of peer objects */
142+ opal_hash_table_t peer_hash ;
146143
147- /** peer data */
148- ompi_osc_pt2pt_peer_t * peers ;
144+ /** lock protecting peer_hash */
145+ opal_mutex_t peer_lock ;
149146
150147 /** Nmber of communication fragments started for this epoch, by
151148 peer. Not in peer data to make fence more manageable. */
@@ -166,29 +163,14 @@ struct ompi_osc_pt2pt_module_t {
166163 /* Next incoming buffer count at which we want a signal on cond */
167164 uint32_t active_incoming_frag_signal_count ;
168165
169- /* Number of flush ack requests send since beginning of time */
170- uint64_t flush_ack_requested_count ;
171- /* Number of flush ack replies received since beginning of
172- time. cond should be signalled on every flush reply
173- received. */
174- uint64_t flush_ack_received_count ;
175-
176166 /** Number of targets locked/being locked */
177167 unsigned int passive_target_access_epoch ;
178168
179- /** start sending data eagerly */
180- bool active_eager_send_active ;
181-
182- /** Indicates the window is in an all access epoch (fence, lock_all) */
183- bool all_access_epoch ;
169+ /** Indicates the window is in a pcsw or all access (fence, lock_all) epoch */
170+ ompi_osc_pt2pt_sync_t all_sync ;
184171
185172 /* ********************* PWSC data ************************ */
186173 struct ompi_group_t * pw_group ;
187- struct ompi_group_t * sc_group ;
188-
189- /** Number of "ping" messages from the remote post group we've
190- received */
191- int32_t num_post_msgs ;
192174
193175 /** Number of "count" messages from the remote complete group
194176 we've received */
@@ -207,9 +189,7 @@ struct ompi_osc_pt2pt_module_t {
207189 opal_list_t locks_pending ;
208190
209191 /** origin side list of locks currently outstanding */
210- opal_list_t outstanding_locks ;
211-
212- uint64_t lock_serial_number ;
192+ opal_hash_table_t outstanding_locks ;
213193
214194 unsigned char * incoming_buffer ;
215195 ompi_request_t * frag_request ;
@@ -218,10 +198,6 @@ struct ompi_osc_pt2pt_module_t {
218198 opal_atomic_lock_t accumulate_lock ;
219199 opal_list_t pending_acc ;
220200
221- /* enforce pscw matching */
222- /** list of unmatched post messages */
223- opal_list_t pending_posts ;
224-
225201 /** Lock for garbage collection lists */
226202 opal_mutex_t gc_lock ;
227203
@@ -234,6 +210,29 @@ struct ompi_osc_pt2pt_module_t {
234210typedef struct ompi_osc_pt2pt_module_t ompi_osc_pt2pt_module_t ;
235211OMPI_MODULE_DECLSPEC extern ompi_osc_pt2pt_component_t mca_osc_pt2pt_component ;
236212
213+ static inline ompi_osc_pt2pt_peer_t * ompi_osc_pt2pt_peer_lookup (ompi_osc_pt2pt_module_t * module ,
214+ int rank )
215+ {
216+ ompi_osc_pt2pt_peer_t * peer = NULL ;
217+ (void ) opal_hash_table_get_value_uint32 (& module -> peer_hash , rank , (void * * ) & peer );
218+
219+ if (OPAL_UNLIKELY (NULL == peer )) {
220+ OPAL_THREAD_LOCK (& module -> peer_lock );
221+ (void ) opal_hash_table_get_value_uint32 (& module -> peer_hash , rank , (void * * ) & peer );
222+
223+ if (NULL == peer ) {
224+ peer = OBJ_NEW (ompi_osc_pt2pt_peer_t );
225+ peer -> rank = rank ;
226+
227+ (void ) opal_hash_table_set_value_uint32 (& module -> peer_hash , rank , (void * ) peer );
228+ }
229+ OPAL_THREAD_UNLOCK (& module -> peer_lock );
230+ }
231+
232+ return peer ;
233+ }
234+
235+
237236struct ompi_osc_pt2pt_pending_t {
238237 opal_list_item_t super ;
239238 ompi_osc_pt2pt_module_t * module ;
@@ -262,23 +261,23 @@ int ompi_osc_pt2pt_put(const void *origin_addr,
262261 struct ompi_win_t * win );
263262
264263int ompi_osc_pt2pt_accumulate (const void * origin_addr ,
265- int origin_count ,
266- struct ompi_datatype_t * origin_dt ,
267- int target ,
268- OPAL_PTRDIFF_TYPE target_disp ,
269- int target_count ,
270- struct ompi_datatype_t * target_dt ,
271- struct ompi_op_t * op ,
272- struct ompi_win_t * win );
264+ int origin_count ,
265+ struct ompi_datatype_t * origin_dt ,
266+ int target ,
267+ OPAL_PTRDIFF_TYPE target_disp ,
268+ int target_count ,
269+ struct ompi_datatype_t * target_dt ,
270+ struct ompi_op_t * op ,
271+ struct ompi_win_t * win );
273272
274273int ompi_osc_pt2pt_get (void * origin_addr ,
275- int origin_count ,
276- struct ompi_datatype_t * origin_dt ,
277- int target ,
278- OPAL_PTRDIFF_TYPE target_disp ,
279- int target_count ,
280- struct ompi_datatype_t * target_dt ,
281- struct ompi_win_t * win );
274+ int origin_count ,
275+ struct ompi_datatype_t * origin_dt ,
276+ int target ,
277+ OPAL_PTRDIFF_TYPE target_disp ,
278+ int target_count ,
279+ struct ompi_datatype_t * target_dt ,
280+ struct ompi_win_t * win );
282281
283282int ompi_osc_pt2pt_compare_and_swap (const void * origin_addr ,
284283 const void * compare_addr ,
@@ -357,7 +356,10 @@ int ompi_osc_pt2pt_rget_accumulate(const void *origin_addr,
357356int ompi_osc_pt2pt_fence (int assert , struct ompi_win_t * win );
358357
359358/* received a post message */
360- int osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t * module , int source );
359+ void osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t * module , int source );
360+
361+ /* received a complete message */
362+ void osc_pt2pt_incoming_complete (ompi_osc_pt2pt_module_t * module , int source , int frag_count );
361363
362364int ompi_osc_pt2pt_start (struct ompi_group_t * group ,
363365 int assert ,
@@ -451,7 +453,8 @@ static inline void mark_incoming_completion (ompi_osc_pt2pt_module_t *module, in
451453 opal_condition_broadcast (& module -> cond );
452454 }
453455 } else {
454- ompi_osc_pt2pt_peer_t * peer = module -> peers + source ;
456+ ompi_osc_pt2pt_peer_t * peer = ompi_osc_pt2pt_peer_lookup (module , source );
457+
455458 OPAL_OUTPUT_VERBOSE ((50 , ompi_osc_base_framework .framework_output ,
456459 "mark_incoming_completion marking passive incoming complete. source = %d, count = %d" ,
457460 source , (int ) peer -> passive_incoming_frag_count + 1 ));
@@ -704,6 +707,16 @@ static inline int ompi_osc_pt2pt_accumulate_trylock (ompi_osc_pt2pt_module_t *mo
704707 return opal_atomic_trylock (& module -> accumulate_lock );
705708}
706709
710+ /**
711+ * @brief check if this process has this process is in a passive target access epoch
712+ *
713+ * @param[in] module osc pt2pt module
714+ */
715+ static inline bool ompi_osc_pt2pt_in_passive_epoch (ompi_osc_pt2pt_module_t * module )
716+ {
717+ return 0 != module -> passive_target_access_epoch ;
718+ }
719+
707720/**
708721 * ompi_osc_pt2pt_accumulate_unlock:
709722 *
@@ -722,9 +735,134 @@ static inline void ompi_osc_pt2pt_accumulate_unlock (ompi_osc_pt2pt_module_t *mo
722735 }
723736}
724737
725- static inline bool ompi_osc_pt2pt_check_access_epoch (ompi_osc_pt2pt_module_t * module , int rank )
738+ /**
739+ * Find the first outstanding lock of the target.
740+ *
741+ * @param[in] module osc pt2pt module
742+ * @param[in] target target rank
743+ * @param[out] peer peer object associated with the target
744+ *
745+ * @returns an outstanding lock on success
746+ *
747+ * This function looks for an outstanding lock to the target. If a lock exists it is returned.
748+ */
749+ static inline ompi_osc_pt2pt_sync_t * ompi_osc_pt2pt_module_lock_find (ompi_osc_pt2pt_module_t * module , int target ,
750+ ompi_osc_pt2pt_peer_t * * peer )
726751{
727- return module -> all_access_epoch || module -> peers [rank ].access_epoch ;
752+ ompi_osc_pt2pt_sync_t * outstanding_lock = NULL ;
753+
754+ (void ) opal_hash_table_get_value_uint32 (& module -> outstanding_locks , (uint32_t ) target , (void * * ) & outstanding_lock );
755+ if (NULL != outstanding_lock && peer ) {
756+ * peer = outstanding_lock -> peer_list .peer ;
757+ }
758+
759+ return outstanding_lock ;
760+ }
761+
762+ /**
763+ * Add an outstanding lock
764+ *
765+ * @param[in] module osc pt2pt module
766+ * @param[in] lock lock object
767+ *
768+ * This function inserts a lock object to the list of outstanding locks. The caller must be holding the module
769+ * lock.
770+ */
771+ static inline void ompi_osc_pt2pt_module_lock_insert (struct ompi_osc_pt2pt_module_t * module , ompi_osc_pt2pt_sync_t * lock )
772+ {
773+ (void ) opal_hash_table_set_value_uint32 (& module -> outstanding_locks , (uint32_t ) lock -> sync .lock .target , (void * ) lock );
774+ }
775+
776+
777+ /**
778+ * Remove an outstanding lock
779+ *
780+ * @param[in] module osc pt2pt module
781+ * @param[in] lock lock object
782+ *
783+ * This function removes a lock object to the list of outstanding locks. The caller must be holding the module
784+ * lock.
785+ */
786+ static inline void ompi_osc_pt2pt_module_lock_remove (struct ompi_osc_pt2pt_module_t * module , ompi_osc_pt2pt_sync_t * lock )
787+ {
788+
789+ (void ) opal_hash_table_remove_value_uint32 (& module -> outstanding_locks , (uint32_t ) lock -> sync .lock .target );
790+ }
791+
792+ /**
793+ * Lookup a synchronization object associated with the target
794+ *
795+ * @param[in] module osc pt2pt module
796+ * @param[in] target target rank
797+ * @param[out] peer peer object
798+ *
799+ * @returns NULL if the target is not locked, fenced, or part of a pscw sync
800+ * @returns synchronization object on success
801+ *
802+ * This function returns the synchronization object associated with an access epoch for
803+ * the target. If the target is not part of any current access epoch then NULL is returned.
804+ */
805+ static inline ompi_osc_pt2pt_sync_t * ompi_osc_pt2pt_module_sync_lookup (ompi_osc_pt2pt_module_t * module , int target ,
806+ struct ompi_osc_pt2pt_peer_t * * peer )
807+ {
808+ OPAL_OUTPUT_VERBOSE ((50 , ompi_osc_base_framework .framework_output ,
809+ "osc/pt2pt: looking for synchronization object for target %d" , target ));
810+
811+ switch (module -> all_sync .type ) {
812+ case OMPI_OSC_PT2PT_SYNC_TYPE_NONE :
813+ if (!module -> no_locks ) {
814+ return ompi_osc_pt2pt_module_lock_find (module , target , peer );
815+ }
816+
817+ return NULL ;
818+ case OMPI_OSC_PT2PT_SYNC_TYPE_FENCE :
819+ case OMPI_OSC_PT2PT_SYNC_TYPE_LOCK :
820+ OPAL_OUTPUT_VERBOSE ((50 , ompi_osc_base_framework .framework_output ,
821+ "osc/pt2pt: found fence/lock_all access epoch for target %d" , target ));
822+
823+ /* fence epoch is now active */
824+ module -> all_sync .epoch_active = true;
825+ if (peer ) {
826+ * peer = ompi_osc_pt2pt_peer_lookup (module , target );
827+ }
828+
829+ return & module -> all_sync ;
830+ case OMPI_OSC_PT2PT_SYNC_TYPE_PSCW :
831+ if (ompi_osc_pt2pt_sync_pscw_peer (module , target , peer )) {
832+ OPAL_OUTPUT_VERBOSE ((50 , ompi_osc_base_framework .framework_output ,
833+ "osc/pt2pt: found PSCW access epoch target for %d" , target ));
834+ return & module -> all_sync ;
835+ }
836+ }
837+
838+ return NULL ;
839+ }
840+
841+ /**
842+ * @brief check if an access epoch is active
843+ *
844+ * @param[in] module osc pt2pt module
845+ *
846+ * @returns true if any type of access epoch is active
847+ * @returns false otherwise
848+ *
849+ * This function is used to check for conflicting access epochs.
850+ */
851+ static inline bool ompi_osc_pt2pt_access_epoch_active (ompi_osc_pt2pt_module_t * module )
852+ {
853+ return (module -> all_sync .epoch_active || ompi_osc_pt2pt_in_passive_epoch (module ));
854+ }
855+
856+ static inline bool ompi_osc_pt2pt_peer_sends_active (ompi_osc_pt2pt_module_t * module , int rank )
857+ {
858+ ompi_osc_pt2pt_sync_t * sync ;
859+
860+ sync = ompi_osc_pt2pt_module_sync_lookup (module , rank , NULL );
861+ if (!sync ) {
862+ return false;
863+ }
864+
865+ return sync -> eager_send_active ;
728866}
729867
730868END_C_DECLS
0 commit comments