3434#include "mtl_portals4_recv_short.h"
3535#include "mtl_portals4_message.h"
3636
37+
38+ static int
39+ ompi_mtl_portals4_recv_progress (ptl_event_t * ev ,
40+ ompi_mtl_portals4_base_request_t * ptl_base_request );
41+ static int
42+ ompi_mtl_portals4_rndv_get_frag_progress (ptl_event_t * ev ,
43+ ompi_mtl_portals4_rndv_get_frag_t * rndv_get_frag );
44+
3745static int
3846read_msg (void * start , ptl_size_t length , ptl_process_t target ,
3947 ptl_match_bits_t match_bits , ptl_size_t remote_offset ,
4048 ompi_mtl_portals4_recv_request_t * request )
4149{
4250 int ret , i ;
43- ptl_size_t rest = length , asked = 0 , frag_size ;
44- int32_t pending_reply ;
51+ ptl_size_t rest = length , asked = 0 ;
52+ int32_t frag_count ;
4553
4654#if OMPI_MTL_PORTALS4_FLOW_CONTROL
4755 while (OPAL_UNLIKELY (OPAL_THREAD_ADD32 (& ompi_mtl_portals4 .flowctl .send_slots , -1 ) < 0 )) {
@@ -50,29 +58,49 @@ read_msg(void *start, ptl_size_t length, ptl_process_t target,
5058 }
5159#endif
5260
53- request -> pending_reply = (length + ompi_mtl_portals4 .max_msg_size_mtl - 1 ) / ompi_mtl_portals4 .max_msg_size_mtl ;
54- pending_reply = request -> pending_reply ;
61+ frag_count = (length + ompi_mtl_portals4 .max_msg_size_mtl - 1 ) / ompi_mtl_portals4 .max_msg_size_mtl ;
62+ ret = OPAL_THREAD_ADD32 (& (request -> pending_reply ), frag_count );
63+
64+ for (i = 0 ; i < frag_count ; i ++ ) {
65+ opal_free_list_item_t * tmp ;
66+ ompi_mtl_portals4_rndv_get_frag_t * frag ;
67+
68+ tmp = opal_free_list_get (& ompi_mtl_portals4 .fl_rndv_get_frag );
69+ if (NULL == tmp ) return OMPI_ERR_OUT_OF_RESOURCE ;
70+
71+ frag = (ompi_mtl_portals4_rndv_get_frag_t * ) tmp ;
72+
73+ frag -> request = request ;
74+ #if OPAL_ENABLE_DEBUG
75+ frag -> frag_num = i ;
76+ #endif
77+ frag -> frag_start = (char * )start + i * ompi_mtl_portals4 .max_msg_size_mtl ;
78+ frag -> frag_length = (OPAL_UNLIKELY (rest > ompi_mtl_portals4 .max_msg_size_mtl )) ? ompi_mtl_portals4 .max_msg_size_mtl : rest ;
79+ frag -> frag_target = target ;
80+ frag -> frag_match_bits = match_bits ;
81+ frag -> frag_remote_offset = remote_offset + i * ompi_mtl_portals4 .max_msg_size_mtl ;
82+
83+ frag -> event_callback = ompi_mtl_portals4_rndv_get_frag_progress ;
84+
85+ OPAL_OUTPUT_VERBOSE ((90 , ompi_mtl_base_framework .framework_output , "GET (fragment %d/%d, size %ld) send" ,
86+ i + 1 , frag_count , frag -> frag_length ));
5587
56- for (i = 0 ; i < pending_reply ; i ++ ) {
57- OPAL_OUTPUT_VERBOSE ((90 , ompi_mtl_base_framework .framework_output , "GET (fragment %d/%d) send" ,
58- i + 1 , pending_reply ));
59- frag_size = (OPAL_UNLIKELY (rest > ompi_mtl_portals4 .max_msg_size_mtl )) ? ompi_mtl_portals4 .max_msg_size_mtl : rest ;
6088 ret = PtlGet (ompi_mtl_portals4 .send_md_h ,
61- (ptl_size_t ) start + i * ompi_mtl_portals4 . max_msg_size_mtl ,
62- frag_size ,
63- target ,
89+ (ptl_size_t ) frag -> frag_start ,
90+ frag -> frag_length ,
91+ frag -> frag_target ,
6492 ompi_mtl_portals4 .read_idx ,
65- match_bits ,
66- remote_offset + i * ompi_mtl_portals4 . max_msg_size_mtl ,
67- request );
93+ frag -> frag_match_bits ,
94+ frag -> frag_remote_offset ,
95+ frag );
6896 if (OPAL_UNLIKELY (PTL_OK != ret )) {
6997 opal_output_verbose (1 , ompi_mtl_base_framework .framework_output ,
7098 "%s:%d: PtlGet failed: %d" ,
7199 __FILE__ , __LINE__ , ret );
72100 return OMPI_ERR_OUT_OF_RESOURCE ;
73101 }
74- rest -= frag_size ;
75- asked += frag_size ;
102+ rest -= frag -> frag_length ;
103+ asked += frag -> frag_length ;
76104 }
77105
78106 return OMPI_SUCCESS ;
@@ -134,9 +162,8 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
134162 /* If it's not a short message and we're doing rndv and the message is not complete, we
135163 only have the first part of the message. Issue the get
136164 to pull the second part of the message. */
137- ret = read_msg ((char * ) ptl_request -> delivery_ptr + ev -> mlength ,
138- ((msg_length > ptl_request -> delivery_len ) ?
139- ptl_request -> delivery_len : msg_length ) - ev -> mlength ,
165+ ret = read_msg ((char * )ptl_request -> delivery_ptr + ev -> mlength ,
166+ ((msg_length > ptl_request -> delivery_len ) ? ptl_request -> delivery_len : msg_length ) - ev -> mlength ,
140167 ev -> initiator ,
141168 ev -> hdr_data ,
142169 ev -> mlength ,
@@ -165,54 +192,6 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
165192 }
166193 break ;
167194
168- case PTL_EVENT_REPLY :
169- OPAL_OUTPUT_VERBOSE ((50 , ompi_mtl_base_framework .framework_output ,
170- "Recv %lu (0x%lx) got reply event" ,
171- ptl_request -> opcount , ptl_request -> hdr_data ));
172-
173- if (OPAL_UNLIKELY (ev -> ni_fail_type != PTL_NI_OK )) {
174- opal_output_verbose (1 , ompi_mtl_base_framework .framework_output ,
175- "%s:%d: PTL_EVENT_REPLY with ni_fail_type: %d" ,
176- __FILE__ , __LINE__ , ev -> ni_fail_type );
177- ret = PTL_FAIL ;
178- goto callback_error ;
179- }
180-
181- /* set the received length in the status, now that we know
182- exactly how much data was sent. */
183- ptl_request -> super .super .ompi_req -> req_status ._ucount += ev -> mlength ;
184-
185- ret = OPAL_THREAD_ADD32 (& (ptl_request -> pending_reply ), -1 );
186- if (ret > 0 ) {
187- return OMPI_SUCCESS ;
188- }
189- assert (ptl_request -> pending_reply == 0 );
190-
191- #if OMPI_MTL_PORTALS4_FLOW_CONTROL
192- OPAL_THREAD_ADD32 (& ompi_mtl_portals4 .flowctl .send_slots , 1 );
193- #endif
194-
195- /* make sure the data is in the right place. Use _ucount for
196- the total length because it will be set correctly for all
197- three protocols. mlength is only correct for eager, and
198- delivery_len is the length of the buffer, not the length of
199- the send. */
200- ret = ompi_mtl_datatype_unpack (ptl_request -> convertor ,
201- ptl_request -> delivery_ptr ,
202- ptl_request -> super .super .ompi_req -> req_status ._ucount );
203- if (OPAL_UNLIKELY (OMPI_SUCCESS != ret )) {
204- opal_output_verbose (1 , ompi_mtl_base_framework .framework_output ,
205- "%s:%d: ompi_mtl_datatype_unpack failed: %d" ,
206- __FILE__ , __LINE__ , ret );
207- ptl_request -> super .super .ompi_req -> req_status .MPI_ERROR = ret ;
208- }
209-
210- OPAL_OUTPUT_VERBOSE ((50 , ompi_mtl_base_framework .framework_output ,
211- "Recv %lu (0x%lx) completed , reply (pending_reply: %d)" ,
212- ptl_request -> opcount , ptl_request -> hdr_data , ptl_request -> pending_reply ));
213- ptl_request -> super .super .completion_callback (& ptl_request -> super .super );
214- break ;
215-
216195 case PTL_EVENT_PUT_OVERFLOW :
217196 OPAL_OUTPUT_VERBOSE ((50 , ompi_mtl_base_framework .framework_output ,
218197 "Recv %lu (0x%lx) got put_overflow event" ,
@@ -301,9 +280,8 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
301280 /* For long messages in the overflow list, ev->mlength = 0 */
302281 ptl_request -> super .super .ompi_req -> req_status ._ucount = 0 ;
303282
304- ret = read_msg ((char * ) ptl_request -> delivery_ptr ,
305- (msg_length > ptl_request -> delivery_len ) ?
306- ptl_request -> delivery_len : msg_length ,
283+ ret = read_msg ((char * )ptl_request -> delivery_ptr ,
284+ (msg_length > ptl_request -> delivery_len ) ? ptl_request -> delivery_len : msg_length ,
307285 ev -> initiator ,
308286 ev -> hdr_data ,
309287 0 ,
@@ -336,6 +314,91 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
336314}
337315
338316
317+ static int
318+ ompi_mtl_portals4_rndv_get_frag_progress (ptl_event_t * ev ,
319+ ompi_mtl_portals4_rndv_get_frag_t * rndv_get_frag )
320+ {
321+ int ret ;
322+ ompi_mtl_portals4_recv_request_t * ptl_request =
323+ (ompi_mtl_portals4_recv_request_t * ) rndv_get_frag -> request ;
324+
325+ assert (ev -> type == PTL_EVENT_REPLY );
326+
327+ OPAL_OUTPUT_VERBOSE ((50 , ompi_mtl_base_framework .framework_output ,
328+ "Recv %lu (0x%lx) got reply event" ,
329+ ptl_request -> opcount , ptl_request -> hdr_data ));
330+
331+ if (OPAL_UNLIKELY (ev -> ni_fail_type != PTL_NI_OK )) {
332+ opal_output_verbose (1 , ompi_mtl_base_framework .framework_output ,
333+ "%s:%d: PTL_EVENT_REPLY with ni_fail_type: %d" ,
334+ __FILE__ , __LINE__ , ev -> ni_fail_type );
335+
336+ OPAL_OUTPUT_VERBOSE ((50 , ompi_mtl_base_framework .framework_output ,
337+ "Rendezvous Get Failed: Reissuing frag #%u" , rndv_get_frag -> frag_num ));
338+
339+ ret = PtlGet (ompi_mtl_portals4 .send_md_h ,
340+ (ptl_size_t ) rndv_get_frag -> frag_start ,
341+ rndv_get_frag -> frag_length ,
342+ rndv_get_frag -> frag_target ,
343+ ompi_mtl_portals4 .read_idx ,
344+ rndv_get_frag -> frag_match_bits ,
345+ rndv_get_frag -> frag_remote_offset ,
346+ rndv_get_frag );
347+ if (OPAL_UNLIKELY (PTL_OK != ret )) {
348+ if (NULL != ptl_request -> buffer_ptr ) free (ptl_request -> buffer_ptr );
349+ goto callback_error ;
350+ }
351+ return OMPI_SUCCESS ;
352+ }
353+
354+ /* set the received length in the status, now that we know
355+ exactly how much data was sent. */
356+ ptl_request -> super .super .ompi_req -> req_status ._ucount += ev -> mlength ;
357+
358+ /* this frag is complete. return to freelist. */
359+ opal_free_list_return (& ompi_mtl_portals4 .fl_rndv_get_frag ,
360+ & rndv_get_frag -> super );
361+
362+ ret = OPAL_THREAD_ADD32 (& (ptl_request -> pending_reply ), -1 );
363+ if (ret > 0 ) {
364+ return OMPI_SUCCESS ;
365+ }
366+ assert (ptl_request -> pending_reply == 0 );
367+
368+ #if OMPI_MTL_PORTALS4_FLOW_CONTROL
369+ OPAL_THREAD_ADD32 (& ompi_mtl_portals4 .flowctl .send_slots , 1 );
370+ #endif
371+
372+ /* make sure the data is in the right place. Use _ucount for
373+ the total length because it will be set correctly for all
374+ three protocols. mlength is only correct for eager, and
375+ delivery_len is the length of the buffer, not the length of
376+ the send. */
377+ ret = ompi_mtl_datatype_unpack (ptl_request -> convertor ,
378+ ptl_request -> delivery_ptr ,
379+ ptl_request -> super .super .ompi_req -> req_status ._ucount );
380+ if (OPAL_UNLIKELY (OMPI_SUCCESS != ret )) {
381+ opal_output_verbose (1 , ompi_mtl_base_framework .framework_output ,
382+ "%s:%d: ompi_mtl_datatype_unpack failed: %d" ,
383+ __FILE__ , __LINE__ , ret );
384+ ptl_request -> super .super .ompi_req -> req_status .MPI_ERROR = ret ;
385+ }
386+
387+ OPAL_OUTPUT_VERBOSE ((50 , ompi_mtl_base_framework .framework_output ,
388+ "Recv %lu (0x%lx) completed , reply (pending_reply: %d)" ,
389+ ptl_request -> opcount , ptl_request -> hdr_data , ptl_request -> pending_reply ));
390+ ptl_request -> super .super .completion_callback (& ptl_request -> super .super );
391+
392+ return OMPI_SUCCESS ;
393+
394+ callback_error :
395+ ptl_request -> super .super .ompi_req -> req_status .MPI_ERROR =
396+ ompi_mtl_portals4_get_error (ret );
397+ ptl_request -> super .super .completion_callback (& ptl_request -> super .super );
398+ return OMPI_SUCCESS ;
399+ }
400+
401+
339402int
340403ompi_mtl_portals4_irecv (struct mca_mtl_base_module_t * mtl ,
341404 struct ompi_communicator_t * comm ,
0 commit comments