Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -684,12 +684,18 @@ ucs_status_t ucp_worker_mem_type_eps_create(ucp_worker_h worker)
ucs_status_t status;
void *address_buffer;
size_t address_length;
ucp_tl_bitmap_t mem_access_tls;
ucp_tl_bitmap_t mem_access_tls, host_mem_access_tls;
char ep_name[UCP_WORKER_ADDRESS_NAME_MAX];
unsigned addr_indices[UCP_MAX_LANES];
ucp_lane_index_t num_lanes;

ucs_memory_type_for_each(mem_type) {
ucp_context_memaccess_tl_bitmap(context, mem_type, 0, &mem_access_tls);
/* Mem type EP requires host memory support */
ucp_context_memaccess_tl_bitmap(context, UCS_MEMORY_TYPE_HOST, 0,
&host_mem_access_tls);
UCS_STATIC_BITMAP_AND_INPLACE(&mem_access_tls, host_mem_access_tls);

if (UCP_MEM_IS_HOST(mem_type) ||
UCS_STATIC_BITMAP_IS_ZERO(mem_access_tls)) {
continue;
Expand Down Expand Up @@ -725,6 +731,9 @@ ucs_status_t ucp_worker_mem_type_eps_create(ucp_worker_h worker)
goto err_free_address_list;
}

/* Mem type EP cannot have more than one lane */
num_lanes = ucp_ep_num_lanes(worker->mem_type_ep[mem_type]);
ucs_assertv_always(num_lanes == 1, "num_lanes=%u", num_lanes);
UCS_ASYNC_UNBLOCK(&worker->async);

ucs_free(local_address.address_list);
Expand Down
123 changes: 121 additions & 2 deletions src/uct/cuda/base/cuda_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,97 @@ ucs_status_t uct_cuda_base_iface_event_fd_arm(uct_iface_h tl_iface,
return UCS_OK;
}

static void uct_cuda_base_stream_flushed_cb(uct_completion_t *self)
{
uct_cuda_flush_stream_desc_t *desc =
ucs_container_of(self, uct_cuda_flush_stream_desc_t, comp);

if (--desc->flush_desc->stream_counter == 0) {
uct_invoke_completion(desc->flush_desc->comp, UCS_OK);
ucs_free(desc->flush_desc);
}
}

/* Flush is done by enqueueing flush events on all active streams and wait
* for them to finish. On each flush event completion, we decrement a shared
* counter and once it hits zero, flush is completed. */
ucs_status_t
uct_cuda_base_ep_flush(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp)
{
uct_base_ep_t UCS_V_UNUSED *ep = ucs_derived_of(tl_ep, uct_base_ep_t);
uct_cuda_iface_t *iface = ucs_derived_of(tl_ep->iface,
uct_cuda_iface_t);
uct_cuda_flush_desc_t *flush_desc;
uct_cuda_flush_stream_desc_t *flush_stream_desc;
uct_cuda_queue_desc_t *q_desc;
uct_cuda_event_desc_t *event_desc;
ucs_queue_head_t *event_queue;
ucs_queue_iter_t iter;
unsigned stream_index;

if (ucs_queue_is_empty(&iface->active_queue)) {
UCT_TL_EP_STAT_FLUSH(ep);
return UCS_OK;
}

if (comp == NULL) {
goto out;
}

/* Allocate base flush descriptor */
flush_desc = ucs_malloc(sizeof(*flush_desc), "cuda_flush_desc");
if (flush_desc == NULL) {
return UCS_ERR_NO_MEMORY;
}

flush_desc->comp = comp;
flush_desc->stream_counter = 0;

/* For each active stream, init a flush event and enqueue it on the
* stream */
ucs_queue_for_each(q_desc, &iface->active_queue, queue) {
flush_stream_desc = ucs_mpool_get(&iface->flush_mpool);
if (flush_stream_desc == NULL) {
goto error;
}

flush_stream_desc->flush_desc = flush_desc;
flush_stream_desc->comp.func = uct_cuda_base_stream_flushed_cb;
flush_stream_desc->comp.count = 1;
flush_stream_desc->super.comp = &flush_stream_desc->comp;
ucs_queue_push(&q_desc->event_queue, &flush_stream_desc->super.queue);
flush_desc->stream_counter++;
}

out:
UCT_TL_EP_STAT_FLUSH_WAIT(ep);
return UCS_INPROGRESS;

error:
/* Rollback enqueued items in case of error */
for (iter = ucs_queue_iter_begin(&iface->active_queue), stream_index = 0;
stream_index < flush_desc->stream_counter;
iter = ucs_queue_iter_next(iter), ++stream_index) {
event_queue = &ucs_queue_iter_elem(q_desc, iter, queue)->event_queue;
event_desc = ucs_queue_tail_elem_non_empty(event_queue,
uct_cuda_event_desc_t,
queue);

ucs_queue_remove(event_queue, &event_desc->queue);
ucs_mpool_put((uct_cuda_flush_stream_desc_t*)event_desc);
}

ucs_free(flush_desc);
return UCS_ERR_NO_MEMORY;
}

static UCS_F_ALWAYS_INLINE int
uct_cuda_base_event_is_flush(uct_cuda_event_desc_t *event)
{
return (event->comp != NULL) &&
(event->comp->func == uct_cuda_base_stream_flushed_cb);
}

static UCS_F_ALWAYS_INLINE unsigned
uct_cuda_base_progress_event_queue(uct_cuda_iface_t *iface,
ucs_queue_head_t *queue_head,
Expand All @@ -183,13 +274,18 @@ uct_cuda_base_progress_event_queue(uct_cuda_iface_t *iface,

ucs_queue_for_each_extract(cuda_event, queue_head, queue,
(count < max_events) &&
(cuEventQuery(cuda_event->event) == CUDA_SUCCESS)) {
(ucs_unlikely(uct_cuda_base_event_is_flush(
cuda_event)) ||
(cuEventQuery(cuda_event->event) == CUDA_SUCCESS))) {
ucs_trace_data("cuda event %p completed", cuda_event);
if (cuda_event->comp != NULL) {
uct_invoke_completion(cuda_event->comp, UCS_OK);
}

iface->ops->complete_event(&iface->super.super, cuda_event);
if (ucs_likely(!uct_cuda_base_event_is_flush(cuda_event))) {
iface->ops->complete_event(&iface->super.super, cuda_event);
}

ucs_mpool_put(cuda_event);
count++;
}
Expand Down Expand Up @@ -352,19 +448,41 @@ static void uct_cuda_base_ctx_rsc_destroy(uct_cuda_iface_t *iface,
iface->ops->destroy_rsc(&iface->super.super, ctx_rsc);
}

static ucs_mpool_ops_t uct_cuda_flush_desc_mpool_ops = {
.chunk_alloc = ucs_mpool_chunk_malloc,
.chunk_release = ucs_mpool_chunk_free,
.obj_init = NULL,
.obj_cleanup = NULL,
.obj_str = NULL
};

UCS_CLASS_INIT_FUNC(uct_cuda_iface_t, uct_iface_ops_t *tl_ops,
uct_iface_internal_ops_t *ops, uct_md_h md,
uct_worker_h worker, const uct_iface_params_t *params,
const uct_iface_config_t *tl_config,
const char *dev_name)
{
ucs_mpool_params_t mp_params;
ucs_status_t status;

UCS_CLASS_CALL_SUPER_INIT(uct_base_iface_t, tl_ops, ops, md, worker, params,
tl_config UCS_STATS_ARG(params->stats_root)
UCS_STATS_ARG(dev_name));

self->eventfd = UCS_ASYNC_EVENTFD_INVALID_FD;
kh_init_inplace(cuda_ctx_rscs, &self->ctx_rscs);
ucs_queue_head_init(&self->active_queue);

ucs_mpool_params_reset(&mp_params);
mp_params.elem_size = sizeof(uct_cuda_flush_stream_desc_t);
mp_params.ops = &uct_cuda_flush_desc_mpool_ops;
mp_params.name = "cuda_flush_descriptors";

status = ucs_mpool_init(&mp_params, &self->flush_mpool);
if (status != UCS_OK) {
return status;
}

return UCS_OK;
}

Expand All @@ -381,6 +499,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_cuda_iface_t)

kh_destroy_inplace(cuda_ctx_rscs, &self->ctx_rscs);
ucs_async_eventfd_destroy(self->eventfd);
ucs_mpool_cleanup(&self->flush_mpool, 1);
}

UCS_CLASS_DEFINE(uct_cuda_iface_t, uct_base_iface_t);
21 changes: 21 additions & 0 deletions src/uct/cuda/base/cuda_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,23 @@ typedef struct {
} uct_cuda_event_desc_t;


/* Base flush descriptor */
typedef struct {
/* How many streams are currently active */
uint32_t stream_counter;
uct_completion_t *comp;
} uct_cuda_flush_desc_t;


/* Stream Flush descriptor */
typedef struct {
uct_cuda_event_desc_t super;
/* Pointer to base flush descriptor */
uct_cuda_flush_desc_t *flush_desc;
uct_completion_t comp;
} uct_cuda_flush_stream_desc_t;


typedef struct {
/* CUDA context handle */
CUcontext ctx;
Expand Down Expand Up @@ -130,6 +147,8 @@ typedef struct {
/* list of queues which require progress */
ucs_queue_head_t active_queue;
uct_cuda_iface_ops_t *ops;
/* Pool for flush events */
ucs_mpool_t flush_mpool;

struct {
unsigned max_events;
Expand All @@ -148,6 +167,8 @@ unsigned uct_cuda_base_iface_progress(uct_iface_h tl_iface);
ucs_status_t uct_cuda_base_iface_flush(uct_iface_h tl_iface, unsigned flags,
uct_completion_t *comp);

ucs_status_t
uct_cuda_base_ep_flush(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp);
ucs_status_t
uct_cuda_base_query_devices_common(
uct_md_h md, uct_device_type_t dev_type,
Expand Down
2 changes: 1 addition & 1 deletion src/uct/cuda/cuda_copy/cuda_copy_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ static uct_iface_ops_t uct_cuda_copy_iface_ops = {
.ep_put_zcopy = uct_cuda_copy_ep_put_zcopy,
.ep_pending_add = (uct_ep_pending_add_func_t)ucs_empty_function_return_busy,
.ep_pending_purge = (uct_ep_pending_purge_func_t)ucs_empty_function,
.ep_flush = uct_base_ep_flush,
.ep_flush = uct_cuda_base_ep_flush,
.ep_fence = uct_base_ep_fence,
.ep_create = UCS_CLASS_NEW_FUNC_NAME(uct_cuda_copy_ep_t),
.ep_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_cuda_copy_ep_t),
Expand Down
12 changes: 1 addition & 11 deletions src/uct/cuda/cuda_ipc/cuda_ipc_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@ static ucs_config_field_t uct_cuda_ipc_iface_config_table[] = {
"Estimated CPU overhead for transferring GPU memory",
ucs_offsetof(uct_cuda_ipc_iface_config_t, params.overhead), UCS_CONFIG_TYPE_TIME},

{"ENABLE_SAME_PROCESS", "n",
"Enable same process same device communication for cuda_ipc",
ucs_offsetof(uct_cuda_ipc_iface_config_t, params.enable_same_process), UCS_CONFIG_TYPE_BOOL},

{NULL}
};

Expand Down Expand Up @@ -146,12 +142,6 @@ uct_cuda_ipc_iface_is_reachable_v2(const uct_iface_h tl_iface,
dev_addr = (const uct_cuda_ipc_device_addr_t *)params->device_addr;
same_uuid = (ucs_get_system_id() == dev_addr->system_uuid);

if ((getpid() == *(pid_t*)params->iface_addr) && same_uuid &&
!iface->config.enable_same_process) {
uct_iface_fill_info_str_buf(params, "same process");
return 0;
}

if (same_uuid ||
uct_cuda_ipc_iface_mnnvl_supported(md, dev_addr, dev_addr_len)) {
return uct_iface_scope_is_reachable(tl_iface, params);
Expand Down Expand Up @@ -336,7 +326,7 @@ static uct_iface_ops_t uct_cuda_ipc_iface_ops = {
.ep_put_zcopy = uct_cuda_ipc_ep_put_zcopy,
.ep_pending_add = (uct_ep_pending_add_func_t)ucs_empty_function_return_busy,
.ep_pending_purge = (uct_ep_pending_purge_func_t)ucs_empty_function,
.ep_flush = uct_base_ep_flush,
.ep_flush = uct_cuda_base_ep_flush,
.ep_fence = uct_base_ep_fence,
.ep_check = (uct_ep_check_func_t)ucs_empty_function_return_unsupported,
.ep_create = UCS_CLASS_NEW_FUNC_NAME(uct_cuda_ipc_ep_t),
Expand Down
1 change: 0 additions & 1 deletion src/uct/cuda/cuda_ipc/cuda_ipc_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ typedef struct {
double bandwidth; /* estimated bandwidth */
double latency; /* estimated latency */
double overhead; /* estimated CPU overhead */
int enable_same_process; /* enable cuda_ipc for same pid same device */
} uct_cuda_ipc_iface_config_params_t;


Expand Down
1 change: 0 additions & 1 deletion test/gtest/ucp/test_ucp_device.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ void test_ucp_device::get_test_variants(std::vector<ucp_test_variant> &variants)

void test_ucp_device::init()
{
m_env.push_back(new ucs::scoped_setenv("UCX_CUDA_IPC_ENABLE_SAME_PROCESS", "y"));
m_env.push_back(new ucs::scoped_setenv("UCX_IB_GDA_MAX_SYS_LATENCY", "1us"));
ucp_test::init();
sender().connect(&receiver(), get_ep_params());
Expand Down
2 changes: 1 addition & 1 deletion test/gtest/ucp/test_ucp_peer_failure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ UCS_TEST_P(test_ucp_peer_failure_rndv_put_ppln_abort, rtr_mtype)
}

UCS_TEST_P(test_ucp_peer_failure_rndv_put_ppln_abort, pipeline,
"RNDV_FRAG_SIZE=host:8K")
"RNDV_FRAG_SIZE=host:8K,cuda:8K")
{
rndv_progress_failure_test(rndv_mode::put_ppln, true);
}
Expand Down
23 changes: 2 additions & 21 deletions test/gtest/uct/test_uct_iface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ class test_uct_iface : public uct_test {
}

void test_is_reachable();

virtual bool is_self_reachable() const
{
return true;
}
};

void test_uct_iface::test_is_reachable()
Expand Down Expand Up @@ -63,7 +58,7 @@ void test_uct_iface::test_is_reachable()
ASSERT_UCS_OK(status);

bool is_reachable = uct_iface_is_reachable_v2(iface, &params);
EXPECT_EQ(is_self_reachable(), is_reachable);
EXPECT_TRUE(is_reachable);

// Allocate corrupted address buffers, make it larger than the correct
// buffer size in case the corrupted data indicates a larger address length
Expand Down Expand Up @@ -98,18 +93,4 @@ UCS_TEST_P(test_uct_iface, is_reachable)
}

UCT_INSTANTIATE_TEST_CASE(test_uct_iface)

class test_uct_iface_self_unreachable : public test_uct_iface {
protected:
bool is_self_reachable() const override
{
return false;
}
};

UCS_TEST_P(test_uct_iface_self_unreachable, is_reachable)
{
test_is_reachable();
}

UCT_INSTANTIATE_CUDA_IPC_TEST_CASE(test_uct_iface_self_unreachable)
UCT_INSTANTIATE_CUDA_IPC_TEST_CASE(test_uct_iface)
Loading