Skip to content

Commit c793dc8

Browse files
Merge pull request #3424 from ggouaillardet/topic/compress_hwloc_topo
compress the XML topology sent out-of-band
2 parents 1707022 + 57b4144 commit c793dc8

File tree

3 files changed

+210
-16
lines changed

3 files changed

+210
-16
lines changed

orte/mca/plm/base/plm_base_launch_support.c

Lines changed: 104 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* et Automatique. All rights reserved.
1515
* Copyright (c) 2011-2012 Los Alamos National Security, LLC.
1616
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
17-
* Copyright (c) 2014-2016 Research Organization for Information Science
17+
* Copyright (c) 2014-2017 Research Organization for Information Science
1818
* and Technology (RIST). All rights reserved.
1919
* Copyright (c) 2016 IBM Corporation. All rights reserved.
2020
* $COPYRIGHT$
@@ -817,6 +817,10 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
817817
int i;
818818
uint32_t h;
819819
orte_job_t *jdata;
820+
uint8_t flag;
821+
size_t inlen, cmplen;
822+
uint8_t *packed_data, *cmpdata;
823+
opal_buffer_t datbuf, *data;
820824

821825
OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
822826
"%s plm:base:daemon_topology recvd for daemon %s",
@@ -832,10 +836,55 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
832836
orted_failed_launch = true;
833837
goto CLEANUP;
834838
}
839+
OBJ_CONSTRUCT(&datbuf, opal_buffer_t);
840+
/* unpack the flag to see if this payload is compressed */
841+
idx=1;
842+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT8))) {
843+
ORTE_ERROR_LOG(rc);
844+
orted_failed_launch = true;
845+
goto CLEANUP;
846+
}
847+
if (flag) {
848+
/* unpack the data size */
849+
idx=1;
850+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &inlen, &idx, OPAL_SIZE))) {
851+
ORTE_ERROR_LOG(rc);
852+
orted_failed_launch = true;
853+
goto CLEANUP;
854+
}
855+
/* unpack the unpacked data size */
856+
idx=1;
857+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &cmplen, &idx, OPAL_SIZE))) {
858+
ORTE_ERROR_LOG(rc);
859+
orted_failed_launch = true;
860+
goto CLEANUP;
861+
}
862+
/* allocate the space */
863+
packed_data = (uint8_t*)malloc(inlen);
864+
/* unpack the data blob */
865+
idx = inlen;
866+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, packed_data, &idx, OPAL_UINT8))) {
867+
ORTE_ERROR_LOG(rc);
868+
orted_failed_launch = true;
869+
goto CLEANUP;
870+
}
871+
/* decompress the data */
872+
if (orte_util_uncompress_block(&cmpdata, cmplen,
873+
packed_data, inlen)) {
874+
/* the data has been uncompressed */
875+
opal_dss.load(&datbuf, cmpdata, cmplen);
876+
data = &datbuf;
877+
} else {
878+
data = buffer;
879+
}
880+
free(packed_data);
881+
} else {
882+
data = buffer;
883+
}
835884

