diff --git a/man/fi_cxi.7.md b/man/fi_cxi.7.md index 28723c2013f..3f424e132f4 100644 --- a/man/fi_cxi.7.md +++ b/man/fi_cxi.7.md @@ -1314,6 +1314,25 @@ The CXI provider checks for the following environment variables: : Enable enforcement of triggered operation limit. Doing this can prevent fi_control(FI_QUEUE_WORK) deadlocking at the cost of performance. +*FI_CXI_ENABLE_WRITEDATA* +: Controls provider support for the fi_writedata() and fi_inject_writedata() RMA + operations. When enabled and the domain attribute cq_data_size is non-zero, + the CXI provider implements handling to generate solicited RMA completions that + include immediate data; completions will include FI_REMOTE_CQ_DATA and will + report source information when FI_SOURCE is enabled (FI_SOURCE_ERR behavior is + followed on resolution failures). + + Note that the CXI_RX_CQ_DATA capability is not required and writedata RMA + operations do not consume posted receive buffers on the target. The feature + is gated by domain/endpoint capabilities (for example, a non-zero + domain_attr->cq_data_size in the libfabric API) and endpoint support. Internally, + the combination of domain and endpoint cq_data_size sets rma_cq_data_size. Only + provider MR keys are supported. + + This option is disabled by default; enable it only when applications require + immediate-data delivery on RMA completions or for controlled testing and + debugging. + *FI_CXI_MR_CACHE_EVENTS_DISABLE_POLL_NSECS* : Max amount of time to poll when disabling an MR configured with MR match events. diff --git a/prov/cxi/Makefile.include b/prov/cxi/Makefile.include index bcde13bab03..a86322084a5 100644 --- a/prov/cxi/Makefile.include +++ b/prov/cxi/Makefile.include @@ -130,7 +130,8 @@ nodist_prov_cxi_test_cxitest_SOURCES = \ prov/cxi/test/fork.c \ prov/cxi/test/mem_reg.c \ prov/cxi/test/nic.c \ - prov/cxi/test/mr_cache.c + prov/cxi/test/mr_cache.c \ + prov/cxi/test/writedata.c prov_cxi_test_cxitest_CPPFLAGS = $(AM_CPPFLAGS) $(cxi_CPPFLAGS) \ $(cxitest_CPPFLAGS) $(PTHREAD_CFLAGS) diff --git a/prov/cxi/include/cxip.h b/prov/cxi/include/cxip.h index abacb5289a5..3157c3360b1 100644 --- a/prov/cxi/include/cxip.h +++ b/prov/cxi/include/cxip.h @@ -178,7 +178,7 @@ #define CXIP_DEFAULT_RX_SIZE 1024U #define CXIP_MAJOR_VERSION 0 -#define CXIP_MINOR_VERSION 1 +#define CXIP_MINOR_VERSION 2 #define CXIP_PROV_VERSION FI_VERSION(CXIP_MAJOR_VERSION, \ CXIP_MINOR_VERSION) #define CXIP_FI_VERSION FI_VERSION(2, 4) @@ -358,6 +358,7 @@ struct cxip_environment { int force_dev_reg_copy; enum cxip_mr_target_ordering mr_target_ordering; int disable_cuda_sync_memops; + int enable_writedata; }; extern struct cxip_environment cxip_env; @@ -521,7 +522,8 @@ struct cxip_mr_key { * it repeated. */ uint64_t id : 16; /* Unique - 64K MR */ - uint64_t seqnum : 44; /* Sequence with random seed */ + uint64_t seqnum : 43; /* Sequence with random seed */ + uint64_t sol_event : 1; /* For FI_WRITEDATA dual entry */ uint64_t events : 1; /* Requires event generation */ uint64_t unused3: 2; uint64_t is_prov: 1; @@ -699,7 +701,24 @@ union cxip_match_bits { uint64_t raw; }; #define CXIP_IS_PROV_MR_KEY_BIT (1ULL << 63) -#define CXIP_KEY_MATCH_BITS(key) ((key) & ~CXIP_IS_PROV_MR_KEY_BIT) +#define CXIP_SOL_NUM_MR_KEY_BIT (1ULL << 59) +#define CXIP_EVENTS_MR_KEY_BIT (1ULL << 60) /* 'events' field - request comm event generation */ +#define CXIP_KEY_MATCH_BITS(key) ((key) & ~(CXIP_IS_PROV_MR_KEY_BIT | CXIP_SOL_NUM_MR_KEY_BIT)) + +static inline uint64_t cxip_key_set_writedata(uint64_t key) +{ + struct cxip_mr_key cxip_key = { .raw = key }; + + /* Provider keys only: non-cached provider keys support writedata. + * Set sol_event (bit 59) for writedata LE match and events (bit 60) + * for target comm event generation. + */ + if (cxip_key.is_prov && !cxip_key.cached) { + /* is_prov bit (63) masked out, preserving 60:59. */ + return key | CXIP_SOL_NUM_MR_KEY_BIT | CXIP_EVENTS_MR_KEY_BIT; + } + return key; +} /* libcxi Wrapper Structures */ @@ -908,6 +927,20 @@ struct cxip_domain { uint32_t tclass; + /* CQ data sizes for remote CQ data support: + * - msg_cq_data_size: for messaging operations (FI_REMOTE_CQ_DATA in msg ops) + * - rma_cq_data_size: for RMA writedata operations (fi_writedata/fi_inject_writedata) + * These are set separately to allow messaging to use remote CQ data without + * forcing RMA to enable writedata support. + */ + size_t msg_cq_data_size; + size_t rma_cq_data_size; + + /* Legacy cq_data_size field - now derived from msg_cq_data_size and rma_cq_data_size. + * Set to non-zero if either messaging or RMA supports remote CQ data. + */ + size_t cq_data_size; + struct cxip_eq *eq; //unused struct cxip_eq *mr_eq; //unused @@ -2698,9 +2731,10 @@ struct cxip_mr { struct fi_mr_attr attr; // attributes struct cxip_cntr *cntr; // if bound to cntr - /* Indicates if FI_RMA_EVENT was specified at creation and - * will be used to enable fi_writedata() and fi_inject_writedata() - * support for this MR (TODO). + /* Indicates if FI_RMA_EVENT was specified at creation. + * This enables remote counter events for this MR. + * Note: fi_writedata() support is controlled by domain->rma_cq_data_size, + * not by FI_RMA_EVENT or this flag. */ bool rma_events; @@ -2720,9 +2754,12 @@ struct cxip_mr { struct cxip_mr_util_ops *mr_util; bool enabled; struct cxip_pte *pte; + struct cxip_pte *writedata_pte; // Second PTE for FI_WRITEDATA dual entry enum cxip_mr_state mr_state; + enum cxip_mr_state writedata_mr_state; // State for writedata PTE int64_t mr_id; // Non-cached provider key uniqueness struct cxip_ctrl_req req; + struct cxip_ctrl_req writedata_req; // Control req for writedata PTE bool optimized; void *buf; // memory buffer VA diff --git a/prov/cxi/src/cxip_dom.c b/prov/cxi/src/cxip_dom.c index a73ecacd427..e3e0ea1348e 100644 --- a/prov/cxi/src/cxip_dom.c +++ b/prov/cxi/src/cxip_dom.c @@ -389,8 +389,11 @@ int cxip_domain_prov_mr_id_alloc(struct cxip_domain *dom, */ key.events = mr->count_events || mr->rma_events || mr->cntr; - key.opt = dom->optimized_mrs && - key.id < CXIP_PTL_IDX_PROV_MR_OPT_CNT; + /* Force unoptimized keys for RMA events (fi_writedata support). + * Optimized MRs do not support header_data delivery in target events. + */ + key.opt = mr->rma_events || mr->domain->rma_cq_data_size ? false : + (dom->optimized_mrs && key.id < CXIP_PTL_IDX_PROV_MR_OPT_CNT); mr->key = key.raw; ofi_spin_unlock(&dom->ctrl_id_lock); @@ -2005,6 +2008,32 @@ int cxip_domain(struct fid_fabric *fabric, struct fi_info *info, cxi_domain->tclass = FI_TC_BEST_EFFORT; } + /* Initialize CQ data sizes for messaging and RMA separately. + * Both default to info->domain_attr->cq_data_size initially, but can be + * controlled independently. This allows messaging to use remote CQ data + * without forcing RMA writedata support. + * + * msg_cq_data_size: for FI_REMOTE_CQ_DATA in messaging operations + * rma_cq_data_size: for fi_writedata/fi_inject_writedata operations + */ + cxi_domain->msg_cq_data_size = info->domain_attr->cq_data_size; + + if (cxip_env.enable_writedata && info->domain_attr->cq_data_size) { + if (cxi_domain->util_domain.mr_mode & FI_MR_PROV_KEY) { + cxi_domain->rma_cq_data_size = info->domain_attr->cq_data_size; + } else { + CXIP_WARN("FI_MR_PROV_KEY required for RMA CQ data\n"); + cxi_domain->rma_cq_data_size = 0; + } + } else { + cxi_domain->rma_cq_data_size = 0; + } + + /* Legacy cq_data_size: non-zero if either msg or RMA supports CQ data */ + cxi_domain->cq_data_size = cxi_domain->msg_cq_data_size || + cxi_domain->rma_cq_data_size ? + info->domain_attr->cq_data_size : 0; + cxi_domain->av_user_id = !!(cxi_domain->util_domain.info_domain_caps & FI_AV_USER_ID); cxi_domain->auth_key_entry_max = info->domain_attr->max_ep_auth_key; @@ -2069,6 +2098,27 @@ int cxip_domain(struct fid_fabric *fabric, struct fi_info *info, cxi_domain->rx_match_mode = cxip_env.rx_match_mode; cxi_domain->msg_offload = cxip_env.msg_offload; cxi_domain->req_buf_size = cxip_env.req_buf_size; + + /* Disable provider key caching and optimized MRs for writedata support. + * Provider keys lack space for sol_event bit in cached encoding. + * Optimized MRs don't support header_data in target events. + * Writedata is only supported with provider keys. + */ + if (cxi_domain->rma_cq_data_size) { + bool disable_cache = cxi_domain->is_prov_key && cxi_domain->prov_key_cache; + bool disable_opt = cxi_domain->optimized_mrs; + + if (disable_cache || disable_opt) { + CXIP_DBG("Disabling %s%s%s due to writedata support (rma_cq_data_size=%zu)\n", + disable_cache ? "provider key cache" : "", + (disable_cache && disable_opt) ? " and " : "", + disable_opt ? "optimized MRs" : "", + cxi_domain->rma_cq_data_size); + } + + cxi_domain->prov_key_cache = false; + cxi_domain->optimized_mrs = false; + } *dom = &cxi_domain->util_domain.domain_fid; return 0; diff --git a/prov/cxi/src/cxip_ep.c b/prov/cxi/src/cxip_ep.c index ffdcc8dd63a..60e61cd623c 100644 --- a/prov/cxi/src/cxip_ep.c +++ b/prov/cxi/src/cxip_ep.c @@ -19,19 +19,25 @@ #define CXIP_DBG(...) _CXIP_DBG(FI_LOG_EP_CTRL, __VA_ARGS__) #define CXIP_WARN(...) _CXIP_WARN(FI_LOG_EP_CTRL, __VA_ARGS__) +#define CXIP_INFO(...) _CXIP_INFO(FI_LOG_EP_CTRL, __VA_ARGS__) extern struct fi_ops_rma cxip_ep_rma_ops; extern struct fi_ops_rma cxip_ep_rma_no_ops; +extern struct fi_ops_rma cxip_ep_rma_writedata_ops; extern struct fi_ops_msg cxip_ep_msg_ops; extern struct fi_ops_msg cxip_ep_msg_no_ops; extern struct fi_ops_msg cxip_ep_msg_no_tx_ops; extern struct fi_ops_msg cxip_ep_msg_no_rx_ops; +extern struct fi_ops_msg cxip_ep_msg_ops_no_writedata; +extern struct fi_ops_msg cxip_ep_msg_no_rx_ops_no_writedata; extern struct fi_ops_tagged cxip_ep_tagged_ops; extern struct fi_ops_tagged cxip_ep_tagged_no_ops; extern struct fi_ops_tagged cxip_ep_tagged_no_tx_ops; extern struct fi_ops_tagged cxip_ep_tagged_no_rx_ops; +extern struct fi_ops_tagged cxip_ep_tagged_ops_no_writedata; +extern struct fi_ops_tagged cxip_ep_tagged_no_rx_ops_no_writedata; extern struct fi_ops_atomic cxip_ep_atomic_ops; extern struct fi_ops_atomic cxip_ep_atomic_no_ops; @@ -717,29 +723,62 @@ static int cxip_ep_enable(struct fid_ep *fid_ep) /* Enable only appropriate API functions based on primary/secondary * capabilities. Send/Receive requires FI_MSG or FI_TAGGED. + * + * For FI_TAGGED operations, check if writedata is supported. + * If tx_attr.caps includes FI_TAGGED and domain->msg_cq_data_size is non-zero, + * use ops table with fi_tagged_senddata/fi_tagged_injectdata implementations. + * Otherwise, use ops table with fi_no_tagged_senddata/fi_no_tagged_injectdata. */ if (ofi_send_allowed(ep->tx_attr.caps & ~FI_MSG) && - ofi_recv_allowed(ep->rx_attr.caps & ~FI_MSG)) - ep->ep.tagged = &cxip_ep_tagged_ops; - else if (ofi_send_allowed(ep->tx_attr.caps & ~FI_MSG)) - ep->ep.tagged = &cxip_ep_tagged_no_rx_ops; - else if (ofi_recv_allowed(ep->rx_attr.caps & ~FI_MSG)) + ofi_recv_allowed(ep->rx_attr.caps & ~FI_MSG)) { + if ((ep->tx_attr.caps & FI_TAGGED) && ep_obj->domain->msg_cq_data_size) + ep->ep.tagged = &cxip_ep_tagged_ops; + else + ep->ep.tagged = &cxip_ep_tagged_ops_no_writedata; + } else if (ofi_send_allowed(ep->tx_attr.caps & ~FI_MSG)) { + if ((ep->tx_attr.caps & FI_TAGGED) && ep_obj->domain->msg_cq_data_size) + ep->ep.tagged = &cxip_ep_tagged_no_rx_ops; + else + ep->ep.tagged = &cxip_ep_tagged_no_rx_ops_no_writedata; + } else if (ofi_recv_allowed(ep->rx_attr.caps & ~FI_MSG)) { ep->ep.tagged = &cxip_ep_tagged_no_tx_ops; + } + /* For FI_MSG operations, check if writedata is supported. + * If tx_attr.caps includes FI_MSG and domain->msg_cq_data_size is non-zero, + * use ops table with fi_senddata/fi_injectdata implementations. + * Otherwise, use ops table with fi_no_msg_senddata/fi_no_msg_injectdata. + */ if (ofi_send_allowed(ep->tx_attr.caps & ~FI_TAGGED) && - ofi_recv_allowed(ep->rx_attr.caps & ~FI_TAGGED)) - ep->ep.msg = &cxip_ep_msg_ops; - else if (ofi_send_allowed(ep->tx_attr.caps & ~FI_TAGGED)) - ep->ep.msg = &cxip_ep_msg_no_rx_ops; - else if (ofi_recv_allowed(ep->rx_attr.caps & ~FI_TAGGED)) + ofi_recv_allowed(ep->rx_attr.caps & ~FI_TAGGED)) { + if ((ep->tx_attr.caps & FI_MSG) && ep_obj->domain->msg_cq_data_size) + ep->ep.msg = &cxip_ep_msg_ops; + else + ep->ep.msg = &cxip_ep_msg_ops_no_writedata; + } else if (ofi_send_allowed(ep->tx_attr.caps & ~FI_TAGGED)) { + if ((ep->tx_attr.caps & FI_MSG) && ep_obj->domain->msg_cq_data_size) + ep->ep.msg = &cxip_ep_msg_no_rx_ops; + else + ep->ep.msg = &cxip_ep_msg_no_rx_ops_no_writedata; + } else if (ofi_recv_allowed(ep->rx_attr.caps & ~FI_TAGGED)) { ep->ep.msg = &cxip_ep_msg_no_tx_ops; + } /* Initiate requires FI_RMA or FI_ATOMIC */ if (ofi_rma_initiate_allowed(ep->tx_attr.caps & ~FI_RMA)) ep->ep.atomic = &cxip_ep_atomic_ops; - if (ofi_rma_initiate_allowed(ep->tx_attr.caps & ~FI_ATOMIC)) - ep->ep.rma = &cxip_ep_rma_ops; + if (ofi_rma_initiate_allowed(ep->tx_attr.caps & ~FI_ATOMIC)) { + /* Select RMA ops variant. Enable writedata/injectdata only if: + * - FI_RMA is present in tx_attr.caps + * - domain has non-zero rma_cq_data_size (remote CQ data supported) + */ + if ((ep->tx_attr.caps & FI_RMA) && ep_obj->domain->rma_cq_data_size) { + ep->ep.rma = &cxip_ep_rma_writedata_ops; + } else { + ep->ep.rma = &cxip_ep_rma_ops; + } + } ep_obj->enabled = true; ofi_genlock_unlock(&ep_obj->lock); @@ -1608,3 +1647,4 @@ int cxip_endpoint(struct fid_domain *domain, struct fi_info *info, return FI_SUCCESS; } + diff --git a/prov/cxi/src/cxip_info.c b/prov/cxi/src/cxip_info.c index 0a22eccda81..c632c44e338 100644 --- a/prov/cxi/src/cxip_info.c +++ b/prov/cxi/src/cxip_info.c @@ -683,6 +683,7 @@ struct cxip_environment cxip_env = { .force_dev_reg_copy = false, .mr_target_ordering = MR_ORDER_DEFAULT, .disable_cuda_sync_memops = false, + .enable_writedata = false, }; static void cxip_env_init(void) @@ -957,6 +958,12 @@ static void cxip_env_init(void) fi_param_get_bool(&cxip_prov, "mr_match_events", &cxip_env.mr_match_events); + fi_param_define(&cxip_prov, "enable_writedata", FI_PARAM_BOOL, + "Enable dual MR entries for FI_WRITEDATA support (default %d).", + cxip_env.enable_writedata); + fi_param_get_bool(&cxip_prov, "enable_writedata", + &cxip_env.enable_writedata); + fi_param_define(&cxip_prov, "prov_key_cache", FI_PARAM_BOOL, "Disable caching of FI_MR_PROV_KEY (default %lu).", &cxip_env.prov_key_cache); diff --git a/prov/cxi/src/cxip_mr.c b/prov/cxi/src/cxip_mr.c index 8e03e099f9f..2609bebbaef 100644 --- a/prov/cxi/src/cxip_mr.c +++ b/prov/cxi/src/cxip_mr.c @@ -65,6 +65,187 @@ static void cxip_ep_mr_remove(struct cxip_mr *mr) dlist_remove(&mr->ep_entry); } + +/* + * cxip_mr_handle_remote_write() - Handle remote write with immediate data. + * + * Processes PUT events containing immediate data (from fi_writedata) and + * generates completion entries to the target endpoint's receive CQ. + * + * The completion is written with FI_RMA | FI_REMOTE_WRITE | FI_REMOTE_CQ_DATA + * flags to indicate a remote RMA write with immediate data. + */ +static int cxip_mr_handle_remote_write(struct cxip_mr *mr, + const union c_event *event, + uint64_t remote_cq_data) +{ + struct cxip_ep *ep; + struct cxip_rxc *rxc; + fi_addr_t src_addr = FI_ADDR_NOTAVAIL; + uint64_t flags; + size_t len; + void *buf; + int ret; + uint32_t initiator; + uint16_t vni; + struct cxip_addr addr; + + if (!mr->ep) { + CXIP_DBG("MR not bound to endpoint, skipping completion\n"); + return FI_SUCCESS; + } + + ep = mr->ep; + rxc = ep->ep_obj->rxc; + + if (!rxc || !rxc->recv_cq) { + CXIP_DBG("No receive CQ bound, skipping completion\n"); + return FI_SUCCESS; + } + + /* Build completion entry with immediate data */ + flags = FI_RMA | FI_REMOTE_WRITE | FI_REMOTE_CQ_DATA; + len = event->tgt_long.mlength; + buf = (void *)((uintptr_t)mr->buf + event->tgt_long.start); + + /* Extract initiator and VNI from event for source address resolution */ + initiator = event->tgt_long.initiator.initiator.process; + vni = event->tgt_long.vni; + + /* Resolve source address if FI_SOURCE capability is enabled */ + if (rxc->attr.caps & FI_SOURCE) { + src_addr = cxip_recv_req_src_addr(rxc, initiator, vni, false); + + /* Generate normal completion if address resolved OR FI_SOURCE_ERR not set. + * Only generate error if BOTH address failed AND FI_SOURCE_ERR is set. + * This matches the semantic in recv_req_event_success(). + */ + if (src_addr != FI_ADDR_NOTAVAIL || + !(rxc->attr.caps & FI_SOURCE_ERR)) { + ret = ofi_peer_cq_write(&rxc->recv_cq->util_cq, + (void *)(uintptr_t)mr->mr_fid.fid.context, + flags, len, buf, remote_cq_data, 0, src_addr); + if (ret != FI_SUCCESS) + CXIP_WARN("Failed to submit remote write completion: %d\n", ret); + return ret; + } + + addr.nic = CXI_MATCH_ID_EP(rxc->pid_bits, initiator); + addr.pid = CXI_MATCH_ID_PID(rxc->pid_bits, initiator); + + src_addr = cxip_av_lookup_auth_key_fi_addr(rxc->ep_obj->av, vni); + + struct fi_cq_err_entry err_entry = {}; + err_entry.err = FI_EADDRNOTAVAIL; + err_entry.err_data = &addr; + err_entry.err_data_size = sizeof(addr); + err_entry.op_context = (void *)(uintptr_t)mr->mr_fid.fid.context; + err_entry.src_addr = src_addr; + err_entry.flags = flags; + err_entry.len = len; + err_entry.buf = buf; + + ret = ofi_peer_cq_write_error(&rxc->recv_cq->util_cq, &err_entry); + + return ret; + } + + /* FI_SOURCE not enabled - use FI_ADDR_NOTAVAIL */ + ret = ofi_peer_cq_write(&rxc->recv_cq->util_cq, + (void *)(uintptr_t)mr->mr_fid.fid.context, + flags, len, buf, remote_cq_data, 0, src_addr); + if (ret != FI_SUCCESS) { + CXIP_WARN("Failed to submit remote write completion: %d\n", ret); + return ret; + } + + CXIP_DBG("Generated remote write completion: data=0x%" PRIx64 " len=%zu\n", + remote_cq_data, len); + + return FI_SUCCESS; +} + +/* + * cxip_mr_sol_event_cb() - Callback for solicited event (writedata) LE. + * + * For standard MRs with writedata support enabled, two separate request IDs + * and callbacks are used: + * - mr->req with cxip_mr_cb() handles regular RMA operations + * - mr->writedata_req with cxip_mr_sol_event_cb() handles writedata operations + * + * This callback is invoked for the second LE that handles writedata operations + * (with sol_event match bit set). When a PUT event is received with status OK + * (C_RC_OK), the solicited event completion is processed by generating a + * completion entry with immediate data to the target endpoint's receive CQ. + */ +static int cxip_mr_sol_event_cb(struct cxip_ctrl_req *req, const union c_event *event) +{ + struct cxip_mr *mr = req->mr.mr; + int evt_rc = cxi_event_rc(event); + int ret; + + switch (event->hdr.event_type) { + case C_EVENT_ACK: + /* Ignore command completion ACKs */ + break; + + case C_EVENT_LINK: + assert(mr->writedata_mr_state == CXIP_MR_DISABLED); + + if (evt_rc == C_RC_OK) { + mr->writedata_mr_state = CXIP_MR_LINKED; + CXIP_DBG("MR writedata PTE linked: %p\n", mr); + break; + } + + mr->writedata_mr_state = CXIP_MR_LINK_ERR; + CXIP_WARN("MR writedata PTE link: %p failed %d\n", mr, evt_rc); + break; + + case C_EVENT_UNLINK: + assert(evt_rc == C_RC_OK); + assert(mr->writedata_mr_state == CXIP_MR_LINKED); + mr->writedata_mr_state = CXIP_MR_UNLINKED; + CXIP_DBG("MR writedata PTE unlinked: %p\n", mr); + break; + + case C_EVENT_PUT: + /* Count access events if MR event counting is enabled */ + if (mr->count_events) + ofi_atomic_inc32(&mr->access_events); + + /* Solicited event: process writedata completion only if status is OK */ + if (evt_rc == C_RC_OK) { + /* Extract immediate data from event header_data field */ + uint64_t imm = event->tgt_long.header_data; + + ret = cxip_mr_handle_remote_write(mr, event, imm); + if (ret != FI_SUCCESS) + CXIP_WARN("Failed to handle solicited event: %d\n", ret); + } else { + CXIP_WARN("Solicited event PUT failed: %s\n", cxi_rc_to_str(evt_rc)); + } + break; + + case C_EVENT_MATCH: + /* Count match events if MR event counting is enabled */ + if (mr->count_events) + ofi_atomic_inc32(&mr->match_events); + + /* Match events can occur on writedata LE, just track them */ + if (evt_rc != C_RC_OK) + CXIP_WARN(CXIP_UNEXPECTED_EVENT, + cxi_event_to_str(event), cxi_rc_to_str(evt_rc)); + break; + + default: + CXIP_WARN(CXIP_UNEXPECTED_EVENT, + cxi_event_to_str(event), cxi_rc_to_str(evt_rc)); + } + + return FI_SUCCESS; +} + /* * cxip_mr_cb() - Process MR LE events. */ @@ -84,6 +265,10 @@ int cxip_mr_cb(struct cxip_ctrl_req *req, const union c_event *event) mr = req->mr.mr; + /* This callback handles the regular (non-writedata) LE. + * Writedata events are handled by cxip_mr_sol_event_cb. + */ + switch (event->hdr.event_type) { case C_EVENT_LINK: if (mr->optimized) @@ -109,7 +294,8 @@ int cxip_mr_cb(struct cxip_ctrl_req *req, const union c_event *event) CXIP_DBG("MR PTE unlinked: %p\n", mr); break; case C_EVENT_MATCH: - ofi_atomic_inc32(&mr->match_events); + if (mr->count_events) + ofi_atomic_inc32(&mr->match_events); if (evt_rc != C_RC_OK) goto log_err; @@ -123,8 +309,6 @@ int cxip_mr_cb(struct cxip_ctrl_req *req, const union c_event *event) if (evt_rc != C_RC_OK) goto log_err; - - /* TODO handle fi_writedata/fi_inject_writedata */ break; default: log_err: @@ -171,7 +355,10 @@ static int cxip_mr_enable_std(struct cxip_mr *mr) }; uint32_t le_flags; + mr->req.ep_obj = ep_obj; mr->req.cb = cxip_mr_cb; + mr->req.mr.mr = mr; + CXIP_DBG("Standard MR callback registered: mr=%p rma_events=%d\n", mr, mr->rma_events); le_flags = C_LE_UNRESTRICTED_BODY_RO; if (mr->attr.access & FI_REMOTE_WRITE) @@ -181,26 +368,102 @@ static int cxip_mr_enable_std(struct cxip_mr *mr) if (mr->cntr) le_flags |= C_LE_EVENT_CT_COMM; - /* TODO: to support fi_writedata(), we will want to leave - * success events enabled for mr->rma_events true too. + /* For dual entry mode, create two LEs with separate request IDs: + * 1. mr->req with cxip_mr_cb handles regular RMA operations + * 2. mr->writedata_req with cxip_mr_sol_event_cb handles writedata operations + * Each request has its own callback for independent event processing. */ - if (!mr->count_events) - le_flags |= C_LE_EVENT_SUCCESS_DISABLE; + if (mr->domain->rma_cq_data_size) { + uint32_t regular_le_flags; + uint32_t writedata_le_flags; + uint64_t writedata_key_raw; + struct cxip_mr_key writedata_key; - ret = cxip_pte_append(ep_obj->ctrl.pte, - mr->len ? CXI_VA_TO_IOVA(mr->md->md, mr->buf) : 0, - mr->len, mr->len ? mr->md->md->lac : 0, - C_PTL_LIST_PRIORITY, mr->req.req_id, - key.key, 0, CXI_MATCH_ID_ANY, - 0, le_flags, mr->cntr, ep_obj->ctrl.tgq, true); - if (ret != FI_SUCCESS) { - CXIP_WARN("Failed to write Append command: %d\n", ret); - return ret; - } + /* Initialize writedata state */ + mr->writedata_mr_state = CXIP_MR_DISABLED; - ret = cxip_mr_wait_append(ep_obj, mr); - if (ret) - return ret; + /* First LE: for regular operations, disable success events + * only if MR event counting is not enabled + */ + regular_le_flags = le_flags; + if (!mr->count_events) + regular_le_flags |= C_LE_EVENT_SUCCESS_DISABLE | C_LE_EVENT_COMM_DISABLE; + + ret = cxip_pte_append(ep_obj->ctrl.pte, + mr->len ? CXI_VA_TO_IOVA(mr->md->md, mr->buf) : 0, + mr->len, mr->len ? mr->md->md->lac : 0, + C_PTL_LIST_PRIORITY, mr->req.req_id, + key.key, 0, CXI_MATCH_ID_ANY, + 0, regular_le_flags, mr->cntr, ep_obj->ctrl.tgq, true); + if (ret != FI_SUCCESS) { + CXIP_WARN("Failed to write Append command for regular LE: %d\n", ret); + return ret; + } + + ret = cxip_mr_wait_append(ep_obj, mr); + if (ret) + return ret; + + /* Second LE: for writedata operations with dedicated callback. + * This uses a separate request ID (writedata_req) so each case has + * its own callback for independent event handling. + */ + writedata_le_flags = le_flags; + /* Enable communication events for writedata */ + CXIP_DBG("MR enabling writedata events: mr=%p\n", mr); + + /* Use writedata_req for the second LE with dedicated callback */ + mr->writedata_req.cb = cxip_mr_sol_event_cb; + mr->writedata_req.mr.mr = mr; + writedata_key_raw = cxip_key_set_writedata(key.raw); + writedata_key.raw = writedata_key_raw; + + ret = cxip_pte_append(ep_obj->ctrl.pte, + mr->len ? CXI_VA_TO_IOVA(mr->md->md, mr->buf) : 0, + mr->len, mr->len ? mr->md->md->lac : 0, + C_PTL_LIST_PRIORITY, mr->writedata_req.req_id, + writedata_key.key, 0, CXI_MATCH_ID_ANY, + 0, writedata_le_flags, mr->cntr, ep_obj->ctrl.tgq, true); + if (ret != FI_SUCCESS) { + CXIP_WARN("Failed to write Append command for writedata LE: %d\n", ret); + return ret; + } + + /* Wait for writedata LE to link */ + do { + sched_yield(); + cxip_ep_tgt_ctrl_progress_locked(ep_obj, true); + } while (mr->writedata_mr_state != CXIP_MR_LINKED && + mr->writedata_mr_state != CXIP_MR_LINK_ERR); + + if (mr->writedata_mr_state == CXIP_MR_LINK_ERR) + return -FI_ENOSPC; + } else { + /* Original single LE logic */ + /* Enable communication events for RMA events; leave success events enabled */ + if (mr->rma_events) { + CXIP_DBG("MR enabling RMA events: mr=%p le_flags=0x%x\n", mr, le_flags); + } + + /* Enable success events when counters are not requested */ + if (!mr->count_events) + le_flags |= C_LE_EVENT_SUCCESS_DISABLE; + + ret = cxip_pte_append(ep_obj->ctrl.pte, + mr->len ? CXI_VA_TO_IOVA(mr->md->md, mr->buf) : 0, + mr->len, mr->len ? mr->md->md->lac : 0, + C_PTL_LIST_PRIORITY, mr->req.req_id, + key.key, 0, CXI_MATCH_ID_ANY, + 0, le_flags, mr->cntr, ep_obj->ctrl.tgq, true); + if (ret != FI_SUCCESS) { + CXIP_WARN("Failed to write Append command: %d\n", ret); + return ret; + } + + ret = cxip_mr_wait_append(ep_obj, mr); + if (ret) + return ret; + } mr->enabled = true; @@ -254,6 +517,20 @@ static int cxip_mr_disable_std(struct cxip_mr *mr) cxip_ep_tgt_ctrl_progress_locked(ep_obj, true); } while (mr->mr_state != CXIP_MR_UNLINKED); + /* For dual entry, also unlink the writedata LE */ + if (mr->domain->rma_cq_data_size) { + ret = cxip_pte_unlink(ep_obj->ctrl.pte, C_PTL_LIST_PRIORITY, + mr->writedata_req.req_id, ep_obj->ctrl.tgq); + if (ret != FI_SUCCESS) + CXIP_FATAL("Unable to queue writedata unlink command: %d\n", ret); + + /* Wait for writedata LE to be unlinked */ + do { + sched_yield(); + cxip_ep_tgt_ctrl_progress_locked(ep_obj, true); + } while (mr->writedata_req.mr.mr->mr_state != CXIP_MR_UNLINKED); + } + if (mr->count_events) { count_events_disabled = cxip_mr_disable_check_count_events(mr, cxip_env.mr_cache_events_disable_poll_nsecs); if (count_events_disabled) @@ -323,6 +600,8 @@ static int cxip_mr_enable_opt(struct cxip_mr *mr) target_relaxed_order = cxip_ep_obj_mr_relaxed_order(ep_obj); mr->req.cb = cxip_mr_cb; + CXIP_DBG("Optimized MR callback registered: mr=%p rma_events=%d\n", + mr, mr->rma_events); ret = cxip_pte_alloc_nomap(ep_obj->ptable, ep_obj->ctrl.tgt_evtq, &opts, cxip_mr_opt_pte_cb, mr, &mr->pte); @@ -857,8 +1136,8 @@ static int cxip_prov_cache_init_mr_key(struct cxip_mr *mr, key.lac_off = mr->len ? CXI_VA_TO_IOVA(md, mr->buf) : 0; mr->key = key.raw; - CXIP_DBG("Init cached MR key 0x%016lX, lac: %d, off:0x%016lX\n", - key.raw, key.lac, (uint64_t)key.lac_off); + CXIP_DBG("Init cached MR key 0x%016lX, lac: %d, off:0x%016lX, rma_events: %d, opt: %d\n", + key.raw, key.lac, (uint64_t)key.lac_off, mr->rma_events, key.opt); return FI_SUCCESS; } @@ -1137,8 +1416,9 @@ int cxip_mr_enable(struct cxip_mr *mr) if (!mr->domain->is_prov_key) mr->mr_util = &cxip_client_key_mr_util_ops; else if (mr->md && mr->md->cached && mr->domain->prov_key_cache && - !mr->cntr && !mr->count_events && !mr->rma_events) - mr->mr_util = &cxip_prov_key_cache_mr_util_ops; + !mr->cntr && !mr->count_events && !mr->rma_events && + !mr->domain->rma_cq_data_size) + mr->mr_util = &cxip_prov_key_cache_mr_util_ops; else mr->mr_util = &cxip_prov_key_mr_util_ops; @@ -1355,6 +1635,8 @@ static struct fi_ops cxip_mr_fi_ops = { static void cxip_mr_fini(struct cxip_mr *mr) { cxip_domain_ctrl_id_free(mr->domain, &mr->req); + if (mr->domain->rma_cq_data_size) + cxip_domain_ctrl_id_free(mr->domain, &mr->writedata_req); cxip_domain_prov_mr_id_free(mr->domain, mr); } @@ -1396,8 +1678,21 @@ static int cxip_mr_init(struct cxip_mr *mr, struct cxip_domain *dom, ofi_spin_destroy(&mr->lock); return -FI_ENOSPC; } + + /* Allocate second buffer ID for writedata dual entry if enabled */ + if (dom->rma_cq_data_size) { + ret = cxip_domain_ctrl_id_alloc(dom, &mr->writedata_req); + if (ret) { + CXIP_WARN("Failed to allocate writedata MR buffer ID: %d\n", ret); + cxip_domain_ctrl_id_free(dom, &mr->req); + ofi_spin_destroy(&mr->lock); + return -FI_ENOSPC; + } + mr->writedata_req.mr.mr = mr; + } } else { mr->req.req_id = -1; + mr->writedata_req.req_id = -1; } mr->mr_id = -1; diff --git a/prov/cxi/src/cxip_msg.c b/prov/cxi/src/cxip_msg.c index 86077f91e4c..90b9df9de8d 100644 --- a/prov/cxi/src/cxip_msg.c +++ b/prov/cxi/src/cxip_msg.c @@ -1078,6 +1078,33 @@ struct fi_ops_tagged cxip_ep_tagged_no_rx_ops = { .injectdata = cxip_tinjectdata, }; +/* Tagged ops with writedata disabled (cq_data_size == 0) */ +struct fi_ops_tagged cxip_ep_tagged_ops_no_writedata = { + .size = sizeof(struct fi_ops_tagged), + .recv = cxip_trecv, + .recvv = cxip_trecvv, + .recvmsg = cxip_trecvmsg, + .send = cxip_tsend, + .sendv = cxip_tsendv, + .sendmsg = cxip_tsendmsg, + .inject = cxip_tinject, + .senddata = fi_no_tagged_senddata, + .injectdata = fi_no_tagged_injectdata, +}; + +struct fi_ops_tagged cxip_ep_tagged_no_rx_ops_no_writedata = { + .size = sizeof(struct fi_ops_tagged), + .recv = fi_no_tagged_recv, + .recvv = fi_no_tagged_recvv, + .recvmsg = fi_no_tagged_recvmsg, + .send = cxip_tsend, + .sendv = cxip_tsendv, + .sendmsg = cxip_tsendmsg, + .inject = cxip_tinject, + .senddata = fi_no_tagged_senddata, + .injectdata = fi_no_tagged_injectdata, +}; + static ssize_t cxip_recv(struct fid_ep *fid_ep, void *buf, size_t len, void *desc, fi_addr_t src_addr, void *context) { @@ -1326,3 +1353,31 @@ struct fi_ops_msg cxip_ep_msg_no_rx_ops = { .senddata = cxip_senddata, .injectdata = cxip_injectdata, }; + +/* Message ops with writedata disabled (cq_data_size == 0) */ +struct fi_ops_msg cxip_ep_msg_ops_no_writedata = { + .size = sizeof(struct fi_ops_msg), + .recv = cxip_recv, + .recvv = cxip_recvv, + .recvmsg = cxip_recvmsg, + .send = cxip_send, + .sendv = cxip_sendv, + .sendmsg = cxip_sendmsg, + .inject = cxip_inject, + .senddata = fi_no_msg_senddata, + .injectdata = fi_no_msg_injectdata, +}; + +struct fi_ops_msg cxip_ep_msg_no_rx_ops_no_writedata = { + .size = sizeof(struct fi_ops_msg), + .recv = fi_no_msg_recv, + .recvv = fi_no_msg_recvv, + .recvmsg = fi_no_msg_recvmsg, + .send = cxip_send, + .sendv = cxip_sendv, + .sendmsg = cxip_sendmsg, + .inject = cxip_inject, + .senddata = fi_no_msg_senddata, + .injectdata = fi_no_msg_injectdata, +}; + diff --git a/prov/cxi/src/cxip_rma.c b/prov/cxi/src/cxip_rma.c index 49b59869928..06a7ec34f07 100644 --- a/prov/cxi/src/cxip_rma.c +++ b/prov/cxi/src/cxip_rma.c @@ -22,6 +22,7 @@ #include "cxip.h" #define CXIP_WARN(...) _CXIP_WARN(FI_LOG_EP_CTRL, __VA_ARGS__) +#define CXIP_DBG(...) _CXIP_DBG(FI_LOG_EP_DATA, __VA_ARGS__) /* * cxip_rma_selective_completion_cb() - RMA selective completion callback. @@ -213,6 +214,7 @@ static int cxip_rma_emit_dma(struct cxip_txc *txc, const void *buf, size_t len, struct cxip_cntr *cntr; void *inject_req; uint64_t access = write ? CXI_MAP_READ : CXI_MAP_WRITE; + uint64_t match_key = key; /* MR desc cannot be value unless hybrid MR desc is enabled. */ if (!dom->hybrid_mr_desc) @@ -290,6 +292,7 @@ static int cxip_rma_emit_dma(struct cxip_txc *txc, const void *buf, size_t len, dma_cmd.index_ext = *idx_ext; dma_cmd.event_send_disable = 1; dma_cmd.dfa = *dfa; + dma_cmd.initiator = cxip_msg_match_id(txc); ret = cxip_adjust_remote_offset(&addr, key); if (ret) { TXC_WARN(txc, "Remote offset overflow\n"); @@ -297,7 +300,25 @@ static int cxip_rma_emit_dma(struct cxip_txc *txc, const void *buf, size_t len, } dma_cmd.remote_offset = addr; dma_cmd.eq = cxip_evtq_eqn(&txc->tx_evtq); - dma_cmd.match_bits = CXIP_KEY_MATCH_BITS(key); + + /* For writedata operations with dual entry enabled, use writedata key + * and preserve solicited + events bits (59,60). Only mask off provider + * bit 63 so the dual LE distinction remains. + */ + if (write && (flags & FI_REMOTE_CQ_DATA) && dom->rma_cq_data_size) + match_key = cxip_key_set_writedata(key); + + if (write && (flags & FI_REMOTE_CQ_DATA) && dom->rma_cq_data_size) { + /* Preserve bits 60:59 */ + dma_cmd.match_bits = match_key & ~CXIP_IS_PROV_MR_KEY_BIT; + } else { + dma_cmd.match_bits = CXIP_KEY_MATCH_BITS(match_key); + } + + /* Set header data for fi_writedata operations */ + if (write && (flags & FI_REMOTE_CQ_DATA)) { + dma_cmd.header_data = data; + } if (req) { dma_cmd.user_ptr = (uint64_t)req; @@ -522,8 +543,15 @@ static int cxip_rma_emit_idc(struct cxip_txc *txc, const void *buf, size_t len, } static bool cxip_rma_is_unrestricted(struct cxip_txc *txc, uint64_t key, - uint64_t msg_order, bool write) + uint64_t msg_order, bool write, uint64_t flags) { + /* Operations with FI_REMOTE_CQ_DATA need unrestricted mode to generate + * target-side PUT events for completion notifications. + */ + if (flags & FI_REMOTE_CQ_DATA) { + return true; + } + /* Unoptimized keys are implemented with match bits and must always be * unrestricted. */ @@ -571,6 +599,13 @@ static bool cxip_rma_is_idc(struct cxip_txc *txc, uint64_t key, size_t len, if (triggered) return false; + /* IDC commands do not support header_data for fi_writedata. + * Must use DMA commands for operations with FI_REMOTE_CQ_DATA. + */ + if (flags & FI_REMOTE_CQ_DATA) { + return false; + } + /* Don't issue non-inject operation as IDC if disabled by env */ if (!(flags & FI_INJECT) && cxip_env.disable_non_inject_rma_idc) return false; @@ -634,7 +669,21 @@ ssize_t cxip_rma_common(enum fi_op_type op, struct cxip_txc *txc, return -FI_EKEYREJECTED; } - unr = cxip_rma_is_unrestricted(txc, key, msg_order, write); + /* Writedata operations require provider keys and unoptimized (standard) + * MRs to support target-side event generation. + */ + if (flags & FI_REMOTE_CQ_DATA) { + if (!(key & CXIP_IS_PROV_MR_KEY_BIT)) { + TXC_WARN(txc, "FI_REMOTE_CQ_DATA requires provider key: 0x%lx\n", key); + return -FI_EINVAL; + } + if (cxip_generic_is_mr_key_opt(key)) { + TXC_WARN(txc, "FI_REMOTE_CQ_DATA requires unoptimized MR key: 0x%lx\n", key); + return -FI_EINVAL; + } + } + + unr = cxip_rma_is_unrestricted(txc, key, msg_order, write, flags); idc = cxip_rma_is_idc(txc, key, len, write, triggered, unr, flags); /* Build target network address. */ @@ -803,6 +852,32 @@ ssize_t cxip_rma_inject(struct fid_ep *fid_ep, const void *buf, size_t len, false, 0, NULL, NULL); } +/* Inject write with immediate (remote CQ) data - used when writedata enabled */ +static ssize_t cxip_rma_injectdata(struct fid_ep *fid_ep, const void *buf, + size_t len, uint64_t data, fi_addr_t dest_addr, uint64_t addr, + uint64_t key) +{ + struct cxip_ep *ep = container_of(fid_ep, struct cxip_ep, ep); + /* FI_REMOTE_CQ_DATA implies header_data presence in DMA path */ + return cxip_rma_common(FI_OP_WRITE, ep->ep_obj->txc, buf, len, NULL, + dest_addr, addr, key, data, + FI_INJECT | FI_REMOTE_CQ_DATA, + ep->tx_attr.tclass, ep->tx_attr.msg_order, NULL, + false, 0, NULL, NULL); +} + +static ssize_t cxip_rma_writedata(struct fid_ep *fid_ep, const void *buf, + size_t len, void *desc, uint64_t data, + fi_addr_t dest_addr, uint64_t addr, + uint64_t key, void *context) +{ + struct cxip_ep *ep = container_of(fid_ep, struct cxip_ep, ep); + return cxip_rma_common(FI_OP_WRITE, ep->ep_obj->txc, buf, len, desc, + dest_addr, addr, key, data, + ep->tx_attr.op_flags | FI_REMOTE_CQ_DATA, + ep->tx_attr.tclass, ep->tx_attr.msg_order, + context, false, 0, NULL, NULL); +} static ssize_t cxip_rma_read(struct fid_ep *fid_ep, void *buf, size_t len, void *desc, fi_addr_t src_addr, uint64_t addr, uint64_t key, void *context) @@ -907,6 +982,24 @@ struct fi_ops_rma cxip_ep_rma_ops = { .writedata = fi_no_rma_writedata, }; +/* RMA ops when writedata and remote CQ data injection are enabled. + * Domain must have rma_cq_data_size != 0. + * Provides fi_rma_injectdata via cxip_rma_inject (with FI_REMOTE_CQ_DATA flag + * set by upper layer call path) and writedata support. + */ +struct fi_ops_rma cxip_ep_rma_writedata_ops = { + .size = sizeof(struct fi_ops_rma), + .read = cxip_rma_read, + .readv = cxip_rma_readv, + .readmsg = cxip_rma_readmsg, + .write = cxip_rma_write, + .writev = cxip_rma_writev, + .writemsg = cxip_rma_writemsg, + .inject = cxip_rma_inject, + .injectdata = cxip_rma_injectdata, + .writedata = cxip_rma_writedata, +}; + struct fi_ops_rma cxip_ep_rma_no_ops = { .size = sizeof(struct fi_ops_rma), .read = fi_no_rma_read, diff --git a/prov/cxi/test/test.sh b/prov/cxi/test/test.sh index ac73b97649f..76540991376 100755 --- a/prov/cxi/test/test.sh +++ b/prov/cxi/test/test.sh @@ -153,6 +153,10 @@ unlimited_triggered_ops_test=( mr_cache_test=("./cxitest --verbose --tap=cxitest-mr_cache_test.tap --filter=\"mr_cache/*\" -j 1") +writedata_rma_test=( + "FI_CXI_ENABLE_WRITEDATA=1 ./cxitest --verbose --filter=\"@(rma*|mr*)/*\" -j 1 --tap=cxitest-writedata-rma.tap" +) + long_test_suite=( "basic_test" "swget_test" @@ -179,6 +183,7 @@ long_test_suite=( "fork_safe_kdreg2_test" "unlimited_triggered_ops_test" "mr_cache_test" + "writedata_rma_test" ) # ################################################################ diff --git a/prov/cxi/test/writedata.c b/prov/cxi/test/writedata.c new file mode 100644 index 00000000000..ba392ee5527 --- /dev/null +++ b/prov/cxi/test/writedata.c @@ -0,0 +1,181 @@ +/* + * SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only + * + * Copyright (c) 2025 Hewlett Packard Enterprise Development LP + */ + +#include +#include + +#include + +#include "cxip.h" +#include "cxip_test_common.h" + +void cxit_setup_rma_writedata(void) +{ + cxip_env.enable_writedata = 1; + cxit_setup_getinfo(); + cxit_fi_hints->caps |= FI_RMA | FI_RMA_EVENT | FI_MSG | FI_SOURCE; + cxit_fi_hints->domain_attr->mr_mode |= FI_MR_PROV_KEY | FI_MR_ENDPOINT; + cxit_setup_rma(); +} + +TestSuite(rma_writedata, .init = cxit_setup_rma_writedata, .fini = cxit_teardown_rma, + .timeout = CXIT_DEFAULT_TIMEOUT); + +Test(rma_writedata, simple) +{ + int ret; + struct mem_region mem_window; + uint64_t key_val = 0x1234; + struct fi_cq_tagged_entry cqe; + uint64_t immediate_data = 0xDEADBEEF; + size_t len = 1024; + uint8_t *send_buf; + struct cxip_ep *ep = container_of(cxit_ep, struct cxip_ep, ep); + send_buf = calloc(1, len); + cr_assert_not_null(send_buf, "send_buf alloc failed"); + memset(send_buf, 0xAA, len); + + /* Manual MR creation to include FI_RMA_EVENT flag */ + mem_window.mem = calloc(1, len); + cr_assert_not_null(mem_window.mem, "mem_window alloc failed"); + + ret = fi_mr_reg(cxit_domain, mem_window.mem, len, + FI_REMOTE_WRITE | FI_REMOTE_READ | FI_SEND | FI_RECV, + 0, key_val, FI_RMA_EVENT, &mem_window.mr, NULL); + cr_assert_eq(ret, FI_SUCCESS, "fi_mr_reg failed %d", ret); + + ret = fi_mr_bind(mem_window.mr, &cxit_ep->fid, 0); + cr_assert_eq(ret, FI_SUCCESS, "fi_mr_bind failed %d", ret); + + if (cxit_rem_cntr) { + ret = fi_mr_bind(mem_window.mr, &cxit_rem_cntr->fid, FI_REMOTE_WRITE); + cr_assert_eq(ret, FI_SUCCESS, "fi_mr_bind(cntr) failed %d", ret); + } + + ret = fi_mr_enable(mem_window.mr); + cr_assert_eq(ret, FI_SUCCESS, "fi_mr_enable failed %d", ret); + + key_val = fi_mr_key(mem_window.mr); + + /* Perform writedata */ + ret = fi_writedata(cxit_ep, send_buf, len, NULL, immediate_data, + cxit_ep_fi_addr, 0, key_val, NULL); + if (ep->ep_obj->domain->rma_cq_data_size) { + cr_assert_eq(ret, FI_SUCCESS, "fi_writedata failed %d", ret); + } else { + cr_assert_eq(ret, -FI_ENOSYS, "fi_writedata bad return %d", ret); + goto done; + } + + /* Wait for local completion (send) */ + ret = cxit_await_completion(cxit_tx_cq, &cqe); + cr_assert_eq(ret, 1, "fi_cq_read (tx) failed %d", ret); + validate_tx_event(&cqe, FI_RMA | FI_WRITE, NULL); + + /* Wait for remote completion (recv) */ + ret = cxit_await_completion(cxit_rx_cq, &cqe); + cr_assert_eq(ret, 1, "fi_cq_read (rx) failed %d", ret); + + /* Validate remote completion */ + cr_assert(cqe.flags & FI_REMOTE_WRITE, "Missing FI_REMOTE_WRITE flag"); + cr_assert_eq(cqe.data, immediate_data, "Data mismatch: 0x%lx != 0x%lx", + cqe.data, immediate_data); + + /* Verify data */ + for (size_t i = 0; i < len; i++) { + cr_assert_eq(mem_window.mem[i], send_buf[i], + "Memory mismatch at index %zu: 0x%02x != 0x%02x", + i, mem_window.mem[i], send_buf[i]); + } +done: + free(send_buf); + mr_destroy(&mem_window); +} + +Test(rma_writedata, with_source) +{ + int ret; + struct mem_region mem_window; + uint64_t key_val = 0x1234; + struct fi_cq_tagged_entry cqe; + uint64_t immediate_data = 0xDEADBEEF; + size_t len = 1024; + uint8_t *send_buf; + struct cxip_ep *ep = container_of(cxit_ep, struct cxip_ep, ep); + fi_addr_t src_addr; + int poll_count = 0; + + send_buf = calloc(1, len); + cr_assert_not_null(send_buf, "send_buf alloc failed"); + memset(send_buf, 0xAA, len); + + /* Manual MR creation to include FI_RMA_EVENT flag */ + mem_window.mem = calloc(1, len); + cr_assert_not_null(mem_window.mem, "mem_window alloc failed"); + + ret = fi_mr_reg(cxit_domain, mem_window.mem, len, + FI_REMOTE_WRITE | FI_REMOTE_READ | FI_SEND | FI_RECV, + 0, key_val, FI_RMA_EVENT, &mem_window.mr, NULL); + cr_assert_eq(ret, FI_SUCCESS, "fi_mr_reg failed %d", ret); + + ret = fi_mr_bind(mem_window.mr, &cxit_ep->fid, 0); + cr_assert_eq(ret, FI_SUCCESS, "fi_mr_bind failed %d", ret); + + if (cxit_rem_cntr) { + ret = fi_mr_bind(mem_window.mr, &cxit_rem_cntr->fid, FI_REMOTE_WRITE); + cr_assert_eq(ret, FI_SUCCESS, "fi_mr_bind(cntr) failed %d", ret); + } + + ret = fi_mr_enable(mem_window.mr); + cr_assert_eq(ret, FI_SUCCESS, "fi_mr_enable failed %d", ret); + + key_val = fi_mr_key(mem_window.mr); + + /* Perform writedata */ + ret = fi_writedata(cxit_ep, send_buf, len, NULL, immediate_data, + cxit_ep_fi_addr, 0, key_val, NULL); + if (ep->ep_obj->domain->rma_cq_data_size) { + cr_assert_eq(ret, FI_SUCCESS, "fi_writedata failed %d", ret); + } else { + cr_assert_eq(ret, -FI_ENOSYS, "fi_writedata bad return %d", ret); + goto done; + } + + /* Wait for local completion (send) */ + ret = cxit_await_completion(cxit_tx_cq, &cqe); + cr_assert_eq(ret, 1, "fi_cq_read (tx) failed %d", ret); + validate_tx_event(&cqe, FI_RMA | FI_WRITE, NULL); + + /* Wait for remote completion (recv) with source address */ + while (poll_count < CXIT_DEFAULT_TIMEOUT * 1000) { + ret = fi_cq_readfrom(cxit_rx_cq, &cqe, 1, &src_addr); + if (ret == 1) + break; + if (ret != -FI_EAGAIN) + break; + poll_count++; + usleep(1000); + } + cr_assert_eq(ret, 1, "fi_cq_readfrom (rx) failed %d", ret); + + /* Validate remote completion */ + cr_assert(cqe.flags & FI_REMOTE_WRITE, "Missing FI_REMOTE_WRITE flag"); + cr_assert_eq(cqe.data, immediate_data, "Data mismatch: 0x%lx != 0x%lx", + cqe.data, immediate_data); + + /* Validate source address */ + cr_assert_eq(src_addr, cxit_ep_fi_addr, "Source address mismatch"); + + /* Verify data */ + for (size_t i = 0; i < len; i++) { + cr_assert_eq(mem_window.mem[i], send_buf[i], + "Memory mismatch at index %zu: 0x%02x != 0x%02x", + i, mem_window.mem[i], send_buf[i]); + } +done: + free(send_buf); + mr_destroy(&mem_window); +}