@@ -25,7 +25,7 @@ mca_btl_base_descriptor_t *mca_btl_uct_alloc (mca_btl_base_module_t *btl, mca_bt
2525 mca_btl_uct_module_t * uct_btl = (mca_btl_uct_module_t * ) btl ;
2626 mca_btl_uct_base_frag_t * frag = NULL ;
2727
28- if (( size + 8 ) <= (size_t ) MCA_BTL_UCT_TL_ATTR (uct_btl -> am_tl , 0 ).cap .am .max_short ) {
28+ if (size <= (size_t ) MCA_BTL_UCT_TL_ATTR (uct_btl -> am_tl , 0 ).cap .am .max_short ) {
2929 frag = mca_btl_uct_frag_alloc_short (uct_btl , endpoint );
3030 } else if (size <= uct_btl -> super .btl_eager_limit ) {
3131 frag = mca_btl_uct_frag_alloc_eager (uct_btl , endpoint );
@@ -40,6 +40,10 @@ mca_btl_base_descriptor_t *mca_btl_uct_alloc (mca_btl_base_module_t *btl, mca_bt
4040 frag -> base .des_flags = flags ;
4141 frag -> base .order = order ;
4242 frag -> uct_iov .length = size ;
43+ if (NULL != frag -> base .super .registration ) {
44+ /* zero-copy fragments will need callbacks */
45+ frag -> base .des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK ;
46+ }
4347 }
4448
4549 return (mca_btl_base_descriptor_t * ) frag ;
@@ -95,14 +99,18 @@ struct mca_btl_base_descriptor_t *mca_btl_uct_prepare_src (mca_btl_base_module_t
9599 return NULL ;
96100 }
97101
102+ frag -> uct_iov .length = total_size ;
98103 frag -> base .order = order ;
99104 frag -> base .des_flags = flags ;
100105 if (total_size > (size_t ) MCA_BTL_UCT_TL_ATTR (uct_btl -> am_tl , 0 ).cap .am .max_short ) {
106+ frag -> segments [0 ].seg_len = reserve ;
101107 frag -> segments [1 ].seg_len = * size ;
102108 frag -> segments [1 ].seg_addr .pval = data_ptr ;
103109 frag -> base .des_segment_count = 2 ;
104110 } else {
111+ frag -> segments [0 ].seg_len = total_size ;
105112 memcpy ((void * )((intptr_t ) frag -> segments [1 ].seg_addr .pval + reserve ), data_ptr , * size );
113+ frag -> base .des_segment_count = 1 ;
106114 }
107115 }
108116
@@ -130,7 +138,7 @@ static size_t mca_btl_uct_send_frag_pack (void *data, void *arg)
130138 data = (void * )((intptr_t ) data + 8 );
131139
132140 /* this function should only ever get called with fragments with two segments */
133- for (size_t i = 0 ; i < 2 ; ++ i ) {
141+ for (size_t i = 0 ; i < frag -> base . des_segment_count ; ++ i ) {
134142 const size_t seg_len = frag -> segments [i ].seg_len ;
135143 memcpy (data , frag -> segments [i ].seg_addr .pval , seg_len );
136144 data = (void * )((intptr_t ) data + seg_len );
@@ -140,57 +148,84 @@ static size_t mca_btl_uct_send_frag_pack (void *data, void *arg)
140148 return length ;
141149}
142150
143- int mca_btl_uct_send_frag (mca_btl_uct_module_t * uct_btl , mca_btl_base_endpoint_t * endpoint , mca_btl_uct_base_frag_t * frag ,
144- int32_t flags , mca_btl_uct_device_context_t * context , uct_ep_h ep_handle )
151+ static void mca_btl_uct_append_pending_frag (mca_btl_uct_module_t * uct_btl , mca_btl_uct_base_frag_t * frag ,
152+ mca_btl_uct_device_context_t * context , bool ready )
145153{
154+ frag -> ready = ready ;
155+ frag -> base .des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK ;
156+ opal_atomic_wmb ();
157+
158+ opal_list_append (& uct_btl -> pending_frags , (opal_list_item_t * ) frag );
159+ }
160+
161+ int mca_btl_uct_send_frag (mca_btl_uct_module_t * uct_btl , mca_btl_uct_base_frag_t * frag , bool append )
162+ {
163+ mca_btl_uct_device_context_t * context = frag -> context ;
164+ const ssize_t msg_size = frag -> uct_iov .length + 8 ;
165+ ssize_t size ;
146166 ucs_status_t ucs_status ;
167+ uct_ep_h ep_handle = NULL ;
147168
148- mca_btl_uct_context_lock (context );
169+ /* if we get here then we must have an endpoint handle for this context/endpoint pair */
170+ (void ) mca_btl_uct_endpoint_test_am (uct_btl , frag -> endpoint , frag -> context , & ep_handle );
171+ assert (NULL != ep_handle );
149172
150- do {
173+ /* if another thread set this we really don't care too much as this flag is only meant
174+ * to protect against deep recursion */
175+ if (!context -> in_am_callback ) {
176+ mca_btl_uct_context_lock (context );
177+ /* attempt to post the fragment */
151178 if (NULL != frag -> base .super .registration ) {
152179 frag -> comp .dev_context = context ;
153-
154180 ucs_status = uct_ep_am_zcopy (ep_handle , MCA_BTL_UCT_FRAG , & frag -> header , sizeof (frag -> header ),
155181 & frag -> uct_iov , 1 , 0 , & frag -> comp .uct_comp );
182+
183+ if (OPAL_LIKELY (UCS_INPROGRESS == ucs_status )) {
184+ uct_worker_progress (context -> uct_worker );
185+ mca_btl_uct_context_unlock (context );
186+ return OPAL_SUCCESS ;
187+ }
156188 } else {
157189 /* short message */
158- /* restore original flags */
159- frag -> base .des_flags = flags ;
160-
161- if (1 == frag -> base .des_segment_count ) {
190+ if (1 == frag -> base .des_segment_count && (frag -> uct_iov .length + 8 ) < MCA_BTL_UCT_TL_ATTR (uct_btl -> am_tl , 0 ).cap .am .max_short ) {
162191 ucs_status = uct_ep_am_short (ep_handle , MCA_BTL_UCT_FRAG , frag -> header .value , frag -> uct_iov .buffer ,
163192 frag -> uct_iov .length );
164- } else {
165- ucs_status = uct_ep_am_bcopy (ep_handle , MCA_BTL_UCT_FRAG , mca_btl_uct_send_frag_pack , frag , 0 );
193+
194+ if (OPAL_LIKELY (UCS_OK == ucs_status )) {
195+ uct_worker_progress (context -> uct_worker );
196+ mca_btl_uct_context_unlock (context );
197+ /* send is complete */
198+ mca_btl_uct_frag_complete (frag , OPAL_SUCCESS );
199+ return 1 ;
200+ }
166201 }
167- }
168202
169- if (UCS_ERR_NO_RESOURCE != ucs_status ) {
170- /* go ahead and progress the worker while we have the lock */
171- (void ) uct_worker_progress (context -> uct_worker );
172- break ;
203+ size = uct_ep_am_bcopy (ep_handle , MCA_BTL_UCT_FRAG , mca_btl_uct_send_frag_pack , frag , 0 );
204+ if (OPAL_LIKELY (size == msg_size )) {
205+ uct_worker_progress (context -> uct_worker );
206+ mca_btl_uct_context_unlock (context );
207+ /* send is complete */
208+ mca_btl_uct_frag_complete (frag , OPAL_SUCCESS );
209+ return 1 ;
210+ }
173211 }
174212
175- /* wait for something to complete before trying again */
176- while (!uct_worker_progress (context -> uct_worker ));
177- } while (1 );
178-
179- mca_btl_uct_context_unlock (context );
213+ /* wait for something to happen */
214+ uct_worker_progress (context -> uct_worker );
215+ mca_btl_uct_context_unlock (context );
180216
181- if (UCS_OK == ucs_status ) {
182- /* restore original flags */
183- frag -> base .des_flags = flags ;
184- /* send is complete */
185- mca_btl_uct_frag_complete (frag , OPAL_SUCCESS );
186- return 1 ;
217+ mca_btl_uct_device_handle_completions (context );
187218 }
188219
189- if (OPAL_UNLIKELY ( UCS_INPROGRESS != ucs_status ) ) {
220+ if (! append ) {
190221 return OPAL_ERR_OUT_OF_RESOURCE ;
191222 }
192223
193- return 0 ;
224+ OPAL_THREAD_LOCK (& uct_btl -> lock );
225+ mca_btl_uct_append_pending_frag (uct_btl , frag , context , true);
226+ OPAL_THREAD_UNLOCK (& uct_btl -> lock );
227+
228+ return OPAL_SUCCESS ;
194229}
195230
196231int mca_btl_uct_send (mca_btl_base_module_t * btl , mca_btl_base_endpoint_t * endpoint , mca_btl_base_descriptor_t * descriptor ,
@@ -199,7 +234,6 @@ int mca_btl_uct_send (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpo
199234 mca_btl_uct_module_t * uct_btl = (mca_btl_uct_module_t * ) btl ;
200235 mca_btl_uct_device_context_t * context = mca_btl_uct_module_get_am_context (uct_btl );
201236 mca_btl_uct_base_frag_t * frag = (mca_btl_uct_base_frag_t * ) descriptor ;
202- int flags = frag -> base .des_flags ;
203237 uct_ep_h ep_handle ;
204238 int rc ;
205239
@@ -208,28 +242,21 @@ int mca_btl_uct_send (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpo
208242
209243
210244 frag -> header .data .tag = tag ;
211-
212- /* add the callback flag before posting to avoid potential races with other threads */
213- frag -> base .des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK ;
245+ frag -> context = context ;
214246
215247 rc = mca_btl_uct_endpoint_check_am (uct_btl , endpoint , context , & ep_handle );
216248 if (OPAL_UNLIKELY (OPAL_SUCCESS != rc )) {
217- OPAL_THREAD_LOCK (& endpoint -> ep_lock );
249+ OPAL_THREAD_LOCK (& uct_btl -> lock );
218250 /* check one more time in case another thread is completing the connection now */
219251 if (OPAL_SUCCESS != mca_btl_uct_endpoint_test_am (uct_btl , endpoint , context , & ep_handle )) {
220- frag -> context_id = context -> context_id ;
221- frag -> ready = false;
222- OPAL_THREAD_LOCK (& uct_btl -> lock );
223- opal_list_append (& uct_btl -> pending_frags , (opal_list_item_t * ) frag );
224- OPAL_THREAD_UNLOCK (& endpoint -> ep_lock );
252+ mca_btl_uct_append_pending_frag (uct_btl , frag , context , false);
225253 OPAL_THREAD_UNLOCK (& uct_btl -> lock );
226-
227254 return OPAL_SUCCESS ;
228255 }
229- OPAL_THREAD_UNLOCK (& endpoint -> ep_lock );
256+ OPAL_THREAD_UNLOCK (& uct_btl -> lock );
230257 }
231258
232- return mca_btl_uct_send_frag (uct_btl , endpoint , frag , flags , context , ep_handle );
259+ return mca_btl_uct_send_frag (uct_btl , frag , true );
233260}
234261
235262struct mca_btl_uct_sendi_pack_args_t {
@@ -255,9 +282,7 @@ static size_t mca_btl_uct_sendi_pack (void *data, void *arg)
255282
256283static inline size_t mca_btl_uct_max_sendi (mca_btl_uct_module_t * uct_btl , int context_id )
257284{
258- const mca_btl_uct_tl_t * tl = uct_btl -> am_tl ;
259- return (MCA_BTL_UCT_TL_ATTR (tl , context_id ).cap .am .max_short > MCA_BTL_UCT_TL_ATTR (tl , context_id ).cap .am .max_bcopy ) ?
260- MCA_BTL_UCT_TL_ATTR (tl , context_id ).cap .am .max_short : MCA_BTL_UCT_TL_ATTR (tl , context_id ).cap .am .max_bcopy ;
285+ return MCA_BTL_UCT_TL_ATTR (uct_btl -> am_tl , context_id ).cap .am .max_bcopy ;
261286}
262287
263288int mca_btl_uct_sendi (mca_btl_base_module_t * btl , mca_btl_base_endpoint_t * endpoint , opal_convertor_t * convertor ,
@@ -270,7 +295,7 @@ int mca_btl_uct_sendi (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endp
270295 /* message with header */
271296 const size_t msg_size = total_size + 8 ;
272297 mca_btl_uct_am_header_t am_header ;
273- ucs_status_t ucs_status = UCS_OK ;
298+ ucs_status_t ucs_status = UCS_ERR_NO_RESOURCE ;
274299 uct_ep_h ep_handle ;
275300 int rc ;
276301
0 commit comments