836885
/* unpack the topology signature for this node */
837886
idx=1;
838-
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &sig, &idx, OPAL_STRING))) {
887+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &sig, &idx, OPAL_STRING))) {
839888
ORTE_ERROR_LOG(rc);
840889
orted_failed_launch = true;
841890
goto CLEANUP;
@@ -861,7 +910,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
861910

862911
/* unpack the topology */
863912
idx=1;
864-
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &topo, &idx, OPAL_HWLOC_TOPO))) {
913+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &topo, &idx, OPAL_HWLOC_TOPO))) {
865914
ORTE_ERROR_LOG(rc);
866915
orted_failed_launch = true;
867916
goto CLEANUP;
@@ -873,7 +922,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
873922

874923
/* unpack any coprocessors */
875924
idx=1;
876-
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &coprocessors, &idx, OPAL_STRING))) {
925+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &coprocessors, &idx, OPAL_STRING))) {
877926
ORTE_ERROR_LOG(rc);
878927
orted_failed_launch = true;
879928
goto CLEANUP;
@@ -900,7 +949,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
900949
}
901950
/* see if this daemon is on a coprocessor */
902951
idx=1;
903-
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &coprocessors, &idx, OPAL_STRING))) {
952+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &coprocessors, &idx, OPAL_STRING))) {
904953
ORTE_ERROR_LOG(rc);
905954
orted_failed_launch = true;
906955
goto CLEANUP;
@@ -1088,8 +1137,57 @@ void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender,
10881137
/* rank=1 always sends its topology back */
10891138
topo = NULL;
10901139
if (1 == dname.vpid) {
1140+
uint8_t flag;
1141+
size_t inlen, cmplen;
1142+
uint8_t *packed_data, *cmpdata;
1143+
opal_buffer_t datbuf, *data;
1144+
OBJ_CONSTRUCT(&datbuf, opal_buffer_t);
1145+
/* unpack the flag to see if this payload is compressed */
1146+
idx=1;
1147+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT8))) {
1148+
ORTE_ERROR_LOG(rc);
1149+
orted_failed_launch = true;
1150+
goto CLEANUP;
1151+
}
1152+
if (flag) {
1153+
/* unpack the data size */
1154+
idx=1;
1155+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &inlen, &idx, OPAL_SIZE))) {
1156+
ORTE_ERROR_LOG(rc);
1157+
orted_failed_launch = true;
1158+
goto CLEANUP;
1159+
}
1160+
/* unpack the unpacked data size */
1161+
idx=1;
1162+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &cmplen, &idx, OPAL_SIZE))) {
1163+
ORTE_ERROR_LOG(rc);
1164+
orted_failed_launch = true;
1165+
goto CLEANUP;
1166+
}
1167+
/* allocate the space */
1168+
packed_data = (uint8_t*)malloc(inlen);
1169+
/* unpack the data blob */
1170+
idx = inlen;
1171+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, packed_data, &idx, OPAL_UINT8))) {
1172+
ORTE_ERROR_LOG(rc);
1173+
orted_failed_launch = true;
1174+
goto CLEANUP;
1175+
}
1176+
/* decompress the data */
1177+
if (orte_util_uncompress_block(&cmpdata, cmplen,
1178+
packed_data, inlen)) {
1179+
/* the data has been uncompressed */
1180+
opal_dss.load(&datbuf, cmpdata, cmplen);
1181+
data = &datbuf;
1182+
} else {
1183+
data = buffer;
1184+
}
1185+
free(packed_data);
1186+
} else {
1187+
data = buffer;
1188+
}
10911189
idx=1;
1092-
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &topo, &idx, OPAL_HWLOC_TOPO))) {
1190+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &topo, &idx, OPAL_HWLOC_TOPO))) {
10931191
ORTE_ERROR_LOG(rc);
10941192
orted_failed_launch = true;
10951193
goto CLEANUP;

orte/orted/orted_comm.c

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved.
1616
* Copyright (c) 2010-2011 Oak Ridge National Labs. All rights reserved.
1717
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
18-
* Copyright (c) 2016 Research Organization for Information Science
18+
* Copyright (c) 2016-2017 Research Organization for Information Science
1919
* and Technology (RIST). All rights reserved.
2020
* $COPYRIGHT$
2121
*
@@ -59,6 +59,7 @@
5959
#include "orte/util/session_dir.h"
6060
#include "orte/util/name_fns.h"
6161
#include "orte/util/nidmap.h"
62+
#include "orte/util/compress.h"
6263

6364
#include "orte/mca/errmgr/errmgr.h"
6465
#include "orte/mca/grpcomm/base/base.h"
@@ -101,7 +102,7 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
101102
int32_t signal;
102103
orte_jobid_t job;
103104
char *contact_info;
104-
opal_buffer_t *answer;
105+
opal_buffer_t data, *answer;
105106
orte_job_t *jdata;
106107
orte_process_name_t proc, proc2;
107108
orte_process_name_t *return_addr;
@@ -124,6 +125,9 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
124125
char *rtmod;
125126
char *coprocessors;
126127
orte_job_map_t *map;
128+
int8_t flag;
129+
uint8_t *cmpdata;
130+
size_t cmplen;
127131

128132
/* unpack the command */
129133
n = 1;
@@ -620,36 +624,78 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
620624

621625
/**** REPORT TOPOLOGY COMMAND ****/
622626
case ORTE_DAEMON_REPORT_TOPOLOGY_CMD:
623-
answer = OBJ_NEW(opal_buffer_t);
627+
OBJ_CONSTRUCT(&data, opal_buffer_t);
624628
/* pack the topology signature */
625-
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &orte_topo_signature, 1, OPAL_STRING))) {
629+
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &orte_topo_signature, 1, OPAL_STRING))) {
626630
ORTE_ERROR_LOG(ret);
627-
OBJ_RELEASE(answer);
631+
OBJ_DESTRUCT(&data);
628632
goto CLEANUP;
629633
}
630634
/* pack the topology */
631-
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
635+
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
632636
ORTE_ERROR_LOG(ret);
633-
OBJ_RELEASE(answer);
637+
OBJ_DESTRUCT(&data);
634638
goto CLEANUP;
635639
}
636640

