Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ static ucs_config_field_t ucp_context_config_table[] = {
" 'y' : Print information for all protocols\n"
" 'n' : Do not print any protocol information\n"
" 'auto' : Print information when UCX_LOG_LEVEL is 'debug' or higher\n"
" 'used' : Print information for used protocols\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we need another var now? Otherwise, how one can ask for used protocols to be printed with/without debug log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When set to used the used protocols table will be printed regardless of the log level.
But currently with auto it will print all protocols when the log level is debug or higher.
We could change the auto mode to print the used protocols when debug logs are enabled (instead of all protocols) if this is going to be the common use case.
What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems to be a bit confusing to me. Imo we have to manage when we want to print it and the format of the output separately. I was thinking about another env var which would define the format of the output (with values full and used).
@iyastreb wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I don't see a problem here. Use cases I see:

  1. No configs specified, mode is auto, prints nothing
  2. LOG_LEVEL=debug, mode is auto = print ALL PS tables
  3. LOG_LEVEL=debug, PROTO_INFO=used (explicit setting), prints only used PS

So LOG_LEVEL can only influence mode if it's not set explicitly (auto).

Copy link
Contributor

@brminich brminich Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would I print just protocol tables with option used (or without it)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like today: UCX_PROTO_INFO=y to print all PS tables
UCX_PROTO_INFO=used to print actually used tables

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, lets use just one var, but I suggest to define it like this:
UCX_PROTO_INFO values:
y - prints tables in "used" format regardless of LOG_LEVEL
n - does not print tables
auto - print tables in "used" format when LOG_LEVEL is debug
all - print tables in the current format (i. e. print all)

@guy-ealey-morag, @iyastreb wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tricky part is that used PS tables are printed only at the end of the UCP worker lifetime (if it's terminated gracefully!). So essentially it changes behavior UCX users get used to. For example, in sglang I use this feature but I have to print PS tables on some event, because sglang app does not terminate gracefully. So I don't know what is better.. I would keep it as is, and improve in the future

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

" glob_pattern : Print information for operations matching the glob pattern.\n"
" For example: '*tag*gpu*', '*put*fast*host*'",
ucs_offsetof(ucp_context_config_t, proto_info), UCS_CONFIG_TYPE_STRING},
Expand Down Expand Up @@ -2398,6 +2399,10 @@ static ucs_status_t ucp_fill_config(ucp_context_h context,
context->config.progress_wrapper_enabled =
ucs_log_is_enabled(UCS_LOG_LEVEL_TRACE_REQ) ||
ucp_context_usage_tracker_enabled(context);

context->config.trace_used_proto_selections =
!strcasecmp(context->config.ext.proto_info, "used");

return UCS_OK;

err_free_key_list:
Expand Down
3 changes: 3 additions & 0 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,9 @@ typedef struct ucp_context {
/* Progress wrapper enabled */
int progress_wrapper_enabled;

/* Indicate whether tracing for used protocol selections is enabled */
int trace_used_proto_selections;

struct {
unsigned count;
size_t *sizes;
Expand Down
62 changes: 58 additions & 4 deletions src/ucp/core/ucp_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -729,33 +729,87 @@ void ucp_request_purge_enqueue_cb(uct_pending_req_t *self, void *arg)
ucs_queue_push(queue, (ucs_queue_elem_t*)&req->send.uct.priv);
}

ucs_status_t ucp_request_progress_counter(uct_pending_req_t *self)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t,
send.uct);
ucp_proto_config_t *proto_config = ucs_const_cast(ucp_proto_config_t*,
req->send.proto_config);
const ucp_proto_t *proto = proto_config->proto;
ucs_status_t status;

status = proto->progress[UCP_PROTO_STAGE_START](self);
if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
/* NOTE: This function is only called when `progress_wrapper_enabled`
* is `false`, which means that it won't be called when the log level
* is TRACE_REQ or higher. Because of this, `ucs_trace` is used here
* instead of `ucp_trace_req` */
ucs_trace("progress protocol %s returned: %s lane %d", proto->name,
ucs_status_string(status), req->send.lane);
return status;
}

++proto_config->selections;

return UCS_OK;
}

ucs_status_t ucp_request_progress_wrapper(uct_pending_req_t *self)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
const ucp_proto_t *proto = req->send.proto_config->proto;
ucp_proto_config_t *conf = ucs_const_cast(ucp_proto_config_t *,
req->send.proto_config);
const ucp_proto_t *proto = conf->proto;
uct_pending_callback_t progress_cb;
ucs_status_t status;

