Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,16 @@ static ucs_config_field_t ucp_context_config_table[] = {
"even if invalidation workflow isn't supported",
ucs_offsetof(ucp_context_config_t, rndv_errh_ppln_enable), UCS_CONFIG_TYPE_BOOL},

{"RNDV_PIPELINE_WORKER_FC_ENABLE", "n",
"Enable worker-level flow control to limit total concurrent pipeline fragments\n"
"across all requests, preventing memory exhaustion",
ucs_offsetof(ucp_context_config_t, rndv_ppln_worker_fc_enable), UCS_CONFIG_TYPE_BOOL},

{"RNDV_PIPELINE_WORKER_MAX_FRAGS", "5000",
"Maximum number of concurrent pipeline fragments per worker\n"
"(only applies when RNDV_PIPELINE_WORKER_FC_ENABLE=y)",
ucs_offsetof(ucp_context_config_t, rndv_ppln_worker_max_frags), UCS_CONFIG_TYPE_ULUNITS},

{"FLUSH_WORKER_EPS", "y",
"Enable flushing the worker by flushing its endpoints. Allows completing\n"
"the flush operation in a bounded time even if there are new requests on\n"
Expand Down
4 changes: 4 additions & 0 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ typedef struct ucp_context_config {
int rndv_shm_ppln_enable;
/** Enable error handling for rndv pipeline protocol */
int rndv_errh_ppln_enable;
/** Enable flow control for rndv pipeline fragments at worker level */
int rndv_ppln_worker_fc_enable;
/** Maximum number of concurrent pipeline fragments per worker */
size_t rndv_ppln_worker_max_frags;
/** Threshold for using tag matching offload capabilities. Smaller buffers
* will not be posted to the transport. */
size_t tm_thresh;
Expand Down
5 changes: 4 additions & 1 deletion src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,10 @@ struct ucp_request {
/* Used by rndv/send/ppln and rndv/recv/ppln */
struct {
/* Size to send in ack message */
ssize_t ack_data_size;
ssize_t ack_data_size;
/* Element in worker-level pending queue
* for throttled ppln requests */
ucs_queue_elem_t queue_elem;
} ppln;

/* Used by rndv/rkey_ptr */
Expand Down
4 changes: 4 additions & 0 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2519,6 +2519,10 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
worker->counters.ep_closures = 0;
worker->counters.ep_failures = 0;

/* Initialize RNDV pipeline flow control */
worker->rndv_ppln_fc.active_frags = 0;
ucs_queue_head_init(&worker->rndv_ppln_fc.pending_q);

/* Copy user flags, and mask-out unsupported flags for compatibility */
worker->flags = UCP_PARAM_VALUE(WORKER, params, flags, FLAGS, 0) &
UCS_MASK(UCP_WORKER_INTERNAL_FLAGS_SHIFT);
Expand Down
6 changes: 6 additions & 0 deletions src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,12 @@ typedef struct ucp_worker {
uint64_t ep_failures;
} counters;

struct {
/* Worker-level ppln fragment flow control */
size_t active_frags; /* Current active fragments */
ucs_queue_head_t pending_q; /* Queue of throttled ppln requests */
} rndv_ppln_fc;

struct {
/* Usage tracker handle */
ucs_usage_tracker_h handle;
Expand Down
11 changes: 11 additions & 0 deletions src/ucp/rndv/rndv_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ ucp_proto_rndv_get_mtype_unpack_completion(uct_completion_t *uct_comp)
send.state.uct_comp);

ucs_mpool_put_inline(req->send.rndv.mdesc);
ucp_proto_rndv_mtype_fc_decrement(req);

if (ucp_proto_rndv_request_is_ppln_frag(req)) {
ucp_proto_rndv_ppln_recv_frag_complete(req, 1, 0);
} else {
Expand Down Expand Up @@ -292,13 +294,20 @@ ucp_proto_rndv_get_mtype_fetch_progress(uct_pending_req_t *uct_req)
rpriv = req->send.proto_config->priv;

if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) {
/* Check throttling. If no resource at the moment, queue the request
* in the throttle pending queue and return UCS_OK. */
if (ucp_proto_rndv_mtype_fc_check(req) == UCS_ERR_NO_RESOURCE) {
return UCS_OK;
}

status = ucp_proto_rndv_mtype_request_init(req, rpriv->frag_mem_type,
rpriv->frag_sys_dev);
if (status != UCS_OK) {
ucp_proto_request_abort(req, status);
return UCS_OK;
}

ucp_proto_rndv_mtype_fc_increment(req);
ucp_proto_rndv_get_common_request_init(req);
ucp_proto_completion_init(&req->send.state.uct_comp,
ucp_proto_rndv_get_mtype_fetch_completion);
Expand Down Expand Up @@ -364,6 +373,8 @@ static ucs_status_t ucp_proto_rndv_get_mtype_reset(ucp_request_t *req)
req->send.rndv.mdesc = NULL;
req->flags &= ~UCP_REQUEST_FLAG_PROTO_INITIALIZED;

ucp_proto_rndv_mtype_fc_decrement(req);

if ((req->send.proto_stage != UCP_PROTO_RNDV_GET_STAGE_FETCH) &&
(req->send.proto_stage != UCP_PROTO_RNDV_GET_STAGE_ATS)) {
ucp_proto_fatal_invalid_stage(req, "reset");
Expand Down
80 changes: 80 additions & 0 deletions src/ucp/rndv/rndv_mtype.inl
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,86 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_proto_rndv_mtype_copy(
return status;
}

/* Reschedule callback for throttled mtype requests */
static unsigned ucp_proto_rndv_mtype_fc_reschedule_cb(void *arg)
{
ucp_request_t *req = arg;
ucp_request_send(req);
return 1;
}

/**
* Check if request should be throttled due to flow control limit.
* If throttled, the request is queued to pending_q.
*
* @return UCS_OK if not throttled (caller should continue),
* UCS_ERR_NO_RESOURCE if throttled and queued (caller should return UCS_OK).
*/
static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_proto_rndv_mtype_fc_check(ucp_request_t *req)
{
ucp_worker_h worker = req->send.ep->worker;
ucp_context_h context = worker->context;

if (!context->config.ext.rndv_ppln_worker_fc_enable) {
return UCS_OK;
}

if (worker->rndv_ppln_fc.active_frags >=
context->config.ext.rndv_ppln_worker_max_frags) {
ucs_trace_req("mtype_fc: throttle limit reached active_frags=%zu max=%zu",
worker->rndv_ppln_fc.active_frags,
context->config.ext.rndv_ppln_worker_max_frags);
ucs_queue_push(&worker->rndv_ppln_fc.pending_q,
&req->send.rndv.ppln.queue_elem);
return UCS_ERR_NO_RESOURCE;
}

return UCS_OK;
}

/**
* Increment active_frags counter after successful mtype allocation.
*/
static UCS_F_ALWAYS_INLINE void
ucp_proto_rndv_mtype_fc_increment(ucp_request_t *req)
{
ucp_worker_h worker = req->send.ep->worker;

if (worker->context->config.ext.rndv_ppln_worker_fc_enable) {
worker->rndv_ppln_fc.active_frags++;
}
}

/**
* Decrement active_frags counter and reschedule pending request if any.
*/
static UCS_F_ALWAYS_INLINE void
ucp_proto_rndv_mtype_fc_decrement(ucp_request_t *req)
{
ucp_worker_h worker = req->send.ep->worker;
ucp_context_h context = worker->context;

if (!context->config.ext.rndv_ppln_worker_fc_enable) {
return;
}

ucs_assert(worker->rndv_ppln_fc.active_frags > 0);
worker->rndv_ppln_fc.active_frags--;

if (!ucs_queue_is_empty(&worker->rndv_ppln_fc.pending_q)) {
ucp_request_t *pending_req;
ucs_queue_elem_t *elem;

elem = ucs_queue_pull(&worker->rndv_ppln_fc.pending_q);
pending_req = ucs_container_of(elem, ucp_request_t,
send.rndv.ppln.queue_elem);
ucs_callbackq_add_oneshot(&worker->uct->progress_q, pending_req,
ucp_proto_rndv_mtype_fc_reschedule_cb,
pending_req);
}
}
Comment on lines 254 to 308
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it can be named better, or I should divide it differently


static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_proto_rndv_mdesc_mtype_copy(ucp_request_t *req,
uct_ep_put_zcopy_func_t copy_func,
Expand Down
3 changes: 2 additions & 1 deletion src/ucp/rndv/rndv_ppln.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <ucp/proto/proto_debug.h>
#include <ucp/proto/proto_multi.inl>
#include <ucp/proto/proto_init.h>
#include <ucs/datastruct/callbackq.h>


enum {
Expand Down Expand Up @@ -252,7 +253,7 @@ void ucp_proto_rndv_ppln_recv_frag_complete(ucp_request_t *freq, int send_ack,

static ucs_status_t ucp_proto_rndv_ppln_progress(uct_pending_req_t *uct_req)
{
ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct);
ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct);
ucp_worker_h worker = req->send.ep->worker;
const ucp_proto_rndv_ppln_priv_t *rpriv;
ucp_datatype_iter_t next_iter;
Expand Down
13 changes: 10 additions & 3 deletions src/ucp/rndv/rndv_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -517,21 +517,26 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_proto_rndv_put_mtype_send_func(
static ucs_status_t
ucp_proto_rndv_put_mtype_copy_progress(uct_pending_req_t *uct_req)
{
ucp_request_t *req = ucs_container_of(uct_req,
ucp_request_t,
send.uct);
ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct);
const ucp_proto_rndv_put_priv_t *rpriv = req->send.proto_config->priv;
ucs_status_t status;

ucs_assert(!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED));

/* Check throttling. If no resource at the moment, queue the request
* in the throttle pending queue and return UCS_OK. */
if (ucp_proto_rndv_mtype_fc_check(req) == UCS_ERR_NO_RESOURCE) {
return UCS_OK;
}

status = ucp_proto_rndv_mtype_request_init(req, rpriv->bulk.frag_mem_type,
rpriv->bulk.frag_sys_dev);
if (status != UCS_OK) {
ucp_proto_request_abort(req, status);
return UCS_OK;
}

ucp_proto_rndv_mtype_fc_increment(req);
ucp_proto_rndv_put_common_request_init(req);
req->flags |= UCP_REQUEST_FLAG_PROTO_INITIALIZED;
ucp_proto_rndv_mdesc_mtype_copy(req, uct_ep_get_zcopy,
Expand Down Expand Up @@ -563,6 +568,7 @@ static void ucp_proto_rndv_put_mtype_completion(uct_completion_t *uct_comp)

ucp_trace_req(req, "rndv_put_mtype_completion");
ucs_mpool_put(req->send.rndv.mdesc);
ucp_proto_rndv_mtype_fc_decrement(req);
ucp_proto_rndv_put_common_complete(req);
}

Expand All @@ -573,6 +579,7 @@ static void ucp_proto_rndv_put_mtype_frag_completion(uct_completion_t *uct_comp)

ucp_trace_req(req, "rndv_put_mtype_frag_completion");
ucs_mpool_put(req->send.rndv.mdesc);
ucp_proto_rndv_mtype_fc_decrement(req);
ucp_proto_rndv_ppln_send_frag_complete(req, 1);
}

Expand Down
11 changes: 11 additions & 0 deletions src/ucp/rndv/rndv_rtr.c
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ ucp_proto_rndv_rtr_mtype_complete(ucp_request_t *req, int abort)
if (!abort || (req->send.rndv.mdesc != NULL)) {
ucs_mpool_put_inline(req->send.rndv.mdesc);
}

ucp_proto_rndv_mtype_fc_decrement(req);

if (ucp_proto_rndv_request_is_ppln_frag(req)) {
ucp_proto_rndv_ppln_recv_frag_complete(req, 0, abort);
} else {
Expand Down Expand Up @@ -316,6 +319,7 @@ static ucs_status_t ucp_proto_rndv_rtr_mtype_reset(ucp_request_t *req)
if (req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED) {
ucs_mpool_put_inline(req->send.rndv.mdesc);
req->send.rndv.mdesc = NULL;
ucp_proto_rndv_mtype_fc_decrement(req);
}

return ucp_proto_request_zcopy_id_reset(req);
Expand Down Expand Up @@ -352,13 +356,20 @@ static ucs_status_t ucp_proto_rndv_rtr_mtype_progress(uct_pending_req_t *self)
ucs_status_t status;

if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) {
/* Check throttling. If no resource at the moment, queue the request
* in the throttle pending queue and return UCS_OK. */
if (ucp_proto_rndv_mtype_fc_check(req) == UCS_ERR_NO_RESOURCE) {
return UCS_OK;
}

status = ucp_proto_rndv_mtype_request_init(req, rpriv->frag_mem_type,
rpriv->frag_sys_dev);
if (status != UCS_OK) {
ucp_proto_request_abort(req, status);
return UCS_OK;
}

ucp_proto_rndv_mtype_fc_increment(req);
ucp_proto_rtr_common_request_init(req);
req->flags |= UCP_REQUEST_FLAG_PROTO_INITIALIZED;
}
Expand Down
Loading