Skip to content

Commit 6ddb487

Browse files
anandhisRalph Castain
authored andcommitted
Cleaned up the send_msg(), moved checking for send to self into the send_nb()
and send_buffer_nb() modified: orte/mca/rml/ofi/rml_ofi_send.c Signed-off-by: Anandhi Jayakumar <[email protected]>
1 parent 5e9be76 commit 6ddb487

File tree

1 file changed

+120
-80
lines changed

1 file changed

+120
-80
lines changed

orte/mca/rml/ofi/rml_ofi_send.c

Lines changed: 120 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -411,16 +411,6 @@ static void send_msg(int fd, short args, void *cbdata)
411411
/* get the peer address by doing modex_receive */
412412
opal_output_verbose(10, orte_rml_base_framework.framework_output,
413413
"%s calling OPAL_MODEX_RECV_STRING ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) );
414-
// if dest is same as me then instead of doing lookup just populate the dest_ep_name
415-
/*if (!ORTE_PROC_IS_APP && peer->jobid == ORTE_PROC_MY_NAME->jobid && peer->vpid == ORTE_PROC_MY_NAME->vpid) {
416-
dest_ep_namelen = orte_rml_ofi.ofi_prov[ofi_prov_id].epnamelen;
417-
dest_ep_name = (char *)calloc(dest_ep_namelen,sizeof(char));
418-
memcpy( dest_ep_name, orte_rml_ofi.ofi_prov[ofi_prov_id].ep_name,dest_ep_namelen);
419-
opal_output_verbose(1, orte_rml_base_framework.framework_output,
420-
"%s rml:ofi: send and dest are same so proceeding with cur provider ep_name ",
421-
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
422-
ret = OPAL_SUCCESS;
423-
} else {*/
424414
if (ORTE_PROC_IS_APP ) {
425415
asprintf(&pmix_key,"%s%d",orte_rml_ofi.ofi_prov[ofi_prov_id].fabric_info->fabric_attr->prov_name,ofi_prov_id);
426416
opal_output_verbose(1, orte_rml_base_framework.framework_output,
@@ -436,75 +426,6 @@ static void send_msg(int fd, short args, void *cbdata)
436426
opal_output_verbose(1, orte_rml_base_framework.framework_output,
437427
"%s calling OPAL_MODEX_RECV_STRING for DAEMON peer %s",
438428
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer));
439-
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) {
440-
opal_output_verbose(1, orte_rml_base_framework.framework_output,
441-
"%s rml_ofi_send_to_self at tag %d",
442-
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag);
443-
/* send to self is a tad tricky - we really don't want
444-
* to track the send callback function throughout the recv
445-
* process and execute it upon receipt as this would provide
446-
* very different timing from a non-self message. Specifically,
447-
* if we just retain a pointer to the incoming data
448-
* and then execute the send callback prior to the receive,
449-
* then the caller will think we are done with the data and
450-
* can release it. So we have to copy the data in order to
451-
* execute the send callback prior to receiving the message.
452-
*
453-
* In truth, this really is a better mimic of the non-self
454-
* message behavior. If we actually pushed the message out
455-
* on the wire and had it loop back, then we would receive
456-
* a new block of data anyway.
457-
*/
458-
/* setup the send callback */
459-
xfer = OBJ_NEW(orte_self_send_xfer_t);
460-
if (NULL != req->send.iov) {
461-
xfer->iov = req->send.iov;
462-
xfer->count = req->send.count;
463-
xfer->cbfunc.iov = req->send.cbfunc.iov;
464-
} else {
465-
xfer->buffer = req->send.buffer;
466-
xfer->cbfunc.buffer = req->send.cbfunc.buffer;
467-
}
468-
xfer->tag = tag;
469-
xfer->cbdata = req->send.cbdata;
470-
/* setup the event for the send callback */
471-
opal_event_set(orte_event_base, &xfer->ev, -1, OPAL_EV_WRITE, send_self_exe, xfer);
472-
opal_event_set_priority(&xfer->ev, ORTE_MSG_PRI);
473-
opal_event_active(&xfer->ev, OPAL_EV_WRITE, 1);
474-
475-
/* copy the message for the recv */
476-
rcv = OBJ_NEW(orte_rml_recv_t);
477-
rcv->sender = *peer;
478-
rcv->tag = tag;
479-
if (NULL != req->send.iov) {
480-
/* get the total number of bytes in the iovec array */
481-
bytes = 0;
482-
for (i = 0 ; i < req->send.count ; ++i) {
483-
bytes += req->send.iov[i].iov_len;
484-
}
485-
/* get the required memory allocation */
486-
if (0 < bytes) {
487-
rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(bytes);
488-
rcv->iov.iov_len = bytes;
489-
/* transfer the bytes */
490-
ptr = (char*)rcv->iov.iov_base;
491-
for (i = 0 ; i < req->send.count ; ++i) {
492-
memcpy(ptr, req->send.iov[i].iov_base, req->send.iov[i].iov_len);
493-
ptr += req->send.iov[i].iov_len;
494-
}
495-
}
496-
} else if (0 < req->send.buffer->bytes_used) {
497-
rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(req->send.buffer->bytes_used);
498-
memcpy(rcv->iov.iov_base, req->send.buffer->base_ptr, req->send.buffer->bytes_used);
499-
rcv->iov.iov_len = req->send.buffer->bytes_used;
500-
}
501-
/* post the message for receipt - since the send callback was posted
502-
* first and has the same priority, it will execute first
503-
*/
504-
ORTE_RML_ACTIVATE_MESSAGE(rcv);
505-
OBJ_RELEASE(req);
506-
return;
507-
} else {
508429
memcpy(&ui64, (char*)peer, sizeof(uint64_t));
509430
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_rml_ofi.peers,
510431
ui64, (void**)&pr) || NULL == pr) {
@@ -519,7 +440,6 @@ static void send_msg(int fd, short args, void *cbdata)
519440
dest_ep_name = pr->ofi_ep;
520441
dest_ep_namelen = pr->ofi_ep_len;
521442
ret = OPAL_SUCCESS;
522-
}
523443
}
524444
if ( OPAL_SUCCESS == ret) {
525445
//[Debug] printing additional info of IP
@@ -704,6 +624,12 @@ int orte_rml_ofi_send_nb(struct orte_rml_base_module_t* mod,
704624
orte_rml_callback_fn_t cbfunc,
705625
void* cbdata)
706626
{
627+
orte_rml_recv_t *rcv;
628+
orte_rml_send_t *snd;
629+
int bytes;
630+
orte_self_send_xfer_t *xfer;
631+
int i;
632+
char* ptr;
707633
ofi_send_request_t *req;
708634
orte_rml_ofi_module_t *ofi_mod = (orte_rml_ofi_module_t*)mod;
709635
int ofi_prov_id = ofi_mod->cur_transport_id;
@@ -731,6 +657,69 @@ int orte_rml_ofi_send_nb(struct orte_rml_base_module_t* mod,
731657
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
732658
return ORTE_ERR_BAD_PARAM;
733659
}
660+
661+
/* if this is a message to myself, then just post the message
662+
* for receipt - no need to dive into the ofi send_msg()
663+
*/
664+
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) { /* local delivery */
665+
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
666+
"%s rml_send_iovec_to_self at tag %d",
667+
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
668+
/* send to self is a tad tricky - we really don't want
669+
* to track the send callback function throughout the recv
670+
* process and execute it upon receipt as this would provide
671+
* very different timing from a non-self message. Specifically,
672+
* if we just retain a pointer to the incoming data
673+
* and then execute the send callback prior to the receive,
674+
* then the caller will think we are done with the data and
675+
* can release it. So we have to copy the data in order to
676+
* execute the send callback prior to receiving the message.
677+
*
678+
* In truth, this really is a better mimic of the non-self
679+
* message behavior. If we actually pushed the message out
680+
* on the wire and had it loop back, then we would receive
681+
* a new block of data anyway.
682+
*/
683+
684+
/* setup the send callback */
685+
xfer = OBJ_NEW(orte_self_send_xfer_t);
686+
xfer->iov = iov;
687+
xfer->count = count;
688+
xfer->cbfunc.iov = cbfunc;
689+
xfer->tag = tag;
690+
xfer->cbdata = cbdata;
691+
/* setup the event for the send callback */
692+
opal_event_set(orte_event_base, &xfer->ev, -1, OPAL_EV_WRITE, send_self_exe, xfer);
693+
opal_event_set_priority(&xfer->ev, ORTE_MSG_PRI);
694+
opal_event_active(&xfer->ev, OPAL_EV_WRITE, 1);
695+
696+
/* copy the message for the recv */
697+
rcv = OBJ_NEW(orte_rml_recv_t);
698+
rcv->sender = *peer;
699+
rcv->tag = tag;
700+
/* get the total number of bytes in the iovec array */
701+
bytes = 0;
702+
for (i = 0 ; i < count ; ++i) {
703+
bytes += iov[i].iov_len;
704+
}
705+
/* get the required memory allocation */
706+
if (0 < bytes) {
707+
rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(bytes);
708+
rcv->iov.iov_len = bytes;
709+
/* transfer the bytes */
710+
ptr = (char*)rcv->iov.iov_base;
711+
for (i = 0 ; i < count ; ++i) {
712+
memcpy(ptr, iov[i].iov_base, iov[i].iov_len);
713+
ptr += iov[i].iov_len;
714+
}
715+
}
716+
/* post the message for receipt - since the send callback was posted
717+
* first and has the same priority, it will execute first
718+
*/
719+
ORTE_RML_ACTIVATE_MESSAGE(rcv);
720+
return ORTE_SUCCESS;
721+
}
722+
734723
/* get ourselves into an event to protect against
735724
* race conditions and threads
736725
*/
@@ -759,6 +748,9 @@ int orte_rml_ofi_send_buffer_nb(struct orte_rml_base_module_t *mod,
759748
orte_rml_buffer_callback_fn_t cbfunc,
760749
void* cbdata)
761750
{
751+
orte_rml_recv_t *rcv;
752+
orte_rml_send_t *snd;
753+
orte_self_send_xfer_t *xfer;
762754
ofi_send_request_t *req;
763755
orte_rml_ofi_module_t *ofi_mod = (orte_rml_ofi_module_t*)mod;
764756
int ofi_prov_id = ofi_mod->cur_transport_id;
@@ -785,6 +777,54 @@ int orte_rml_ofi_send_buffer_nb(struct orte_rml_base_module_t *mod,
785777
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
786778
return ORTE_ERR_BAD_PARAM;
787779
}
780+
/* if this is a message to myself, then just post the message
781+
* for receipt - no need to dive into the oob
782+
*/
783+
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) { /* local delivery */
784+
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
785+
"%s rml_send_iovec_to_self at tag %d",
786+
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
787+
/* send to self is a tad tricky - we really don't want
788+
* to track the send callback function throughout the recv
789+
* process and execute it upon receipt as this would provide
790+
* very different timing from a non-self message. Specifically,
791+
* if we just retain a pointer to the incoming data
792+
* and then execute the send callback prior to the receive,
793+
* then the caller will think we are done with the data and
794+
* can release it. So we have to copy the data in order to
795+
* execute the send callback prior to receiving the message.
796+
*
797+
* In truth, this really is a better mimic of the non-self
798+
* message behavior. If we actually pushed the message out
799+
* on the wire and had it loop back, then we would receive
800+
* a new block of data anyway.
801+
*/
802+
803+
/* setup the send callback */
804+
xfer = OBJ_NEW(orte_self_send_xfer_t);
805+
xfer->buffer = buffer;
806+
xfer->cbfunc.buffer = cbfunc;
807+
xfer->tag = tag;
808+
xfer->cbdata = cbdata;
809+
/* setup the event for the send callback */
810+
opal_event_set(orte_event_base, &xfer->ev, -1, OPAL_EV_WRITE, send_self_exe, xfer);
811+
opal_event_set_priority(&xfer->ev, ORTE_MSG_PRI);
812+
opal_event_active(&xfer->ev, OPAL_EV_WRITE, 1);
813+
814+
/* copy the message for the recv */
815+
rcv = OBJ_NEW(orte_rml_recv_t);
816+
rcv->sender = *peer;
817+
rcv->tag = tag;
818+
rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(buffer->bytes_used);
819+
memcpy(rcv->iov.iov_base, buffer->base_ptr, buffer->bytes_used);
820+
rcv->iov.iov_len = buffer->bytes_used;
821+
/* post the message for receipt - since the send callback was posted
822+
* first and has the same priority, it will execute first
823+
*/
824+
ORTE_RML_ACTIVATE_MESSAGE(rcv);
825+
return ORTE_SUCCESS;
826+
}
827+
788828
/* get ourselves into an event to protect against
789829
* race conditions and threads
790830
*/

0 commit comments

Comments
 (0)