Skip to content

Commit c7dc491

Browse files
UCP/PROTO: Applied changes from PR openucx#10395
1 parent 1c723da commit c7dc491

File tree

16 files changed

+237
-44
lines changed

16 files changed

+237
-44
lines changed

src/ucp/core/ucp_context.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,7 @@ static ucs_config_field_t ucp_context_config_table[] = {
492492
" 'y' : Print information for all protocols\n"
493493
" 'n' : Do not print any protocol information\n"
494494
" 'auto' : Print information when UCX_LOG_LEVEL is 'debug' or higher\n"
495+
" 'used' : Print information for used protocols\n"
495496
" glob_pattern : Print information for operations matching the glob pattern.\n"
496497
" For example: '*tag*gpu*', '*put*fast*host*'",
497498
ucs_offsetof(ucp_context_config_t, proto_info), UCS_CONFIG_TYPE_STRING},
@@ -2398,6 +2399,10 @@ static ucs_status_t ucp_fill_config(ucp_context_h context,
23982399
context->config.progress_wrapper_enabled =
23992400
ucs_log_is_enabled(UCS_LOG_LEVEL_TRACE_REQ) ||
24002401
ucp_context_usage_tracker_enabled(context);
2402+
2403+
context->config.trace_used_proto_selections =
2404+
!strcasecmp(context->config.ext.proto_info, "used");
2405+
24012406
return UCS_OK;
24022407

24032408
err_free_key_list:

src/ucp/core/ucp_context.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,9 @@ typedef struct ucp_context {
460460
/* Progress wrapper enabled */
461461
int progress_wrapper_enabled;
462462

463+
/* Indicate whether tracing for used protocol selections is enabled */
464+
int trace_used_proto_selections;
465+
463466
struct {
464467
unsigned count;
465468
size_t *sizes;

src/ucp/core/ucp_request.c

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -729,19 +729,43 @@ void ucp_request_purge_enqueue_cb(uct_pending_req_t *self, void *arg)
729729
ucs_queue_push(queue, (ucs_queue_elem_t*)&req->send.uct.priv);
730730
}
731731

732+
ucs_status_t ucp_request_progress_counter(uct_pending_req_t *self)
733+
{
734+
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
735+
ucp_proto_config_t *conf = ucs_const_cast(ucp_proto_config_t *,
736+
req->send.proto_config);
737+
const ucp_proto_t *proto = conf->proto;
738+
ucs_status_t status;
739+
740+
status = proto->progress[UCP_PROTO_STAGE_START](self);
741+
if (status != UCS_OK) {
742+
return status;
743+
}
744+
745+
if (++conf->selections == UCP_PROTO_SELECTIONS_COUNT_MAX) {
746+
ucp_trace_req(req, "protocol %s was selected %u times, stop tracing",
747+
proto->name, conf->selections);
748+
memcpy(conf->progress_wrapper, proto->progress,
749+
sizeof(conf->progress_wrapper));
750+
}
751+
752+
return UCS_OK;
753+
}
754+
732755
ucs_status_t ucp_request_progress_wrapper(uct_pending_req_t *self)
733756
{
734757
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
735-
const ucp_proto_t *proto = req->send.proto_config->proto;
758+
ucp_proto_config_t *conf = ucs_const_cast(ucp_proto_config_t *,
759+
req->send.proto_config);
760+
const ucp_proto_t *proto = conf->proto;
736761
uct_pending_callback_t progress_cb;
737762
ucs_status_t status;
738763

739764
progress_cb = proto->progress[req->send.proto_stage];
740765
ucp_trace_req(req,
741766
"progress %s {%s} ep_cfg[%d] rkey_cfg[%d] offset %zu/%zu",
742767
proto->name, ucs_debug_get_symbol_name(progress_cb),
743-
req->send.proto_config->ep_cfg_index,
744-
req->send.proto_config->rkey_cfg_index,
768+
conf->ep_cfg_index, conf->rkey_cfg_index,
745769
req->send.state.dt_iter.offset,
746770
req->send.state.dt_iter.length);
747771

@@ -753,9 +777,39 @@ ucs_status_t ucp_request_progress_wrapper(uct_pending_req_t *self)
753777
ucp_trace_req(req, "progress protocol %s returned: %s lane %d",
754778
proto->name, ucs_status_string(status), req->send.lane);
755779
} else {
780+
if (req->send.proto_stage == UCP_PROTO_STAGE_START) {
781+
++conf->selections;
782+
}
783+
756784
ucp_trace_req(req, "progress protocol %s returned: %s", proto->name,
757785
ucs_status_string(status));
758786
}
759787
ucs_log_indent(-1);
760788
return status;
761789
}
790+
791+
void ucp_request_init_progress_wrapper(ucp_worker_h worker,
792+
ucp_proto_config_t *proto_config,
793+
int internal)
794+
{
795+
uint8_t stage;
796+
797+
if (worker->context->config.progress_wrapper_enabled) {
798+
for (stage = UCP_PROTO_STAGE_START; stage < UCP_PROTO_STAGE_LAST;
799+
++stage) {
800+
proto_config->progress_wrapper[stage] =
801+
ucp_request_progress_wrapper;
802+
}
803+
return;
804+
}
805+
806+
/* Set wrappers pointing to the original protocol functions */
807+
memcpy(proto_config->progress_wrapper, proto_config->proto->progress,
808+
sizeof(proto_config->progress_wrapper));
809+
810+
if (worker->context->config.trace_used_proto_selections) {
811+
/* TODO: should we disable progress wrapper for internal protocols? */
812+
proto_config->progress_wrapper[UCP_PROTO_STAGE_START] =
813+
ucp_request_progress_counter;
814+
}
815+
}

src/ucp/core/ucp_request.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,4 +587,8 @@ void ucp_request_purge_enqueue_cb(uct_pending_req_t *self, void *arg);
587587

588588
ucs_status_t ucp_request_progress_wrapper(uct_pending_req_t *self);
589589

590+
void ucp_request_init_progress_wrapper(ucp_worker_h worker,
591+
ucp_proto_config_t *proto_config,
592+
int internal);
593+
590594
#endif

src/ucp/core/ucp_worker.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2267,6 +2267,21 @@ static void ucp_worker_keepalive_reset(ucp_worker_h worker)
22672267
worker->keepalive.round_count = 0;
22682268
}
22692269

