Skip to content

Commit 80b7d51

Browse files
UCP/PROTO: Applied changes from PR openucx#10395
1 parent 328841f commit 80b7d51

File tree

16 files changed

+235
-43
lines changed

16 files changed

+235
-43
lines changed

src/ucp/core/ucp_context.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,7 @@ static ucs_config_field_t ucp_context_config_table[] = {
491491
"Enable printing protocols information. The value is interpreted as follows:\n"
492492
" 'y' : Print information for all protocols\n"
493493
" 'n' : Do not print any protocol information\n"
494+
" used : Print information for used protocols\n"
494495
" glob_pattern : Print information for operations matching the glob pattern.\n"
495496
" For example: '*tag*gpu*', '*put*fast*host*'",
496497
ucs_offsetof(ucp_context_config_t, proto_info), UCS_CONFIG_TYPE_STRING},
@@ -2397,6 +2398,10 @@ static ucs_status_t ucp_fill_config(ucp_context_h context,
23972398
context->config.progress_wrapper_enabled =
23982399
ucs_log_is_enabled(UCS_LOG_LEVEL_TRACE_REQ) ||
23992400
ucp_context_usage_tracker_enabled(context);
2401+
2402+
context->config.trace_used_proto_selections =
2403+
!strcasecmp(context->config.ext.proto_info, "used");
2404+
24002405
return UCS_OK;
24012406

24022407
err_free_key_list:

src/ucp/core/ucp_context.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,9 @@ typedef struct ucp_context {
460460
/* Progress wrapper enabled */
461461
int progress_wrapper_enabled;
462462

463+
/* Indicate whether tracing for used protocol selections is enabled */
464+
int trace_used_proto_selections;
465+
463466
struct {
464467
unsigned count;
465468
size_t *sizes;

src/ucp/core/ucp_request.c

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -729,19 +729,43 @@ void ucp_request_purge_enqueue_cb(uct_pending_req_t *self, void *arg)
729729
ucs_queue_push(queue, (ucs_queue_elem_t*)&req->send.uct.priv);
730730
}
731731

732+
ucs_status_t ucp_request_progress_counter(uct_pending_req_t *self)
733+
{
734+
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
735+
ucp_proto_config_t *conf = ucs_const_cast(ucp_proto_config_t *,
736+
req->send.proto_config);
737+
const ucp_proto_t *proto = conf->proto;
738+
ucs_status_t status;
739+
740+
status = proto->progress[UCP_PROTO_STAGE_START](self);
741+
if (status != UCS_OK) {
742+
return status;
743+
}
744+
745+
if (++conf->selections == UCP_PROTO_SELECTIONS_COUNT_MAX) {
746+
ucp_trace_req(req, "protocol %s was selected %u times, stop tracing",
747+
proto->name, conf->selections);
748+
memcpy(conf->progress_wrapper, proto->progress,
749+
sizeof(conf->progress_wrapper));
750+
}
751+
752+
return UCS_OK;
753+
}
754+
732755
ucs_status_t ucp_request_progress_wrapper(uct_pending_req_t *self)
733756
{
734757
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
735-
const ucp_proto_t *proto = req->send.proto_config->proto;
758+
ucp_proto_config_t *conf = ucs_const_cast(ucp_proto_config_t *,
759+
req->send.proto_config);
760+
const ucp_proto_t *proto = conf->proto;
736761
uct_pending_callback_t progress_cb;
737762
ucs_status_t status;
738763

739764
progress_cb = proto->progress[req->send.proto_stage];
740765
ucp_trace_req(req,
741766
"progress %s {%s} ep_cfg[%d] rkey_cfg[%d] offset %zu/%zu",
742767
proto->name, ucs_debug_get_symbol_name(progress_cb),
743-
req->send.proto_config->ep_cfg_index,
744-
req->send.proto_config->rkey_cfg_index,
768+
conf->ep_cfg_index, conf->rkey_cfg_index,
745769
req->send.state.dt_iter.offset,
746770
req->send.state.dt_iter.length);
747771

@@ -753,9 +777,39 @@ ucs_status_t ucp_request_progress_wrapper(uct_pending_req_t *self)
753777
ucp_trace_req(req, "progress protocol %s returned: %s lane %d",
754778
proto->name, ucs_status_string(status), req->send.lane);
755779
} else {
780+
if (req->send.proto_stage == UCP_PROTO_STAGE_START) {
781+
++conf->selections;
782+
}
783+
756784
ucp_trace_req(req, "progress protocol %s returned: %s", proto->name,
757785
ucs_status_string(status));
758786
}
759787
ucs_log_indent(-1);
760788
return status;
761789
}
790+
791+
void ucp_request_init_progress_wrapper(ucp_worker_h worker,
792+
ucp_proto_config_t *proto_config,
793+
int internal)
794+
{
795+
uint8_t stage;
796+
797+
if (worker->context->config.progress_wrapper_enabled) {
798+
for (stage = UCP_PROTO_STAGE_START; stage < UCP_PROTO_STAGE_LAST;
799+
++stage) {
800+
proto_config->progress_wrapper[stage] =
801+
ucp_request_progress_wrapper;
802+
}
803+
return;
804+
}
805+
806+
/* Set wrappers pointing to the original protocol functions */
807+
memcpy(proto_config->progress_wrapper, proto_config->proto->progress,
808+
sizeof(proto_config->progress_wrapper));
809+
810+
if (worker->context->config.trace_used_proto_selections) {
811+
/* TODO: should we disable progress wrapper for internal protocols? */
812+
proto_config->progress_wrapper[UCP_PROTO_STAGE_START] =
813+
ucp_request_progress_counter;
814+
}
815+
}

src/ucp/core/ucp_request.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,4 +587,8 @@ void ucp_request_purge_enqueue_cb(uct_pending_req_t *self, void *arg);
587587

588588
ucs_status_t ucp_request_progress_wrapper(uct_pending_req_t *self);
589589

590+
void ucp_request_init_progress_wrapper(ucp_worker_h worker,
591+
ucp_proto_config_t *proto_config,
592+
int internal);
593+
590594
#endif

src/ucp/core/ucp_worker.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2267,6 +2267,21 @@ static void ucp_worker_keepalive_reset(ucp_worker_h worker)
22672267
worker->keepalive.round_count = 0;
22682268
}
22692269

