Skip to content

Commit fdefd86

Browse files
authored
Merge pull request #2477 from artpol84/oob/v2.x/msg_drop
oob/v2.x/msg drop
2 parents 81adac2 + f508887 commit fdefd86

File tree

1 file changed

+126
-51
lines changed

1 file changed

+126
-51
lines changed

orte/mca/oob/tcp/oob_tcp_connection.c

Lines changed: 126 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* Copyright (c) 2013-2015 Intel, Inc. All rights reserved.
1717
* Copyright (c) 2014-2015 Research Organization for Information Science
1818
* and Technology (RIST). All rights reserved.
19+
* Copyright (c) 2016 Mellanox Technologies Ltd. All rights reserved.
1920
* $COPYRIGHT$
2021
*
2122
* Additional copyrights may follow
@@ -78,6 +79,7 @@
7879

7980
static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer);
8081
static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer);
82+
static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name);
8183
static int tcp_peer_send_blocking(int sd, void* data, size_t size);
8284
static bool tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, int sd,
8385
void* data, size_t size);
@@ -373,8 +375,9 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
373375
{
374376
char *msg;
375377
mca_oob_tcp_hdr_t hdr;
378+
uint16_t ack_flag = htons(1);
376379
int rc;
377-
size_t sdsize;
380+
size_t sdsize, offset = 0;
378381
char *cred;
379382
size_t credsize;
380383

@@ -399,21 +402,26 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
399402
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
400403
(unsigned long)credsize);
401404

402-
/* set the number of bytes to be read beyond the header */
403-
hdr.nbytes = strlen(orte_version_string) + 1 + credsize;
405+
/* payload size */
406+
sdsize = sizeof(ack_flag) + strlen(orte_version_string) + 1 + credsize;
407+
hdr.nbytes = sdsize;
404408
MCA_OOB_TCP_HDR_HTON(&hdr);
405409

406410
/* create a space for our message */
407-
sdsize = sizeof(hdr) + strlen(orte_version_string) + 1 + credsize;
411+
sdsize += sizeof(hdr);
408412
if (NULL == (msg = (char*)malloc(sdsize))) {
409413
return ORTE_ERR_OUT_OF_RESOURCE;
410414
}
411415
memset(msg, 0, sdsize);
412416

413417
/* load the message */
414-
memcpy(msg, &hdr, sizeof(hdr));
415-
memcpy(msg+sizeof(hdr), orte_version_string, strlen(orte_version_string));
416-
memcpy(msg+sizeof(hdr)+strlen(orte_version_string)+1, cred, credsize);
418+
memcpy(msg + offset, &hdr, sizeof(hdr));
419+
offset += sizeof(hdr);
420+
memcpy(msg + offset, &ack_flag, sizeof(ack_flag));
421+
offset += sizeof(ack_flag);
422+
memcpy(msg + offset, orte_version_string, strlen(orte_version_string));
423+
offset += strlen(orte_version_string)+1;
424+
memcpy(msg + offset, cred, credsize);
417425
/* clear the memory */
418426
if (NULL != cred) {
419427
free(cred);
@@ -431,6 +439,53 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
431439
return ORTE_SUCCESS;
432440
}
433441