2270+
static void ucp_worker_trace_configs(ucp_worker_h worker)
2271+
{
2272+
ucp_ep_config_t *ep_config;
2273+
ucp_rkey_config_t *rkey_config;
2274+
2275+
ucs_array_for_each(ep_config, &worker->ep_config) {
2276+
ucp_proto_select_trace(worker, &ep_config->proto_select);
2277+
}
2278+
2279+
ucs_carray_for_each(rkey_config, worker->rkey_config,
2280+
worker->rkey_config_count) {
2281+
ucp_proto_select_trace(worker, &rkey_config->proto_select);
2282+
}
2283+
}
2284+
22702285
static void ucp_worker_destroy_configs(ucp_worker_h worker)
22712286
{
22722287
ucp_ep_config_t *ep_config;
@@ -2921,6 +2936,9 @@ static void ucp_worker_destroy_eps(ucp_worker_h worker,
29212936
void ucp_worker_destroy(ucp_worker_h worker)
29222937
{
29232938
ucs_debug("destroy worker %p", worker);
2939+
if (worker->context->config.trace_used_proto_selections) {
2940+
ucp_worker_trace_configs(worker);
2941+
}
29242942

29252943
UCS_ASYNC_BLOCK(&worker->async);
29262944
uct_worker_progress_unregister_safe(worker->uct, &worker->keepalive.cb_id);

src/ucp/proto/proto.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,9 @@ typedef struct {
141141

142142
/* Map of used lanes */
143143
ucp_lane_map_t lane_map;
144+
145+
/* Selections count */
146+
unsigned selections;
144147
} ucp_proto_query_attr_t;
145148

146149

src/ucp/proto/proto_common.inl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,7 @@ ucp_proto_request_set_stage(ucp_request_t *req, uint8_t proto_stage)
175175
req->send.proto_stage = proto_stage;
176176

177177
/* Set pointer to progress function */
178-
if (req->send.ep->worker->context->config.progress_wrapper_enabled) {
179-
req->send.uct.func = ucp_request_progress_wrapper;
180-
} else {
181-
req->send.uct.func = proto->progress[proto_stage];
182-
}
178+
req->send.uct.func = req->send.proto_config->progress_wrapper[proto_stage];
183179
}
184180

185181
/* Select protocol for the request and initialize protocol-related fields */

src/ucp/proto/proto_debug.c

Lines changed: 71 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ struct ucp_proto_perf_node {
6464

6565
/* Protocol information table */
6666
typedef struct {
67+
char counter_str[32];
6768
char range_str[32];
6869
char desc[UCP_PROTO_DESC_STR_MAX];
6970
char config[UCP_PROTO_CONFIG_STR_MAX];
@@ -156,16 +157,22 @@ static void ucp_proto_table_row_separator(ucs_string_buffer_t *strb,
156157
}
157158

158159
static int ucp_proto_debug_is_info_enabled(ucp_context_h context,
159-
const char *select_param_str)
160+
const char *select_param_str,
161+
int show_used)
160162
{
161163
const char *proto_info_config = context->config.ext.proto_info;
162164
int bool_value;
163165

166+
/* Handle "used" */
167+
if (show_used) {
168+
return context->config.trace_used_proto_selections;
169+
}
170+
164171
/* Handle "auto" - enable when log level is DEBUG or higher */
165172
if (!strcasecmp(proto_info_config, "auto")) {
166173
return ucs_log_is_enabled(UCS_LOG_LEVEL_DEBUG);
167174
}
168-
175+
169176
/* Handle boolean */
170177
if (ucs_config_sscanf_bool(proto_info_config, &bool_value, NULL)) {
171178
return bool_value;
@@ -175,29 +182,67 @@ static int ucp_proto_debug_is_info_enabled(ucp_context_h context,
175182
return fnmatch(proto_info_config, select_param_str, FNM_CASEFOLD) == 0;
176183
}
177184

178-
void ucp_proto_select_elem_info(ucp_worker_h worker,
179-
ucp_worker_cfg_index_t ep_cfg_index,
180-
ucp_worker_cfg_index_t rkey_cfg_index,
181-
const ucp_proto_select_param_t *select_param,
182-
const ucp_proto_select_elem_t *select_elem,
183-
int show_all, ucs_string_buffer_t *strb)
185+
static inline int
186+
ucp_proto_select_elem_has_selections(const ucp_proto_select_elem_t *select_elem)
187+
{
188+
const ucp_proto_threshold_elem_t *thresh_elem = select_elem->thresholds;
189+
190+
do {
191+
if (thresh_elem->proto_config.selections > 0) {
192+
return 1;
193+
}
194+
} while ((thresh_elem++)->max_msg_length < SIZE_MAX);
195+
196+
return 0;
197+
}
198+
199+
static void
200+
ucp_proto_selections_dump(const ucp_proto_query_attr_t *proto_attr,
201+
char *counter_str, size_t size, int show_used)
202+
{
203+
unsigned max_selections;
204+
205+
if (!show_used) {
206+
*counter_str = '\0';
207+
return;
208+
}
209+
210+
max_selections = (strstr(proto_attr->desc, "short") != NULL) ?
211+
1 : UCP_PROTO_SELECTIONS_COUNT_MAX;
212+
213+
ucs_snprintf_safe(counter_str, size, "%u%s ", proto_attr->selections,
214+
(proto_attr->selections >= max_selections ? "+" : ""));
215+
}
216+
217+
void
218+
ucp_proto_select_elem_info(ucp_worker_h worker,
219+
ucp_worker_cfg_index_t ep_cfg_index,
220+
ucp_worker_cfg_index_t rkey_cfg_index,
221+
const ucp_proto_select_param_t *select_param,
222+
const ucp_proto_select_elem_t *select_elem,
223+
int show_all, int show_used, ucs_string_buffer_t *strb)
184224
{
185225
UCS_STRING_BUFFER_ONSTACK(ep_cfg_strb, UCP_PROTO_CONFIG_STR_MAX);
186226
UCS_STRING_BUFFER_ONSTACK(sel_param_strb, UCP_PROTO_CONFIG_STR_MAX);
187-
static const char *info_row_fmt = "| %*s | %-*s | %-*s |\n";
227+
static const char *info_row_fmt = "| %s%*s | %-*s | %-*s |\n";
188228
ucp_proto_info_table_t table;
189229
int hdr_col_width[2], col_width[3];
190230
ucp_proto_query_attr_t proto_attr;
191231
ucp_proto_info_row_t *row_elem;
192232
size_t range_start, range_end;
193233
int proto_valid;
194234

235+
if (show_used && !ucp_proto_select_elem_has_selections(select_elem)) {
236+
return;
237+
}
238+
195239
ucp_proto_select_param_dump(worker, ep_cfg_index, rkey_cfg_index,
196240
select_param, ucp_operation_descs, &ep_cfg_strb,
197241
&sel_param_strb);
198242
if (!show_all &&
199243
!ucp_proto_debug_is_info_enabled(
200-
worker->context, ucs_string_buffer_cstr(&sel_param_strb))) {
244+
worker->context, ucs_string_buffer_cstr(&sel_param_strb),
245+
show_used)) {
201246
return;
202247
}
203248

@@ -228,7 +273,11 @@ void ucp_proto_select_elem_info(ucp_worker_h worker,
228273
ucs_memunits_range_str(range_start, range_end, row_elem->range_str,
229274
sizeof(row_elem->range_str));
230275

231-
col_width[0] = ucs_max(col_width[0], strlen(row_elem->range_str));
276+
ucp_proto_selections_dump(&proto_attr, row_elem->counter_str,
277+
sizeof(row_elem->counter_str), show_used);
278+
279+
col_width[0] = ucs_max(col_width[0], strlen(row_elem->counter_str) +
280+
strlen(row_elem->range_str));
232281
col_width[1] = ucs_max(col_width[1], strlen(row_elem->desc));
233282
col_width[2] = ucs_max(col_width[2], strlen(row_elem->config));
234283
} while (range_end != SIZE_MAX);
@@ -250,7 +299,8 @@ void ucp_proto_select_elem_info(ucp_worker_h worker,
250299
/* Print contents */
251300
ucp_proto_table_row_separator(strb, col_width, 3);
252301
ucs_array_for_each(row_elem, &table) {
253-
ucs_string_buffer_appendf(strb, info_row_fmt, col_width[0],
302+
ucs_string_buffer_appendf(strb, info_row_fmt, row_elem->counter_str,
303+
col_width[0] - strlen(row_elem->counter_str),
254304
row_elem->range_str, col_width[1],
255305
row_elem->desc, col_width[2],
256306
row_elem->config);
@@ -271,7 +321,7 @@ void ucp_proto_select_info(ucp_worker_h worker,
271321

272322
kh_foreach(proto_select->hash, key.u64, select_elem,
273323
ucp_proto_select_elem_info(worker, ep_cfg_index, rkey_cfg_index,
274-
&key.param, &select_elem, show_all,
324+
&key.param, &select_elem, show_all, 0,
275325
strb);
276326
ucs_string_buffer_appendf(strb, "\n"))
277327
}
@@ -973,7 +1023,7 @@ ucp_proto_select_write_info(ucp_worker_h worker,
9731023
ucp_operation_names, &ep_cfg_strb,
9741024
&sel_param_strb);
9751025
if (!ucp_proto_debug_is_info_enabled(
976-
worker->context, ucs_string_buffer_cstr(&sel_param_strb))) {
1026+
worker->context, ucs_string_buffer_cstr(&sel_param_strb), 0)) {
9771027
goto out;
9781028
}
9791029

@@ -1038,17 +1088,19 @@ ucp_proto_select_write_info(ucp_worker_h worker,
10381088
}
10391089

10401090
void ucp_proto_select_elem_trace(ucp_worker_h worker,
1041-
ucp_worker_cfg_index_t ep_cfg_index,
1042-
ucp_worker_cfg_index_t rkey_cfg_index,
10431091
const ucp_proto_select_param_t *select_param,
1044-
ucp_proto_select_elem_t *select_elem)
1092+
const ucp_proto_select_elem_t *select_elem,
1093+
int show_used)
10451094
{
1046-
ucs_string_buffer_t strb = UCS_STRING_BUFFER_INITIALIZER;
1095+
const ucp_proto_config_t *proto_config = &select_elem->thresholds[0].proto_config;
1096+
ucp_worker_cfg_index_t ep_cfg_index = proto_config->ep_cfg_index;
1097+
ucp_worker_cfg_index_t rkey_cfg_index = proto_config->rkey_cfg_index;
1098+
ucs_string_buffer_t strb = UCS_STRING_BUFFER_INITIALIZER;
10471099
char *line;
10481100

10491101
/* Print human-readable protocol selection table to the log */
10501102
ucp_proto_select_elem_info(worker, ep_cfg_index, rkey_cfg_index,
1051-
select_param, select_elem, 0, &strb);
1103+
select_param, select_elem, 0, show_used, &strb);
10521104
ucs_string_buffer_for_each_token(line, &strb, "\n") {
10531105
ucs_log_print_compact(line);
10541106
}

src/ucp/proto/proto_debug.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,14 @@ void ucp_proto_select_elem_info(ucp_worker_h worker,
157157
ucp_worker_cfg_index_t rkey_cfg_index,
158158
const ucp_proto_select_param_t *select_param,
159159
const ucp_proto_select_elem_t *select_elem,
160-
int show_all, ucs_string_buffer_t *strb);
160+
int show_all, int show_used,
161+
ucs_string_buffer_t *strb);
161162

162163

163164
void ucp_proto_select_elem_trace(ucp_worker_h worker,
164-
ucp_worker_cfg_index_t ep_cfg_index,
165-
ucp_worker_cfg_index_t rkey_cfg_index,
166165
const ucp_proto_select_param_t *select_param,
167-
ucp_proto_select_elem_t *select_elem);
166+
const ucp_proto_select_elem_t *select_elem,
167+
int show_used);
168168

169169

170170
void ucp_proto_select_write_info(

src/ucp/proto/proto_reconfig.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ static ucs_status_t ucp_proto_reconfig_select_progress(uct_pending_req_t *self)
3030
return UCS_ERR_NO_RESOURCE;
3131
}
3232

33+
/* coverity[address_free] */
3334
return req->send.uct.func(&req->send.uct);
3435
}
3536

0 commit comments

Comments
 (0)