From 04670379e7fcbf1b2ac7b363ce9ff4f6c03ec354 Mon Sep 17 00:00:00 2001 From: Shachar Hasson Date: Wed, 5 Nov 2025 12:47:54 +0000 Subject: [PATCH 1/4] UCT/CUDA: Add support for flush_ep --- src/uct/cuda/base/cuda_iface.c | 122 ++++++++++++++++++++++- src/uct/cuda/base/cuda_iface.h | 21 ++++ src/uct/cuda/cuda_copy/cuda_copy_iface.c | 2 +- src/uct/cuda/cuda_ipc/cuda_ipc_iface.c | 2 +- 4 files changed, 143 insertions(+), 4 deletions(-) diff --git a/src/uct/cuda/base/cuda_iface.c b/src/uct/cuda/base/cuda_iface.c index 678c753a830..59f002d2ef4 100644 --- a/src/uct/cuda/base/cuda_iface.c +++ b/src/uct/cuda/base/cuda_iface.c @@ -173,6 +173,96 @@ ucs_status_t uct_cuda_base_iface_event_fd_arm(uct_iface_h tl_iface, return UCS_OK; } +static void uct_cuda_base_stream_flushed_cb(uct_completion_t *self) +{ + uct_cuda_flush_stream_desc_t *desc = + ucs_container_of(self, uct_cuda_flush_stream_desc_t, comp); + + if (--desc->flush_desc->stream_counter == 0) { + uct_invoke_completion(desc->flush_desc->comp, UCS_OK); + ucs_free(desc->flush_desc); + } +} + +/* Flush is done by enqueueing flush events on all active streams and wait + * for them to finish. On each flush event completion, we decrement a shared + * counter and once it hits zero, flush is completed. */ +ucs_status_t +uct_cuda_base_ep_flush(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp) +{ + uct_base_ep_t *ep = ucs_derived_of(tl_ep, uct_base_ep_t); + uct_cuda_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_cuda_iface_t); + uct_cuda_flush_desc_t *flush_desc; + uct_cuda_flush_stream_desc_t *flush_stream_desc; + uct_cuda_queue_desc_t *q_desc; + uct_cuda_event_desc_t *event_desc; + ucs_queue_head_t *event_queue; + ucs_queue_iter_t iter; + unsigned stream_index; + + if (ucs_queue_is_empty(&iface->active_queue)) { + UCT_TL_EP_STAT_FLUSH(ep); + return UCS_OK; + } + + if (comp == NULL) { + goto out; + } + + /* Allocate base flush descriptor */ + flush_desc = ucs_malloc(sizeof(*flush_desc), "cuda_flush_desc"); + if (flush_desc == NULL) { + return UCS_ERR_NO_MEMORY; + } + + flush_desc->comp = comp; + flush_desc->stream_counter = 0; + + /* For each active stream, init a flush event and enqueue it on the + * stream */ + ucs_queue_for_each(q_desc, &iface->active_queue, queue) { + flush_stream_desc = ucs_mpool_get(&iface->flush_mpool); + if (flush_stream_desc == NULL) { + goto error; + } + + flush_stream_desc->flush_desc = flush_desc; + flush_stream_desc->comp.func = uct_cuda_base_stream_flushed_cb; + flush_stream_desc->comp.count = 1; + flush_stream_desc->super.comp = &flush_stream_desc->comp; + ucs_queue_push(&q_desc->event_queue, &flush_stream_desc->super.queue); + flush_desc->stream_counter++; + } + +out: + UCT_TL_EP_STAT_FLUSH_WAIT(ep); + return UCS_INPROGRESS; + +error: + /* Rollback enqueued items in case of error */ + for (iter = ucs_queue_iter_begin(&iface->active_queue), stream_index = 0; + stream_index < flush_desc->stream_counter; + iter = ucs_queue_iter_next(iter), ++stream_index) { + event_queue = &ucs_queue_iter_elem(q_desc, iter, queue)->event_queue; + event_desc = ucs_queue_tail_elem_non_empty(event_queue, + uct_cuda_event_desc_t, + queue); + + ucs_queue_remove(event_queue, &event_desc->queue); + ucs_mpool_put((uct_cuda_flush_stream_desc_t*)event_desc); + } + + ucs_free(flush_desc); + return UCS_ERR_NO_MEMORY; +} + +static UCS_F_ALWAYS_INLINE int +uct_cuda_base_event_is_flush(uct_cuda_event_desc_t *event) +{ + return (event->comp != NULL) && + (event->comp->func == uct_cuda_base_stream_flushed_cb); +} + static UCS_F_ALWAYS_INLINE unsigned uct_cuda_base_progress_event_queue(uct_cuda_iface_t *iface, ucs_queue_head_t *queue_head, @@ -183,13 +273,18 @@ uct_cuda_base_progress_event_queue(uct_cuda_iface_t *iface, ucs_queue_for_each_extract(cuda_event, queue_head, queue, (count < max_events) && - (cuEventQuery(cuda_event->event) == CUDA_SUCCESS)) { + (ucs_likely(uct_cuda_base_event_is_flush( + cuda_event)) || + (cuEventQuery(cuda_event->event) == CUDA_SUCCESS))) { ucs_trace_data("cuda event %p completed", cuda_event); if (cuda_event->comp != NULL) { uct_invoke_completion(cuda_event->comp, UCS_OK); } - iface->ops->complete_event(&iface->super.super, cuda_event); + if (ucs_likely(!uct_cuda_base_event_is_flush(cuda_event))) { + iface->ops->complete_event(&iface->super.super, cuda_event); + } + ucs_mpool_put(cuda_event); count++; } @@ -352,12 +447,23 @@ static void uct_cuda_base_ctx_rsc_destroy(uct_cuda_iface_t *iface, iface->ops->destroy_rsc(&iface->super.super, ctx_rsc); } +static ucs_mpool_ops_t uct_cuda_flush_desc_mpool_ops = { + .chunk_alloc = ucs_mpool_chunk_malloc, + .chunk_release = ucs_mpool_chunk_free, + .obj_init = NULL, + .obj_cleanup = NULL, + .obj_str = NULL +}; + UCS_CLASS_INIT_FUNC(uct_cuda_iface_t, uct_iface_ops_t *tl_ops, uct_iface_internal_ops_t *ops, uct_md_h md, uct_worker_h worker, const uct_iface_params_t *params, const uct_iface_config_t *tl_config, const char *dev_name) { + ucs_mpool_params_t mp_params; + ucs_status_t status; + UCS_CLASS_CALL_SUPER_INIT(uct_base_iface_t, tl_ops, ops, md, worker, params, tl_config UCS_STATS_ARG(params->stats_root) UCS_STATS_ARG(dev_name)); @@ -365,6 +471,17 @@ UCS_CLASS_INIT_FUNC(uct_cuda_iface_t, uct_iface_ops_t *tl_ops, self->eventfd = UCS_ASYNC_EVENTFD_INVALID_FD; kh_init_inplace(cuda_ctx_rscs, &self->ctx_rscs); ucs_queue_head_init(&self->active_queue); + + ucs_mpool_params_reset(&mp_params); + mp_params.elem_size = sizeof(uct_cuda_flush_stream_desc_t); + mp_params.ops = &uct_cuda_flush_desc_mpool_ops; + mp_params.name = "cuda_flush_descriptors"; + + status = ucs_mpool_init(&mp_params, &self->flush_mpool); + if (status != UCS_OK) { + return status; + } + return UCS_OK; } @@ -381,6 +498,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_cuda_iface_t) kh_destroy_inplace(cuda_ctx_rscs, &self->ctx_rscs); ucs_async_eventfd_destroy(self->eventfd); + ucs_mpool_cleanup(&self->flush_mpool, 1); } UCS_CLASS_DEFINE(uct_cuda_iface_t, uct_base_iface_t); diff --git a/src/uct/cuda/base/cuda_iface.h b/src/uct/cuda/base/cuda_iface.h index ff0a2efea86..71407482a10 100644 --- a/src/uct/cuda/base/cuda_iface.h +++ b/src/uct/cuda/base/cuda_iface.h @@ -95,6 +95,23 @@ typedef struct { } uct_cuda_event_desc_t; +/* Base flush descriptor */ +typedef struct { + /* How many streams are currently active */ + uint32_t stream_counter; + uct_completion_t *comp; +} uct_cuda_flush_desc_t; + + +/* Stream Flush descriptor */ +typedef struct { + uct_cuda_event_desc_t super; + /* Pointer to base flush descriptor */ + uct_cuda_flush_desc_t *flush_desc; + uct_completion_t comp; +} uct_cuda_flush_stream_desc_t; + + typedef struct { /* CUDA context handle */ CUcontext ctx; @@ -130,6 +147,8 @@ typedef struct { /* list of queues which require progress */ ucs_queue_head_t active_queue; uct_cuda_iface_ops_t *ops; + /* Pool for flush events */ + ucs_mpool_t flush_mpool; struct { unsigned max_events; @@ -148,6 +167,8 @@ unsigned uct_cuda_base_iface_progress(uct_iface_h tl_iface); ucs_status_t uct_cuda_base_iface_flush(uct_iface_h tl_iface, unsigned flags, uct_completion_t *comp); +ucs_status_t +uct_cuda_base_ep_flush(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp); ucs_status_t uct_cuda_base_query_devices_common( uct_md_h md, uct_device_type_t dev_type, diff --git a/src/uct/cuda/cuda_copy/cuda_copy_iface.c b/src/uct/cuda/cuda_copy/cuda_copy_iface.c index f2c4bcb99d1..e2d8bd51b30 100644 --- a/src/uct/cuda/cuda_copy/cuda_copy_iface.c +++ b/src/uct/cuda/cuda_copy/cuda_copy_iface.c @@ -158,7 +158,7 @@ static uct_iface_ops_t uct_cuda_copy_iface_ops = { .ep_put_zcopy = uct_cuda_copy_ep_put_zcopy, .ep_pending_add = (uct_ep_pending_add_func_t)ucs_empty_function_return_busy, .ep_pending_purge = (uct_ep_pending_purge_func_t)ucs_empty_function, - .ep_flush = uct_base_ep_flush, + .ep_flush = uct_cuda_base_ep_flush, .ep_fence = uct_base_ep_fence, .ep_create = UCS_CLASS_NEW_FUNC_NAME(uct_cuda_copy_ep_t), .ep_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_cuda_copy_ep_t), diff --git a/src/uct/cuda/cuda_ipc/cuda_ipc_iface.c b/src/uct/cuda/cuda_ipc/cuda_ipc_iface.c index 116f3791f35..75a4e1af786 100644 --- a/src/uct/cuda/cuda_ipc/cuda_ipc_iface.c +++ b/src/uct/cuda/cuda_ipc/cuda_ipc_iface.c @@ -336,7 +336,7 @@ static uct_iface_ops_t uct_cuda_ipc_iface_ops = { .ep_put_zcopy = uct_cuda_ipc_ep_put_zcopy, .ep_pending_add = (uct_ep_pending_add_func_t)ucs_empty_function_return_busy, .ep_pending_purge = (uct_ep_pending_purge_func_t)ucs_empty_function, - .ep_flush = uct_base_ep_flush, + .ep_flush = uct_cuda_base_ep_flush, .ep_fence = uct_base_ep_fence, .ep_check = (uct_ep_check_func_t)ucs_empty_function_return_unsupported, .ep_create = UCS_CLASS_NEW_FUNC_NAME(uct_cuda_ipc_ep_t), From bbcc1d8b5630b0570963881faccf5069a142ae9d Mon Sep 17 00:00:00 2001 From: Shachar Hasson Date: Wed, 5 Nov 2025 13:35:01 +0000 Subject: [PATCH 2/4] UCT/CUDA: minor fix --- src/uct/cuda/base/cuda_iface.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/uct/cuda/base/cuda_iface.c b/src/uct/cuda/base/cuda_iface.c index 59f002d2ef4..7b32e85e605 100644 --- a/src/uct/cuda/base/cuda_iface.c +++ b/src/uct/cuda/base/cuda_iface.c @@ -190,8 +190,9 @@ static void uct_cuda_base_stream_flushed_cb(uct_completion_t *self) ucs_status_t uct_cuda_base_ep_flush(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp) { - uct_base_ep_t *ep = ucs_derived_of(tl_ep, uct_base_ep_t); - uct_cuda_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_cuda_iface_t); + uct_base_ep_t UCS_V_UNUSED *ep = ucs_derived_of(tl_ep, uct_base_ep_t); + uct_cuda_iface_t *iface = ucs_derived_of(tl_ep->iface, + uct_cuda_iface_t); uct_cuda_flush_desc_t *flush_desc; uct_cuda_flush_stream_desc_t *flush_stream_desc; uct_cuda_queue_desc_t *q_desc; From e3d088d4b7b381bb9bab22f811640ce85417622e Mon Sep 17 00:00:00 2001 From: Shachar Hasson Date: Wed, 5 Nov 2025 17:27:32 +0000 Subject: [PATCH 3/4] UCT/CUDA: minor fix --- src/uct/cuda/base/cuda_iface.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/uct/cuda/base/cuda_iface.c b/src/uct/cuda/base/cuda_iface.c index 7b32e85e605..8199e4c5890 100644 --- a/src/uct/cuda/base/cuda_iface.c +++ b/src/uct/cuda/base/cuda_iface.c @@ -274,7 +274,7 @@ uct_cuda_base_progress_event_queue(uct_cuda_iface_t *iface, ucs_queue_for_each_extract(cuda_event, queue_head, queue, (count < max_events) && - (ucs_likely(uct_cuda_base_event_is_flush( + (ucs_unlikely(uct_cuda_base_event_is_flush( cuda_event)) || (cuEventQuery(cuda_event->event) == CUDA_SUCCESS))) { ucs_trace_data("cuda event %p completed", cuda_event); From b8728e4be50c9cfa9d8428d4e38fa29a69020f34 Mon Sep 17 00:00:00 2001 From: Shachar Hasson Date: Wed, 12 Nov 2025 10:49:18 +0000 Subject: [PATCH 4/4] UCT/CUDA_IPC: Enforce host memory support for mem_type EP --- src/ucp/core/ucp_ep.c | 11 ++++++++++- src/uct/cuda/cuda_ipc/cuda_ipc_iface.c | 10 ---------- src/uct/cuda/cuda_ipc/cuda_ipc_iface.h | 1 - test/gtest/ucp/test_ucp_device.cc | 1 - test/gtest/ucp/test_ucp_peer_failure.cc | 2 +- test/gtest/uct/test_uct_iface.cc | 23 ++--------------------- 6 files changed, 13 insertions(+), 35 deletions(-) diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 3562dfd1e6b..a6b50caf86e 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -684,12 +684,18 @@ ucs_status_t ucp_worker_mem_type_eps_create(ucp_worker_h worker) ucs_status_t status; void *address_buffer; size_t address_length; - ucp_tl_bitmap_t mem_access_tls; + ucp_tl_bitmap_t mem_access_tls, host_mem_access_tls; char ep_name[UCP_WORKER_ADDRESS_NAME_MAX]; unsigned addr_indices[UCP_MAX_LANES]; + ucp_lane_index_t num_lanes; ucs_memory_type_for_each(mem_type) { ucp_context_memaccess_tl_bitmap(context, mem_type, 0, &mem_access_tls); + /* Mem type EP requires host memory support */ + ucp_context_memaccess_tl_bitmap(context, UCS_MEMORY_TYPE_HOST, 0, + &host_mem_access_tls); + UCS_STATIC_BITMAP_AND_INPLACE(&mem_access_tls, host_mem_access_tls); + if (UCP_MEM_IS_HOST(mem_type) || UCS_STATIC_BITMAP_IS_ZERO(mem_access_tls)) { continue; @@ -725,6 +731,9 @@ ucs_status_t ucp_worker_mem_type_eps_create(ucp_worker_h worker) goto err_free_address_list; } + /* Mem type EP cannot have more than one lane */ + num_lanes = ucp_ep_num_lanes(worker->mem_type_ep[mem_type]); + ucs_assertv_always(num_lanes == 1, "num_lanes=%u", num_lanes); UCS_ASYNC_UNBLOCK(&worker->async); ucs_free(local_address.address_list); diff --git a/src/uct/cuda/cuda_ipc/cuda_ipc_iface.c b/src/uct/cuda/cuda_ipc/cuda_ipc_iface.c index 75a4e1af786..0985e40e7cc 100644 --- a/src/uct/cuda/cuda_ipc/cuda_ipc_iface.c +++ b/src/uct/cuda/cuda_ipc/cuda_ipc_iface.c @@ -74,10 +74,6 @@ static ucs_config_field_t uct_cuda_ipc_iface_config_table[] = { "Estimated CPU overhead for transferring GPU memory", ucs_offsetof(uct_cuda_ipc_iface_config_t, params.overhead), UCS_CONFIG_TYPE_TIME}, - {"ENABLE_SAME_PROCESS", "n", - "Enable same process same device communication for cuda_ipc", - ucs_offsetof(uct_cuda_ipc_iface_config_t, params.enable_same_process), UCS_CONFIG_TYPE_BOOL}, - {NULL} }; @@ -146,12 +142,6 @@ uct_cuda_ipc_iface_is_reachable_v2(const uct_iface_h tl_iface, dev_addr = (const uct_cuda_ipc_device_addr_t *)params->device_addr; same_uuid = (ucs_get_system_id() == dev_addr->system_uuid); - if ((getpid() == *(pid_t*)params->iface_addr) && same_uuid && - !iface->config.enable_same_process) { - uct_iface_fill_info_str_buf(params, "same process"); - return 0; - } - if (same_uuid || uct_cuda_ipc_iface_mnnvl_supported(md, dev_addr, dev_addr_len)) { return uct_iface_scope_is_reachable(tl_iface, params); diff --git a/src/uct/cuda/cuda_ipc/cuda_ipc_iface.h b/src/uct/cuda/cuda_ipc/cuda_ipc_iface.h index c749fcff9e0..663cece255f 100644 --- a/src/uct/cuda/cuda_ipc/cuda_ipc_iface.h +++ b/src/uct/cuda/cuda_ipc/cuda_ipc_iface.h @@ -28,7 +28,6 @@ typedef struct { double bandwidth; /* estimated bandwidth */ double latency; /* estimated latency */ double overhead; /* estimated CPU overhead */ - int enable_same_process; /* enable cuda_ipc for same pid same device */ } uct_cuda_ipc_iface_config_params_t; diff --git a/test/gtest/ucp/test_ucp_device.cc b/test/gtest/ucp/test_ucp_device.cc index 6359ff3d3db..31587daab3b 100644 --- a/test/gtest/ucp/test_ucp_device.cc +++ b/test/gtest/ucp/test_ucp_device.cc @@ -78,7 +78,6 @@ void test_ucp_device::get_test_variants(std::vector &variants) void test_ucp_device::init() { - m_env.push_back(new ucs::scoped_setenv("UCX_CUDA_IPC_ENABLE_SAME_PROCESS", "y")); m_env.push_back(new ucs::scoped_setenv("UCX_IB_GDA_MAX_SYS_LATENCY", "1us")); ucp_test::init(); sender().connect(&receiver(), get_ep_params()); diff --git a/test/gtest/ucp/test_ucp_peer_failure.cc b/test/gtest/ucp/test_ucp_peer_failure.cc index 80d459568aa..f01d1ddaea7 100644 --- a/test/gtest/ucp/test_ucp_peer_failure.cc +++ b/test/gtest/ucp/test_ucp_peer_failure.cc @@ -987,7 +987,7 @@ UCS_TEST_P(test_ucp_peer_failure_rndv_put_ppln_abort, rtr_mtype) } UCS_TEST_P(test_ucp_peer_failure_rndv_put_ppln_abort, pipeline, - "RNDV_FRAG_SIZE=host:8K") + "RNDV_FRAG_SIZE=host:8K,cuda:8K") { rndv_progress_failure_test(rndv_mode::put_ppln, true); } diff --git a/test/gtest/uct/test_uct_iface.cc b/test/gtest/uct/test_uct_iface.cc index 233f0864d88..f6a0578ab9d 100644 --- a/test/gtest/uct/test_uct_iface.cc +++ b/test/gtest/uct/test_uct_iface.cc @@ -24,11 +24,6 @@ class test_uct_iface : public uct_test { } void test_is_reachable(); - - virtual bool is_self_reachable() const - { - return true; - } }; void test_uct_iface::test_is_reachable() @@ -63,7 +58,7 @@ void test_uct_iface::test_is_reachable() ASSERT_UCS_OK(status); bool is_reachable = uct_iface_is_reachable_v2(iface, ¶ms); - EXPECT_EQ(is_self_reachable(), is_reachable); + EXPECT_TRUE(is_reachable); // Allocate corrupted address buffers, make it larger than the correct // buffer size in case the corrupted data indicates a larger address length @@ -98,18 +93,4 @@ UCS_TEST_P(test_uct_iface, is_reachable) } UCT_INSTANTIATE_TEST_CASE(test_uct_iface) - -class test_uct_iface_self_unreachable : public test_uct_iface { -protected: - bool is_self_reachable() const override - { - return false; - } -}; - -UCS_TEST_P(test_uct_iface_self_unreachable, is_reachable) -{ - test_is_reachable(); -} - -UCT_INSTANTIATE_CUDA_IPC_TEST_CASE(test_uct_iface_self_unreachable) +UCT_INSTANTIATE_CUDA_IPC_TEST_CASE(test_uct_iface)