progress_cb = proto->progress[req->send.proto_stage];
ucp_trace_req(req,
"progress %s {%s} ep_cfg[%d] rkey_cfg[%d] offset %zu/%zu",
proto->name, ucs_debug_get_symbol_name(progress_cb),
req->send.proto_config->ep_cfg_index,
req->send.proto_config->rkey_cfg_index,
conf->ep_cfg_index, conf->rkey_cfg_index,
req->send.state.dt_iter.offset,
req->send.state.dt_iter.length);

ucp_worker_track_ep_usage(req);

ucs_log_indent(1);
status = progress_cb(self);
if (UCS_STATUS_IS_ERR(status)) {
if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
ucp_trace_req(req, "progress protocol %s returned: %s lane %d",
proto->name, ucs_status_string(status), req->send.lane);
} else {
if (req->send.proto_stage == UCP_PROTO_STAGE_START) {
++conf->selections;
}

ucp_trace_req(req, "progress protocol %s returned: %s", proto->name,
ucs_status_string(status));
}
ucs_log_indent(-1);
return status;
}

void ucp_request_progress_wrapper_init(ucp_worker_h worker,
ucp_proto_config_t *proto_config)
{
uint8_t stage;

if (worker->context->config.progress_wrapper_enabled) {
for (stage = UCP_PROTO_STAGE_START; stage < UCP_PROTO_STAGE_LAST;
++stage) {
proto_config->progress_wrapper[stage] =
ucp_request_progress_wrapper;
}
return;
}

/* Set wrappers pointing to the original protocol functions */
memcpy(proto_config->progress_wrapper, proto_config->proto->progress,
sizeof(proto_config->progress_wrapper));

if (worker->context->config.trace_used_proto_selections) {
proto_config->progress_wrapper[UCP_PROTO_STAGE_START] =
ucp_request_progress_counter;
}
}
3 changes: 3 additions & 0 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -587,4 +587,7 @@ void ucp_request_purge_enqueue_cb(uct_pending_req_t *self, void *arg);

ucs_status_t ucp_request_progress_wrapper(uct_pending_req_t *self);

void ucp_request_progress_wrapper_init(ucp_worker_h worker,
ucp_proto_config_t *proto_config);

#endif
18 changes: 18 additions & 0 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2272,6 +2272,21 @@ static void ucp_worker_keepalive_reset(ucp_worker_h worker)
worker->keepalive.round_count = 0;
}

static void ucp_worker_trace_configs(ucp_worker_h worker)
{
ucp_ep_config_t *ep_config;
ucp_rkey_config_t *rkey_config;

ucs_array_for_each(ep_config, &worker->ep_config) {
ucp_proto_select_trace(worker, &ep_config->proto_select);
}

ucs_carray_for_each(rkey_config, worker->rkey_config,
worker->rkey_config_count) {
ucp_proto_select_trace(worker, &rkey_config->proto_select);
}
}

