@@ -1000,6 +1000,76 @@ ompi_mtl_ofi_send_generic(struct mca_mtl_base_module_t *mtl,
10001000 return ofi_req .status .MPI_ERROR ;
10011001}
10021002
1003+ /*
1004+ * This routine is invoked in the case where a Recv finds the
1005+ * MTL_OFI_IS_SYNC_SEND flag was set, indicating the sender issued an SSend and
1006+ * is blocking while it waits on an ACK message.
1007+ *
1008+ * Issue a fire-and-forget send back to the src with a matching tag so that
1009+ * the sender may continue progress.
1010+ * Requires ofi_req->remote_addr and ofi_req->comm to be set.
1011+ */
1012+ static int
1013+ ompi_mtl_ofi_gen_ssend_ack (struct fi_cq_tagged_entry * wc ,
1014+ ompi_mtl_ofi_request_t * ofi_req )
1015+ {
1016+ /**
1017+ * If this recv is part of an MPI_Ssend operation, then we send an
1018+ * acknowledgment back to the sender.
1019+ * The ack message is sent without generating a completion event in
1020+ * the completion queue by not setting FI_COMPLETION in the flags to
1021+ * fi_tsendmsg(FI_SELECTIVE_COMPLETION).
1022+ * This is done since the 0 byte message requires no
1023+ * notification on the send side for a successful completion.
1024+ * If a failure occurs the provider will notify the error
1025+ * in the cq_readerr during OFI progress. Once the message has been
1026+ * successfully processed the request is marked as completed.
1027+ */
1028+ int ctxt_id = 0 ;
1029+ ssize_t ret ;
1030+ ompi_proc_t * ompi_proc = NULL ;
1031+ mca_mtl_ofi_endpoint_t * endpoint = NULL ;
1032+ int src = mtl_ofi_get_source (wc );
1033+ struct fi_msg_tagged tagged_msg ;
1034+
1035+ if (ompi_mtl_ofi .total_ctxts_used > 0 ) {
1036+ ctxt_id = ofi_req -> comm -> c_contextid .cid_sub .u64 % ompi_mtl_ofi .total_ctxts_used ;
1037+ } else {
1038+ ctxt_id = 0 ;
1039+ }
1040+
1041+ ret = MPI_SUCCESS ;
1042+
1043+ /**
1044+ * If the recv request was posted for any source,
1045+ * we need to extract the source's actual address.
1046+ */
1047+ ompi_proc = ompi_comm_peer_lookup (ofi_req -> comm , src );
1048+ endpoint = ompi_mtl_ofi_get_endpoint (ofi_req -> mtl , ompi_proc );
1049+ ofi_req -> remote_addr = fi_rx_addr (endpoint -> peer_fiaddr , ctxt_id , ompi_mtl_ofi .rx_ctx_bits );
1050+
1051+ tagged_msg .msg_iov = NULL ;
1052+ tagged_msg .desc = NULL ;
1053+ tagged_msg .iov_count = 0 ;
1054+ tagged_msg .addr = ofi_req -> remote_addr ;
1055+ /**
1056+ * We must continue to use the user's original tag but remove the
1057+ * sync_send protocol tag bit and instead apply the sync_send_ack
1058+ * tag bit to complete the initiator's sync send receive.
1059+ */
1060+ tagged_msg .tag = (wc -> tag | ompi_mtl_ofi .sync_send_ack ) & ~ompi_mtl_ofi .sync_send ;
1061+ tagged_msg .context = NULL ;
1062+ tagged_msg .data = 0 ;
1063+
1064+ MTL_OFI_RETRY_UNTIL_DONE (fi_tsendmsg (ompi_mtl_ofi .ofi_ctxt [ctxt_id ].tx_ep ,
1065+ & tagged_msg , 0 ), ret );
1066+ if (OPAL_UNLIKELY (0 > ret )) {
1067+ MTL_OFI_LOG_FI_ERR (ret , "fi_tsendmsg failed during ompi_mtl_ofi_gen_ssend_ack" );
1068+ ret = OMPI_ERROR ;
1069+ }
1070+ return ret ;
1071+ }
1072+
10031073__opal_attribute_always_inline__ static inline int
10041074ompi_mtl_ofi_isend_generic (struct mca_mtl_base_module_t * mtl ,
10051075 struct ompi_communicator_t * comm ,
@@ -1136,19 +1206,9 @@ __opal_attribute_always_inline__ static inline int
11361206ompi_mtl_ofi_recv_callback (struct fi_cq_tagged_entry * wc ,
11371207 ompi_mtl_ofi_request_t * ofi_req )
11381208{
1139- int ompi_ret , ctxt_id = 0 ;
1140- ssize_t ret ;
1141- ompi_proc_t * ompi_proc = NULL ;
1142- mca_mtl_ofi_endpoint_t * endpoint = NULL ;
1209+ int ompi_ret ;
11431210 int src = mtl_ofi_get_source (wc );
11441211 ompi_status_public_t * status = NULL ;
1145- struct fi_msg_tagged tagged_msg ;
1146-
1147- if (ompi_mtl_ofi .total_ctxts_used > 0 ) {
1148- ctxt_id = ofi_req -> comm -> c_contextid .cid_sub .u64 % ompi_mtl_ofi .total_ctxts_used ;
1149- } else {
1150- ctxt_id = 0 ;
1151- }
11521212
11531213 assert (ofi_req -> super .ompi_req );
11541214 status = & ofi_req -> super .ompi_req -> req_status ;
@@ -1159,6 +1219,7 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
11591219 */
11601220 ofi_req -> req_started = true;
11611221
1222+ status -> MPI_ERROR = MPI_SUCCESS ;
11621223 status -> MPI_SOURCE = src ;
11631224 status -> MPI_TAG = MTL_OFI_GET_TAG (wc -> tag );
11641225 status -> _ucount = wc -> len ;
@@ -1194,53 +1255,20 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
11941255 */
11951256 assert (!MTL_OFI_IS_SYNC_SEND_ACK (wc -> tag ));
11961257
1197- /**
1198- * If this recv is part of an MPI_Ssend operation, then we send an
1199- * acknowledgment back to the sender.
1200- * The ack message is sent without generating a completion event in
1201- * the completion queue by not setting FI_COMPLETION in the flags to
1202- * fi_tsendmsg(FI_SELECTIVE_COMPLETION).
1203- * This is done since the 0 byte message requires no
1204- * notification on the send side for a successful completion.
1205- * If a failure occurs the provider will notify the error
1206- * in the cq_readerr during OFI progress. Once the message has been
1207- * successfully processed the request is marked as completed.
1208- */
12091258 if (OPAL_UNLIKELY (MTL_OFI_IS_SYNC_SEND (wc -> tag ))) {
1210- /**
1211- * If the recv request was posted for any source,
1212- * we need to extract the source's actual address.
1213- */
1214- if (ompi_mtl_ofi .any_addr == ofi_req -> remote_addr ) {
1215- ompi_proc = ompi_comm_peer_lookup (ofi_req -> comm , src );
1216- endpoint = ompi_mtl_ofi_get_endpoint (ofi_req -> mtl , ompi_proc );
1217- ofi_req -> remote_addr = fi_rx_addr (endpoint -> peer_fiaddr , ctxt_id , ompi_mtl_ofi .rx_ctx_bits );
1218- }
1259+ ompi_ret = ompi_mtl_ofi_gen_ssend_ack (wc , ofi_req );
12191260
1220- tagged_msg .msg_iov = NULL ;
1221- tagged_msg .desc = NULL ;
1222- tagged_msg .iov_count = 0 ;
1223- tagged_msg .addr = ofi_req -> remote_addr ;
1224- /**
1225- * We must continue to use the user's original tag but remove the
1226- * sync_send protocol tag bit and instead apply the sync_send_ack
1227- * tag bit to complete the initiator's sync send receive.
1228- */
1229- tagged_msg .tag = (wc -> tag | ompi_mtl_ofi .sync_send_ack ) & ~ompi_mtl_ofi .sync_send ;
1230- tagged_msg .context = NULL ;
1231- tagged_msg .data = 0 ;
1232-
1233- MTL_OFI_RETRY_UNTIL_DONE (fi_tsendmsg (ompi_mtl_ofi .ofi_ctxt [ctxt_id ].tx_ep ,
1234- & tagged_msg , 0 ), ret );
1235- if (OPAL_UNLIKELY (0 > ret )) {
1236- MTL_OFI_LOG_FI_ERR (ret , "fi_tsendmsg failed" );
1237- status -> MPI_ERROR = OMPI_ERROR ;
1261+ if (OPAL_UNLIKELY (OMPI_SUCCESS != ompi_ret )) {
1262+ opal_output_verbose (1 , opal_common_ofi .output ,
1263+ "%s:%d: ompi_mtl_ofi_gen_ssend_ack failed: %d" ,
1264+ __FILE__ , __LINE__ , ompi_ret );
1265+ status -> MPI_ERROR = ompi_ret ;
12381266 }
12391267 }
12401268
12411269 ofi_req -> super .completion_callback (& ofi_req -> super );
12421270
1243- return OMPI_SUCCESS ;
1271+ return status -> MPI_ERROR ;
12441272}
12451273
12461274/**
@@ -1386,14 +1414,26 @@ ompi_mtl_ofi_mrecv_callback(struct fi_cq_tagged_entry *wc,
13861414 status -> MPI_TAG = MTL_OFI_GET_TAG (wc -> tag );
13871415 status -> MPI_ERROR = MPI_SUCCESS ;
13881416 status -> _ucount = wc -> len ;
1417+ int ompi_ret ;
13891418
13901419 ompi_mtl_ofi_deregister_and_free_buffer (ofi_req );
13911420
1421+ if (OPAL_UNLIKELY (MTL_OFI_IS_SYNC_SEND (wc -> tag ))) {
1422+ ompi_ret = ompi_mtl_ofi_gen_ssend_ack (wc , ofi_req );
1423+
1424+ if (OPAL_UNLIKELY (OMPI_SUCCESS != ompi_ret )) {
1425+ opal_output_verbose (1 , opal_common_ofi .output ,
1426+ "%s:%d: ompi_mtl_ofi_gen_ssend_ack failed: %d" ,
1427+ __FILE__ , __LINE__ , ompi_ret );
1428+ status -> MPI_ERROR = ompi_ret ;
1429+ }
1430+ }
1431+
13921432 free (ofi_req );
13931433
13941434 mrecv_req -> completion_callback (mrecv_req );
13951435
1396- return OMPI_SUCCESS ;
1436+ return status -> MPI_ERROR ;
13971437}
13981438
13991439/**
@@ -1472,6 +1512,8 @@ ompi_mtl_ofi_imrecv(struct mca_mtl_base_module_t *mtl,
14721512 ofi_req -> convertor = convertor ;
14731513 ofi_req -> status .MPI_ERROR = OMPI_SUCCESS ;
14741514 ofi_req -> mrecv_req = mtl_request ;
1515+ ofi_req -> comm = comm ;
1516+
14751517
14761518 ompi_ret = ompi_mtl_ofi_register_buffer (convertor , ofi_req , start );
14771519 if (OPAL_UNLIKELY (OMPI_SUCCESS != ompi_ret )) {
0 commit comments