From b17567333a66d69b7e8b6eda5d2f9e1114d4afd3 Mon Sep 17 00:00:00 2001 From: Alma Mastbaum Date: Mon, 8 Dec 2025 18:41:51 +0200 Subject: [PATCH 01/14] UCP/RNDV: Throttle rndv fragment requests (both pipeline and standalone mtype requests) --- src/ucp/core/ucp_context.c | 10 +++++ src/ucp/core/ucp_context.h | 4 ++ src/ucp/core/ucp_request.h | 5 ++- src/ucp/core/ucp_worker.c | 4 ++ src/ucp/core/ucp_worker.h | 6 +++ src/ucp/rndv/rndv_get.c | 11 +++++ src/ucp/rndv/rndv_mtype.inl | 80 +++++++++++++++++++++++++++++++++++++ src/ucp/rndv/rndv_ppln.c | 3 +- src/ucp/rndv/rndv_put.c | 13 ++++-- src/ucp/rndv/rndv_rtr.c | 11 +++++ 10 files changed, 142 insertions(+), 5 deletions(-) diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index eaee414dd8c..b6730ef1c5c 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -383,6 +383,16 @@ static ucs_config_field_t ucp_context_config_table[] = { "even if invalidation workflow isn't supported", ucs_offsetof(ucp_context_config_t, rndv_errh_ppln_enable), UCS_CONFIG_TYPE_BOOL}, + {"RNDV_PIPELINE_WORKER_FC_ENABLE", "n", + "Enable worker-level flow control to limit total concurrent pipeline fragments\n" + "across all requests, preventing memory exhaustion", + ucs_offsetof(ucp_context_config_t, rndv_ppln_worker_fc_enable), UCS_CONFIG_TYPE_BOOL}, + + {"RNDV_PIPELINE_WORKER_MAX_FRAGS", "5000", + "Maximum number of concurrent pipeline fragments per worker\n" + "(only applies when RNDV_PIPELINE_WORKER_FC_ENABLE=y)", + ucs_offsetof(ucp_context_config_t, rndv_ppln_worker_max_frags), UCS_CONFIG_TYPE_ULUNITS}, + {"FLUSH_WORKER_EPS", "y", "Enable flushing the worker by flushing its endpoints. Allows completing\n" "the flush operation in a bounded time even if there are new requests on\n" diff --git a/src/ucp/core/ucp_context.h b/src/ucp/core/ucp_context.h index b6891cf4480..1b16e4d3ea6 100644 --- a/src/ucp/core/ucp_context.h +++ b/src/ucp/core/ucp_context.h @@ -98,6 +98,10 @@ typedef struct ucp_context_config { int rndv_shm_ppln_enable; /** Enable error handling for rndv pipeline protocol */ int rndv_errh_ppln_enable; + /** Enable flow control for rndv pipeline fragments at worker level */ + int rndv_ppln_worker_fc_enable; + /** Maximum number of concurrent pipeline fragments per worker */ + size_t rndv_ppln_worker_max_frags; /** Threshold for using tag matching offload capabilities. Smaller buffers * will not be posted to the transport. */ size_t tm_thresh; diff --git a/src/ucp/core/ucp_request.h b/src/ucp/core/ucp_request.h index 8787b747133..d157a861597 100644 --- a/src/ucp/core/ucp_request.h +++ b/src/ucp/core/ucp_request.h @@ -318,7 +318,10 @@ struct ucp_request { /* Used by rndv/send/ppln and rndv/recv/ppln */ struct { /* Size to send in ack message */ - ssize_t ack_data_size; + ssize_t ack_data_size; + /* Element in worker-level pending queue + * for throttled ppln requests */ + ucs_queue_elem_t queue_elem; } ppln; /* Used by rndv/rkey_ptr */ diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index bbd53bdb7c8..1687c397a06 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -2519,6 +2519,10 @@ ucs_status_t ucp_worker_create(ucp_context_h context, worker->counters.ep_closures = 0; worker->counters.ep_failures = 0; + /* Initialize RNDV pipeline flow control */ + worker->rndv_ppln_fc.active_frags = 0; + ucs_queue_head_init(&worker->rndv_ppln_fc.pending_q); + /* Copy user flags, and mask-out unsupported flags for compatibility */ worker->flags = UCP_PARAM_VALUE(WORKER, params, flags, FLAGS, 0) & UCS_MASK(UCP_WORKER_INTERNAL_FLAGS_SHIFT); diff --git a/src/ucp/core/ucp_worker.h b/src/ucp/core/ucp_worker.h index 561f0ec8043..2a1266ab56e 100644 --- a/src/ucp/core/ucp_worker.h +++ b/src/ucp/core/ucp_worker.h @@ -393,6 +393,12 @@ typedef struct ucp_worker { uint64_t ep_failures; } counters; + struct { + /* Worker-level ppln fragment flow control */ + size_t active_frags; /* Current active fragments */ + ucs_queue_head_t pending_q; /* Queue of throttled ppln requests */ + } rndv_ppln_fc; + struct { /* Usage tracker handle */ ucs_usage_tracker_h handle; diff --git a/src/ucp/rndv/rndv_get.c b/src/ucp/rndv/rndv_get.c index 9a8fb8c28f4..a23ea33bade 100644 --- a/src/ucp/rndv/rndv_get.c +++ b/src/ucp/rndv/rndv_get.c @@ -262,6 +262,8 @@ ucp_proto_rndv_get_mtype_unpack_completion(uct_completion_t *uct_comp) send.state.uct_comp); ucs_mpool_put_inline(req->send.rndv.mdesc); + ucp_proto_rndv_mtype_fc_decrement(req); + if (ucp_proto_rndv_request_is_ppln_frag(req)) { ucp_proto_rndv_ppln_recv_frag_complete(req, 1, 0); } else { @@ -292,6 +294,12 @@ ucp_proto_rndv_get_mtype_fetch_progress(uct_pending_req_t *uct_req) rpriv = req->send.proto_config->priv; if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) { + /* Check throttling. If no resource at the moment, queue the request + * in the throttle pending queue and return UCS_OK. */ + if (ucp_proto_rndv_mtype_fc_check(req) == UCS_ERR_NO_RESOURCE) { + return UCS_OK; + } + status = ucp_proto_rndv_mtype_request_init(req, rpriv->frag_mem_type, rpriv->frag_sys_dev); if (status != UCS_OK) { @@ -299,6 +307,7 @@ ucp_proto_rndv_get_mtype_fetch_progress(uct_pending_req_t *uct_req) return UCS_OK; } + ucp_proto_rndv_mtype_fc_increment(req); ucp_proto_rndv_get_common_request_init(req); ucp_proto_completion_init(&req->send.state.uct_comp, ucp_proto_rndv_get_mtype_fetch_completion); @@ -364,6 +373,8 @@ static ucs_status_t ucp_proto_rndv_get_mtype_reset(ucp_request_t *req) req->send.rndv.mdesc = NULL; req->flags &= ~UCP_REQUEST_FLAG_PROTO_INITIALIZED; + ucp_proto_rndv_mtype_fc_decrement(req); + if ((req->send.proto_stage != UCP_PROTO_RNDV_GET_STAGE_FETCH) && (req->send.proto_stage != UCP_PROTO_RNDV_GET_STAGE_ATS)) { ucp_proto_fatal_invalid_stage(req, "reset"); diff --git a/src/ucp/rndv/rndv_mtype.inl b/src/ucp/rndv/rndv_mtype.inl index 93a958be7ac..3a1661f44da 100644 --- a/src/ucp/rndv/rndv_mtype.inl +++ b/src/ucp/rndv/rndv_mtype.inl @@ -169,6 +169,86 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_proto_rndv_mtype_copy( return status; } +/* Reschedule callback for throttled mtype requests */ +static unsigned ucp_proto_rndv_mtype_fc_reschedule_cb(void *arg) +{ + ucp_request_t *req = arg; + ucp_request_send(req); + return 1; +} + +/** + * Check if request should be throttled due to flow control limit. + * If throttled, the request is queued to pending_q. + * + * @return UCS_OK if not throttled (caller should continue), + * UCS_ERR_NO_RESOURCE if throttled and queued (caller should return UCS_OK). + */ +static UCS_F_ALWAYS_INLINE ucs_status_t +ucp_proto_rndv_mtype_fc_check(ucp_request_t *req) +{ + ucp_worker_h worker = req->send.ep->worker; + ucp_context_h context = worker->context; + + if (!context->config.ext.rndv_ppln_worker_fc_enable) { + return UCS_OK; + } + + if (worker->rndv_ppln_fc.active_frags >= + context->config.ext.rndv_ppln_worker_max_frags) { + ucs_trace_req("mtype_fc: throttle limit reached active_frags=%zu max=%zu", + worker->rndv_ppln_fc.active_frags, + context->config.ext.rndv_ppln_worker_max_frags); + ucs_queue_push(&worker->rndv_ppln_fc.pending_q, + &req->send.rndv.ppln.queue_elem); + return UCS_ERR_NO_RESOURCE; + } + + return UCS_OK; +} + +/** + * Increment active_frags counter after successful mtype allocation. + */ +static UCS_F_ALWAYS_INLINE void +ucp_proto_rndv_mtype_fc_increment(ucp_request_t *req) +{ + ucp_worker_h worker = req->send.ep->worker; + + if (worker->context->config.ext.rndv_ppln_worker_fc_enable) { + worker->rndv_ppln_fc.active_frags++; + } +} + +/** + * Decrement active_frags counter and reschedule pending request if any. + */ +static UCS_F_ALWAYS_INLINE void +ucp_proto_rndv_mtype_fc_decrement(ucp_request_t *req) +{ + ucp_worker_h worker = req->send.ep->worker; + ucp_context_h context = worker->context; + + if (!context->config.ext.rndv_ppln_worker_fc_enable) { + return; + } + + ucs_assert(worker->rndv_ppln_fc.active_frags > 0); + worker->rndv_ppln_fc.active_frags--; + + if (!ucs_queue_is_empty(&worker->rndv_ppln_fc.pending_q)) { + ucp_request_t *pending_req; + ucs_queue_elem_t *elem; + + elem = ucs_queue_pull(&worker->rndv_ppln_fc.pending_q); + pending_req = ucs_container_of(elem, ucp_request_t, + send.rndv.ppln.queue_elem); + ucs_callbackq_add_oneshot(&worker->uct->progress_q, pending_req, + ucp_proto_rndv_mtype_fc_reschedule_cb, + pending_req); + } +} + static UCS_F_ALWAYS_INLINE ucs_status_t ucp_proto_rndv_mdesc_mtype_copy(ucp_request_t *req, uct_ep_put_zcopy_func_t copy_func, diff --git a/src/ucp/rndv/rndv_ppln.c b/src/ucp/rndv/rndv_ppln.c index 17fc9e8b16b..982cb7e9548 100644 --- a/src/ucp/rndv/rndv_ppln.c +++ b/src/ucp/rndv/rndv_ppln.c @@ -13,6 +13,7 @@ #include #include #include +#include enum { @@ -252,7 +253,7 @@ void ucp_proto_rndv_ppln_recv_frag_complete(ucp_request_t *freq, int send_ack, static ucs_status_t ucp_proto_rndv_ppln_progress(uct_pending_req_t *uct_req) { - ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct); + ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct); ucp_worker_h worker = req->send.ep->worker; const ucp_proto_rndv_ppln_priv_t *rpriv; ucp_datatype_iter_t next_iter; diff --git a/src/ucp/rndv/rndv_put.c b/src/ucp/rndv/rndv_put.c index 28948386ffb..e0c78c76a9a 100644 --- a/src/ucp/rndv/rndv_put.c +++ b/src/ucp/rndv/rndv_put.c @@ -517,14 +517,18 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_proto_rndv_put_mtype_send_func( static ucs_status_t ucp_proto_rndv_put_mtype_copy_progress(uct_pending_req_t *uct_req) { - ucp_request_t *req = ucs_container_of(uct_req, - ucp_request_t, - send.uct); + ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct); const ucp_proto_rndv_put_priv_t *rpriv = req->send.proto_config->priv; ucs_status_t status; ucs_assert(!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)); + /* Check throttling. If no resource at the moment, queue the request + * in the throttle pending queue and return UCS_OK. */ + if (ucp_proto_rndv_mtype_fc_check(req) == UCS_ERR_NO_RESOURCE) { + return UCS_OK; + } + status = ucp_proto_rndv_mtype_request_init(req, rpriv->bulk.frag_mem_type, rpriv->bulk.frag_sys_dev); if (status != UCS_OK) { @@ -532,6 +536,7 @@ ucp_proto_rndv_put_mtype_copy_progress(uct_pending_req_t *uct_req) return UCS_OK; } + ucp_proto_rndv_mtype_fc_increment(req); ucp_proto_rndv_put_common_request_init(req); req->flags |= UCP_REQUEST_FLAG_PROTO_INITIALIZED; ucp_proto_rndv_mdesc_mtype_copy(req, uct_ep_get_zcopy, @@ -563,6 +568,7 @@ static void ucp_proto_rndv_put_mtype_completion(uct_completion_t *uct_comp) ucp_trace_req(req, "rndv_put_mtype_completion"); ucs_mpool_put(req->send.rndv.mdesc); + ucp_proto_rndv_mtype_fc_decrement(req); ucp_proto_rndv_put_common_complete(req); } @@ -573,6 +579,7 @@ static void ucp_proto_rndv_put_mtype_frag_completion(uct_completion_t *uct_comp) ucp_trace_req(req, "rndv_put_mtype_frag_completion"); ucs_mpool_put(req->send.rndv.mdesc); + ucp_proto_rndv_mtype_fc_decrement(req); ucp_proto_rndv_ppln_send_frag_complete(req, 1); } diff --git a/src/ucp/rndv/rndv_rtr.c b/src/ucp/rndv/rndv_rtr.c index 11493a511c9..15e0bd09c3b 100644 --- a/src/ucp/rndv/rndv_rtr.c +++ b/src/ucp/rndv/rndv_rtr.c @@ -286,6 +286,9 @@ ucp_proto_rndv_rtr_mtype_complete(ucp_request_t *req, int abort) if (!abort || (req->send.rndv.mdesc != NULL)) { ucs_mpool_put_inline(req->send.rndv.mdesc); } + + ucp_proto_rndv_mtype_fc_decrement(req); + if (ucp_proto_rndv_request_is_ppln_frag(req)) { ucp_proto_rndv_ppln_recv_frag_complete(req, 0, abort); } else { @@ -316,6 +319,7 @@ static ucs_status_t ucp_proto_rndv_rtr_mtype_reset(ucp_request_t *req) if (req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED) { ucs_mpool_put_inline(req->send.rndv.mdesc); req->send.rndv.mdesc = NULL; + ucp_proto_rndv_mtype_fc_decrement(req); } return ucp_proto_request_zcopy_id_reset(req); @@ -352,6 +356,12 @@ static ucs_status_t ucp_proto_rndv_rtr_mtype_progress(uct_pending_req_t *self) ucs_status_t status; if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) { + /* Check throttling. If no resource at the moment, queue the request + * in the throttle pending queue and return UCS_OK. */ + if (ucp_proto_rndv_mtype_fc_check(req) == UCS_ERR_NO_RESOURCE) { + return UCS_OK; + } + status = ucp_proto_rndv_mtype_request_init(req, rpriv->frag_mem_type, rpriv->frag_sys_dev); if (status != UCS_OK) { @@ -359,6 +369,7 @@ static ucs_status_t ucp_proto_rndv_rtr_mtype_progress(uct_pending_req_t *self) return UCS_OK; } + ucp_proto_rndv_mtype_fc_increment(req); ucp_proto_rtr_common_request_init(req); req->flags |= UCP_REQUEST_FLAG_PROTO_INITIALIZED; } From 2ab04e4b1ac08ef252684eeb6f9cb794ef1273ea Mon Sep 17 00:00:00 2001 From: Alma Mastbaum Date: Mon, 15 Dec 2025 16:59:18 +0200 Subject: [PATCH 02/14] UCP/RNDV: change 'ppln' prefix to 'mtype' --- src/ucp/core/ucp_context.c | 14 +++++++------- src/ucp/core/ucp_context.h | 8 ++++---- src/ucp/core/ucp_worker.c | 4 ++-- src/ucp/core/ucp_worker.h | 6 +++--- src/ucp/rndv/rndv_mtype.inl | 26 +++++++++++++------------- 5 files changed, 29 insertions(+), 29 deletions(-) diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index b6730ef1c5c..df80f6528be 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -383,15 +383,15 @@ static ucs_config_field_t ucp_context_config_table[] = { "even if invalidation workflow isn't supported", ucs_offsetof(ucp_context_config_t, rndv_errh_ppln_enable), UCS_CONFIG_TYPE_BOOL}, - {"RNDV_PIPELINE_WORKER_FC_ENABLE", "n", - "Enable worker-level flow control to limit total concurrent pipeline fragments\n" + {"RNDV_MTYPE_WORKER_FC_ENABLE", "n", + "Enable worker-level flow control to limit total concurrent mtype fragments\n" "across all requests, preventing memory exhaustion", - ucs_offsetof(ucp_context_config_t, rndv_ppln_worker_fc_enable), UCS_CONFIG_TYPE_BOOL}, + ucs_offsetof(ucp_context_config_t, rndv_mtype_worker_fc_enable), UCS_CONFIG_TYPE_BOOL}, - {"RNDV_PIPELINE_WORKER_MAX_FRAGS", "5000", - "Maximum number of concurrent pipeline fragments per worker\n" - "(only applies when RNDV_PIPELINE_WORKER_FC_ENABLE=y)", - ucs_offsetof(ucp_context_config_t, rndv_ppln_worker_max_frags), UCS_CONFIG_TYPE_ULUNITS}, + {"RNDV_MTYPE_WORKER_MAX_FRAGS", "1024", + "Maximum number of concurrent mtype fragments per worker\n" + "(only applies when RNDV_MTYPE_WORKER_FC_ENABLE=y)", + ucs_offsetof(ucp_context_config_t, rndv_mtype_worker_max_frags), UCS_CONFIG_TYPE_ULUNITS}, {"FLUSH_WORKER_EPS", "y", "Enable flushing the worker by flushing its endpoints. Allows completing\n" diff --git a/src/ucp/core/ucp_context.h b/src/ucp/core/ucp_context.h index 1b16e4d3ea6..28edb3e8a07 100644 --- a/src/ucp/core/ucp_context.h +++ b/src/ucp/core/ucp_context.h @@ -98,10 +98,10 @@ typedef struct ucp_context_config { int rndv_shm_ppln_enable; /** Enable error handling for rndv pipeline protocol */ int rndv_errh_ppln_enable; - /** Enable flow control for rndv pipeline fragments at worker level */ - int rndv_ppln_worker_fc_enable; - /** Maximum number of concurrent pipeline fragments per worker */ - size_t rndv_ppln_worker_max_frags; + /** Enable flow control for rndv mtype fragments at worker level */ + int rndv_mtype_worker_fc_enable; + /** Maximum number of concurrent rndv mtype fragments per worker */ + size_t rndv_mtype_worker_max_frags; /** Threshold for using tag matching offload capabilities. Smaller buffers * will not be posted to the transport. */ size_t tm_thresh; diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 1687c397a06..3cb2eb5adf0 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -2520,8 +2520,8 @@ ucs_status_t ucp_worker_create(ucp_context_h context, worker->counters.ep_failures = 0; /* Initialize RNDV pipeline flow control */ - worker->rndv_ppln_fc.active_frags = 0; - ucs_queue_head_init(&worker->rndv_ppln_fc.pending_q); + worker->rndv_mtype_fc.active_frags = 0; + ucs_queue_head_init(&worker->rndv_mtype_fc.pending_q); /* Copy user flags, and mask-out unsupported flags for compatibility */ worker->flags = UCP_PARAM_VALUE(WORKER, params, flags, FLAGS, 0) & diff --git a/src/ucp/core/ucp_worker.h b/src/ucp/core/ucp_worker.h index 2a1266ab56e..ec6b1d35946 100644 --- a/src/ucp/core/ucp_worker.h +++ b/src/ucp/core/ucp_worker.h @@ -394,10 +394,10 @@ typedef struct ucp_worker { } counters; struct { - /* Worker-level ppln fragment flow control */ + /* Worker-level mtype fragment flow control */ size_t active_frags; /* Current active fragments */ - ucs_queue_head_t pending_q; /* Queue of throttled ppln requests */ - } rndv_ppln_fc; + ucs_queue_head_t pending_q; /* Queue of throttled mtype requests */ + } rndv_mtype_fc; struct { /* Usage tracker handle */ diff --git a/src/ucp/rndv/rndv_mtype.inl b/src/ucp/rndv/rndv_mtype.inl index 3a1661f44da..68fcd4904fa 100644 --- a/src/ucp/rndv/rndv_mtype.inl +++ b/src/ucp/rndv/rndv_mtype.inl @@ -190,16 +190,16 @@ ucp_proto_rndv_mtype_fc_check(ucp_request_t *req) ucp_worker_h worker = req->send.ep->worker; ucp_context_h context = worker->context; - if (!context->config.ext.rndv_ppln_worker_fc_enable) { + if (!context->config.ext.rndv_mtype_worker_fc_enable) { return UCS_OK; } - if (worker->rndv_ppln_fc.active_frags >= - context->config.ext.rndv_ppln_worker_max_frags) { + if (worker->rndv_mtype_fc.active_frags >= + context->config.ext.rndv_mtype_worker_max_frags) { ucs_trace_req("mtype_fc: throttle limit reached active_frags=%zu max=%zu", - worker->rndv_ppln_fc.active_frags, - context->config.ext.rndv_ppln_worker_max_frags); - ucs_queue_push(&worker->rndv_ppln_fc.pending_q, + worker->rndv_mtype_fc.active_frags, + context->config.ext.rndv_mtype_worker_max_frags); + ucs_queue_push(&worker->rndv_mtype_fc.pending_q, &req->send.rndv.ppln.queue_elem); return UCS_ERR_NO_RESOURCE; } @@ -215,8 +215,8 @@ ucp_proto_rndv_mtype_fc_increment(ucp_request_t *req) { ucp_worker_h worker = req->send.ep->worker; - if (worker->context->config.ext.rndv_ppln_worker_fc_enable) { - worker->rndv_ppln_fc.active_frags++; + if (worker->context->config.ext.rndv_mtype_worker_fc_enable) { + worker->rndv_mtype_fc.active_frags++; } } @@ -229,18 +229,18 @@ ucp_proto_rndv_mtype_fc_decrement(ucp_request_t *req) ucp_worker_h worker = req->send.ep->worker; ucp_context_h context = worker->context; - if (!context->config.ext.rndv_ppln_worker_fc_enable) { + if (!context->config.ext.rndv_mtype_worker_fc_enable) { return; } - ucs_assert(worker->rndv_ppln_fc.active_frags > 0); - worker->rndv_ppln_fc.active_frags--; + ucs_assert(worker->rndv_mtype_fc.active_frags > 0); + worker->rndv_mtype_fc.active_frags--; - if (!ucs_queue_is_empty(&worker->rndv_ppln_fc.pending_q)) { + if (!ucs_queue_is_empty(&worker->rndv_mtype_fc.pending_q)) { ucp_request_t *pending_req; ucs_queue_elem_t *elem; - elem = ucs_queue_pull(&worker->rndv_ppln_fc.pending_q); + elem = ucs_queue_pull(&worker->rndv_mtype_fc.pending_q); pending_req = ucs_container_of(elem, ucp_request_t, send.rndv.ppln.queue_elem); ucs_callbackq_add_oneshot(&worker->uct->progress_q, pending_req, From cc2f67f29e727521d7b8e34f4813c7cd7ba0312a Mon Sep 17 00:00:00 2001 From: Alma Mastbaum Date: Tue, 23 Dec 2025 17:52:32 +0200 Subject: [PATCH 03/14] UCP/RNDV: prioritize PUT ops over RTR --- src/ucp/core/ucp_worker.c | 8 ++-- src/ucp/core/ucp_worker.h | 5 ++- src/ucp/rndv/rndv_mtype.inl | 76 +++++++++++++++++++++++++++++-------- 3 files changed, 70 insertions(+), 19 deletions(-) diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 3cb2eb5adf0..187abc3adfe 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -2519,9 +2519,11 @@ ucs_status_t ucp_worker_create(ucp_context_h context, worker->counters.ep_closures = 0; worker->counters.ep_failures = 0; - /* Initialize RNDV pipeline flow control */ - worker->rndv_mtype_fc.active_frags = 0; - ucs_queue_head_init(&worker->rndv_mtype_fc.pending_q); + /* Initialize RNDV mtype flow control with separate priority queues */ + worker->rndv_mtype_fc.active_frags = 0; + ucs_queue_head_init(&worker->rndv_mtype_fc.put_pending_q); + ucs_queue_head_init(&worker->rndv_mtype_fc.get_pending_q); + ucs_queue_head_init(&worker->rndv_mtype_fc.rtr_pending_q); /* Copy user flags, and mask-out unsupported flags for compatibility */ worker->flags = UCP_PARAM_VALUE(WORKER, params, flags, FLAGS, 0) & diff --git a/src/ucp/core/ucp_worker.h b/src/ucp/core/ucp_worker.h index ec6b1d35946..6d330181044 100644 --- a/src/ucp/core/ucp_worker.h +++ b/src/ucp/core/ucp_worker.h @@ -396,7 +396,10 @@ typedef struct ucp_worker { struct { /* Worker-level mtype fragment flow control */ size_t active_frags; /* Current active fragments */ - ucs_queue_head_t pending_q; /* Queue of throttled mtype requests */ + /* Separate pending queues by priority (PUT > GET > RTR) */ + ucs_queue_head_t put_pending_q; /* Throttled PUT (RNDV_SEND) requests */ + ucs_queue_head_t get_pending_q; /* Throttled GET requests */ + ucs_queue_head_t rtr_pending_q; /* Throttled RTR requests */ } rndv_mtype_fc; struct { diff --git a/src/ucp/rndv/rndv_mtype.inl b/src/ucp/rndv/rndv_mtype.inl index 68fcd4904fa..7a907d1dbee 100644 --- a/src/ucp/rndv/rndv_mtype.inl +++ b/src/ucp/rndv/rndv_mtype.inl @@ -10,6 +10,7 @@ #include "rndv.h" #include +#include static ucp_ep_h ucp_proto_rndv_mtype_ep(ucp_worker_t *worker, @@ -177,9 +178,25 @@ static unsigned ucp_proto_rndv_mtype_fc_reschedule_cb(void *arg) return 1; } +/** + * Check if this is an RTR protocol (vs GET protocol). + * Both are UCP_OP_ID_RNDV_RECV, but RTR holds memory longer (waiting for PUT). + */ +static UCS_F_ALWAYS_INLINE int +ucp_proto_rndv_mtype_is_rtr(ucp_request_t *req) +{ + const char *proto_name = req->send.proto_config->proto->name; + return (strstr(proto_name, "rtr") != NULL); +} + /** * Check if request should be throttled due to flow control limit. - * If throttled, the request is queued to pending_q. + * If throttled, the request is queued to the appropriate priority queue. + * + * Priority and quota allocation: + * - PUT (RNDV_SEND): Full quota, highest dequeue priority (releases both sides' memory) + * - GET (RNDV_RECV): 75% quota, medium priority (releases receiver memory) + * - RTR (RNDV_RECV): 50% quota, lowest priority (holds memory until remote PUT) * * @return UCS_OK if not throttled (caller should continue), * UCS_ERR_NO_RESOURCE if throttled and queued (caller should return UCS_OK). @@ -187,20 +204,38 @@ static unsigned ucp_proto_rndv_mtype_fc_reschedule_cb(void *arg) static UCS_F_ALWAYS_INLINE ucs_status_t ucp_proto_rndv_mtype_fc_check(ucp_request_t *req) { - ucp_worker_h worker = req->send.ep->worker; - ucp_context_h context = worker->context; + ucp_worker_h worker = req->send.ep->worker; + ucp_context_h context = worker->context; + size_t max_frags = context->config.ext.rndv_mtype_worker_max_frags; + ucp_operation_id_t op_id; + ucs_queue_head_t *pending_q; + size_t effective_max; if (!context->config.ext.rndv_mtype_worker_fc_enable) { return UCS_OK; } - if (worker->rndv_mtype_fc.active_frags >= - context->config.ext.rndv_mtype_worker_max_frags) { - ucs_trace_req("mtype_fc: throttle limit reached active_frags=%zu max=%zu", - worker->rndv_mtype_fc.active_frags, - context->config.ext.rndv_mtype_worker_max_frags); - ucs_queue_push(&worker->rndv_mtype_fc.pending_q, - &req->send.rndv.ppln.queue_elem); + op_id = ucp_proto_select_op_id(&req->send.proto_config->select_param); + + if (op_id == UCP_OP_ID_RNDV_SEND) { + /* PUT - can use full quota (highest priority, releases both sides) */ + effective_max = max_frags; + pending_q = &worker->rndv_mtype_fc.put_pending_q; + } else if (ucp_proto_rndv_mtype_is_rtr(req)) { + /* RTR - reserve 50% for PUT/GET (holds memory waiting for remote PUT) */ + effective_max = max_frags / 2; + pending_q = &worker->rndv_mtype_fc.rtr_pending_q; + } else { + /* GET - reserve 25% for PUT (releases receiver memory on completion) */ + effective_max = (max_frags * 3) / 4; + pending_q = &worker->rndv_mtype_fc.get_pending_q; + } + + if (worker->rndv_mtype_fc.active_frags >= effective_max) { + ucs_trace_req("mtype_fc: throttle op=%d active=%zu max=%zu effective=%zu", + op_id, worker->rndv_mtype_fc.active_frags, + max_frags, effective_max); + ucs_queue_push(pending_q, &req->send.rndv.ppln.queue_elem); return UCS_ERR_NO_RESOURCE; } @@ -221,13 +256,19 @@ ucp_proto_rndv_mtype_fc_increment(ucp_request_t *req) } /** - * Decrement active_frags counter and reschedule pending request if any. + * Decrement active_frags counter and reschedule pending request. + * Dequeue priority: PUT > GET > RTR (by memory release impact) + * - PUT completion releases memory on both sender and receiver + * - GET completion releases memory on receiver + * - RTR completion doesn't release memory (waiting for PUT) */ static UCS_F_ALWAYS_INLINE void ucp_proto_rndv_mtype_fc_decrement(ucp_request_t *req) { ucp_worker_h worker = req->send.ep->worker; ucp_context_h context = worker->context; + ucs_queue_elem_t *elem = NULL; + ucp_request_t *pending_req; if (!context->config.ext.rndv_mtype_worker_fc_enable) { return; @@ -236,11 +277,16 @@ ucp_proto_rndv_mtype_fc_decrement(ucp_request_t *req) ucs_assert(worker->rndv_mtype_fc.active_frags > 0); worker->rndv_mtype_fc.active_frags--; - if (!ucs_queue_is_empty(&worker->rndv_mtype_fc.pending_q)) { - ucp_request_t *pending_req; - ucs_queue_elem_t *elem; + /* Dequeue with priority: PUT > GET > RTR */ + if (!ucs_queue_is_empty(&worker->rndv_mtype_fc.put_pending_q)) { + elem = ucs_queue_pull(&worker->rndv_mtype_fc.put_pending_q); + } else if (!ucs_queue_is_empty(&worker->rndv_mtype_fc.get_pending_q)) { + elem = ucs_queue_pull(&worker->rndv_mtype_fc.get_pending_q); + } else if (!ucs_queue_is_empty(&worker->rndv_mtype_fc.rtr_pending_q)) { + elem = ucs_queue_pull(&worker->rndv_mtype_fc.rtr_pending_q); + } - elem = ucs_queue_pull(&worker->rndv_mtype_fc.pending_q); + if (elem != NULL) { pending_req = ucs_container_of(elem, ucp_request_t, send.rndv.ppln.queue_elem); ucs_callbackq_add_oneshot(&worker->uct->progress_q, pending_req, From 0243bab22975666f34d7513eb6285a17e9e134de Mon Sep 17 00:00:00 2001 From: Alma Mastbaum Date: Wed, 31 Dec 2025 10:02:48 +0200 Subject: [PATCH 04/14] UCP/RNDV: unite RTR, PUT and GET in a generic ucp_proto_rndv_mtype_fc_check --- src/ucp/rndv/rndv_get.c | 14 +++++++++---- src/ucp/rndv/rndv_mtype.inl | 39 +++++++++---------------------------- src/ucp/rndv/rndv_put.c | 14 +++++++++---- src/ucp/rndv/rndv_rtr.c | 11 ++++++++--- 4 files changed, 37 insertions(+), 41 deletions(-) diff --git a/src/ucp/rndv/rndv_get.c b/src/ucp/rndv/rndv_get.c index a23ea33bade..fa862e2896f 100644 --- a/src/ucp/rndv/rndv_get.c +++ b/src/ucp/rndv/rndv_get.c @@ -286,17 +286,23 @@ ucp_proto_rndv_get_mtype_fetch_completion(uct_completion_t *uct_comp) static ucs_status_t ucp_proto_rndv_get_mtype_fetch_progress(uct_pending_req_t *uct_req) { - ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct); + ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct); + ucp_context_h context = req->send.ep->worker->context; const ucp_proto_rndv_bulk_priv_t *rpriv; ucs_status_t status; + size_t max_frags; + ucs_queue_head_t *pending_q; /* coverity[tainted_data_downcast] */ rpriv = req->send.proto_config->priv; if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) { - /* Check throttling. If no resource at the moment, queue the request - * in the throttle pending queue and return UCS_OK. */ - if (ucp_proto_rndv_mtype_fc_check(req) == UCS_ERR_NO_RESOURCE) { + /* Check throttling limit. If no resource at the moment, queue the request + * in GET pending queue and return UCS_OK. */ + max_frags = context->config.ext.rndv_mtype_worker_max_frags * 3 / 4; + pending_q = &req->send.ep->worker->rndv_mtype_fc.get_pending_q; + if (ucp_proto_rndv_mtype_fc_check( + req, max_frags, pending_q) == UCS_ERR_NO_RESOURCE) { return UCS_OK; } diff --git a/src/ucp/rndv/rndv_mtype.inl b/src/ucp/rndv/rndv_mtype.inl index 7a907d1dbee..3aa1f298582 100644 --- a/src/ucp/rndv/rndv_mtype.inl +++ b/src/ucp/rndv/rndv_mtype.inl @@ -180,7 +180,6 @@ static unsigned ucp_proto_rndv_mtype_fc_reschedule_cb(void *arg) /** * Check if this is an RTR protocol (vs GET protocol). - * Both are UCP_OP_ID_RNDV_RECV, but RTR holds memory longer (waiting for PUT). */ static UCS_F_ALWAYS_INLINE int ucp_proto_rndv_mtype_is_rtr(ucp_request_t *req) @@ -202,39 +201,19 @@ ucp_proto_rndv_mtype_is_rtr(ucp_request_t *req) * UCS_ERR_NO_RESOURCE if throttled and queued (caller should return UCS_OK). */ static UCS_F_ALWAYS_INLINE ucs_status_t -ucp_proto_rndv_mtype_fc_check(ucp_request_t *req) +ucp_proto_rndv_mtype_fc_check(ucp_request_t *req, size_t max_frags, + ucs_queue_head_t *pending_q) { - ucp_worker_h worker = req->send.ep->worker; - ucp_context_h context = worker->context; - size_t max_frags = context->config.ext.rndv_mtype_worker_max_frags; - ucp_operation_id_t op_id; - ucs_queue_head_t *pending_q; - size_t effective_max; + ucp_worker_h worker = req->send.ep->worker; + ucp_context_h context = worker->context; if (!context->config.ext.rndv_mtype_worker_fc_enable) { return UCS_OK; } - op_id = ucp_proto_select_op_id(&req->send.proto_config->select_param); - - if (op_id == UCP_OP_ID_RNDV_SEND) { - /* PUT - can use full quota (highest priority, releases both sides) */ - effective_max = max_frags; - pending_q = &worker->rndv_mtype_fc.put_pending_q; - } else if (ucp_proto_rndv_mtype_is_rtr(req)) { - /* RTR - reserve 50% for PUT/GET (holds memory waiting for remote PUT) */ - effective_max = max_frags / 2; - pending_q = &worker->rndv_mtype_fc.rtr_pending_q; - } else { - /* GET - reserve 25% for PUT (releases receiver memory on completion) */ - effective_max = (max_frags * 3) / 4; - pending_q = &worker->rndv_mtype_fc.get_pending_q; - } - - if (worker->rndv_mtype_fc.active_frags >= effective_max) { - ucs_trace_req("mtype_fc: throttle op=%d active=%zu max=%zu effective=%zu", - op_id, worker->rndv_mtype_fc.active_frags, - max_frags, effective_max); + if (worker->rndv_mtype_fc.active_frags >= max_frags) { + ucs_trace_req("mtype_fc: fragments throttle limit reached (%zu/%zu)", + worker->rndv_mtype_fc.active_frags, max_frags); ucs_queue_push(pending_q, &req->send.rndv.ppln.queue_elem); return UCS_ERR_NO_RESOURCE; } @@ -265,8 +244,8 @@ ucp_proto_rndv_mtype_fc_increment(ucp_request_t *req) static UCS_F_ALWAYS_INLINE void ucp_proto_rndv_mtype_fc_decrement(ucp_request_t *req) { - ucp_worker_h worker = req->send.ep->worker; - ucp_context_h context = worker->context; + ucp_worker_h worker = req->send.ep->worker; + ucp_context_h context = worker->context; ucs_queue_elem_t *elem = NULL; ucp_request_t *pending_req; diff --git a/src/ucp/rndv/rndv_put.c b/src/ucp/rndv/rndv_put.c index e0c78c76a9a..e46f3c91957 100644 --- a/src/ucp/rndv/rndv_put.c +++ b/src/ucp/rndv/rndv_put.c @@ -517,15 +517,21 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_proto_rndv_put_mtype_send_func( static ucs_status_t ucp_proto_rndv_put_mtype_copy_progress(uct_pending_req_t *uct_req) { - ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct); + ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct); + ucp_context_h context = req->send.ep->worker->context; const ucp_proto_rndv_put_priv_t *rpriv = req->send.proto_config->priv; ucs_status_t status; + size_t max_frags; + ucs_queue_head_t *pending_q; ucs_assert(!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)); - /* Check throttling. If no resource at the moment, queue the request - * in the throttle pending queue and return UCS_OK. */ - if (ucp_proto_rndv_mtype_fc_check(req) == UCS_ERR_NO_RESOURCE) { + /* Check throttling limit. If no resource at the moment, queue the request + * in PUT pending queue and return UCS_OK. */ + max_frags = context->config.ext.rndv_mtype_worker_max_frags; + pending_q = &req->send.ep->worker->rndv_mtype_fc.put_pending_q; + if (ucp_proto_rndv_mtype_fc_check( + req, max_frags, pending_q) == UCS_ERR_NO_RESOURCE) { return UCS_OK; } diff --git a/src/ucp/rndv/rndv_rtr.c b/src/ucp/rndv/rndv_rtr.c index 15e0bd09c3b..fca2c62e8cf 100644 --- a/src/ucp/rndv/rndv_rtr.c +++ b/src/ucp/rndv/rndv_rtr.c @@ -353,12 +353,17 @@ static ucs_status_t ucp_proto_rndv_rtr_mtype_progress(uct_pending_req_t *self) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); const ucp_proto_rndv_rtr_mtype_priv_t *rpriv = req->send.proto_config->priv; + size_t max_frags; + ucs_queue_head_t *pending_q; ucs_status_t status; if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) { - /* Check throttling. If no resource at the moment, queue the request - * in the throttle pending queue and return UCS_OK. */ - if (ucp_proto_rndv_mtype_fc_check(req) == UCS_ERR_NO_RESOURCE) { + /* Check throttling limit. If no resource at the moment, queue the + * request in RTR pending queue and return UCS_OK. */ + max_frags = context->config.ext.rndv_mtype_worker_max_frags / 2; + pending_q = &req->send.ep->worker->rndv_mtype_fc.rtr_pending_q; + if (ucp_proto_rndv_mtype_fc_check( + req, max_frags, pending_q) == UCS_ERR_NO_RESOURCE) { return UCS_OK; } From 88f8981d5c94c0dabe87f6091872a1886541f0d0 Mon Sep 17 00:00:00 2001 From: Alma Mastbaum Date: Wed, 31 Dec 2025 10:21:47 +0200 Subject: [PATCH 05/14] UCP/RNDV: some fixes --- src/ucp/rndv/rndv_mtype.inl | 25 ++++++++++++++++--------- src/ucp/rndv/rndv_put.c | 3 ++- src/ucp/rndv/rndv_rtr.c | 6 ++++-- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/src/ucp/rndv/rndv_mtype.inl b/src/ucp/rndv/rndv_mtype.inl index 3aa1f298582..0e67a135861 100644 --- a/src/ucp/rndv/rndv_mtype.inl +++ b/src/ucp/rndv/rndv_mtype.inl @@ -192,13 +192,15 @@ ucp_proto_rndv_mtype_is_rtr(ucp_request_t *req) * Check if request should be throttled due to flow control limit. * If throttled, the request is queued to the appropriate priority queue. * - * Priority and quota allocation: - * - PUT (RNDV_SEND): Full quota, highest dequeue priority (releases both sides' memory) - * - GET (RNDV_RECV): 75% quota, medium priority (releases receiver memory) - * - RTR (RNDV_RECV): 50% quota, lowest priority (holds memory until remote PUT) + * Priority and quota allocation (based on memory release and system impact): + * - PUT: Full quota, remote already sent RTR and is waiting. Completing PUT + * unblocks remote. + * - GET: 75% quota, self-contained operation, no remote allocation triggered. + * - RTR: 50% quota, scheduling RTR causes additional allocation on remote + * (RNDV PUT). * * @return UCS_OK if not throttled (caller should continue), - * UCS_ERR_NO_RESOURCE if throttled and queued (caller should return UCS_OK). + * UCS_ERR_NO_RESOURCE if throttled and queued. */ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_proto_rndv_mtype_fc_check(ucp_request_t *req, size_t max_frags, @@ -236,10 +238,15 @@ ucp_proto_rndv_mtype_fc_increment(ucp_request_t *req) /** * Decrement active_frags counter and reschedule pending request. - * Dequeue priority: PUT > GET > RTR (by memory release impact) - * - PUT completion releases memory on both sender and receiver - * - GET completion releases memory on receiver - * - RTR completion doesn't release memory (waiting for PUT) + * Dequeue priority: PUT > GET > RTR + * + * Priority rationale: + * PUT - Remote is blocked waiting for our data. Scheduling PUT unblocks remote + * immediately. + * GET - Self-contained fetch operation. Completes without causing remote + * allocations. + * RTR - Scheduling RTR triggers a remote PUT allocation, increasing total + * memory pressure. */ static UCS_F_ALWAYS_INLINE void ucp_proto_rndv_mtype_fc_decrement(ucp_request_t *req) diff --git a/src/ucp/rndv/rndv_put.c b/src/ucp/rndv/rndv_put.c index e46f3c91957..7f212443cf4 100644 --- a/src/ucp/rndv/rndv_put.c +++ b/src/ucp/rndv/rndv_put.c @@ -519,7 +519,7 @@ ucp_proto_rndv_put_mtype_copy_progress(uct_pending_req_t *uct_req) { ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct); ucp_context_h context = req->send.ep->worker->context; - const ucp_proto_rndv_put_priv_t *rpriv = req->send.proto_config->priv; + const ucp_proto_rndv_put_priv_t *rpriv; ucs_status_t status; size_t max_frags; ucs_queue_head_t *pending_q; @@ -535,6 +535,7 @@ ucp_proto_rndv_put_mtype_copy_progress(uct_pending_req_t *uct_req) return UCS_OK; } + rpriv = req->send.proto_config->priv; status = ucp_proto_rndv_mtype_request_init(req, rpriv->bulk.frag_mem_type, rpriv->bulk.frag_sys_dev); if (status != UCS_OK) { diff --git a/src/ucp/rndv/rndv_rtr.c b/src/ucp/rndv/rndv_rtr.c index fca2c62e8cf..bd0e5b0e145 100644 --- a/src/ucp/rndv/rndv_rtr.c +++ b/src/ucp/rndv/rndv_rtr.c @@ -351,8 +351,9 @@ ucp_proto_rndv_rtr_mtype_data_received(ucp_request_t *req, int in_buffer) static ucs_status_t ucp_proto_rndv_rtr_mtype_progress(uct_pending_req_t *self) { - ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); - const ucp_proto_rndv_rtr_mtype_priv_t *rpriv = req->send.proto_config->priv; + ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); + ucp_context_h context = req->send.ep->worker->context; + const ucp_proto_rndv_rtr_mtype_priv_t *rpriv; size_t max_frags; ucs_queue_head_t *pending_q; ucs_status_t status; @@ -367,6 +368,7 @@ static ucs_status_t ucp_proto_rndv_rtr_mtype_progress(uct_pending_req_t *self) return UCS_OK; } + rpriv = req->send.proto_config->priv; status = ucp_proto_rndv_mtype_request_init(req, rpriv->frag_mem_type, rpriv->frag_sys_dev); if (status != UCS_OK) { From 2abf694470f9a6d87ca4c50b957db9bc6ea8fc3e Mon Sep 17 00:00:00 2001 From: Alma Mastbaum Date: Wed, 31 Dec 2025 12:03:52 +0200 Subject: [PATCH 06/14] UCP/RNDV: clang --- src/ucp/rndv/rndv_get.c | 4 ++-- src/ucp/rndv/rndv_ppln.c | 2 +- src/ucp/rndv/rndv_put.c | 6 +++--- src/ucp/rndv/rndv_rtr.c | 6 +++--- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/ucp/rndv/rndv_get.c b/src/ucp/rndv/rndv_get.c index fa862e2896f..5f4eaf6cf6b 100644 --- a/src/ucp/rndv/rndv_get.c +++ b/src/ucp/rndv/rndv_get.c @@ -301,8 +301,8 @@ ucp_proto_rndv_get_mtype_fetch_progress(uct_pending_req_t *uct_req) * in GET pending queue and return UCS_OK. */ max_frags = context->config.ext.rndv_mtype_worker_max_frags * 3 / 4; pending_q = &req->send.ep->worker->rndv_mtype_fc.get_pending_q; - if (ucp_proto_rndv_mtype_fc_check( - req, max_frags, pending_q) == UCS_ERR_NO_RESOURCE) { + if (ucp_proto_rndv_mtype_fc_check(req, max_frags, pending_q) == + UCS_ERR_NO_RESOURCE) { return UCS_OK; } diff --git a/src/ucp/rndv/rndv_ppln.c b/src/ucp/rndv/rndv_ppln.c index 982cb7e9548..26282fae8a0 100644 --- a/src/ucp/rndv/rndv_ppln.c +++ b/src/ucp/rndv/rndv_ppln.c @@ -253,7 +253,7 @@ void ucp_proto_rndv_ppln_recv_frag_complete(ucp_request_t *freq, int send_ack, static ucs_status_t ucp_proto_rndv_ppln_progress(uct_pending_req_t *uct_req) { - ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct); + ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct); ucp_worker_h worker = req->send.ep->worker; const ucp_proto_rndv_ppln_priv_t *rpriv; ucp_datatype_iter_t next_iter; diff --git a/src/ucp/rndv/rndv_put.c b/src/ucp/rndv/rndv_put.c index 7f212443cf4..d84721bd532 100644 --- a/src/ucp/rndv/rndv_put.c +++ b/src/ucp/rndv/rndv_put.c @@ -530,12 +530,12 @@ ucp_proto_rndv_put_mtype_copy_progress(uct_pending_req_t *uct_req) * in PUT pending queue and return UCS_OK. */ max_frags = context->config.ext.rndv_mtype_worker_max_frags; pending_q = &req->send.ep->worker->rndv_mtype_fc.put_pending_q; - if (ucp_proto_rndv_mtype_fc_check( - req, max_frags, pending_q) == UCS_ERR_NO_RESOURCE) { + if (ucp_proto_rndv_mtype_fc_check(req, max_frags, pending_q) == + UCS_ERR_NO_RESOURCE) { return UCS_OK; } - rpriv = req->send.proto_config->priv; + rpriv = req->send.proto_config->priv; status = ucp_proto_rndv_mtype_request_init(req, rpriv->bulk.frag_mem_type, rpriv->bulk.frag_sys_dev); if (status != UCS_OK) { diff --git a/src/ucp/rndv/rndv_rtr.c b/src/ucp/rndv/rndv_rtr.c index bd0e5b0e145..2e7a19742e2 100644 --- a/src/ucp/rndv/rndv_rtr.c +++ b/src/ucp/rndv/rndv_rtr.c @@ -363,12 +363,12 @@ static ucs_status_t ucp_proto_rndv_rtr_mtype_progress(uct_pending_req_t *self) * request in RTR pending queue and return UCS_OK. */ max_frags = context->config.ext.rndv_mtype_worker_max_frags / 2; pending_q = &req->send.ep->worker->rndv_mtype_fc.rtr_pending_q; - if (ucp_proto_rndv_mtype_fc_check( - req, max_frags, pending_q) == UCS_ERR_NO_RESOURCE) { + if (ucp_proto_rndv_mtype_fc_check(req, max_frags, pending_q) == + UCS_ERR_NO_RESOURCE) { return UCS_OK; } - rpriv = req->send.proto_config->priv; + rpriv = req->send.proto_config->priv; status = ucp_proto_rndv_mtype_request_init(req, rpriv->frag_mem_type, rpriv->frag_sys_dev); if (status != UCS_OK) { From 07d39e403274b838c4e88f3ad6e614fd3c4acbd1 Mon Sep 17 00:00:00 2001 From: Alma Mastbaum Date: Wed, 31 Dec 2025 13:50:01 +0200 Subject: [PATCH 07/14] UCP/RNDV: some fixes --- src/ucp/rndv/rndv_get.c | 4 ++-- src/ucp/rndv/rndv_mtype.inl | 26 ++++++-------------------- 2 files changed, 8 insertions(+), 22 deletions(-) diff --git a/src/ucp/rndv/rndv_get.c b/src/ucp/rndv/rndv_get.c index 5f4eaf6cf6b..a95562c27d3 100644 --- a/src/ucp/rndv/rndv_get.c +++ b/src/ucp/rndv/rndv_get.c @@ -297,8 +297,8 @@ ucp_proto_rndv_get_mtype_fetch_progress(uct_pending_req_t *uct_req) rpriv = req->send.proto_config->priv; if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) { - /* Check throttling limit. If no resource at the moment, queue the request - * in GET pending queue and return UCS_OK. */ + /* Check throttling limit. If no resource at the moment, queue the + * request in GET pending queue and return UCS_OK. */ max_frags = context->config.ext.rndv_mtype_worker_max_frags * 3 / 4; pending_q = &req->send.ep->worker->rndv_mtype_fc.get_pending_q; if (ucp_proto_rndv_mtype_fc_check(req, max_frags, pending_q) == diff --git a/src/ucp/rndv/rndv_mtype.inl b/src/ucp/rndv/rndv_mtype.inl index 0e67a135861..94c8744df76 100644 --- a/src/ucp/rndv/rndv_mtype.inl +++ b/src/ucp/rndv/rndv_mtype.inl @@ -178,29 +178,15 @@ static unsigned ucp_proto_rndv_mtype_fc_reschedule_cb(void *arg) return 1; } -/** - * Check if this is an RTR protocol (vs GET protocol). - */ -static UCS_F_ALWAYS_INLINE int -ucp_proto_rndv_mtype_is_rtr(ucp_request_t *req) -{ - const char *proto_name = req->send.proto_config->proto->name; - return (strstr(proto_name, "rtr") != NULL); -} - /** * Check if request should be throttled due to flow control limit. * If throttled, the request is queued to the appropriate priority queue. * - * Priority and quota allocation (based on memory release and system impact): - * - PUT: Full quota, remote already sent RTR and is waiting. Completing PUT - * unblocks remote. - * - GET: 75% quota, self-contained operation, no remote allocation triggered. - * - RTR: 50% quota, scheduling RTR causes additional allocation on remote - * (RNDV PUT). + * @param req The request to check. + * @param max_frags The maximum number of fragments allowed. + * @param pending_q The queue to add the request to if it is throttled. * - * @return UCS_OK if not throttled (caller should continue), - * UCS_ERR_NO_RESOURCE if throttled and queued. + * @return UCS_OK if not throttled, UCS_ERR_NO_RESOURCE if throttled and queued. */ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_proto_rndv_mtype_fc_check(ucp_request_t *req, size_t max_frags, @@ -242,9 +228,9 @@ ucp_proto_rndv_mtype_fc_increment(ucp_request_t *req) * * Priority rationale: * PUT - Remote is blocked waiting for our data. Scheduling PUT unblocks remote - * immediately. + * as well. * GET - Self-contained fetch operation. Completes without causing remote - * allocations. + * allocations, but scheduling it doesn't unblock another buffer. * RTR - Scheduling RTR triggers a remote PUT allocation, increasing total * memory pressure. */ From f539944c9f068794c9053bd79518a5f7f4309d4b Mon Sep 17 00:00:00 2001 From: Alma Mastbaum Date: Wed, 31 Dec 2025 13:53:09 +0200 Subject: [PATCH 08/14] UCP/RNDV: change fragments distibution across different protos --- src/ucp/rndv/rndv_get.c | 2 +- src/ucp/rndv/rndv_rtr.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ucp/rndv/rndv_get.c b/src/ucp/rndv/rndv_get.c index a95562c27d3..fd5f2fcab18 100644 --- a/src/ucp/rndv/rndv_get.c +++ b/src/ucp/rndv/rndv_get.c @@ -299,7 +299,7 @@ ucp_proto_rndv_get_mtype_fetch_progress(uct_pending_req_t *uct_req) if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) { /* Check throttling limit. If no resource at the moment, queue the * request in GET pending queue and return UCS_OK. */ - max_frags = context->config.ext.rndv_mtype_worker_max_frags * 3 / 4; + max_frags = context->config.ext.rndv_mtype_worker_max_frags * 4 / 5; pending_q = &req->send.ep->worker->rndv_mtype_fc.get_pending_q; if (ucp_proto_rndv_mtype_fc_check(req, max_frags, pending_q) == UCS_ERR_NO_RESOURCE) { diff --git a/src/ucp/rndv/rndv_rtr.c b/src/ucp/rndv/rndv_rtr.c index 2e7a19742e2..390afae9a4b 100644 --- a/src/ucp/rndv/rndv_rtr.c +++ b/src/ucp/rndv/rndv_rtr.c @@ -361,7 +361,7 @@ static ucs_status_t ucp_proto_rndv_rtr_mtype_progress(uct_pending_req_t *self) if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) { /* Check throttling limit. If no resource at the moment, queue the * request in RTR pending queue and return UCS_OK. */ - max_frags = context->config.ext.rndv_mtype_worker_max_frags / 2; + max_frags = context->config.ext.rndv_mtype_worker_max_frags * 3 / 5; pending_q = &req->send.ep->worker->rndv_mtype_fc.rtr_pending_q; if (ucp_proto_rndv_mtype_fc_check(req, max_frags, pending_q) == UCS_ERR_NO_RESOURCE) { From 208c3134d673abe9cdb8214f0f500d1ba1f0194e Mon Sep 17 00:00:00 2001 From: Alma Mastbaum Date: Wed, 31 Dec 2025 17:26:09 +0200 Subject: [PATCH 09/14] UCP/RNDV: some changes --- src/ucp/core/ucp_worker.c | 2 +- src/ucp/rndv/rndv_get.c | 2 +- src/ucp/rndv/rndv_mtype.inl | 1 - src/ucp/rndv/rndv_rtr.c | 2 +- 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 36d67cdd814..516c2702ef2 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -2524,7 +2524,7 @@ ucs_status_t ucp_worker_create(ucp_context_h context, worker->counters.ep_closures = 0; worker->counters.ep_failures = 0; - /* Initialize RNDV mtype flow control with separate priority queues */ + /* Initialize RNDV mtype flow control */ worker->rndv_mtype_fc.active_frags = 0; ucs_queue_head_init(&worker->rndv_mtype_fc.put_pending_q); ucs_queue_head_init(&worker->rndv_mtype_fc.get_pending_q); diff --git a/src/ucp/rndv/rndv_get.c b/src/ucp/rndv/rndv_get.c index fd5f2fcab18..2965840e255 100644 --- a/src/ucp/rndv/rndv_get.c +++ b/src/ucp/rndv/rndv_get.c @@ -299,7 +299,7 @@ ucp_proto_rndv_get_mtype_fetch_progress(uct_pending_req_t *uct_req) if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) { /* Check throttling limit. If no resource at the moment, queue the * request in GET pending queue and return UCS_OK. */ - max_frags = context->config.ext.rndv_mtype_worker_max_frags * 4 / 5; + max_frags = context->config.ext.rndv_mtype_worker_max_frags / 5 * 4; pending_q = &req->send.ep->worker->rndv_mtype_fc.get_pending_q; if (ucp_proto_rndv_mtype_fc_check(req, max_frags, pending_q) == UCS_ERR_NO_RESOURCE) { diff --git a/src/ucp/rndv/rndv_mtype.inl b/src/ucp/rndv/rndv_mtype.inl index 94c8744df76..4dd5f981883 100644 --- a/src/ucp/rndv/rndv_mtype.inl +++ b/src/ucp/rndv/rndv_mtype.inl @@ -10,7 +10,6 @@ #include "rndv.h" #include -#include static ucp_ep_h ucp_proto_rndv_mtype_ep(ucp_worker_t *worker, diff --git a/src/ucp/rndv/rndv_rtr.c b/src/ucp/rndv/rndv_rtr.c index 390afae9a4b..38bfc9b0bef 100644 --- a/src/ucp/rndv/rndv_rtr.c +++ b/src/ucp/rndv/rndv_rtr.c @@ -361,7 +361,7 @@ static ucs_status_t ucp_proto_rndv_rtr_mtype_progress(uct_pending_req_t *self) if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) { /* Check throttling limit. If no resource at the moment, queue the * request in RTR pending queue and return UCS_OK. */ - max_frags = context->config.ext.rndv_mtype_worker_max_frags * 3 / 5; + max_frags = context->config.ext.rndv_mtype_worker_max_frags / 5 * 3; pending_q = &req->send.ep->worker->rndv_mtype_fc.rtr_pending_q; if (ucp_proto_rndv_mtype_fc_check(req, max_frags, pending_q) == UCS_ERR_NO_RESOURCE) { From bfa55cd7605343a01a6bae037a91b6b20dba4f5c Mon Sep 17 00:00:00 2001 From: Alma Mastbaum Date: Wed, 31 Dec 2025 17:26:41 +0200 Subject: [PATCH 10/14] UCP/RNDV: Added gtest --- src/ucp/core/ucp_worker.c | 26 ++++--- src/ucp/core/ucp_worker.h | 2 + src/ucp/rndv/rndv_mtype.inl | 4 + test/gtest/ucp/test_ucp_am.cc | 142 ++++++++++++++++++++++++++++++++++ 4 files changed, 162 insertions(+), 12 deletions(-) diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 516c2702ef2..02d3d8ad250 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -87,18 +87,20 @@ static ucs_stats_class_t ucp_worker_stats_class = { .num_counters = UCP_WORKER_STAT_LAST, .class_id = UCS_STATS_CLASS_ID_INVALID, .counter_names = { - [UCP_WORKER_STAT_TAG_RX_EAGER_MSG] = "tag_rx_eager_msg", - [UCP_WORKER_STAT_TAG_RX_EAGER_SYNC_MSG] = "tag_rx_sync_msg", - [UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_EXP] = "tag_rx_eager_chunk_exp", - [UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_UNEXP] = "tag_rx_eager_chunk_unexp", - [UCP_WORKER_STAT_RNDV_RX_EXP] = "rndv_rx_exp", - [UCP_WORKER_STAT_RNDV_RX_UNEXP] = "rndv_rx_unexp", - [UCP_WORKER_STAT_RNDV_PUT_ZCOPY] = "rndv_put_zcopy", - [UCP_WORKER_STAT_RNDV_PUT_MTYPE_ZCOPY] = "rndv_put_mtype_zcopy", - [UCP_WORKER_STAT_RNDV_GET_ZCOPY] = "rndv_get_zcopy", - [UCP_WORKER_STAT_RNDV_RTR] = "rndv_rtr", - [UCP_WORKER_STAT_RNDV_RTR_MTYPE] = "rndv_rtr_mtype", - [UCP_WORKER_STAT_RNDV_RKEY_PTR] = "rndv_rkey_ptr" + [UCP_WORKER_STAT_TAG_RX_EAGER_MSG] = "tag_rx_eager_msg", + [UCP_WORKER_STAT_TAG_RX_EAGER_SYNC_MSG] = "tag_rx_sync_msg", + [UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_EXP] = "tag_rx_eager_chunk_exp", + [UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_UNEXP] = "tag_rx_eager_chunk_unexp", + [UCP_WORKER_STAT_RNDV_RX_EXP] = "rndv_rx_exp", + [UCP_WORKER_STAT_RNDV_RX_UNEXP] = "rndv_rx_unexp", + [UCP_WORKER_STAT_RNDV_PUT_ZCOPY] = "rndv_put_zcopy", + [UCP_WORKER_STAT_RNDV_PUT_MTYPE_ZCOPY] = "rndv_put_mtype_zcopy", + [UCP_WORKER_STAT_RNDV_GET_ZCOPY] = "rndv_get_zcopy", + [UCP_WORKER_STAT_RNDV_RTR] = "rndv_rtr", + [UCP_WORKER_STAT_RNDV_RTR_MTYPE] = "rndv_rtr_mtype", + [UCP_WORKER_STAT_RNDV_RKEY_PTR] = "rndv_rkey_ptr", + [UCP_WORKER_STAT_RNDV_MTYPE_FC_THROTTLED] = "rndv_mtype_fc_throttled", + [UCP_WORKER_STAT_RNDV_MTYPE_FC_INCREMENTED] = "rndv_mtype_fc_incremented" } }; #endif diff --git a/src/ucp/core/ucp_worker.h b/src/ucp/core/ucp_worker.h index 6d330181044..c78a7bdbcb0 100644 --- a/src/ucp/core/ucp_worker.h +++ b/src/ucp/core/ucp_worker.h @@ -154,6 +154,8 @@ enum { UCP_WORKER_STAT_RNDV_RTR, UCP_WORKER_STAT_RNDV_RTR_MTYPE, UCP_WORKER_STAT_RNDV_RKEY_PTR, + UCP_WORKER_STAT_RNDV_MTYPE_FC_THROTTLED, + UCP_WORKER_STAT_RNDV_MTYPE_FC_INCREMENTED, UCP_WORKER_STAT_LAST }; diff --git a/src/ucp/rndv/rndv_mtype.inl b/src/ucp/rndv/rndv_mtype.inl index 4dd5f981883..9244e684391 100644 --- a/src/ucp/rndv/rndv_mtype.inl +++ b/src/ucp/rndv/rndv_mtype.inl @@ -201,6 +201,8 @@ ucp_proto_rndv_mtype_fc_check(ucp_request_t *req, size_t max_frags, if (worker->rndv_mtype_fc.active_frags >= max_frags) { ucs_trace_req("mtype_fc: fragments throttle limit reached (%zu/%zu)", worker->rndv_mtype_fc.active_frags, max_frags); + UCS_STATS_UPDATE_COUNTER(worker->stats, + UCP_WORKER_STAT_RNDV_MTYPE_FC_THROTTLED, 1); ucs_queue_push(pending_q, &req->send.rndv.ppln.queue_elem); return UCS_ERR_NO_RESOURCE; } @@ -218,6 +220,8 @@ ucp_proto_rndv_mtype_fc_increment(ucp_request_t *req) if (worker->context->config.ext.rndv_mtype_worker_fc_enable) { worker->rndv_mtype_fc.active_frags++; + UCS_STATS_UPDATE_COUNTER(worker->stats, + UCP_WORKER_STAT_RNDV_MTYPE_FC_INCREMENTED, 1); } } diff --git a/test/gtest/ucp/test_ucp_am.cc b/test/gtest/ucp/test_ucp_am.cc index 45dc9edd2dc..0ca207ac553 100644 --- a/test/gtest/ucp/test_ucp_am.cc +++ b/test/gtest/ucp/test_ucp_am.cc @@ -2187,4 +2187,146 @@ UCS_TEST_P(test_ucp_am_nbx_rndv_ppln, cuda_managed_buff, UCP_INSTANTIATE_TEST_CASE_GPU_AWARE(test_ucp_am_nbx_rndv_ppln); + +class test_ucp_am_nbx_rndv_mtype_fc : public test_ucp_am_nbx_rndv_ppln { +protected: + void check_stats_ge(entity &e, uint64_t cntr, uint64_t min_value) + { + auto stats_node = e.worker()->stats; + auto value = UCS_STATS_GET_COUNTER(stats_node, cntr); + + EXPECT_GE(value, min_value) << "counter " + << stats_node->cls->counter_names[cntr] + << " expected >= " << min_value + << " but got " << value; + } + + void check_active_frags_zero(entity &e) + { + EXPECT_EQ(0u, e.worker()->rndv_mtype_fc.active_frags) + << "active_frags should be 0 after completion"; + } + + void check_pending_queues_empty(entity &e) + { + ucp_worker_h worker = e.worker(); + EXPECT_TRUE(ucs_queue_is_empty(&worker->rndv_mtype_fc.put_pending_q)) + << "put_pending_q should be empty"; + EXPECT_TRUE(ucs_queue_is_empty(&worker->rndv_mtype_fc.get_pending_q)) + << "get_pending_q should be empty"; + EXPECT_TRUE(ucs_queue_is_empty(&worker->rndv_mtype_fc.rtr_pending_q)) + << "rtr_pending_q should be empty"; + } + + void send_message(size_t num_frags) + { + set_mem_type(UCS_MEMORY_TYPE_CUDA_MANAGED); + test_am_send_recv(get_rndv_frag_size(UCS_MEMORY_TYPE_CUDA) * num_frags); + } + + void verify_clean_fc_state() + { + check_active_frags_zero(sender()); + check_active_frags_zero(receiver()); + check_pending_queues_empty(sender()); + check_pending_queues_empty(receiver()); + } +}; + +UCS_TEST_P(test_ucp_am_nbx_rndv_mtype_fc, fc_enabled_under_cap, + "RNDV_MTYPE_WORKER_FC_ENABLE=y", + "RNDV_MTYPE_WORKER_MAX_FRAGS=128", + "RNDV_FRAG_MEM_TYPE=cuda") +{ + if (!sender().is_rndv_put_ppln_supported()) { + UCS_TEST_SKIP_R("RNDV is not supported"); + } + + send_message(8); + + check_stats_ge(sender(), UCP_WORKER_STAT_RNDV_PUT_MTYPE_ZCOPY, 1); + check_stats_ge(receiver(), UCP_WORKER_STAT_RNDV_RTR_MTYPE, 1); + + /* Throttling should NOT have occurred */ + auto sender_throttled = UCS_STATS_GET_COUNTER(sender().worker()->stats, + UCP_WORKER_STAT_RNDV_MTYPE_FC_THROTTLED); + auto receiver_throttled = UCS_STATS_GET_COUNTER(receiver().worker()->stats, + UCP_WORKER_STAT_RNDV_MTYPE_FC_THROTTLED); + EXPECT_EQ(0u, sender_throttled) << "sender should not be throttled"; + EXPECT_EQ(0u, receiver_throttled) << "receiver should not be throttled"; + + /* FC should have been active (incremented) even though no throttling */ + auto sender_incremented = UCS_STATS_GET_COUNTER( + sender().worker()->stats, + UCP_WORKER_STAT_RNDV_MTYPE_FC_INCREMENTED); + auto receiver_incremented = UCS_STATS_GET_COUNTER( + receiver().worker()->stats, + UCP_WORKER_STAT_RNDV_MTYPE_FC_INCREMENTED); + EXPECT_GT(sender_incremented + receiver_incremented, 0u) + << "FC should be active and tracking fragments"; + + verify_clean_fc_state(); +} + +UCS_TEST_P(test_ucp_am_nbx_rndv_mtype_fc, fc_enabled_cap_reached, + "RNDV_MTYPE_WORKER_FC_ENABLE=y", + "RNDV_MTYPE_WORKER_MAX_FRAGS=5", + "RNDV_FRAG_MEM_TYPE=cuda") +{ + if (!sender().is_rndv_put_ppln_supported()) { + UCS_TEST_SKIP_R("RNDV is not supported"); + } + + send_message(20); + + /* Verify mtype protocols were used */ + check_stats_ge(sender(), UCP_WORKER_STAT_RNDV_PUT_MTYPE_ZCOPY, 1); + check_stats_ge(receiver(), UCP_WORKER_STAT_RNDV_RTR_MTYPE, 1); + + /* Throttling SHOULD have occurred (at least once on sender or receiver) */ + auto sender_throttled = UCS_STATS_GET_COUNTER(sender().worker()->stats, + UCP_WORKER_STAT_RNDV_MTYPE_FC_THROTTLED); + auto receiver_throttled = UCS_STATS_GET_COUNTER(receiver().worker()->stats, + UCP_WORKER_STAT_RNDV_MTYPE_FC_THROTTLED); + EXPECT_GT(sender_throttled + receiver_throttled, 0u) + << "throttling should have occurred with MAX_FRAGS=5"; + + verify_clean_fc_state(); +} + +UCS_TEST_P(test_ucp_am_nbx_rndv_mtype_fc, fc_disabled, + "RNDV_MTYPE_WORKER_FC_ENABLE=n", + "RNDV_FRAG_MEM_TYPE=cuda") +{ + if (!sender().is_rndv_put_ppln_supported()) { + UCS_TEST_SKIP_R("RNDV is not supported"); + } + + send_message(8); + + check_stats_ge(sender(), UCP_WORKER_STAT_RNDV_PUT_MTYPE_ZCOPY, 1); + check_stats_ge(receiver(), UCP_WORKER_STAT_RNDV_RTR_MTYPE, 1); + + /* No throttling should have occurred (FC is disabled) */ + auto sender_throttled = UCS_STATS_GET_COUNTER(sender().worker()->stats, + UCP_WORKER_STAT_RNDV_MTYPE_FC_THROTTLED); + auto receiver_throttled = UCS_STATS_GET_COUNTER(receiver().worker()->stats, + UCP_WORKER_STAT_RNDV_MTYPE_FC_THROTTLED); + EXPECT_EQ(0u, sender_throttled) << "FC disabled - no throttling expected"; + EXPECT_EQ(0u, receiver_throttled) << "FC disabled - no throttling expected"; + + /* With FC disabled, increment should NOT be called */ + auto sender_incremented = UCS_STATS_GET_COUNTER(sender().worker()->stats, + UCP_WORKER_STAT_RNDV_MTYPE_FC_INCREMENTED); + auto receiver_incremented = UCS_STATS_GET_COUNTER(receiver().worker()->stats, + UCP_WORKER_STAT_RNDV_MTYPE_FC_INCREMENTED); + EXPECT_EQ(0u, sender_incremented) << "FC disabled - no increment expected"; + EXPECT_EQ(0u, receiver_incremented) << "FC disabled - no increment expected"; + + verify_clean_fc_state(); +} + + +UCP_INSTANTIATE_TEST_CASE_GPU_AWARE(test_ucp_am_nbx_rndv_mtype_fc); + #endif From 099a79580abedbb54896c9cab57d83d3088d0104 Mon Sep 17 00:00:00 2001 From: Alma Mastbaum Date: Sun, 4 Jan 2026 14:29:30 +0200 Subject: [PATCH 11/14] UCP/RNDV: pre-compute max frags --- src/ucp/core/ucp_context.c | 9 +++++---- src/ucp/core/ucp_context.h | 4 ++-- src/ucp/rndv/proto_rndv.c | 3 +++ src/ucp/rndv/proto_rndv.h | 3 +++ src/ucp/rndv/rndv_get.c | 7 ++++--- src/ucp/rndv/rndv_mtype.inl | 19 +++++++++++++++++++ src/ucp/rndv/rndv_put.c | 9 ++++----- src/ucp/rndv/rndv_rtr.c | 15 ++++++++++----- 8 files changed, 50 insertions(+), 19 deletions(-) diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index 44dd0272efb..a184527f2e4 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -389,10 +389,11 @@ static ucs_config_field_t ucp_context_config_table[] = { "across all requests, preventing memory exhaustion", ucs_offsetof(ucp_context_config_t, rndv_mtype_worker_fc_enable), UCS_CONFIG_TYPE_BOOL}, - {"RNDV_MTYPE_WORKER_MAX_FRAGS", "1024", - "Maximum number of concurrent mtype fragments per worker\n" - "(only applies when RNDV_MTYPE_WORKER_FC_ENABLE=y)", - ucs_offsetof(ucp_context_config_t, rndv_mtype_worker_max_frags), UCS_CONFIG_TYPE_ULUNITS}, + {"RNDV_MTYPE_WORKER_MAX_MEM", "4g", + "Maximum memory for concurrent mtype fragments per worker.\n" + "This value is translated to a fragment count based on RNDV_FRAG_SIZE\n" + "for each memory type (only applies when RNDV_MTYPE_WORKER_FC_ENABLE=y)", + ucs_offsetof(ucp_context_config_t, rndv_mtype_worker_max_mem), UCS_CONFIG_TYPE_MEMUNITS}, {"FLUSH_WORKER_EPS", "y", "Enable flushing the worker by flushing its endpoints. Allows completing\n" diff --git a/src/ucp/core/ucp_context.h b/src/ucp/core/ucp_context.h index 29a129cb3a0..b8274d8407a 100644 --- a/src/ucp/core/ucp_context.h +++ b/src/ucp/core/ucp_context.h @@ -100,8 +100,8 @@ typedef struct ucp_context_config { int rndv_errh_ppln_enable; /** Enable flow control for rndv mtype fragments at worker level */ int rndv_mtype_worker_fc_enable; - /** Maximum number of concurrent rndv mtype fragments per worker */ - size_t rndv_mtype_worker_max_frags; + /** Maximum memory for concurrent rndv mtype fragments per worker (bytes) */ + size_t rndv_mtype_worker_max_mem; /** Threshold for using tag matching offload capabilities. Smaller buffers * will not be posted to the transport. */ size_t tm_thresh; diff --git a/src/ucp/rndv/proto_rndv.c b/src/ucp/rndv/proto_rndv.c index dc516b23c80..c3d1261eea5 100644 --- a/src/ucp/rndv/proto_rndv.c +++ b/src/ucp/rndv/proto_rndv.c @@ -9,6 +9,7 @@ #endif #include "proto_rndv.inl" +#include "rndv_mtype.inl" #include #include @@ -650,6 +651,8 @@ ucp_proto_rndv_bulk_init(const ucp_proto_multi_init_params_t *init_params, rpriv->frag_mem_type = init_params->super.reg_mem_info.type; rpriv->frag_sys_dev = init_params->super.reg_mem_info.sys_dev; + rpriv->fc_max_frags = ucp_proto_rndv_mtype_fc_max_frags( + context, rpriv->frag_mem_type); if (rpriv->super.lane == UCP_NULL_LANE) { /* Add perf without ACK in case of pipeline */ diff --git a/src/ucp/rndv/proto_rndv.h b/src/ucp/rndv/proto_rndv.h index 8299b740491..ca18507b76b 100644 --- a/src/ucp/rndv/proto_rndv.h +++ b/src/ucp/rndv/proto_rndv.h @@ -75,6 +75,9 @@ typedef struct { ucs_memory_type_t frag_mem_type; ucs_sys_device_t frag_sys_dev; + /* max fragments for flow control */ + size_t fc_max_frags; + /* Multi-lane common part. Must be the last field, see @ref ucp_proto_multi_priv_t */ ucp_proto_multi_priv_t mpriv; diff --git a/src/ucp/rndv/rndv_get.c b/src/ucp/rndv/rndv_get.c index 2965840e255..b75db7f9490 100644 --- a/src/ucp/rndv/rndv_get.c +++ b/src/ucp/rndv/rndv_get.c @@ -286,8 +286,7 @@ ucp_proto_rndv_get_mtype_fetch_completion(uct_completion_t *uct_comp) static ucs_status_t ucp_proto_rndv_get_mtype_fetch_progress(uct_pending_req_t *uct_req) { - ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct); - ucp_context_h context = req->send.ep->worker->context; + ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct); const ucp_proto_rndv_bulk_priv_t *rpriv; ucs_status_t status; size_t max_frags; @@ -297,9 +296,11 @@ ucp_proto_rndv_get_mtype_fetch_progress(uct_pending_req_t *uct_req) rpriv = req->send.proto_config->priv; if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) { + /* GET priority: 80% of total fragments */ + max_frags = rpriv->fc_max_frags / 5 * 4; + /* Check throttling limit. If no resource at the moment, queue the * request in GET pending queue and return UCS_OK. */ - max_frags = context->config.ext.rndv_mtype_worker_max_frags / 5 * 4; pending_q = &req->send.ep->worker->rndv_mtype_fc.get_pending_q; if (ucp_proto_rndv_mtype_fc_check(req, max_frags, pending_q) == UCS_ERR_NO_RESOURCE) { diff --git a/src/ucp/rndv/rndv_mtype.inl b/src/ucp/rndv/rndv_mtype.inl index 9244e684391..9abbccdf35e 100644 --- a/src/ucp/rndv/rndv_mtype.inl +++ b/src/ucp/rndv/rndv_mtype.inl @@ -177,6 +177,25 @@ static unsigned ucp_proto_rndv_mtype_fc_reschedule_cb(void *arg) return 1; } +/** + * Compute maximum number of fragments allowed based on configured max memory + * and fragment size for the given memory type. + * + * @param context The UCP context. + * @param frag_mem_type Memory type used for fragments. + * + * @return Maximum number of fragments that fit within the configured memory limit. + */ +static UCS_F_ALWAYS_INLINE size_t +ucp_proto_rndv_mtype_fc_max_frags(ucp_context_h context, + ucs_memory_type_t frag_mem_type) +{ + size_t max_mem = context->config.ext.rndv_mtype_worker_max_mem; + size_t frag_size = context->config.ext.rndv_frag_size[frag_mem_type]; + + return (frag_size > 0) ? (max_mem / frag_size) : SIZE_MAX; +} + /** * Check if request should be throttled due to flow control limit. * If throttled, the request is queued to the appropriate priority queue. diff --git a/src/ucp/rndv/rndv_put.c b/src/ucp/rndv/rndv_put.c index d84721bd532..b49b825fc6b 100644 --- a/src/ucp/rndv/rndv_put.c +++ b/src/ucp/rndv/rndv_put.c @@ -517,8 +517,7 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_proto_rndv_put_mtype_send_func( static ucs_status_t ucp_proto_rndv_put_mtype_copy_progress(uct_pending_req_t *uct_req) { - ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct); - ucp_context_h context = req->send.ep->worker->context; + ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct); const ucp_proto_rndv_put_priv_t *rpriv; ucs_status_t status; size_t max_frags; @@ -526,16 +525,16 @@ ucp_proto_rndv_put_mtype_copy_progress(uct_pending_req_t *uct_req) ucs_assert(!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)); + rpriv = req->send.proto_config->priv; + /* Check throttling limit. If no resource at the moment, queue the request * in PUT pending queue and return UCS_OK. */ - max_frags = context->config.ext.rndv_mtype_worker_max_frags; + max_frags = rpriv->bulk.fc_max_frags; pending_q = &req->send.ep->worker->rndv_mtype_fc.put_pending_q; if (ucp_proto_rndv_mtype_fc_check(req, max_frags, pending_q) == UCS_ERR_NO_RESOURCE) { return UCS_OK; } - - rpriv = req->send.proto_config->priv; status = ucp_proto_rndv_mtype_request_init(req, rpriv->bulk.frag_mem_type, rpriv->bulk.frag_sys_dev); if (status != UCS_OK) { diff --git a/src/ucp/rndv/rndv_rtr.c b/src/ucp/rndv/rndv_rtr.c index 38bfc9b0bef..bbc5e1f100f 100644 --- a/src/ucp/rndv/rndv_rtr.c +++ b/src/ucp/rndv/rndv_rtr.c @@ -36,6 +36,8 @@ typedef struct { ucp_proto_rndv_rtr_priv_t super; ucs_memory_type_t frag_mem_type; ucs_sys_device_t frag_sys_dev; + /* max fragments for flow control */ + size_t fc_max_frags; } ucp_proto_rndv_rtr_mtype_priv_t; static UCS_F_ALWAYS_INLINE void @@ -351,24 +353,25 @@ ucp_proto_rndv_rtr_mtype_data_received(ucp_request_t *req, int in_buffer) static ucs_status_t ucp_proto_rndv_rtr_mtype_progress(uct_pending_req_t *self) { - ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); - ucp_context_h context = req->send.ep->worker->context; + ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); const ucp_proto_rndv_rtr_mtype_priv_t *rpriv; size_t max_frags; ucs_queue_head_t *pending_q; ucs_status_t status; if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) { + rpriv = req->send.proto_config->priv; + + /* RTR priority: 60% of total fragments */ + max_frags = rpriv->fc_max_frags / 5 * 3; + /* Check throttling limit. If no resource at the moment, queue the * request in RTR pending queue and return UCS_OK. */ - max_frags = context->config.ext.rndv_mtype_worker_max_frags / 5 * 3; pending_q = &req->send.ep->worker->rndv_mtype_fc.rtr_pending_q; if (ucp_proto_rndv_mtype_fc_check(req, max_frags, pending_q) == UCS_ERR_NO_RESOURCE) { return UCS_OK; } - - rpriv = req->send.proto_config->priv; status = ucp_proto_rndv_mtype_request_init(req, rpriv->frag_mem_type, rpriv->frag_sys_dev); if (status != UCS_OK) { @@ -466,6 +469,8 @@ ucp_proto_rndv_rtr_mtype_probe(const ucp_proto_init_params_t *init_params) rpriv.super.data_received = ucp_proto_rndv_rtr_mtype_data_received; rpriv.frag_mem_type = frag_mem_type; rpriv.frag_sys_dev = params.super.reg_mem_info.sys_dev; + rpriv.fc_max_frags = ucp_proto_rndv_mtype_fc_max_frags( + context, frag_mem_type); ucp_proto_rndv_ctrl_probe(¶ms, &rpriv, sizeof(rpriv)); out_unpack_perf_destroy: From 48807d8a8b12b67eb53e8f03f6e915c616a2d4e3 Mon Sep 17 00:00:00 2001 From: Alma Mastbaum Date: Sun, 4 Jan 2026 16:37:34 +0200 Subject: [PATCH 12/14] UCP/RNDV: align max frags to mpool chunk size --- src/ucp/rndv/rndv_mtype.inl | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/src/ucp/rndv/rndv_mtype.inl b/src/ucp/rndv/rndv_mtype.inl index 9abbccdf35e..5e23b2f3778 100644 --- a/src/ucp/rndv/rndv_mtype.inl +++ b/src/ucp/rndv/rndv_mtype.inl @@ -179,21 +179,40 @@ static unsigned ucp_proto_rndv_mtype_fc_reschedule_cb(void *arg) /** * Compute maximum number of fragments allowed based on configured max memory - * and fragment size for the given memory type. + * and fragment size for the given memory type. The result is rounded down to + * the allocation chunk granularity (rndv_num_frags). * * @param context The UCP context. * @param frag_mem_type Memory type used for fragments. * - * @return Maximum number of fragments that fit within the configured memory limit. + * @return Maximum number of fragments that fit within the configured memory + * limit, aligned to allocation chunk size. */ static UCS_F_ALWAYS_INLINE size_t ucp_proto_rndv_mtype_fc_max_frags(ucp_context_h context, ucs_memory_type_t frag_mem_type) { - size_t max_mem = context->config.ext.rndv_mtype_worker_max_mem; - size_t frag_size = context->config.ext.rndv_frag_size[frag_mem_type]; + size_t max_mem = context->config.ext.rndv_mtype_worker_max_mem; + size_t frag_size = context->config.ext.rndv_frag_size[frag_mem_type]; + size_t frags_in_chunk = context->config.ext.rndv_num_frags[frag_mem_type]; + size_t max_frags; + + /* frag_size must be > 0 for mtype protocols - validated during proto init */ + ucs_assert(frag_size > 0); + + /* Compute max fragments and round down to allocation chunk granularity */ + max_frags = max_mem / frag_size; + max_frags = (max_frags / frags_in_chunk) * frags_in_chunk; + + if (max_frags == 0) { + ucs_warn("RNDV_MTYPE_WORKER_MAX_MEM (%zu) is too low for %s " + "(frag_size=%zu, frags_per_alloc=%zu), using minimum %zu " + "frags", max_mem, ucs_memory_type_names[frag_mem_type], + frag_size, frags_in_chunk, frags_in_chunk); + return frags_in_chunk; + } - return (frag_size > 0) ? (max_mem / frag_size) : SIZE_MAX; + return max_frags; } /** From 910e0c8659dea58e970baa853fda32050af0444a Mon Sep 17 00:00:00 2001 From: Alma Mastbaum Date: Mon, 5 Jan 2026 10:54:19 +0200 Subject: [PATCH 13/14] UCP/RNDV: adjust gtest to MAX_MEM --- src/ucp/rndv/rndv_mtype.inl | 1 - test/gtest/ucp/test_ucp_am.cc | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/ucp/rndv/rndv_mtype.inl b/src/ucp/rndv/rndv_mtype.inl index 5e23b2f3778..90c05c74916 100644 --- a/src/ucp/rndv/rndv_mtype.inl +++ b/src/ucp/rndv/rndv_mtype.inl @@ -197,7 +197,6 @@ ucp_proto_rndv_mtype_fc_max_frags(ucp_context_h context, size_t frags_in_chunk = context->config.ext.rndv_num_frags[frag_mem_type]; size_t max_frags; - /* frag_size must be > 0 for mtype protocols - validated during proto init */ ucs_assert(frag_size > 0); /* Compute max fragments and round down to allocation chunk granularity */ diff --git a/test/gtest/ucp/test_ucp_am.cc b/test/gtest/ucp/test_ucp_am.cc index 0ca207ac553..4852cc075f3 100644 --- a/test/gtest/ucp/test_ucp_am.cc +++ b/test/gtest/ucp/test_ucp_am.cc @@ -2235,7 +2235,7 @@ class test_ucp_am_nbx_rndv_mtype_fc : public test_ucp_am_nbx_rndv_ppln { UCS_TEST_P(test_ucp_am_nbx_rndv_mtype_fc, fc_enabled_under_cap, "RNDV_MTYPE_WORKER_FC_ENABLE=y", - "RNDV_MTYPE_WORKER_MAX_FRAGS=128", + "RNDV_MTYPE_WORKER_MAX_MEM=1g", "RNDV_FRAG_MEM_TYPE=cuda") { if (!sender().is_rndv_put_ppln_supported()) { @@ -2270,14 +2270,14 @@ UCS_TEST_P(test_ucp_am_nbx_rndv_mtype_fc, fc_enabled_under_cap, UCS_TEST_P(test_ucp_am_nbx_rndv_mtype_fc, fc_enabled_cap_reached, "RNDV_MTYPE_WORKER_FC_ENABLE=y", - "RNDV_MTYPE_WORKER_MAX_FRAGS=5", + "RNDV_MTYPE_WORKER_MAX_MEM=600mb", "RNDV_FRAG_MEM_TYPE=cuda") { if (!sender().is_rndv_put_ppln_supported()) { UCS_TEST_SKIP_R("RNDV is not supported"); } - send_message(20); + send_message(200); /* Verify mtype protocols were used */ check_stats_ge(sender(), UCP_WORKER_STAT_RNDV_PUT_MTYPE_ZCOPY, 1); @@ -2289,7 +2289,7 @@ UCS_TEST_P(test_ucp_am_nbx_rndv_mtype_fc, fc_enabled_cap_reached, auto receiver_throttled = UCS_STATS_GET_COUNTER(receiver().worker()->stats, UCP_WORKER_STAT_RNDV_MTYPE_FC_THROTTLED); EXPECT_GT(sender_throttled + receiver_throttled, 0u) - << "throttling should have occurred with MAX_FRAGS=5"; + << "throttling should have occurred with MAX_MEM=600mb"; verify_clean_fc_state(); } From 264c7331533142e13396ea78e8d28bd8e56886f8 Mon Sep 17 00:00:00 2001 From: Alma Mastbaum Date: Wed, 7 Jan 2026 15:37:03 +0200 Subject: [PATCH 14/14] UCP/RNDV: pre-define stats-counter-array size --- src/ucs/stats/libstats.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ucs/stats/libstats.h b/src/ucs/stats/libstats.h index 3b72c5fb7eb..63e5df1a533 100644 --- a/src/ucs/stats/libstats.h +++ b/src/ucs/stats/libstats.h @@ -56,7 +56,9 @@ struct ucs_stats_class { const char *name; unsigned num_counters; unsigned class_id; - const char* counter_names[]; + const char* counter_names[14]; /* Need to maintain the number of + counters once it gets bigger + than 14 in one of the classes */ }; /*