static void ucp_worker_destroy_configs(ucp_worker_h worker)
{
ucp_ep_config_t *ep_config;
Expand Down Expand Up @@ -2926,6 +2941,9 @@ static void ucp_worker_destroy_eps(ucp_worker_h worker,
void ucp_worker_destroy(ucp_worker_h worker)
{
ucs_debug("destroy worker %p", worker);
if (worker->context->config.trace_used_proto_selections) {
ucp_worker_trace_configs(worker);
}

UCS_ASYNC_BLOCK(&worker->async);
uct_worker_progress_unregister_safe(worker->uct, &worker->keepalive.cb_id);
Expand Down
3 changes: 3 additions & 0 deletions src/ucp/proto/proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ typedef struct {

/* Map of used lanes */
ucp_lane_map_t lane_map;

/* Selections count */
unsigned selections;
} ucp_proto_query_attr_t;


Expand Down
6 changes: 1 addition & 5 deletions src/ucp/proto/proto_common.inl
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,7 @@ ucp_proto_request_set_stage(ucp_request_t *req, uint8_t proto_stage)
req->send.proto_stage = proto_stage;

/* Set pointer to progress function */
if (req->send.ep->worker->context->config.progress_wrapper_enabled) {
req->send.uct.func = ucp_request_progress_wrapper;
} else {
req->send.uct.func = proto->progress[proto_stage];
}
req->send.uct.func = req->send.proto_config->progress_wrapper[proto_stage];
}

/* Select protocol for the request and initialize protocol-related fields */
Expand Down
95 changes: 77 additions & 18 deletions src/ucp/proto/proto_debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct ucp_proto_perf_node {

/* Protocol information table */
typedef struct {
char counter_str[32];
char range_str[32];
char desc[UCP_PROTO_DESC_STR_MAX];
char config[UCP_PROTO_CONFIG_STR_MAX];
Expand Down Expand Up @@ -156,11 +157,16 @@ static void ucp_proto_table_row_separator(ucs_string_buffer_t *strb,
}

static int ucp_proto_debug_is_info_enabled(ucp_context_h context,
const char *select_param_str)
const char *select_param_str,
int show_used)
{
const char *proto_info_config = context->config.ext.proto_info;
int bool_value;

if (show_used) {
return context->config.trace_used_proto_selections;
}

/* Handle "auto" - enable when log level is DEBUG or higher */
if (!strcasecmp(proto_info_config, "auto")) {
return ucs_log_is_enabled(UCS_LOG_LEVEL_DEBUG);
Expand All @@ -175,29 +181,74 @@ static int ucp_proto_debug_is_info_enabled(ucp_context_h context,
return fnmatch(proto_info_config, select_param_str, FNM_CASEFOLD) == 0;
}

void ucp_proto_select_elem_info(ucp_worker_h worker,
ucp_worker_cfg_index_t ep_cfg_index,
ucp_worker_cfg_index_t rkey_cfg_index,
const ucp_proto_select_param_t *select_param,
const ucp_proto_select_elem_t *select_elem,
int show_all, ucs_string_buffer_t *strb)
static inline int
ucp_proto_select_elem_has_selections(const ucp_proto_select_elem_t *select_elem)
{
const ucp_proto_threshold_elem_t *thresh_elem = select_elem->thresholds;

do {
if (thresh_elem->proto_config.selections > 0) {
return 1;
}
} while ((thresh_elem++)->max_msg_length < SIZE_MAX);

return 0;
}

static void
ucp_proto_selections_dump(ucp_worker_h worker,
const ucp_proto_query_attr_t *proto_attr,
const ucp_proto_select_param_t *select_param,
char *counter_str, size_t size, int show_used)
{
ucp_operation_id_t op_id;

if (!show_used) {
*counter_str = '\0';
return;
}

/* Short active messages protocol selections are not counted */
op_id = ucp_proto_select_op_id(select_param);
if ((op_id == UCP_OP_ID_AM_SEND || op_id == UCP_OP_ID_AM_SEND_REPLY) &&
strstr(proto_attr->desc, "short") != NULL) {
ucs_assert(proto_attr->selections == 0);
*counter_str = '\0';
return;
}

ucs_snprintf_safe(counter_str, size, "%u ", proto_attr->selections);
}

void
ucp_proto_select_elem_info(ucp_worker_h worker,
ucp_worker_cfg_index_t ep_cfg_index,
ucp_worker_cfg_index_t rkey_cfg_index,
const ucp_proto_select_param_t *select_param,
const ucp_proto_select_elem_t *select_elem,
int show_all, int show_used, ucs_string_buffer_t *strb)
{
UCS_STRING_BUFFER_ONSTACK(ep_cfg_strb, UCP_PROTO_CONFIG_STR_MAX);
UCS_STRING_BUFFER_ONSTACK(sel_param_strb, UCP_PROTO_CONFIG_STR_MAX);
static const char *info_row_fmt = "| %*s | %-*s | %-*s |\n";
static const char *info_row_fmt = "| %s%*s | %-*s | %-*s |\n";
ucp_proto_info_table_t table;
int hdr_col_width[2], col_width[3];
ucp_proto_query_attr_t proto_attr;
ucp_proto_info_row_t *row_elem;
size_t range_start, range_end;
int proto_valid;

if (show_used && !ucp_proto_select_elem_has_selections(select_elem)) {
return;
}

ucp_proto_select_param_dump(worker, ep_cfg_index, rkey_cfg_index,
select_param, ucp_operation_descs, &ep_cfg_strb,
&sel_param_strb);
if (!show_all &&
!ucp_proto_debug_is_info_enabled(
worker->context, ucs_string_buffer_cstr(&sel_param_strb))) {
worker->context, ucs_string_buffer_cstr(&sel_param_strb),
show_used)) {
return;
}

Expand Down Expand Up @@ -228,7 +279,12 @@ void ucp_proto_select_elem_info(ucp_worker_h worker,
ucs_memunits_range_str(range_start, range_end, row_elem->range_str,
sizeof(row_elem->range_str));

col_width[0] = ucs_max(col_width[0], strlen(row_elem->range_str));
ucp_proto_selections_dump(worker, &proto_attr, select_param,
row_elem->counter_str,
sizeof(row_elem->counter_str), show_used);

col_width[0] = ucs_max(col_width[0], strlen(row_elem->counter_str) +
strlen(row_elem->range_str));
col_width[1] = ucs_max(col_width[1], strlen(row_elem->desc));
col_width[2] = ucs_max(col_width[2], strlen(row_elem->config));
} while (range_end != SIZE_MAX);
Expand All @@ -250,7 +306,8 @@ void ucp_proto_select_elem_info(ucp_worker_h worker,
/* Print contents */
ucp_proto_table_row_separator(strb, col_width, 3);
ucs_array_for_each(row_elem, &table) {
ucs_string_buffer_appendf(strb, info_row_fmt, col_width[0],
ucs_string_buffer_appendf(strb, info_row_fmt, row_elem->counter_str,
col_width[0] - strlen(row_elem->counter_str),
row_elem->range_str, col_width[1],
row_elem->desc, col_width[2],
row_elem->config);
Expand All @@ -271,7 +328,7 @@ void ucp_proto_select_info(ucp_worker_h worker,

kh_foreach(proto_select->hash, key.u64, select_elem,
ucp_proto_select_elem_info(worker, ep_cfg_index, rkey_cfg_index,
&key.param, &select_elem, show_all,
&key.param, &select_elem, show_all, 0,
strb);
ucs_string_buffer_appendf(strb, "\n"))
}
Expand Down Expand Up @@ -973,7 +1030,7 @@ ucp_proto_select_write_info(ucp_worker_h worker,
ucp_operation_names, &ep_cfg_strb,
&sel_param_strb);
if (!ucp_proto_debug_is_info_enabled(
worker->context, ucs_string_buffer_cstr(&sel_param_strb))) {
worker->context, ucs_string_buffer_cstr(&sel_param_strb), 0)) {
goto out;
}

Expand Down Expand Up @@ -1038,17 +1095,19 @@ ucp_proto_select_write_info(ucp_worker_h worker,
}

void ucp_proto_select_elem_trace(ucp_worker_h worker,
ucp_worker_cfg_index_t ep_cfg_index,
ucp_worker_cfg_index_t rkey_cfg_index,
const ucp_proto_select_param_t *select_param,
ucp_proto_select_elem_t *select_elem)
const ucp_proto_select_elem_t *select_elem,
int show_used)
{
ucs_string_buffer_t strb = UCS_STRING_BUFFER_INITIALIZER;
const ucp_proto_config_t *proto_config = &select_elem->thresholds[0].proto_config;
ucp_worker_cfg_index_t ep_cfg_index = proto_config->ep_cfg_index;
ucp_worker_cfg_index_t rkey_cfg_index = proto_config->rkey_cfg_index;
ucs_string_buffer_t strb = UCS_STRING_BUFFER_INITIALIZER;
char *line;

/* Print human-readable protocol selection table to the log */
ucp_proto_select_elem_info(worker, ep_cfg_index, rkey_cfg_index,
select_param, select_elem, 0, &strb);
select_param, select_elem, 0, show_used, &strb);
ucs_string_buffer_for_each_token(line, &strb, "\n") {
ucs_log_print_compact(line);
}
Expand Down
8 changes: 4 additions & 4 deletions src/ucp/proto/proto_debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,14 @@ void ucp_proto_select_elem_info(ucp_worker_h worker,
ucp_worker_cfg_index_t rkey_cfg_index,
const ucp_proto_select_param_t *select_param,
const ucp_proto_select_elem_t *select_elem,
int show_all, ucs_string_buffer_t *strb);
int show_all, int show_used,
ucs_string_buffer_t *strb);


void ucp_proto_select_elem_trace(ucp_worker_h worker,
ucp_worker_cfg_index_t ep_cfg_index,
ucp_worker_cfg_index_t rkey_cfg_index,
const ucp_proto_select_param_t *select_param,
ucp_proto_select_elem_t *select_elem);
const ucp_proto_select_elem_t *select_elem,
int show_used);


void ucp_proto_select_write_info(
Expand Down
1 change: 1 addition & 0 deletions src/ucp/proto/proto_reconfig.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ static ucs_status_t ucp_proto_reconfig_select_progress(uct_pending_req_t *self)
return UCS_ERR_NO_RESOURCE;
}

/* coverity[address_free] */
return req->send.uct.func(&req->send.uct);
}

Expand Down
Loading
Loading