Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,17 @@ static ucs_config_field_t ucp_context_config_table[] = {
"even if invalidation workflow isn't supported",
ucs_offsetof(ucp_context_config_t, rndv_errh_ppln_enable), UCS_CONFIG_TYPE_BOOL},

{"RNDV_MTYPE_WORKER_FC_ENABLE", "n",
"Enable worker-level flow control to limit total concurrent mtype fragments\n"
"across all requests, preventing memory exhaustion",
ucs_offsetof(ucp_context_config_t, rndv_mtype_worker_fc_enable), UCS_CONFIG_TYPE_BOOL},

{"RNDV_MTYPE_WORKER_MAX_MEM", "4g",
"Maximum memory for concurrent mtype fragments per worker.\n"
"This value is translated to a fragment count based on RNDV_FRAG_SIZE\n"
"for each memory type (only applies when RNDV_MTYPE_WORKER_FC_ENABLE=y)",
ucs_offsetof(ucp_context_config_t, rndv_mtype_worker_max_mem), UCS_CONFIG_TYPE_MEMUNITS},

{"FLUSH_WORKER_EPS", "y",
"Enable flushing the worker by flushing its endpoints. Allows completing\n"
"the flush operation in a bounded time even if there are new requests on\n"
Expand Down
4 changes: 4 additions & 0 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ typedef struct ucp_context_config {
int rndv_shm_ppln_enable;
/** Enable error handling for rndv pipeline protocol */
int rndv_errh_ppln_enable;
/** Enable flow control for rndv mtype fragments at worker level */
int rndv_mtype_worker_fc_enable;
/** Maximum memory for concurrent rndv mtype fragments per worker (bytes) */
size_t rndv_mtype_worker_max_mem;
/** Threshold for using tag matching offload capabilities. Smaller buffers
* will not be posted to the transport. */
size_t tm_thresh;
Expand Down
5 changes: 4 additions & 1 deletion src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,10 @@ struct ucp_request {
/* Used by rndv/send/ppln and rndv/recv/ppln */
struct {
/* Size to send in ack message */
ssize_t ack_data_size;
ssize_t ack_data_size;
/* Element in worker-level pending queue
* for throttled ppln requests */
ucs_queue_elem_t queue_elem;
} ppln;

/* Used by rndv/rkey_ptr */
Expand Down
32 changes: 20 additions & 12 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,20 @@ static ucs_stats_class_t ucp_worker_stats_class = {
.num_counters = UCP_WORKER_STAT_LAST,
.class_id = UCS_STATS_CLASS_ID_INVALID,
.counter_names = {
[UCP_WORKER_STAT_TAG_RX_EAGER_MSG] = "tag_rx_eager_msg",
[UCP_WORKER_STAT_TAG_RX_EAGER_SYNC_MSG] = "tag_rx_sync_msg",
[UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_EXP] = "tag_rx_eager_chunk_exp",
[UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_UNEXP] = "tag_rx_eager_chunk_unexp",
[UCP_WORKER_STAT_RNDV_RX_EXP] = "rndv_rx_exp",
[UCP_WORKER_STAT_RNDV_RX_UNEXP] = "rndv_rx_unexp",
[UCP_WORKER_STAT_RNDV_PUT_ZCOPY] = "rndv_put_zcopy",
[UCP_WORKER_STAT_RNDV_PUT_MTYPE_ZCOPY] = "rndv_put_mtype_zcopy",
[UCP_WORKER_STAT_RNDV_GET_ZCOPY] = "rndv_get_zcopy",
[UCP_WORKER_STAT_RNDV_RTR] = "rndv_rtr",
[UCP_WORKER_STAT_RNDV_RTR_MTYPE] = "rndv_rtr_mtype",
[UCP_WORKER_STAT_RNDV_RKEY_PTR] = "rndv_rkey_ptr"
[UCP_WORKER_STAT_TAG_RX_EAGER_MSG] = "tag_rx_eager_msg",
[UCP_WORKER_STAT_TAG_RX_EAGER_SYNC_MSG] = "tag_rx_sync_msg",
[UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_EXP] = "tag_rx_eager_chunk_exp",
[UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_UNEXP] = "tag_rx_eager_chunk_unexp",
[UCP_WORKER_STAT_RNDV_RX_EXP] = "rndv_rx_exp",
[UCP_WORKER_STAT_RNDV_RX_UNEXP] = "rndv_rx_unexp",
[UCP_WORKER_STAT_RNDV_PUT_ZCOPY] = "rndv_put_zcopy",
[UCP_WORKER_STAT_RNDV_PUT_MTYPE_ZCOPY] = "rndv_put_mtype_zcopy",
[UCP_WORKER_STAT_RNDV_GET_ZCOPY] = "rndv_get_zcopy",
[UCP_WORKER_STAT_RNDV_RTR] = "rndv_rtr",
[UCP_WORKER_STAT_RNDV_RTR_MTYPE] = "rndv_rtr_mtype",
[UCP_WORKER_STAT_RNDV_RKEY_PTR] = "rndv_rkey_ptr",
[UCP_WORKER_STAT_RNDV_MTYPE_FC_THROTTLED] = "rndv_mtype_fc_throttled",
[UCP_WORKER_STAT_RNDV_MTYPE_FC_INCREMENTED] = "rndv_mtype_fc_incremented"
}
};
#endif
Expand Down Expand Up @@ -2524,6 +2526,12 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
worker->counters.ep_closures = 0;
worker->counters.ep_failures = 0;

/* Initialize RNDV mtype flow control */
worker->rndv_mtype_fc.active_frags = 0;
ucs_queue_head_init(&worker->rndv_mtype_fc.put_pending_q);
ucs_queue_head_init(&worker->rndv_mtype_fc.get_pending_q);
ucs_queue_head_init(&worker->rndv_mtype_fc.rtr_pending_q);

/* Copy user flags, and mask-out unsupported flags for compatibility */
worker->flags = UCP_PARAM_VALUE(WORKER, params, flags, FLAGS, 0) &
UCS_MASK(UCP_WORKER_INTERNAL_FLAGS_SHIFT);
Expand Down
35 changes: 23 additions & 12 deletions src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,25 +135,27 @@ enum {
*/
enum {
/* Total number of received eager messages */
UCP_WORKER_STAT_TAG_RX_EAGER_MSG,
UCP_WORKER_STAT_TAG_RX_EAGER_SYNC_MSG,
UCP_WORKER_STAT_TAG_RX_EAGER_MSG = 0,
UCP_WORKER_STAT_TAG_RX_EAGER_SYNC_MSG = 1,

/* Total number of received eager chunks (every message
* can be split into a bunch of chunks). It is possible that
* some chunks of the message arrived unexpectedly and then
* receive had been posted and the rest arrived expectedly */
UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_EXP,
UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_UNEXP,
UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_EXP = 2,
UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_UNEXP = 3,

UCP_WORKER_STAT_RNDV_RX_EXP,
UCP_WORKER_STAT_RNDV_RX_UNEXP,
UCP_WORKER_STAT_RNDV_RX_EXP = 4,
UCP_WORKER_STAT_RNDV_RX_UNEXP = 5,

UCP_WORKER_STAT_RNDV_PUT_ZCOPY,
UCP_WORKER_STAT_RNDV_PUT_MTYPE_ZCOPY,
UCP_WORKER_STAT_RNDV_GET_ZCOPY,
UCP_WORKER_STAT_RNDV_RTR,
UCP_WORKER_STAT_RNDV_RTR_MTYPE,
UCP_WORKER_STAT_RNDV_RKEY_PTR,
UCP_WORKER_STAT_RNDV_PUT_ZCOPY = 6,
UCP_WORKER_STAT_RNDV_PUT_MTYPE_ZCOPY = 7,
UCP_WORKER_STAT_RNDV_GET_ZCOPY = 8,
UCP_WORKER_STAT_RNDV_RTR = 9,
UCP_WORKER_STAT_RNDV_RTR_MTYPE = 10,
UCP_WORKER_STAT_RNDV_RKEY_PTR = 11,
UCP_WORKER_STAT_RNDV_MTYPE_FC_THROTTLED = 12,
UCP_WORKER_STAT_RNDV_MTYPE_FC_INCREMENTED = 13,

UCP_WORKER_STAT_LAST
};
Expand Down Expand Up @@ -393,6 +395,15 @@ typedef struct ucp_worker {
uint64_t ep_failures;
} counters;

struct {
/* Worker-level mtype fragment flow control */
size_t active_frags; /* Current active fragments */
/* Separate pending queues by priority (PUT > GET > RTR) */
ucs_queue_head_t put_pending_q; /* Throttled PUT (RNDV_SEND) requests */
ucs_queue_head_t get_pending_q; /* Throttled GET requests */
ucs_queue_head_t rtr_pending_q; /* Throttled RTR requests */
} rndv_mtype_fc;

struct {
/* Usage tracker handle */
ucs_usage_tracker_h handle;
Expand Down
3 changes: 3 additions & 0 deletions src/ucp/rndv/proto_rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#endif

#include "proto_rndv.inl"
#include "rndv_mtype.inl"

#include <ucp/proto/proto_init.h>
#include <ucp/proto/proto_debug.h>
Expand Down Expand Up @@ -650,6 +651,8 @@ ucp_proto_rndv_bulk_init(const ucp_proto_multi_init_params_t *init_params,

rpriv->frag_mem_type = init_params->super.reg_mem_info.type;
rpriv->frag_sys_dev = init_params->super.reg_mem_info.sys_dev;
rpriv->fc_max_frags = ucp_proto_rndv_mtype_fc_max_frags(
context, rpriv->frag_mem_type);

if (rpriv->super.lane == UCP_NULL_LANE) {
/* Add perf without ACK in case of pipeline */
Expand Down
3 changes: 3 additions & 0 deletions src/ucp/rndv/proto_rndv.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ typedef struct {
ucs_memory_type_t frag_mem_type;
ucs_sys_device_t frag_sys_dev;

/* max fragments for flow control */
size_t fc_max_frags;

/* Multi-lane common part. Must be the last field, see
@ref ucp_proto_multi_priv_t */
ucp_proto_multi_priv_t mpriv;
Expand Down
18 changes: 18 additions & 0 deletions src/ucp/rndv/rndv_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ ucp_proto_rndv_get_mtype_unpack_completion(uct_completion_t *uct_comp)
send.state.uct_comp);

ucs_mpool_put_inline(req->send.rndv.mdesc);
ucp_proto_rndv_mtype_fc_decrement(req);

if (ucp_proto_rndv_request_is_ppln_frag(req)) {
ucp_proto_rndv_ppln_recv_frag_complete(req, 1, 0);
} else {
Expand All @@ -287,18 +289,32 @@ ucp_proto_rndv_get_mtype_fetch_progress(uct_pending_req_t *uct_req)
ucp_request_t *req = ucs_container_of(uct_req, ucp_request_t, send.uct);
const ucp_proto_rndv_bulk_priv_t *rpriv;
ucs_status_t status;
size_t max_frags;
ucs_queue_head_t *pending_q;

/* coverity[tainted_data_downcast] */
rpriv = req->send.proto_config->priv;

if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) {
/* GET priority: 80% of total fragments */
max_frags = rpriv->fc_max_frags / 5 * 4;

/* Check throttling limit. If no resource at the moment, queue the
* request in GET pending queue and return UCS_OK. */
pending_q = &req->send.ep->worker->rndv_mtype_fc.get_pending_q;
if (ucp_proto_rndv_mtype_fc_check(req, max_frags, pending_q) ==
UCS_ERR_NO_RESOURCE) {
return UCS_OK;
}

status = ucp_proto_rndv_mtype_request_init(req, rpriv->frag_mem_type,
rpriv->frag_sys_dev);
if (status != UCS_OK) {
ucp_proto_request_abort(req, status);
return UCS_OK;
}

ucp_proto_rndv_mtype_fc_increment(req);
ucp_proto_rndv_get_common_request_init(req);
ucp_proto_completion_init(&req->send.state.uct_comp,
ucp_proto_rndv_get_mtype_fetch_completion);
Expand Down Expand Up @@ -364,6 +380,8 @@ static ucs_status_t ucp_proto_rndv_get_mtype_reset(ucp_request_t *req)
req->send.rndv.mdesc = NULL;
req->flags &= ~UCP_REQUEST_FLAG_PROTO_INITIALIZED;

ucp_proto_rndv_mtype_fc_decrement(req);

if ((req->send.proto_stage != UCP_PROTO_RNDV_GET_STAGE_FETCH) &&
(req->send.proto_stage != UCP_PROTO_RNDV_GET_STAGE_ATS)) {
ucp_proto_fatal_invalid_stage(req, "reset");
Expand Down
138 changes: 138 additions & 0 deletions src/ucp/rndv/rndv_mtype.inl
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,144 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_proto_rndv_mtype_copy(
return status;
}

/* Reschedule callback for throttled mtype requests */
static unsigned ucp_proto_rndv_mtype_fc_reschedule_cb(void *arg)
{
ucp_request_t *req = arg;
ucp_request_send(req);
return 1;
}

/**
* Compute maximum number of fragments allowed based on configured max memory
* and fragment size for the given memory type. The result is rounded down to
* the allocation chunk granularity (rndv_num_frags).
*
* @param context The UCP context.
* @param frag_mem_type Memory type used for fragments.
*
* @return Maximum number of fragments that fit within the configured memory
* limit, aligned to allocation chunk size.
*/
static UCS_F_ALWAYS_INLINE size_t
ucp_proto_rndv_mtype_fc_max_frags(ucp_context_h context,
ucs_memory_type_t frag_mem_type)
{
size_t max_mem = context->config.ext.rndv_mtype_worker_max_mem;
size_t frag_size = context->config.ext.rndv_frag_size[frag_mem_type];
size_t frags_in_chunk = context->config.ext.rndv_num_frags[frag_mem_type];
size_t max_frags;

ucs_assert(frag_size > 0);

/* Compute max fragments and round down to allocation chunk granularity */
max_frags = max_mem / frag_size;
max_frags = (max_frags / frags_in_chunk) * frags_in_chunk;

if (max_frags == 0) {
ucs_warn("RNDV_MTYPE_WORKER_MAX_MEM (%zu) is too low for %s "
"(frag_size=%zu, frags_per_alloc=%zu), using minimum %zu "
"frags", max_mem, ucs_memory_type_names[frag_mem_type],
frag_size, frags_in_chunk, frags_in_chunk);
return frags_in_chunk;
}

return max_frags;
}

/**
* Check if request should be throttled due to flow control limit.
* If throttled, the request is queued to the appropriate priority queue.
*
* @param req The request to check.
* @param max_frags The maximum number of fragments allowed.
* @param pending_q The queue to add the request to if it is throttled.
*
* @return UCS_OK if not throttled, UCS_ERR_NO_RESOURCE if throttled and queued.
*/
static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_proto_rndv_mtype_fc_check(ucp_request_t *req, size_t max_frags,
ucs_queue_head_t *pending_q)
{
ucp_worker_h worker = req->send.ep->worker;
ucp_context_h context = worker->context;

if (!context->config.ext.rndv_mtype_worker_fc_enable) {
return UCS_OK;
}

if (worker->rndv_mtype_fc.active_frags >= max_frags) {
ucs_trace_req("mtype_fc: fragments throttle limit reached (%zu/%zu)",
worker->rndv_mtype_fc.active_frags, max_frags);
UCS_STATS_UPDATE_COUNTER(worker->stats,
UCP_WORKER_STAT_RNDV_MTYPE_FC_THROTTLED, 1);
ucs_queue_push(pending_q, &req->send.rndv.ppln.queue_elem);
return UCS_ERR_NO_RESOURCE;
}

return UCS_OK;
}

/**
* Increment active_frags counter after successful mtype allocation.
*/
static UCS_F_ALWAYS_INLINE void
ucp_proto_rndv_mtype_fc_increment(ucp_request_t *req)
{
ucp_worker_h worker = req->send.ep->worker;

if (worker->context->config.ext.rndv_mtype_worker_fc_enable) {
worker->rndv_mtype_fc.active_frags++;
UCS_STATS_UPDATE_COUNTER(worker->stats,
UCP_WORKER_STAT_RNDV_MTYPE_FC_INCREMENTED, 1);
}
}

/**
* Decrement active_frags counter and reschedule pending request.
* Dequeue priority: PUT > GET > RTR
*
* Priority rationale:
* PUT - Remote is blocked waiting for our data. Scheduling PUT unblocks remote
* as well.
* GET - Self-contained fetch operation. Completes without causing remote
* allocations, but scheduling it doesn't unblock another buffer.
* RTR - Scheduling RTR triggers a remote PUT allocation, increasing total
* memory pressure.
*/
static UCS_F_ALWAYS_INLINE void
ucp_proto_rndv_mtype_fc_decrement(ucp_request_t *req)
{
ucp_worker_h worker = req->send.ep->worker;
ucp_context_h context = worker->context;
ucs_queue_elem_t *elem = NULL;
ucp_request_t *pending_req;

if (!context->config.ext.rndv_mtype_worker_fc_enable) {
return;
}

ucs_assert(worker->rndv_mtype_fc.active_frags > 0);
worker->rndv_mtype_fc.active_frags--;

/* Dequeue with priority: PUT > GET > RTR */
if (!ucs_queue_is_empty(&worker->rndv_mtype_fc.put_pending_q)) {
elem = ucs_queue_pull(&worker->rndv_mtype_fc.put_pending_q);
} else if (!ucs_queue_is_empty(&worker->rndv_mtype_fc.get_pending_q)) {
elem = ucs_queue_pull(&worker->rndv_mtype_fc.get_pending_q);
} else if (!ucs_queue_is_empty(&worker->rndv_mtype_fc.rtr_pending_q)) {
elem = ucs_queue_pull(&worker->rndv_mtype_fc.rtr_pending_q);
}

if (elem != NULL) {
pending_req = ucs_container_of(elem, ucp_request_t,
send.rndv.ppln.queue_elem);
ucs_callbackq_add_oneshot(&worker->uct->progress_q, pending_req,
ucp_proto_rndv_mtype_fc_reschedule_cb,
pending_req);
}
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_proto_rndv_mdesc_mtype_copy(ucp_request_t *req,
uct_ep_put_zcopy_func_t copy_func,
Expand Down
1 change: 1 addition & 0 deletions src/ucp/rndv/rndv_ppln.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <ucp/proto/proto_debug.h>
#include <ucp/proto/proto_multi.inl>
#include <ucp/proto/proto_init.h>
#include <ucs/datastruct/callbackq.h>


enum {
Expand Down
Loading
Loading