Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 43 additions & 32 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ static int ucp_ep_shall_use_indirect_id(ucp_context_h context,
return !(ep_init_flags & UCP_EP_INIT_FLAG_INTERNAL) &&
((context->config.ext.proto_indirect_id == UCS_CONFIG_ON) ||
((context->config.ext.proto_indirect_id == UCS_CONFIG_AUTO) &&
(ep_init_flags & UCP_EP_INIT_ERR_MODE_PEER_FAILURE)));
(ep_init_flags & UCP_EP_INIT_ERR_MODE_FAILOVER_MASK)));
}

void ucp_ep_peer_mem_destroy(ucp_context_h context,
Expand Down Expand Up @@ -573,8 +573,13 @@ void ucp_ep_release_id(ucp_ep_h ep)
void ucp_ep_config_key_set_err_mode(ucp_ep_config_key_t *key,
unsigned ep_init_flags)
{
key->err_mode = (ep_init_flags & UCP_EP_INIT_ERR_MODE_PEER_FAILURE) ?
UCP_ERR_HANDLING_MODE_PEER : UCP_ERR_HANDLING_MODE_NONE;
if (ep_init_flags & UCP_EP_INIT_ERR_MODE_FAILOVER) {
key->err_mode = UCP_ERR_HANDLING_MODE_FAILOVER;
} else if (ep_init_flags & UCP_EP_INIT_ERR_MODE_PEER_FAILURE) {
key->err_mode = UCP_ERR_HANDLING_MODE_PEER;
} else {
key->err_mode = UCP_ERR_HANDLING_MODE_NONE;
}
}

void ucp_ep_config_key_init_flags(ucp_ep_config_key_t *key,
Expand Down Expand Up @@ -790,7 +795,7 @@ static ucs_status_t ucp_ep_init_create_wireup(ucp_ep_h ep,
if (ucp_ep_init_flags_has_cm(ep_init_flags)) {
key.cm_lane = 0;
/* Send keepalive on wireup_ep (which will send on aux_ep) */
if (ep_init_flags & UCP_EP_INIT_ERR_MODE_PEER_FAILURE) {
if (ep_init_flags & UCP_EP_INIT_ERR_MODE_FAILOVER_MASK) {
key.keepalive_lane = 0;
}
} else {
Expand Down Expand Up @@ -931,8 +936,7 @@ ucp_sa_data_v1_unpack(const ucp_wireup_sockaddr_data_base_t *sa_data,
return UCS_ERR_UNSUPPORTED;
}

*ep_init_flags_p = (sa_data->header == UCP_ERR_HANDLING_MODE_PEER) ?
UCP_EP_INIT_ERR_MODE_PEER_FAILURE : 0;
*ep_init_flags_p = ucp_ep_err_mode_init_flags(sa_data->header);
*worker_addr_p = sa_data_v1 + 1;
return UCS_OK;
}
Expand All @@ -942,8 +946,15 @@ ucp_sa_data_v2_unpack(const ucp_wireup_sockaddr_data_base_t *sa_data,
unsigned *ep_init_flags_p,
const void** worker_addr_p)
{
*ep_init_flags_p = (sa_data->header & UCP_SA_DATA_FLAG_ERR_MODE_PEER) ?
UCP_EP_INIT_ERR_MODE_PEER_FAILURE : 0;
if (sa_data->header & UCP_SA_DATA_FLAG_ERR_MODE_FAILOVER) {
ucs_assert(sa_data->header & UCP_SA_DATA_FLAG_ERR_MODE_PEER);
*ep_init_flags_p = UCP_EP_INIT_ERR_MODE_FAILOVER_MASK;
} else if (sa_data->header & UCP_SA_DATA_FLAG_ERR_MODE_PEER) {
*ep_init_flags_p = UCP_EP_INIT_ERR_MODE_PEER_FAILURE;
} else {
*ep_init_flags_p = 0;
}

*worker_addr_p = sa_data + 1;
return UCS_OK;
}
Expand Down Expand Up @@ -1142,7 +1153,7 @@ ucp_ep_create_api_to_worker_addr(ucp_worker_h worker,
out_resolve_remote_id:
if ((context->config.ext.resolve_remote_ep_id == UCS_CONFIG_ON) ||
((context->config.ext.resolve_remote_ep_id == UCS_CONFIG_AUTO) &&
(ep_init_flags & UCP_EP_INIT_ERR_MODE_PEER_FAILURE) &&
(ep_init_flags & UCP_EP_INIT_ERR_MODE_FAILOVER_MASK) &&
ucp_worker_keepalive_is_enabled(worker))) {
/* If resolving remote ID forced by configuration or PEER_FAILURE
* and keepalive were requested, resolve remote endpoint ID prior to
Expand All @@ -1169,11 +1180,7 @@ ucp_ep_create_api_to_worker_addr(ucp_worker_h worker,
static void ucp_ep_params_check_err_handling(ucp_ep_h ep,
const ucp_ep_params_t *params)
{
ucp_err_handling_mode_t err_mode =
UCP_PARAM_VALUE(EP, params, err_mode, ERR_HANDLING_MODE,
UCP_ERR_HANDLING_MODE_NONE);

if (err_mode == UCP_ERR_HANDLING_MODE_NONE) {
if (ucp_ep_params_err_handling_mode(params) == UCP_ERR_HANDLING_MODE_NONE) {
return;
}

Expand All @@ -1193,13 +1200,6 @@ ucs_status_t ucp_ep_create(ucp_worker_h worker, const ucp_ep_params_t *params,
unsigned flags = UCP_PARAM_VALUE(EP, params, flags, FLAGS, 0);
ucs_status_t status;

/* TODO: Implement failover error handling mode */
if (UCP_PARAM_VALUE(EP, params, err_mode, ERR_HANDLING_MODE,
UCP_ERR_HANDLING_MODE_NONE) == UCP_ERR_HANDLING_MODE_FAILOVER) {
ucs_error("failover error handling mode is not implemented");
return UCS_ERR_NOT_IMPLEMENTED;
}

UCS_ASYNC_BLOCK(&worker->async);

if (flags & UCP_EP_PARAMS_FLAGS_CLIENT_SERVER) {
Expand Down Expand Up @@ -1413,10 +1413,9 @@ static void ucp_ep_failed_destroy(uct_ep_h ep)

static void ucp_ep_discard_lanes(ucp_ep_h ep, ucs_status_t discard_status)
{
unsigned ep_flush_flags = (ucp_ep_config(ep)->key.err_mode ==
UCP_ERR_HANDLING_MODE_NONE) ?
UCT_FLUSH_FLAG_LOCAL :
UCT_FLUSH_FLAG_CANCEL;
unsigned ep_flush_flags = ucp_ep_config_err_handling_enabled(ep) ?
UCT_FLUSH_FLAG_CANCEL :
UCT_FLUSH_FLAG_LOCAL;
uct_ep_h uct_eps[UCP_MAX_LANES] = { NULL };
ucp_ep_discard_lanes_arg_t *discard_arg;
ucs_status_t status;
Expand Down Expand Up @@ -1475,7 +1474,6 @@ ucp_ep_set_failed(ucp_ep_h ucp_ep, ucp_lane_index_t lane, ucs_status_t status)
{
UCS_STRING_BUFFER_ONSTACK(lane_info_strb, 64);
ucp_ep_ext_t *ep_ext = ucp_ep->ext;
ucp_err_handling_mode_t err_mode;
ucs_log_level_t log_level;
ucp_request_t *close_req;

Expand Down Expand Up @@ -1516,10 +1514,8 @@ ucp_ep_set_failed(ucp_ep_h ucp_ep, ucp_lane_index_t lane, ucs_status_t status)
} else if (ep_ext->err_cb == NULL) {
/* Print error if user requested error handling support but did not
install a valid error handling callback */
err_mode = ucp_ep_config(ucp_ep)->key.err_mode;
log_level = (err_mode == UCP_ERR_HANDLING_MODE_NONE) ?
UCS_LOG_LEVEL_DIAG :
UCS_LOG_LEVEL_ERROR;
log_level = ucp_ep_config_err_handling_enabled(ucp_ep) ?
UCS_LOG_LEVEL_ERROR : UCS_LOG_LEVEL_DIAG;

ucp_ep_get_lane_info_str(ucp_ep, lane, &lane_info_strb);
ucs_log(log_level,
Expand Down Expand Up @@ -1755,7 +1751,7 @@ ucs_status_ptr_t ucp_ep_close_nbx(ucp_ep_h ep, const ucp_request_param_t *param)
ucp_request_t *close_req;

if ((ucp_request_param_flags(param) & UCP_EP_CLOSE_FLAG_FORCE) &&
(ucp_ep_config(ep)->key.err_mode != UCP_ERR_HANDLING_MODE_PEER)) {
!ucp_ep_config_err_handling_enabled(ep)) {
return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM);
}

Expand Down Expand Up @@ -2683,7 +2679,8 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,
config->md_index[lane] = context->tl_rscs[rsc_index].md_index;
if (ucp_ep_config_connect_p2p(worker, &config->key, rsc_index)) {
config->p2p_lanes |= UCS_BIT(lane);
} else if (config->key.err_mode == UCP_ERR_HANDLING_MODE_PEER) {
} else if ((config->key.err_mode == UCP_ERR_HANDLING_MODE_PEER) ||
(config->key.err_mode == UCP_ERR_HANDLING_MODE_FAILOVER)) {
config->uct_rkey_pack_flags |= UCT_MD_MKEY_PACK_FLAG_INVALIDATE_RMA;
}

Expand Down Expand Up @@ -3969,3 +3966,17 @@ void ucp_ep_set_cfg_index(ucp_ep_h ep, ucp_worker_cfg_index_t cfg_index)
ucp_ep_config_activate_worker_ifaces(ep->worker, cfg_index);
ucp_ep_config_proto_init(ep->worker, cfg_index);
}

unsigned ucp_ep_err_mode_init_flags(ucp_err_handling_mode_t err_mode)
{
switch (err_mode) {
case UCP_ERR_HANDLING_MODE_NONE:
return 0;
case UCP_ERR_HANDLING_MODE_PEER:
return UCP_EP_INIT_ERR_MODE_PEER_FAILURE;
case UCP_ERR_HANDLING_MODE_FAILOVER:
return UCP_EP_INIT_ERR_MODE_FAILOVER_MASK;
default:
ucs_fatal("invalid error handling mode: %d", err_mode);
}
}
25 changes: 22 additions & 3 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,16 @@ enum {
UCP_EP_INIT_CREATE_AM_LANE_ONLY = UCS_BIT(8), /**< Endpoint requires an AM lane only */
UCP_EP_INIT_KA_FROM_EXIST_LANES = UCS_BIT(9), /**< Use only existing lanes to create
keepalive lane */
UCP_EP_INIT_ALLOW_AM_AUX_TL = UCS_BIT(10) /**< Endpoint allows selecting of auxiliary
UCP_EP_INIT_ALLOW_AM_AUX_TL = UCS_BIT(10), /**< Endpoint allows selecting of auxiliary
transports for AM lane */
UCP_EP_INIT_ERR_MODE_FAILOVER = UCS_BIT(11), /**< Endpoint requires an
@ref UCP_ERR_HANDLING_MODE_FAILOVER */

/**
* For consistency with @ref UCP_SA_DATA_MASK_ERR_MODE_FAILOVER
*/
UCP_EP_INIT_ERR_MODE_FAILOVER_MASK = UCP_EP_INIT_ERR_MODE_PEER_FAILURE |
UCP_EP_INIT_ERR_MODE_FAILOVER
};


Expand Down Expand Up @@ -617,8 +625,19 @@ enum {
* ucp_wireup_sockaddr_data_base_t structure.
*/
enum {
/* Indicates support of UCP_ERR_HANDLING_MODE_PEER error mode. */
UCP_SA_DATA_FLAG_ERR_MODE_PEER = UCS_BIT(0)
/* Indicates support of @ref UCP_ERR_HANDLING_MODE_PEER error mode. */
UCP_SA_DATA_FLAG_ERR_MODE_PEER = UCS_BIT(0),
/* Indicates support of @ref UCP_ERR_HANDLING_MODE_FAILOVER error mode.
* NOTE: use @ref UCP_SA_DATA_MASK_ERR_MODE_FAILOVER for backward
* compatibility to fallback peer failure mode to
* @ref UCP_ERR_HANDLING_MODE_PEER */
UCP_SA_DATA_FLAG_ERR_MODE_FAILOVER = UCS_BIT(1),

/**
* Backward compatibility mask
*/
UCP_SA_DATA_MASK_ERR_MODE_FAILOVER = UCP_SA_DATA_FLAG_ERR_MODE_PEER |
UCP_SA_DATA_FLAG_ERR_MODE_FAILOVER
};


Expand Down
13 changes: 13 additions & 0 deletions src/ucp/core/ucp_ep.inl
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,17 @@ ucp_ep_config_err_mode_eq(ucp_ep_h ep, ucp_err_handling_mode_t err_mode)
return ucp_ep_config(ep)->key.err_mode == err_mode;
}

static UCS_F_ALWAYS_INLINE ucp_err_handling_mode_t
ucp_ep_params_err_handling_mode(const ucp_ep_params_t *params)
{
return UCP_PARAM_VALUE(EP, params, err_mode, ERR_HANDLING_MODE,
UCP_ERR_HANDLING_MODE_NONE);
}

static UCS_F_ALWAYS_INLINE int ucp_ep_config_err_handling_enabled(ucp_ep_h ep)
{
return ucp_ep_config_err_mode_eq(ep, UCP_ERR_HANDLING_MODE_PEER) ||
ucp_ep_config_err_mode_eq(ep, UCP_ERR_HANDLING_MODE_FAILOVER);
}

#endif
5 changes: 3 additions & 2 deletions src/ucp/core/ucp_ep_vfs.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ static const ucp_ep_vfs_attr_t ucp_ep_vfs_attrs[] = {
};

static const char *ucp_err_handling_mode_names[] = {
[UCP_ERR_HANDLING_MODE_NONE] = "none",
[UCP_ERR_HANDLING_MODE_PEER] = "peer"
[UCP_ERR_HANDLING_MODE_NONE] = "none",
[UCP_ERR_HANDLING_MODE_PEER] = "peer",
[UCP_ERR_HANDLING_MODE_FAILOVER] = "failover"
};

static void ucp_ep_vfs_read_peer_name(void *obj, ucs_string_buffer_t *strb,
Expand Down
9 changes: 4 additions & 5 deletions src/ucp/core/ucp_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -397,14 +397,13 @@ static ucp_md_map_t ucp_request_get_invalidation_map(ucp_ep_h ep)

int ucp_request_memh_invalidate(ucp_request_t *req, ucs_status_t status)
{
ucp_ep_h ep = req->send.ep;
ucp_err_handling_mode_t err_mode = ucp_ep_config(ep)->key.err_mode;
ucp_worker_h worker = ep->worker;
ucp_context_h context = worker->context;
ucp_ep_h ep = req->send.ep;
ucp_worker_h worker = ep->worker;
ucp_context_h context = worker->context;
ucp_mem_h *memh_p;
ucp_md_map_t invalidate_map;

if ((err_mode != UCP_ERR_HANDLING_MODE_PEER) ||
if (ucp_ep_config_err_mode_eq(ep, UCP_ERR_HANDLING_MODE_NONE) ||
!(req->flags & UCP_REQUEST_FLAG_RKEY_INUSE)) {
return 0;
}
Expand Down
3 changes: 1 addition & 2 deletions src/ucp/proto/proto_common.inl
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,7 @@ ucp_proto_request_pack_rkey(ucp_request_t *req, ucp_md_map_t md_map,
/* Since global VA registration doesn't support invalidation yet, and error
* handling is enabled on this EP, we replace GVA registrations with
* regular ones */
if (ucp_ep_config_err_mode_eq(req->send.ep,
UCP_ERR_HANDLING_MODE_PEER) &&
if (ucp_ep_config_err_handling_enabled(req->send.ep) &&
ucs_unlikely(memh->flags & UCP_MEMH_FLAG_HAS_AUTO_GVA)) {
ucp_memh_disable_gva(memh, md_map);
}
Expand Down
5 changes: 2 additions & 3 deletions src/ucp/rma/flush.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ ucp_ep_flush_request_update_uct_comp(ucp_request_t *req, int diff,
static void ucp_ep_flush_error(ucp_request_t *req, ucp_lane_index_t lane,
ucs_status_t status)
{
ucs_log_level_t level = (ucp_ep_config(req->send.ep)->key.err_mode ==
UCP_ERR_HANDLING_MODE_PEER) ?
UCS_LOG_LEVEL_TRACE_REQ : UCS_LOG_LEVEL_ERROR;
ucs_log_level_t level = ucp_ep_config_err_handling_enabled(req->send.ep) ?
UCS_LOG_LEVEL_TRACE_REQ : UCS_LOG_LEVEL_ERROR;

ucs_assertv(lane != UCP_NULL_LANE, "req=%p ep=%p lane=%d status=%s",
req, req->send.ep, lane, ucs_status_string(status));
Expand Down
21 changes: 2 additions & 19 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -579,13 +579,6 @@ void ucp_wireup_remote_connected(ucp_ep_h ep)
ucs_assert(ep->flags & UCP_EP_FLAG_REMOTE_ID);
}

static UCS_F_ALWAYS_INLINE unsigned
ucp_ep_err_mode_init_flags(ucp_err_handling_mode_t err_mode)
{
return (err_mode == UCP_ERR_HANDLING_MODE_PEER) ?
UCP_EP_INIT_ERR_MODE_PEER_FAILURE : 0;
}

static UCS_F_NOINLINE void
ucp_wireup_process_pre_request(ucp_worker_h worker, ucp_ep_h ep,
const ucp_wireup_msg_t *msg,
Expand Down Expand Up @@ -2228,13 +2221,6 @@ static void ucp_wireup_msg_dump(ucp_worker_h worker, uct_am_trace_type_t type,
ucs_free(unpacked_address.address_list);
}

static ucp_err_handling_mode_t
ucp_ep_params_err_handling_mode(const ucp_ep_params_t *params)
{
return (params->field_mask & UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE) ?
params->err_mode : UCP_ERR_HANDLING_MODE_NONE;
}

static unsigned
ucp_cm_ep_init_flags(const ucp_ep_params_t *params)
{
Expand All @@ -2259,11 +2245,8 @@ unsigned ucp_ep_init_flags(const ucp_worker_h worker,
flags |= UCP_EP_INIT_CREATE_AM_LANE;
}

if (ucp_ep_params_err_handling_mode(params) == UCP_ERR_HANDLING_MODE_PEER) {
flags |= UCP_EP_INIT_ERR_MODE_PEER_FAILURE;
}

return flags;
return flags |
ucp_ep_err_mode_init_flags(ucp_ep_params_err_handling_mode(params));
}

double ucp_wireup_iface_lat_distance_v1(const ucp_worker_iface_t *wiface)
Expand Down
11 changes: 11 additions & 0 deletions src/ucp/wireup/wireup.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,17 @@ void ucp_wireup_remote_connected(ucp_ep_h ep);
unsigned ucp_ep_init_flags(const ucp_worker_h worker,
const ucp_ep_params_t *params);


/**
* @brief Convert error handling mode to endpoint initialization flags.
*
* @param [in] err_mode Error handling mode.
*
* @return Endpoint initialization flags.
*/
unsigned ucp_ep_err_mode_init_flags(ucp_err_handling_mode_t err_mode);


int ucp_wireup_connect_p2p(ucp_worker_h worker, ucp_rsc_index_t rsc_index,
int has_cm_lane);

Expand Down
4 changes: 3 additions & 1 deletion src/ucp/wireup/wireup_cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,10 @@ ucp_cm_ep_sa_data_pack(ucp_ep_h ep, ucp_wireup_sockaddr_data_base_t *sa_data,
"sa_data version: %u", sa_data_version);
sa_data->header = UCP_OBJECT_VERSION_V2 <<
UCP_SA_DATA_HEADER_VERSION_SHIFT;
if (ucp_ep_config(ep)->key.err_mode == UCP_ERR_HANDLING_MODE_PEER) {
if (ucp_ep_config_err_mode_eq(ep, UCP_ERR_HANDLING_MODE_PEER)) {
sa_data->header |= UCP_SA_DATA_FLAG_ERR_MODE_PEER;
} else if (ucp_ep_config_err_mode_eq(ep, UCP_ERR_HANDLING_MODE_FAILOVER)) {
sa_data->header |= UCP_SA_DATA_MASK_ERR_MODE_FAILOVER;
}

return sa_data + 1;
Expand Down
Loading