442+
/* Respond with refuse to the connection request */
443+
static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name)
444+
{
445+
char *msg;
446+
mca_oob_tcp_hdr_t hdr;
447+
uint16_t ack_flag = htons(0);
448+
int rc = ORTE_SUCCESS;
449+
size_t sdsize, offset = 0;
450+
451+
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
452+
"%s SEND CONNECT NACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
453+
454+
/* load the header */
455+
hdr.origin = *ORTE_PROC_MY_NAME;
456+
hdr.dst = name;
457+
hdr.type = MCA_OOB_TCP_IDENT;
458+
hdr.tag = 0;
459+
460+
/* payload size */
461+
sdsize = sizeof(ack_flag);
462+
hdr.nbytes = sdsize;
463+
MCA_OOB_TCP_HDR_HTON(&hdr);
464+
465+
/* create a space for our message */
466+
sdsize += sizeof(hdr);
467+
if (NULL == (msg = (char*)malloc(sdsize))) {
468+
return ORTE_ERR_OUT_OF_RESOURCE;
469+
}
470+
memset(msg, 0, sdsize);
471+
472+
/* load the message */
473+
memcpy(msg + offset, &hdr, sizeof(hdr));
474+
offset += sizeof(hdr);
475+
memcpy(msg + offset, &ack_flag, sizeof(ack_flag));
476+
offset += sizeof(ack_flag);
477+
478+
/* send it */
479+
if (ORTE_SUCCESS != tcp_peer_send_blocking(sd, msg, sdsize)) {
480+
/* it's ok if it fails - remote side may already
481+
* identifiet the collision and closed the connection
482+
*/
483+
rc = ORTE_SUCCESS;
484+
}
485+
free(msg);
486+
return rc;
487+
}
488+
434489
/*
435490
* Initialize events to be used by the peer instance for TCP select/poll callbacks.
436491
*/
@@ -634,6 +689,7 @@ static bool retry(mca_oob_tcp_peer_t* peer, int sd, bool fatal)
634689
return false;
635690
} else {
636691
/* The connection will be retried */
692+
tcp_peer_send_connect_nack(sd, peer->name);
637693
CLOSE_THE_SOCKET(sd);
638694
return true;
639695
}
@@ -647,10 +703,12 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
647703
char *version;
648704
int rc;
649705
char *cred;
650-
size_t credsize;
706+
size_t credsize, offset = 0;
651707
mca_oob_tcp_hdr_t hdr;
652708
mca_oob_tcp_peer_t *peer;
653709
uint64_t *ui64;
710+
uint16_t ack_flag;
711+
bool is_new = (NULL == pr);
654712

655713
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
656714
"%s RECV CONNECT ACK FROM %s ON SOCKET %d",
@@ -679,19 +737,6 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
679737
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
680738
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
681739
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name), sd);
682-
/* check for a race condition - if I was in the process of
683-
* creating a connection to the peer, or have already established
684-
* such a connection, then we need to reject this connection. We will
685-
* let the higher ranked process retry - if I'm the lower ranked
686-
* process, I'll simply defer until I receive the request
687-
*/
688-
if (NULL != peer &&
689-
(MCA_OOB_TCP_CONNECTED == peer->state ||
690-
MCA_OOB_TCP_CONNECTING == peer->state ||
691-
MCA_OOB_TCP_CONNECT_ACK == peer->state ||
692-
MCA_OOB_TCP_CLOSED == peer->state)) {
693-
retry(peer, sd, false);
694-
}
695740
return ORTE_ERR_UNREACH;
696741
}
697742

