@@ -411,16 +411,6 @@ static void send_msg(int fd, short args, void *cbdata)
411411 /* get the peer address by doing modex_receive */
412412 opal_output_verbose (10 , orte_rml_base_framework .framework_output ,
413413 "%s calling OPAL_MODEX_RECV_STRING " , ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ) );
414- // if dest is same as me then instead of doing lookup just populate the dest_ep_name
415- /*if (!ORTE_PROC_IS_APP && peer->jobid == ORTE_PROC_MY_NAME->jobid && peer->vpid == ORTE_PROC_MY_NAME->vpid) {
416- dest_ep_namelen = orte_rml_ofi.ofi_prov[ofi_prov_id].epnamelen;
417- dest_ep_name = (char *)calloc(dest_ep_namelen,sizeof(char));
418- memcpy( dest_ep_name, orte_rml_ofi.ofi_prov[ofi_prov_id].ep_name,dest_ep_namelen);
419- opal_output_verbose(1, orte_rml_base_framework.framework_output,
420- "%s rml:ofi: send and dest are same so proceeding with cur provider ep_name ",
421- ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
422- ret = OPAL_SUCCESS;
423- } else {*/
424414 if (ORTE_PROC_IS_APP ) {
425415 asprintf (& pmix_key ,"%s%d" ,orte_rml_ofi .ofi_prov [ofi_prov_id ].fabric_info -> fabric_attr -> prov_name ,ofi_prov_id );
426416 opal_output_verbose (1 , orte_rml_base_framework .framework_output ,
@@ -436,75 +426,6 @@ static void send_msg(int fd, short args, void *cbdata)
436426 opal_output_verbose (1 , orte_rml_base_framework .framework_output ,
437427 "%s calling OPAL_MODEX_RECV_STRING for DAEMON peer %s" ,
438428 ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ), ORTE_NAME_PRINT (peer ));
439- if (OPAL_EQUAL == orte_util_compare_name_fields (ORTE_NS_CMP_ALL , peer , ORTE_PROC_MY_NAME )) {
440- opal_output_verbose (1 , orte_rml_base_framework .framework_output ,
441- "%s rml_ofi_send_to_self at tag %d" ,
442- ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ), tag );
443- /* send to self is a tad tricky - we really don't want
444- * to track the send callback function throughout the recv
445- * process and execute it upon receipt as this would provide
446- * very different timing from a non-self message. Specifically,
447- * if we just retain a pointer to the incoming data
448- * and then execute the send callback prior to the receive,
449- * then the caller will think we are done with the data and
450- * can release it. So we have to copy the data in order to
451- * execute the send callback prior to receiving the message.
452- *
453- * In truth, this really is a better mimic of the non-self
454- * message behavior. If we actually pushed the message out
455- * on the wire and had it loop back, then we would receive
456- * a new block of data anyway.
457- */
458- /* setup the send callback */
459- xfer = OBJ_NEW (orte_self_send_xfer_t );
460- if (NULL != req -> send .iov ) {
461- xfer -> iov = req -> send .iov ;
462- xfer -> count = req -> send .count ;
463- xfer -> cbfunc .iov = req -> send .cbfunc .iov ;
464- } else {
465- xfer -> buffer = req -> send .buffer ;
466- xfer -> cbfunc .buffer = req -> send .cbfunc .buffer ;
467- }
468- xfer -> tag = tag ;
469- xfer -> cbdata = req -> send .cbdata ;
470- /* setup the event for the send callback */
471- opal_event_set (orte_event_base , & xfer -> ev , -1 , OPAL_EV_WRITE , send_self_exe , xfer );
472- opal_event_set_priority (& xfer -> ev , ORTE_MSG_PRI );
473- opal_event_active (& xfer -> ev , OPAL_EV_WRITE , 1 );
474-
475- /* copy the message for the recv */
476- rcv = OBJ_NEW (orte_rml_recv_t );
477- rcv -> sender = * peer ;
478- rcv -> tag = tag ;
479- if (NULL != req -> send .iov ) {
480- /* get the total number of bytes in the iovec array */
481- bytes = 0 ;
482- for (i = 0 ; i < req -> send .count ; ++ i ) {
483- bytes += req -> send .iov [i ].iov_len ;
484- }
485- /* get the required memory allocation */
486- if (0 < bytes ) {
487- rcv -> iov .iov_base = (IOVBASE_TYPE * )malloc (bytes );
488- rcv -> iov .iov_len = bytes ;
489- /* transfer the bytes */
490- ptr = (char * )rcv -> iov .iov_base ;
491- for (i = 0 ; i < req -> send .count ; ++ i ) {
492- memcpy (ptr , req -> send .iov [i ].iov_base , req -> send .iov [i ].iov_len );
493- ptr += req -> send .iov [i ].iov_len ;
494- }
495- }
496- } else if (0 < req -> send .buffer -> bytes_used ) {
497- rcv -> iov .iov_base = (IOVBASE_TYPE * )malloc (req -> send .buffer -> bytes_used );
498- memcpy (rcv -> iov .iov_base , req -> send .buffer -> base_ptr , req -> send .buffer -> bytes_used );
499- rcv -> iov .iov_len = req -> send .buffer -> bytes_used ;
500- }
501- /* post the message for receipt - since the send callback was posted
502- * first and has the same priority, it will execute first
503- */
504- ORTE_RML_ACTIVATE_MESSAGE (rcv );
505- OBJ_RELEASE (req );
506- return ;
507- } else {
508429 memcpy (& ui64 , (char * )peer , sizeof (uint64_t ));
509430 if (OPAL_SUCCESS != opal_hash_table_get_value_uint64 (& orte_rml_ofi .peers ,
510431 ui64 , (void * * )& pr ) || NULL == pr ) {
@@ -519,7 +440,6 @@ static void send_msg(int fd, short args, void *cbdata)
519440 dest_ep_name = pr -> ofi_ep ;
520441 dest_ep_namelen = pr -> ofi_ep_len ;
521442 ret = OPAL_SUCCESS ;
522- }
523443 }
524444 if ( OPAL_SUCCESS == ret ) {
525445 //[Debug] printing additional info of IP
@@ -704,6 +624,12 @@ int orte_rml_ofi_send_nb(struct orte_rml_base_module_t* mod,
704624 orte_rml_callback_fn_t cbfunc ,
705625 void * cbdata )
706626{
627+ orte_rml_recv_t * rcv ;
628+ orte_rml_send_t * snd ;
629+ int bytes ;
630+ orte_self_send_xfer_t * xfer ;
631+ int i ;
632+ char * ptr ;
707633 ofi_send_request_t * req ;
708634 orte_rml_ofi_module_t * ofi_mod = (orte_rml_ofi_module_t * )mod ;
709635 int ofi_prov_id = ofi_mod -> cur_transport_id ;
@@ -731,6 +657,69 @@ int orte_rml_ofi_send_nb(struct orte_rml_base_module_t* mod,
731657 ORTE_ERROR_LOG (ORTE_ERR_BAD_PARAM );
732658 return ORTE_ERR_BAD_PARAM ;
733659 }
660+
661+ /* if this is a message to myself, then just post the message
662+ * for receipt - no need to dive into the ofi send_msg()
663+ */
664+ if (OPAL_EQUAL == orte_util_compare_name_fields (ORTE_NS_CMP_ALL , peer , ORTE_PROC_MY_NAME )) { /* local delivery */
665+ OPAL_OUTPUT_VERBOSE ((1 , orte_rml_base_framework .framework_output ,
666+ "%s rml_send_iovec_to_self at tag %d" ,
667+ ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ), tag ));
668+ /* send to self is a tad tricky - we really don't want
669+ * to track the send callback function throughout the recv
670+ * process and execute it upon receipt as this would provide
671+ * very different timing from a non-self message. Specifically,
672+ * if we just retain a pointer to the incoming data
673+ * and then execute the send callback prior to the receive,
674+ * then the caller will think we are done with the data and
675+ * can release it. So we have to copy the data in order to
676+ * execute the send callback prior to receiving the message.
677+ *
678+ * In truth, this really is a better mimic of the non-self
679+ * message behavior. If we actually pushed the message out
680+ * on the wire and had it loop back, then we would receive
681+ * a new block of data anyway.
682+ */
683+
684+ /* setup the send callback */
685+ xfer = OBJ_NEW (orte_self_send_xfer_t );
686+ xfer -> iov = iov ;
687+ xfer -> count = count ;
688+ xfer -> cbfunc .iov = cbfunc ;
689+ xfer -> tag = tag ;
690+ xfer -> cbdata = cbdata ;
691+ /* setup the event for the send callback */
692+ opal_event_set (orte_event_base , & xfer -> ev , -1 , OPAL_EV_WRITE , send_self_exe , xfer );
693+ opal_event_set_priority (& xfer -> ev , ORTE_MSG_PRI );
694+ opal_event_active (& xfer -> ev , OPAL_EV_WRITE , 1 );
695+
696+ /* copy the message for the recv */
697+ rcv = OBJ_NEW (orte_rml_recv_t );
698+ rcv -> sender = * peer ;
699+ rcv -> tag = tag ;
700+ /* get the total number of bytes in the iovec array */
701+ bytes = 0 ;
702+ for (i = 0 ; i < count ; ++ i ) {
703+ bytes += iov [i ].iov_len ;
704+ }
705+ /* get the required memory allocation */
706+ if (0 < bytes ) {
707+ rcv -> iov .iov_base = (IOVBASE_TYPE * )malloc (bytes );
708+ rcv -> iov .iov_len = bytes ;
709+ /* transfer the bytes */
710+ ptr = (char * )rcv -> iov .iov_base ;
711+ for (i = 0 ; i < count ; ++ i ) {
712+ memcpy (ptr , iov [i ].iov_base , iov [i ].iov_len );
713+ ptr += iov [i ].iov_len ;
714+ }
715+ }
716+ /* post the message for receipt - since the send callback was posted
717+ * first and has the same priority, it will execute first
718+ */
719+ ORTE_RML_ACTIVATE_MESSAGE (rcv );
720+ return ORTE_SUCCESS ;
721+ }
722+
734723 /* get ourselves into an event to protect against
735724 * race conditions and threads
736725 */
@@ -759,6 +748,9 @@ int orte_rml_ofi_send_buffer_nb(struct orte_rml_base_module_t *mod,
759748 orte_rml_buffer_callback_fn_t cbfunc ,
760749 void * cbdata )
761750{
751+ orte_rml_recv_t * rcv ;
752+ orte_rml_send_t * snd ;
753+ orte_self_send_xfer_t * xfer ;
762754 ofi_send_request_t * req ;
763755 orte_rml_ofi_module_t * ofi_mod = (orte_rml_ofi_module_t * )mod ;
764756 int ofi_prov_id = ofi_mod -> cur_transport_id ;
@@ -785,6 +777,54 @@ int orte_rml_ofi_send_buffer_nb(struct orte_rml_base_module_t *mod,
785777 ORTE_ERROR_LOG (ORTE_ERR_BAD_PARAM );
786778 return ORTE_ERR_BAD_PARAM ;
787779 }
780+ /* if this is a message to myself, then just post the message
781+ * for receipt - no need to dive into the oob
782+ */
783+ if (OPAL_EQUAL == orte_util_compare_name_fields (ORTE_NS_CMP_ALL , peer , ORTE_PROC_MY_NAME )) { /* local delivery */
784+ OPAL_OUTPUT_VERBOSE ((1 , orte_rml_base_framework .framework_output ,
785+ "%s rml_send_iovec_to_self at tag %d" ,
786+ ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ), tag ));
787+ /* send to self is a tad tricky - we really don't want
788+ * to track the send callback function throughout the recv
789+ * process and execute it upon receipt as this would provide
790+ * very different timing from a non-self message. Specifically,
791+ * if we just retain a pointer to the incoming data
792+ * and then execute the send callback prior to the receive,
793+ * then the caller will think we are done with the data and
794+ * can release it. So we have to copy the data in order to
795+ * execute the send callback prior to receiving the message.
796+ *
797+ * In truth, this really is a better mimic of the non-self
798+ * message behavior. If we actually pushed the message out
799+ * on the wire and had it loop back, then we would receive
800+ * a new block of data anyway.
801+ */
802+
803+ /* setup the send callback */
804+ xfer = OBJ_NEW (orte_self_send_xfer_t );
805+ xfer -> buffer = buffer ;
806+ xfer -> cbfunc .buffer = cbfunc ;
807+ xfer -> tag = tag ;
808+ xfer -> cbdata = cbdata ;
809+ /* setup the event for the send callback */
810+ opal_event_set (orte_event_base , & xfer -> ev , -1 , OPAL_EV_WRITE , send_self_exe , xfer );
811+ opal_event_set_priority (& xfer -> ev , ORTE_MSG_PRI );
812+ opal_event_active (& xfer -> ev , OPAL_EV_WRITE , 1 );
813+
814+ /* copy the message for the recv */
815+ rcv = OBJ_NEW (orte_rml_recv_t );
816+ rcv -> sender = * peer ;
817+ rcv -> tag = tag ;
818+ rcv -> iov .iov_base = (IOVBASE_TYPE * )malloc (buffer -> bytes_used );
819+ memcpy (rcv -> iov .iov_base , buffer -> base_ptr , buffer -> bytes_used );
820+ rcv -> iov .iov_len = buffer -> bytes_used ;
821+ /* post the message for receipt - since the send callback was posted
822+ * first and has the same priority, it will execute first
823+ */
824+ ORTE_RML_ACTIVATE_MESSAGE (rcv );
825+ return ORTE_SUCCESS ;
826+ }
827+
788828 /* get ourselves into an event to protect against
789829 * race conditions and threads
790830 */
0 commit comments