2270+
static void ucp_worker_trace_configs(ucp_worker_h worker)
2271+
{
2272+
ucp_ep_config_t *ep_config;
2273+
ucp_rkey_config_t *rkey_config;
2274+
2275+
ucs_array_for_each(ep_config, &worker->ep_config) {
2276+
ucp_proto_select_trace(worker, &ep_config->proto_select);
2277+
}
2278+
2279+
ucs_carray_for_each(rkey_config, worker->rkey_config,
2280+
worker->rkey_config_count) {
2281+
ucp_proto_select_trace(worker, &rkey_config->proto_select);
2282+
}
2283+
}
2284+
22702285
static void ucp_worker_destroy_configs(ucp_worker_h worker)
22712286
{
22722287
ucp_ep_config_t *ep_config;
@@ -2921,6 +2936,9 @@ static void ucp_worker_destroy_eps(ucp_worker_h worker,
29212936
void ucp_worker_destroy(ucp_worker_h worker)
29222937
{
29232938
ucs_debug("destroy worker %p", worker);
2939+
if (worker->context->config.trace_used_proto_selections) {
2940+
ucp_worker_trace_configs(worker);
2941+
}
29242942

29252943
UCS_ASYNC_BLOCK(&worker->async);
29262944
uct_worker_progress_unregister_safe(worker->uct, &worker->keepalive.cb_id);

src/ucp/proto/proto.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,9 @@ typedef struct {
141141

142142
/* Map of used lanes */
143143
ucp_lane_map_t lane_map;
144+
145+
/* Selections count */
146+
unsigned selections;
144147
} ucp_proto_query_attr_t;
145148

146149

src/ucp/proto/proto_common.inl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,7 @@ ucp_proto_request_set_stage(ucp_request_t *req, uint8_t proto_stage)
175175
req->send.proto_stage = proto_stage;
176176

177177
/* Set pointer to progress function */
178-
if (req->send.ep->worker->context->config.progress_wrapper_enabled) {
179-
req->send.uct.func = ucp_request_progress_wrapper;
180-
} else {
181-
req->send.uct.func = proto->progress[proto_stage];
182-
}
178+
req->send.uct.func = req->send.proto_config->progress_wrapper[proto_stage];
183179
}
184180

185181
/* Select protocol for the request and initialize protocol-related fields */

src/ucp/proto/proto_debug.c

Lines changed: 69 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ struct ucp_proto_perf_node {
6464

6565
/* Protocol information table */
6666
typedef struct {
67+
char counter_str[32];
6768
char range_str[32];
6869
char desc[UCP_PROTO_DESC_STR_MAX];
6970
char config[UCP_PROTO_CONFIG_STR_MAX];
@@ -156,41 +157,84 @@ static void ucp_proto_table_row_separator(ucs_string_buffer_t *strb,
156157
}
157158

158159
static int ucp_proto_debug_is_info_enabled(ucp_context_h context,
159-
const char *select_param_str)
160+
const char *select_param_str,
161+
int show_used)
160162
{
161163
const char *proto_info_config = context->config.ext.proto_info;
162164
int bool_value;
163165

166+
if (show_used) {
167+
return context->config.trace_used_proto_selections;
168+
}
169+
164170
if (ucs_config_sscanf_bool(proto_info_config, &bool_value, NULL)) {
165171
return bool_value;
166172
}
167173

168174
return fnmatch(proto_info_config, select_param_str, FNM_CASEFOLD) == 0;
169175
}
170176

171-
void ucp_proto_select_elem_info(ucp_worker_h worker,
172-
ucp_worker_cfg_index_t ep_cfg_index,
173-
ucp_worker_cfg_index_t rkey_cfg_index,
174-
const ucp_proto_select_param_t *select_param,
175-
const ucp_proto_select_elem_t *select_elem,
176-
int show_all, ucs_string_buffer_t *strb)
177+
static inline int
178+
ucp_proto_select_elem_has_selections(const ucp_proto_select_elem_t *select_elem)
179+
{
180+
const ucp_proto_threshold_elem_t *thresh_elem = select_elem->thresholds;
181+
182+
do {
183+
if (thresh_elem->proto_config.selections > 0) {
184+
return 1;
185+
}
186+
} while ((thresh_elem++)->max_msg_length < SIZE_MAX);
187+
188+
return 0;
189+
}
190+
191+
static void
192+
ucp_proto_selections_dump(const ucp_proto_query_attr_t *proto_attr,
193+
char *counter_str, size_t size, int show_used)
194+
{
195+
unsigned max_selections;
196+
197+
if (!show_used) {
198+
*counter_str = '\0';
199+
return;
200+
}
201+
202+
max_selections = (strstr(proto_attr->desc, "short") != NULL) ?
203+
1 : UCP_PROTO_SELECTIONS_COUNT_MAX;
204+
205+
ucs_snprintf_safe(counter_str, size, "%u%s ", proto_attr->selections,
206+
(proto_attr->selections >= max_selections ? "+" : ""));
207+
}
208+
209+
void
210+
ucp_proto_select_elem_info(ucp_worker_h worker,
211+
ucp_worker_cfg_index_t ep_cfg_index,
212+
ucp_worker_cfg_index_t rkey_cfg_index,
213+
const ucp_proto_select_param_t *select_param,
214+
const ucp_proto_select_elem_t *select_elem,
215+
int show_all, int show_used, ucs_string_buffer_t *strb)
177216
{
178217
UCS_STRING_BUFFER_ONSTACK(ep_cfg_strb, UCP_PROTO_CONFIG_STR_MAX);
179218
UCS_STRING_BUFFER_ONSTACK(sel_param_strb, UCP_PROTO_CONFIG_STR_MAX);
180-
static const char *info_row_fmt = "| %*s | %-*s | %-*s |\n";
219+
static const char *info_row_fmt = "| %s%*s | %-*s | %-*s |\n";
181220
ucp_proto_info_table_t table;
182221
int hdr_col_width[2], col_width[3];
183222
ucp_proto_query_attr_t proto_attr;
184223
ucp_proto_info_row_t *row_elem;
185224
size_t range_start, range_end;
186225
int proto_valid;
187226

227+
if (show_used && !ucp_proto_select_elem_has_selections(select_elem)) {
228+
return;
229+
}
230+
188231
ucp_proto_select_param_dump(worker, ep_cfg_index, rkey_cfg_index,
189232
select_param, ucp_operation_descs, &ep_cfg_strb,
190233
&sel_param_strb);
191234
if (!show_all &&
192235
!ucp_proto_debug_is_info_enabled(
193-
worker->context, ucs_string_buffer_cstr(&sel_param_strb))) {
236+
worker->context, ucs_string_buffer_cstr(&sel_param_strb),
237+
show_used)) {
194238
return;
195239
}
196240

@@ -221,7 +265,11 @@ void ucp_proto_select_elem_info(ucp_worker_h worker,
221265
ucs_memunits_range_str(range_start, range_end, row_elem->range_str,
222266
sizeof(row_elem->range_str));
223267

224-
col_width[0] = ucs_max(col_width[0], strlen(row_elem->range_str));
268+
ucp_proto_selections_dump(&proto_attr, row_elem->counter_str,
269+
sizeof(row_elem->counter_str), show_used);
270+
271+
col_width[0] = ucs_max(col_width[0], strlen(row_elem->counter_str) +
272+
strlen(row_elem->range_str));
225273
col_width[1] = ucs_max(col_width[1], strlen(row_elem->desc));
226274
col_width[2] = ucs_max(col_width[2], strlen(row_elem->config));
227275
} while (range_end != SIZE_MAX);
@@ -243,7 +291,8 @@ void ucp_proto_select_elem_info(ucp_worker_h worker,
243291
/* Print contents */
244292
ucp_proto_table_row_separator(strb, col_width, 3);
245293
ucs_array_for_each(row_elem, &table) {
246-
ucs_string_buffer_appendf(strb, info_row_fmt, col_width[0],
294+
ucs_string_buffer_appendf(strb, info_row_fmt, row_elem->counter_str,
295+
col_width[0] - strlen(row_elem->counter_str),
247296
row_elem->range_str, col_width[1],
248297
row_elem->desc, col_width[2],
249298
row_elem->config);
@@ -264,7 +313,7 @@ void ucp_proto_select_info(ucp_worker_h worker,
264313

265314
kh_foreach(proto_select->hash, key.u64, select_elem,
266315
ucp_proto_select_elem_info(worker, ep_cfg_index, rkey_cfg_index,
267-
&key.param, &select_elem, show_all,
316+
&key.param, &select_elem, show_all, 0,
268317
strb);
269318
ucs_string_buffer_appendf(strb, "\n"))
270319
}
@@ -966,7 +1015,7 @@ ucp_proto_select_write_info(ucp_worker_h worker,
9661015
ucp_operation_names, &ep_cfg_strb,
9671016
&sel_param_strb);
9681017
if (!ucp_proto_debug_is_info_enabled(
969-
worker->context, ucs_string_buffer_cstr(&sel_param_strb))) {
1018+
worker->context, ucs_string_buffer_cstr(&sel_param_strb), 0)) {
9701019
goto out;
9711020
}
9721021

@@ -1031,17 +1080,19 @@ ucp_proto_select_write_info(ucp_worker_h worker,
10311080
}
10321081

10331082
void ucp_proto_select_elem_trace(ucp_worker_h worker,
1034-
ucp_worker_cfg_index_t ep_cfg_index,
1035-
ucp_worker_cfg_index_t rkey_cfg_index,
10361083
const ucp_proto_select_param_t *select_param,
1037-
ucp_proto_select_elem_t *select_elem)
1084+
const ucp_proto_select_elem_t *select_elem,
1085+
int show_used)
10381086
{
1039-
ucs_string_buffer_t strb = UCS_STRING_BUFFER_INITIALIZER;
1087+
const ucp_proto_config_t *proto_config = &select_elem->thresholds[0].proto_config;
1088+
ucp_worker_cfg_index_t ep_cfg_index = proto_config->ep_cfg_index;
1089+
ucp_worker_cfg_index_t rkey_cfg_index = proto_config->rkey_cfg_index;
1090+
ucs_string_buffer_t strb = UCS_STRING_BUFFER_INITIALIZER;
10401091
char *line;
10411092

10421093
/* Print human-readable protocol selection table to the log */
10431094
ucp_proto_select_elem_info(worker, ep_cfg_index, rkey_cfg_index,
1044-
select_param, select_elem, 0, &strb);
1095+
select_param, select_elem, 0, show_used, &strb);
10451096
ucs_string_buffer_for_each_token(line, &strb, "\n") {
10461097
ucs_log_print_compact(line);
10471098
}

src/ucp/proto/proto_debug.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,14 @@ void ucp_proto_select_elem_info(ucp_worker_h worker,
157157
ucp_worker_cfg_index_t rkey_cfg_index,
158158
const ucp_proto_select_param_t *select_param,
159159
const ucp_proto_select_elem_t *select_elem,
160-
int show_all, ucs_string_buffer_t *strb);
160+
int show_all, int show_used,
161+
ucs_string_buffer_t *strb);
161162

162163

163164
void ucp_proto_select_elem_trace(ucp_worker_h worker,
164-
ucp_worker_cfg_index_t ep_cfg_index,
165-
ucp_worker_cfg_index_t rkey_cfg_index,
166165
const ucp_proto_select_param_t *select_param,
167-
ucp_proto_select_elem_t *select_elem);
166+
const ucp_proto_select_elem_t *select_elem,
167+
int show_used);
168168

169169

170170
void ucp_proto_select_write_info(

src/ucp/proto/proto_reconfig.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ static ucs_status_t ucp_proto_reconfig_select_progress(uct_pending_req_t *self)
3030
return UCS_ERR_NO_RESOURCE;
3131
}
3232

33+
/* coverity[address_free] */
3334
return req->send.uct.func(&req->send.uct);
3435
}
3536

0 commit comments

Comments
 (0)