77#include "common_ucx_int.h"
88#include "common_ucx_request.h"
99#include <stdint.h>
10+ #include <string.h>
1011
1112#include <ucp/api/ucp.h>
1213#include <pthread.h>
@@ -84,6 +85,9 @@ typedef struct {
8485 ucp_worker_h worker ;
8586 ucp_ep_h * endpoints ;
8687 size_t comm_size ;
88+ short * inflight_ops ;
89+ short global_inflight_ops ;
90+ ucs_status_ptr_t inflight_req ;
8791} opal_common_ucx_winfo_t ;
8892
8993typedef struct {
@@ -101,6 +105,12 @@ typedef enum {
101105 OPAL_COMMON_UCX_SCOPE_WORKER
102106} opal_common_ucx_flush_scope_t ;
103107
108+ typedef enum {
109+ OPAL_COMMON_UCX_FLUSH_NB ,
110+ OPAL_COMMON_UCX_FLUSH_B ,
111+ OPAL_COMMON_UCX_FLUSH_NB_PREFERRED
112+ } opal_common_ucx_flush_type_t ;
113+
104114typedef enum {
105115 OPAL_COMMON_UCX_MEM_ALLOCATE_MAP ,
106116 OPAL_COMMON_UCX_MEM_MAP
@@ -236,6 +246,58 @@ OPAL_DECLSPEC int opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
236246 int target );
237247OPAL_DECLSPEC int opal_common_ucx_wpmem_fence (opal_common_ucx_wpmem_t * mem );
238248
249+ OPAL_DECLSPEC int opal_common_ucx_flush (ucp_ep_h ep , ucp_worker_h worker ,
250+ opal_common_ucx_flush_type_t type ,
251+ opal_common_ucx_flush_scope_t scope ,
252+ ucs_status_ptr_t * req_ptr );
253+
254+ static inline int _periodical_flush_nb (opal_common_ucx_wpmem_t * mem ,
255+ opal_common_ucx_winfo_t * winfo ,
256+ int target ) {
257+ int rc = OPAL_SUCCESS ;
258+
259+ winfo -> inflight_ops [target ]++ ;
260+ winfo -> global_inflight_ops ++ ;
261+
262+ if (OPAL_UNLIKELY (winfo -> inflight_ops [target ] >= MCA_COMMON_UCX_PER_TARGET_OPS_THRESHOLD ) ||
263+ OPAL_UNLIKELY (winfo -> global_inflight_ops >= MCA_COMMON_UCX_GLOBAL_OPS_THRESHOLD )) {
264+ opal_common_ucx_flush_scope_t scope ;
265+
266+ if (winfo -> inflight_req != UCS_OK ) {
267+ rc = opal_common_ucx_wait_request (winfo -> inflight_req , winfo -> worker ,
268+ "opal_common_ucx_flush_nb" );
269+ if (OPAL_UNLIKELY (OPAL_SUCCESS != rc )){
270+ MCA_COMMON_UCX_VERBOSE (1 , "opal_common_ucx_wait_request failed: %d" , rc );
271+ return rc ;
272+ }
273+ winfo -> inflight_req = UCS_OK ;
274+ }
275+
276+ if (winfo -> global_inflight_ops >= MCA_COMMON_UCX_GLOBAL_OPS_THRESHOLD ) {
277+ scope = OPAL_COMMON_UCX_SCOPE_WORKER ;
278+ winfo -> global_inflight_ops = 0 ;
279+ memset (winfo -> inflight_ops , 0 , winfo -> comm_size * sizeof (short ));
280+ } else {
281+ scope = OPAL_COMMON_UCX_SCOPE_EP ;
282+ winfo -> global_inflight_ops -= winfo -> inflight_ops [target ];
283+ winfo -> inflight_ops [target ] = 0 ;
284+ }
285+
286+ rc = opal_common_ucx_flush (winfo -> endpoints [target ], winfo -> worker ,
287+ OPAL_COMMON_UCX_FLUSH_NB_PREFERRED , scope ,
288+ & winfo -> inflight_req );
289+ if (OPAL_UNLIKELY (OPAL_SUCCESS != rc )){
290+ MCA_COMMON_UCX_VERBOSE (1 , "opal_common_ucx_flush failed: %d" , rc );
291+ return rc ;
292+ }
293+ } else if (OPAL_UNLIKELY (winfo -> inflight_req != UCS_OK )) {
294+ int ret ;
295+ do {
296+ ret = ucp_worker_progress (winfo -> worker );
297+ } while (ret );
298+ }
299+ return rc ;
300+ }
239301
240302static inline int
241303opal_common_ucx_wpmem_putget (opal_common_ucx_wpmem_t * mem , opal_common_ucx_op_t op ,
@@ -269,7 +331,6 @@ opal_common_ucx_wpmem_putget(opal_common_ucx_wpmem_t *mem, opal_common_ucx_op_t
269331 called_func = "ucp_get_nbi" ;
270332 break ;
271333 }
272- opal_mutex_unlock (& winfo -> mutex );
273334
274335 if (OPAL_UNLIKELY (status != UCS_OK && status != UCS_INPROGRESS )) {
275336 MCA_COMMON_UCX_ERROR ("%s failed: %d" , called_func , status );
@@ -278,6 +339,15 @@ opal_common_ucx_wpmem_putget(opal_common_ucx_wpmem_t *mem, opal_common_ucx_op_t
278339 WPOOL_DBG_OUT (_dbg_mem ,"ep = %p, rkey = %p\n" ,
279340 (void * )ep , (void * )rkey );
280341 }
342+
343+ rc = _periodical_flush_nb (mem , winfo , target );
344+ if (OPAL_UNLIKELY (OPAL_SUCCESS != rc )){
345+ MCA_COMMON_UCX_VERBOSE (1 , "_incr_and_check_inflight_ops failed: %d" , rc );
346+ return rc ;
347+ }
348+
349+ opal_mutex_unlock (& winfo -> mutex );
350+
281351 return rc ;
282352}
283353
@@ -314,6 +384,13 @@ opal_common_ucx_wpmem_cmpswp(opal_common_ucx_wpmem_t *mem, uint64_t compare,
314384 WPOOL_DBG_OUT (_dbg_mem , "ep = %p, rkey = %p\n" ,
315385 (void * )ep , (void * )rkey );
316386 }
387+
388+ rc = _periodical_flush_nb (mem , winfo , target );
389+ if (OPAL_UNLIKELY (OPAL_SUCCESS != rc )){
390+ MCA_COMMON_UCX_VERBOSE (1 , "_incr_and_check_inflight_ops failed: %d" , rc );
391+ return rc ;
392+ }
393+
317394 opal_mutex_unlock (& winfo -> mutex );
318395
319396 return rc ;
@@ -349,6 +426,13 @@ opal_common_ucx_wpmem_post(opal_common_ucx_wpmem_t *mem, ucp_atomic_post_op_t op
349426 WPOOL_DBG_OUT (_dbg_mem , "ep = %p, rkey = %p\n" ,
350427 (void * )ep , (void * )rkey );
351428 }
429+
430+ rc = _periodical_flush_nb (mem , winfo , target );
431+ if (OPAL_UNLIKELY (OPAL_SUCCESS != rc )){
432+ MCA_COMMON_UCX_VERBOSE (1 , "_incr_and_check_inflight_ops failed: %d" , rc );
433+ return rc ;
434+ }
435+
352436 opal_mutex_unlock (& winfo -> mutex );
353437 return rc ;
354438}
@@ -386,6 +470,13 @@ opal_common_ucx_wpmem_fetch(opal_common_ucx_wpmem_t *mem,
386470 WPOOL_DBG_OUT (_dbg_mem , "ep = %p, rkey = %p\n" ,
387471 (void * )ep , (void * )rkey );
388472 }
473+
474+ rc = _periodical_flush_nb (mem , winfo , target );
475+ if (OPAL_UNLIKELY (OPAL_SUCCESS != rc )){
476+ MCA_COMMON_UCX_VERBOSE (1 , "_incr_and_check_inflight_ops failed: %d" , rc );
477+ return rc ;
478+ }
479+
389480 opal_mutex_unlock (& winfo -> mutex );
390481
391482 return rc ;
@@ -416,8 +507,6 @@ opal_common_ucx_wpmem_fetch_nb(opal_common_ucx_wpmem_t *mem,
416507 req = opal_common_ucx_atomic_fetch_nb (ep , opcode , value , buffer , len ,
417508 rem_addr , rkey , opal_common_ucx_req_completion ,
418509 winfo -> worker );
419- opal_mutex_unlock (& winfo -> mutex );
420-
421510 if (UCS_PTR_IS_PTR (req )) {
422511 req -> ext_req = user_req_ptr ;
423512 req -> ext_cb = user_req_cb ;
@@ -427,6 +516,14 @@ opal_common_ucx_wpmem_fetch_nb(opal_common_ucx_wpmem_t *mem,
427516 }
428517 }
429518
519+ rc = _periodical_flush_nb (mem , winfo , target );
520+ if (OPAL_UNLIKELY (OPAL_SUCCESS != rc )){
521+ MCA_COMMON_UCX_VERBOSE (1 , "_incr_and_check_inflight_ops failed: %d" , rc );
522+ return rc ;
523+ }
524+
525+ opal_mutex_unlock (& winfo -> mutex );
526+
430527 return rc ;
431528}
432529
0 commit comments