@@ -746,23 +791,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
746791
CLOSE_THE_SOCKET(sd);
747792
return ORTE_ERR_OUT_OF_RESOURCE;
748793
}
749-
} else {
750-
/* check for a race condition - if I was in the process of
751-
* creating a connection to the peer, or have already established
752-
* such a connection, then we need to reject this connection. We will
753-
* let the higher ranked process retry - if I'm the lower ranked
754-
* process, I'll simply defer until I receive the request
755-
*/
756-
if (MCA_OOB_TCP_CONNECTED == peer->state ||
757-
MCA_OOB_TCP_CONNECTING == peer->state ||
758-
MCA_OOB_TCP_CONNECT_ACK == peer->state) {
759-
if (retry(peer, sd, false)) {
760-
return ORTE_ERR_UNREACH;
761-
}
762-
}
763794
}
764795
} else {
765-
766796
/* compare the peers name to the expected value */
767797
if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, &hdr.origin)) {
768798
opal_output(0, "%s tcp_peer_recv_connect_ack: "
@@ -793,23 +823,68 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
793823
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
794824
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
795825
ORTE_NAME_PRINT(&peer->name), peer->sd);
796-
/* check for a race condition - if I was in the process of
797-
* creating a connection to the peer, or have already established
798-
* such a connection, then we need to reject this connection. We will
799-
* let the higher ranked process retry - if I'm the lower ranked
800-
* process, I'll simply defer until I receive the request
801-
*/
802-
if (MCA_OOB_TCP_CONNECTED == peer->state ||
803-
MCA_OOB_TCP_CONNECTING == peer->state ||
804-
MCA_OOB_TCP_CONNECT_ACK == peer->state) {
805-
retry(peer, sd, true);
826+
free(msg);
827+
return ORTE_ERR_UNREACH;
828+
}
829+
830+
/* Check the type of acknowledgement */
831+
memcpy(&ack_flag, msg + offset, sizeof(ack_flag));
832+
offset += sizeof(ack_flag);
833+
834+
ack_flag = ntohs(ack_flag);
835+
if( !ack_flag ){
836+
if (MCA_OOB_TCP_CONNECT_ACK == peer->state) {
837+
/* We got nack from the remote side which means that
838+
* it will be the initiator of the connection.
839+
*/
840+
841+
/* release the socket */
842+
CLOSE_THE_SOCKET(peer->sd);
843+
peer->sd = -1;
844+
845+
/* unregister active events */
846+
if (peer->recv_ev_active) {
847+
opal_event_del(&peer->recv_event);
848+
peer->recv_ev_active = false;
849+
}
850+
if (peer->send_ev_active) {
851+
opal_event_del(&peer->send_event);
852+
peer->send_ev_active = false;
853+
}
854+
855+
/* change the state so we'll accept the remote
856+
* connection when it'll appear
857+
*/
858+
peer->state = MCA_OOB_TCP_UNCONNECTED;
859+
} else {
860+
/* FIXME: this shouldn't happen. We need to force next address
861+
* to be tried.
862+
*/
863+
mca_oob_tcp_peer_close(peer);
806864
}
807865
free(msg);
808866
return ORTE_ERR_UNREACH;
809867
}
810868

869+
/* check for a race condition - if I was in the process of
870+
* creating a connection to the peer, or have already established
871+
* such a connection, then we need to reject this connection. We will
872+
* let the higher ranked process retry - if I'm the lower ranked
873+
* process, I'll simply defer until I receive the request
874+
*/
875+
if (is_new &&
876+
( MCA_OOB_TCP_CONNECTED == peer->state ||
877+
MCA_OOB_TCP_CONNECTING == peer->state ||
878+
MCA_OOB_TCP_CONNECT_ACK == peer->state ) ) {
879+
if (retry(peer, sd, false)) {
880+
free(msg);
881+
return ORTE_ERR_UNREACH;
882+
}
883+
}
884+
811885
/* check that this is from a matching version */
812-
version = (char*)(msg);
886+
version = (char*)((char*)msg + offset);
887+
offset += strlen(version) + 1;
813888
if (0 != strcmp(version, orte_version_string)) {
814889
opal_output(0, "%s tcp_peer_recv_connect_ack: "
815890
"received different version from %s: %s instead of %s\n",
@@ -828,8 +903,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
828903
ORTE_NAME_PRINT(&peer->name));
829904

830905
/* check security token */
831-
cred = (char*)(msg + strlen(version) + 1);
832-
credsize = hdr.nbytes - strlen(version) - 1;
906+
cred = (char*)((char*)msg + offset);
907+
credsize = hdr.nbytes - offset;
833908
if (OPAL_SUCCESS != (rc = opal_sec.authenticate(cred, credsize, &peer->auth_method))) {
834909
char *hostname;
835910
hostname = orte_get_proc_hostname(&peer->name);
@@ -909,8 +984,6 @@ static void tcp_peer_connected(mca_oob_tcp_peer_t* peer)
909984
*/
910985
void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer)
911986
{
912-
mca_oob_tcp_send_t *snd;
913-
914987
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
915988
"%s tcp_peer_close for %s sd %d state %s",
916989
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@@ -962,10 +1035,12 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer)
9621035
* handle these recycled messages. This prevents us from unintentionally
9631036
* attempting to send the message again across the now-failed interface
9641037
*/
1038+
/*
9651039
if (NULL != peer->send_msg) {
9661040
}
9671041
while (NULL != (snd = (mca_oob_tcp_send_t*)opal_list_remove_first(&peer->send_queue))) {
9681042
}
1043+
*/
9691044
}
9701045

9711046
/*

0 commit comments

Comments
 (0)