637641
/* detect and add any coprocessors */
638642
coprocessors = opal_hwloc_base_find_coprocessors(opal_hwloc_topology);
639-
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &coprocessors, 1, OPAL_STRING))) {
643+
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &coprocessors, 1, OPAL_STRING))) {
640644
ORTE_ERROR_LOG(ret);
641645
}
642646
if (NULL != coprocessors) {
643647
free(coprocessors);
644648
}
645649
/* see if I am on a coprocessor */
646650
coprocessors = opal_hwloc_base_check_on_coprocessor();
647-
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &coprocessors, 1, OPAL_STRING))) {
651+
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &coprocessors, 1, OPAL_STRING))) {
648652
ORTE_ERROR_LOG(ret);
649653
}
650654
if (NULL!= coprocessors) {
651655
free(coprocessors);
652656
}
657+
answer = OBJ_NEW(opal_buffer_t);
658+
if (orte_util_compress_block((uint8_t*)data.base_ptr, data.bytes_used,
659+
&cmpdata, &cmplen)) {
660+
/* the data was compressed - mark that we compressed it */
661+
flag = 1;
662+
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &flag, 1, OPAL_INT8))) {
663+
ORTE_ERROR_LOG(ret);
664+
free(cmpdata);
665+
OBJ_DESTRUCT(&data);
666+
}
667+
/* pack the compressed length */
668+
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &cmplen, 1, OPAL_SIZE))) {
669+
ORTE_ERROR_LOG(ret);
670+
free(cmpdata);
671+
OBJ_DESTRUCT(&data);
672+
}
673+
/* pack the uncompressed length */
674+
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &data.bytes_used, 1, OPAL_SIZE))) {
675+
ORTE_ERROR_LOG(ret);
676+
free(cmpdata);
677+
OBJ_DESTRUCT(&data);
678+
}
679+
/* pack the compressed info */
680+
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, cmpdata, cmplen, OPAL_UINT8))) {
681+
ORTE_ERROR_LOG(ret);
682+
free(cmpdata);
683+
OBJ_DESTRUCT(&data);
684+
}
685+
OBJ_DESTRUCT(&data);
686+
free(cmpdata);
687+
} else {
688+
/* mark that it was not compressed */
689+
flag = 0;
690+
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &flag, 1, OPAL_INT8))) {
691+
ORTE_ERROR_LOG(ret);
692+
OBJ_DESTRUCT(&data);
693+
free(cmpdata);
694+
}
695+
/* transfer the payload across */
696+
opal_dss.copy_payload(answer, &data);
697+
OBJ_DESTRUCT(&data);
698+
}
653699
/* send the data */
654700
if (0 > (ret = orte_rml.send_buffer_nb(orte_mgmt_conduit,
655701
sender, answer, ORTE_RML_TAG_TOPOLOGY_REPORT,

orte/orted/orted_main.c

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
#include "orte/util/parse_options.h"
7777
#include "orte/mca/rml/base/rml_contact.h"
7878
#include "orte/util/pre_condition_transports.h"
79+
#include "orte/util/compress.h"
7980

8081
#include "orte/mca/errmgr/errmgr.h"
8182
#include "orte/mca/ess/ess.h"
@@ -793,9 +794,58 @@ int orte_daemon(int argc, char *argv[])
793794
/* if we are rank=1, then send our topology back - otherwise, mpirun
794795
* will request it if necessary */
795796
if (1 == ORTE_PROC_MY_NAME->vpid) {
796-
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
797+
opal_buffer_t data;
798+
int8_t flag;
799+
uint8_t *cmpdata;
800+
size_t cmplen;
801+
802+
/* setup an intermediate buffer */
803+
OBJ_CONSTRUCT(&data, opal_buffer_t);
804+
805+
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
797806
ORTE_ERROR_LOG(ret);
798807
}
808+
if (orte_util_compress_block((uint8_t*)data.base_ptr, data.bytes_used,
809+
&cmpdata, &cmplen)) {
810+
/* the data was compressed - mark that we compressed it */
811+
flag = 1;
812+
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
813+
ORTE_ERROR_LOG(ret);
814+
free(cmpdata);
815+
OBJ_DESTRUCT(&data);
816+
}
817+
/* pack the compressed length */
818+
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &cmplen, 1, OPAL_SIZE))) {
819+
ORTE_ERROR_LOG(ret);
820+
free(cmpdata);
821+
OBJ_DESTRUCT(&data);
822+
}
823+
/* pack the uncompressed length */
824+
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &data.bytes_used, 1, OPAL_SIZE))) {
825+
ORTE_ERROR_LOG(ret);
826+
free(cmpdata);
827+
OBJ_DESTRUCT(&data);
828+
}
829+
/* pack the compressed info */
830+
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, cmpdata, cmplen, OPAL_UINT8))) {
831+
ORTE_ERROR_LOG(ret);
832+
free(cmpdata);
833+
OBJ_DESTRUCT(&data);
834+
}
835+
OBJ_DESTRUCT(&data);
836+
free(cmpdata);
837+
} else {
838+
/* mark that it was not compressed */
839+
flag = 0;
840+
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
841+
ORTE_ERROR_LOG(ret);
842+
OBJ_DESTRUCT(&data);
843+
free(cmpdata);
844+
}
845+
/* transfer the payload across */
846+
opal_dss.copy_payload(buffer, &data);
847+
OBJ_DESTRUCT(&data);
848+
}
799849
}
800850

801851
/* send it to the designated target */

0 commit comments

Comments
 (0)