@@ -999,6 +999,76 @@ ompi_mtl_ofi_send_generic(struct mca_mtl_base_module_t *mtl,
999999 return ofi_req .status .MPI_ERROR ;
10001000}
10011001
1002+ /*
1003+ * This routine is invoked in the case where a Recv finds the
1004+ * MTL_OFI_IS_SYNC_SEND flag was set, indicating the sender issued an SSend and
1005+ * is blocking while it waits on an ACK message.
1006+ *
1007+ * Issue a fire-and-forget send back to the src with a matching tag so that
1008+ * the sender may continue progress.
1009+ * Requires ofi_req->remote_addr and ofi_req->comm to be set.
1010+ */
1011+ static int
1012+ ompi_mtl_ofi_gen_ssend_ack (struct fi_cq_tagged_entry * wc ,
1013+ ompi_mtl_ofi_request_t * ofi_req )
1014+ {
1015+ /**
1016+ * If this recv is part of an MPI_Ssend operation, then we send an
1017+ * acknowledgment back to the sender.
1018+ * The ack message is sent without generating a completion event in
1019+ * the completion queue by not setting FI_COMPLETION in the flags to
1020+ * fi_tsendmsg(FI_SELECTIVE_COMPLETION).
1021+ * This is done since the 0 byte message requires no
1022+ * notification on the send side for a successful completion.
1023+ * If a failure occurs the provider will notify the error
1024+ * in the cq_readerr during OFI progress. Once the message has been
1025+ * successfully processed the request is marked as completed.
1026+ */
1027+ int ctxt_id = 0 ;
1028+ ssize_t ret ;
1029+ ompi_proc_t * ompi_proc = NULL ;
1030+ mca_mtl_ofi_endpoint_t * endpoint = NULL ;
1031+ int src = mtl_ofi_get_source (wc );
1032+ struct fi_msg_tagged tagged_msg ;
1033+
1034+ if (ompi_mtl_ofi .total_ctxts_used > 0 ) {
1035+ ctxt_id = ofi_req -> comm -> c_contextid .cid_sub .u64 % ompi_mtl_ofi .total_ctxts_used ;
1036+ } else {
1037+ ctxt_id = 0 ;
1038+ }
1039+
1040+ ret = MPI_SUCCESS ;
1041+
1042+ /**
1043+ * If the recv request was posted for any source,
1044+ * we need to extract the source's actual address.
1045+ */
1046+ ompi_proc = ompi_comm_peer_lookup (ofi_req -> comm , src );
1047+ endpoint = ompi_mtl_ofi_get_endpoint (ofi_req -> mtl , ompi_proc );
1048+ ofi_req -> remote_addr = fi_rx_addr (endpoint -> peer_fiaddr , ctxt_id , ompi_mtl_ofi .rx_ctx_bits );
1049+
1050+ tagged_msg .msg_iov = NULL ;
1051+ tagged_msg .desc = NULL ;
1052+ tagged_msg .iov_count = 0 ;
1053+ tagged_msg .addr = ofi_req -> remote_addr ;
1054+ /**
1055+ * We must continue to use the user's original tag but remove the
1056+ * sync_send protocol tag bit and instead apply the sync_send_ack
1057+ * tag bit to complete the initiator's sync send receive.
1058+ */
1059+ tagged_msg .tag = (wc -> tag | ompi_mtl_ofi .sync_send_ack ) & ~ompi_mtl_ofi .sync_send ;
1060+ tagged_msg .context = NULL ;
1061+ tagged_msg .data = 0 ;
1062+
1063+ MTL_OFI_RETRY_UNTIL_DONE (fi_tsendmsg (ompi_mtl_ofi .ofi_ctxt [ctxt_id ].tx_ep ,
1064+ & tagged_msg , 0 ), ret );
1065+ if (OPAL_UNLIKELY (0 > ret )) {
1066+ MTL_OFI_LOG_FI_ERR (ret , "fi_tsendmsg failed during ompi_mtl_ofi_gen_ssend_ack" );
1067+ ret = OMPI_ERROR ;
1068+ }
1069+ return ret ;
1070+ }
1071+
10021072__opal_attribute_always_inline__ static inline int
10031073ompi_mtl_ofi_isend_generic (struct mca_mtl_base_module_t * mtl ,
10041074 struct ompi_communicator_t * comm ,
@@ -1134,19 +1204,9 @@ __opal_attribute_always_inline__ static inline int
11341204ompi_mtl_ofi_recv_callback (struct fi_cq_tagged_entry * wc ,
11351205 ompi_mtl_ofi_request_t * ofi_req )
11361206{
1137- int ompi_ret , ctxt_id = 0 ;
1138- ssize_t ret ;
1139- ompi_proc_t * ompi_proc = NULL ;
1140- mca_mtl_ofi_endpoint_t * endpoint = NULL ;
1207+ int ompi_ret ;
11411208 int src = mtl_ofi_get_source (wc );
11421209 ompi_status_public_t * status = NULL ;
1143- struct fi_msg_tagged tagged_msg ;
1144-
1145- if (ompi_mtl_ofi .total_ctxts_used > 0 ) {
1146- ctxt_id = ofi_req -> comm -> c_contextid .cid_sub .u64 % ompi_mtl_ofi .total_ctxts_used ;
1147- } else {
1148- ctxt_id = 0 ;
1149- }
11501210
11511211 assert (ofi_req -> super .ompi_req );
11521212 status = & ofi_req -> super .ompi_req -> req_status ;
@@ -1157,6 +1217,7 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
11571217 */
11581218 ofi_req -> req_started = true;
11591219
1220+ status -> MPI_ERROR = MPI_SUCCESS ;
11601221 status -> MPI_SOURCE = src ;
11611222 status -> MPI_TAG = MTL_OFI_GET_TAG (wc -> tag );
11621223 status -> _ucount = wc -> len ;
@@ -1192,53 +1253,20 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
11921253 */
11931254 assert (!MTL_OFI_IS_SYNC_SEND_ACK (wc -> tag ));
11941255
1195- /**
1196- * If this recv is part of an MPI_Ssend operation, then we send an
1197- * acknowledgment back to the sender.
1198- * The ack message is sent without generating a completion event in
1199- * the completion queue by not setting FI_COMPLETION in the flags to
1200- * fi_tsendmsg(FI_SELECTIVE_COMPLETION).
1201- * This is done since the 0 byte message requires no
1202- * notification on the send side for a successful completion.
1203- * If a failure occurs the provider will notify the error
1204- * in the cq_readerr during OFI progress. Once the message has been
1205- * successfully processed the request is marked as completed.
1206- */
12071256 if (OPAL_UNLIKELY (MTL_OFI_IS_SYNC_SEND (wc -> tag ))) {
1208- /**
1209- * If the recv request was posted for any source,
1210- * we need to extract the source's actual address.
1211- */
1212- if (ompi_mtl_ofi .any_addr == ofi_req -> remote_addr ) {
1213- ompi_proc = ompi_comm_peer_lookup (ofi_req -> comm , src );
1214- endpoint = ompi_mtl_ofi_get_endpoint (ofi_req -> mtl , ompi_proc );
1215- ofi_req -> remote_addr = fi_rx_addr (endpoint -> peer_fiaddr , ctxt_id , ompi_mtl_ofi .rx_ctx_bits );
1216- }
1257+ ompi_ret = ompi_mtl_ofi_gen_ssend_ack (wc , ofi_req );
12171258
1218- tagged_msg .msg_iov = NULL ;
1219- tagged_msg .desc = NULL ;
1220- tagged_msg .iov_count = 0 ;
1221- tagged_msg .addr = ofi_req -> remote_addr ;
1222- /**
1223- * We must continue to use the user's original tag but remove the
1224- * sync_send protocol tag bit and instead apply the sync_send_ack
1225- * tag bit to complete the initiator's sync send receive.
1226- */
1227- tagged_msg .tag = (wc -> tag | ompi_mtl_ofi .sync_send_ack ) & ~ompi_mtl_ofi .sync_send ;
1228- tagged_msg .context = NULL ;
1229- tagged_msg .data = 0 ;
1230-
1231- MTL_OFI_RETRY_UNTIL_DONE (fi_tsendmsg (ompi_mtl_ofi .ofi_ctxt [ctxt_id ].tx_ep ,
1232- & tagged_msg , 0 ), ret );
1233- if (OPAL_UNLIKELY (0 > ret )) {
1234- MTL_OFI_LOG_FI_ERR (ret , "fi_tsendmsg failed" );
1235- status -> MPI_ERROR = OMPI_ERROR ;
1259+ if (OPAL_UNLIKELY (OMPI_SUCCESS != ompi_ret )) {
1260+ opal_output_verbose (1 , opal_common_ofi .output ,
1261+ "%s:%d: ompi_mtl_ofi_gen_ssend_ack failed: %d" ,
1262+ __FILE__ , __LINE__ , ompi_ret );
1263+ status -> MPI_ERROR = ompi_ret ;
12361264 }
12371265 }
12381266
12391267 ofi_req -> super .completion_callback (& ofi_req -> super );
12401268
1241- return OMPI_SUCCESS ;
1269+ return status -> MPI_ERROR ;
12421270}
12431271
12441272/**
@@ -1384,14 +1412,26 @@ ompi_mtl_ofi_mrecv_callback(struct fi_cq_tagged_entry *wc,
13841412 status -> MPI_TAG = MTL_OFI_GET_TAG (wc -> tag );
13851413 status -> MPI_ERROR = MPI_SUCCESS ;
13861414 status -> _ucount = wc -> len ;
1415+ int ompi_ret ;
13871416
13881417 ompi_mtl_ofi_deregister_and_free_buffer (ofi_req );
13891418
1419+ if (OPAL_UNLIKELY (MTL_OFI_IS_SYNC_SEND (wc -> tag ))) {
1420+ ompi_ret = ompi_mtl_ofi_gen_ssend_ack (wc , ofi_req );
1421+
1422+ if (OPAL_UNLIKELY (OMPI_SUCCESS != ompi_ret )) {
1423+ opal_output_verbose (1 , opal_common_ofi .output ,
1424+ "%s:%d: ompi_mtl_ofi_gen_ssend_ack failed: %d" ,
1425+ __FILE__ , __LINE__ , ompi_ret );
1426+ status -> MPI_ERROR = ompi_ret ;
1427+ }
1428+ }
1429+
13901430 free (ofi_req );
13911431
13921432 mrecv_req -> completion_callback (mrecv_req );
13931433
1394- return OMPI_SUCCESS ;
1434+ return status -> MPI_ERROR ;
13951435}
13961436
13971437/**
@@ -1470,6 +1510,8 @@ ompi_mtl_ofi_imrecv(struct mca_mtl_base_module_t *mtl,
14701510 ofi_req -> convertor = convertor ;
14711511 ofi_req -> status .MPI_ERROR = OMPI_SUCCESS ;
14721512 ofi_req -> mrecv_req = mtl_request ;
1513+ ofi_req -> comm = comm ;
1514+
14731515
14741516 ompi_ret = ompi_mtl_ofi_register_buffer (convertor , ofi_req , start );
14751517 if (OPAL_UNLIKELY (OMPI_SUCCESS != ompi_ret )) {
0 commit comments