Skip to content

Commit 57b4144

Browse files
committed
orte: use compression for ORTE_DAEMON_REPORT_TOPOLOGY_CMD answer
Refs #3414 Signed-off-by: Gilles Gouaillardet <[email protected]>
1 parent 49cd40b commit 57b4144

File tree

2 files changed

+108
-13
lines changed

2 files changed

+108
-13
lines changed

orte/mca/plm/base/plm_base_launch_support.c

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,10 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
817817
int i;
818818
uint32_t h;
819819
orte_job_t *jdata;
820+
uint8_t flag;
821+
size_t inlen, cmplen;
822+
uint8_t *packed_data, *cmpdata;
823+
opal_buffer_t datbuf, *data;
820824

821825
OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
822826
"%s plm:base:daemon_topology recvd for daemon %s",
@@ -832,10 +836,55 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
832836
orted_failed_launch = true;
833837
goto CLEANUP;
834838
}
839+
OBJ_CONSTRUCT(&datbuf, opal_buffer_t);
840+
/* unpack the flag to see if this payload is compressed */
841+
idx=1;
842+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT8))) {
843+
ORTE_ERROR_LOG(rc);
844+
orted_failed_launch = true;
845+
goto CLEANUP;
846+
}
847+
if (flag) {
848+
/* unpack the data size */
849+
idx=1;
850+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &inlen, &idx, OPAL_SIZE))) {
851+
ORTE_ERROR_LOG(rc);
852+
orted_failed_launch = true;
853+
goto CLEANUP;
854+
}
855+
/* unpack the unpacked data size */
856+
idx=1;
857+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &cmplen, &idx, OPAL_SIZE))) {
858+
ORTE_ERROR_LOG(rc);
859+
orted_failed_launch = true;
860+
goto CLEANUP;
861+
}
862+
/* allocate the space */
863+
packed_data = (uint8_t*)malloc(inlen);
864+
/* unpack the data blob */
865+
idx = inlen;
866+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, packed_data, &idx, OPAL_UINT8))) {
867+
ORTE_ERROR_LOG(rc);
868+
orted_failed_launch = true;
869+
goto CLEANUP;
870+
}
871+
/* decompress the data */
872+
if (orte_util_uncompress_block(&cmpdata, cmplen,
873+
packed_data, inlen)) {
874+
/* the data has been uncompressed */
875+
opal_dss.load(&datbuf, cmpdata, cmplen);
876+
data = &datbuf;
877+
} else {
878+
data = buffer;
879+
}
880+
free(packed_data);
881+
} else {
882+
data = buffer;
883+
}
835884

836885
/* unpack the topology signature for this node */
837886
idx=1;
838-
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &sig, &idx, OPAL_STRING))) {
887+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &sig, &idx, OPAL_STRING))) {
839888
ORTE_ERROR_LOG(rc);
840889
orted_failed_launch = true;
841890
goto CLEANUP;
@@ -861,7 +910,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
861910

862911
/* unpack the topology */
863912
idx=1;
864-
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &topo, &idx, OPAL_HWLOC_TOPO))) {
913+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &topo, &idx, OPAL_HWLOC_TOPO))) {
865914
ORTE_ERROR_LOG(rc);
866915
orted_failed_launch = true;
867916
goto CLEANUP;
@@ -873,7 +922,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
873922

874923
/* unpack any coprocessors */
875924
idx=1;
876-
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &coprocessors, &idx, OPAL_STRING))) {
925+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &coprocessors, &idx, OPAL_STRING))) {
877926
ORTE_ERROR_LOG(rc);
878927
orted_failed_launch = true;
879928
goto CLEANUP;
@@ -900,7 +949,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
900949
}
901950
/* see if this daemon is on a coprocessor */
902951
idx=1;
903-
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &coprocessors, &idx, OPAL_STRING))) {
952+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &coprocessors, &idx, OPAL_STRING))) {
904953
ORTE_ERROR_LOG(rc);
905954
orted_failed_launch = true;
906955
goto CLEANUP;

orte/orted/orted_comm.c

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved.
1616
* Copyright (c) 2010-2011 Oak Ridge National Labs. All rights reserved.
1717
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
18-
* Copyright (c) 2016 Research Organization for Information Science
18+
* Copyright (c) 2016-2017 Research Organization for Information Science
1919
* and Technology (RIST). All rights reserved.
2020
* $COPYRIGHT$
2121
*
@@ -59,6 +59,7 @@
5959
#include "orte/util/session_dir.h"
6060
#include "orte/util/name_fns.h"
6161
#include "orte/util/nidmap.h"
62+
#include "orte/util/compress.h"
6263

6364
#include "orte/mca/errmgr/errmgr.h"
6465
#include "orte/mca/grpcomm/base/base.h"
@@ -101,7 +102,7 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
101102
int32_t signal;
102103
orte_jobid_t job;
103104
char *contact_info;
104-
opal_buffer_t *answer;
105+
opal_buffer_t data, *answer;
105106
orte_job_t *jdata;
106107
orte_process_name_t proc, proc2;
107108
orte_process_name_t *return_addr;
@@ -124,6 +125,9 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
124125
char *rtmod;
125126
char *coprocessors;
126127
orte_job_map_t *map;
128+
int8_t flag;
129+
uint8_t *cmpdata;
130+
size_t cmplen;
127131

128132
/* unpack the command */
129133
n = 1;
@@ -620,36 +624,78 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
620624

621625
/**** REPORT TOPOLOGY COMMAND ****/
622626
case ORTE_DAEMON_REPORT_TOPOLOGY_CMD:
623-
answer = OBJ_NEW(opal_buffer_t);
627+
OBJ_CONSTRUCT(&data, opal_buffer_t);
624628
/* pack the topology signature */
625-
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &orte_topo_signature, 1, OPAL_STRING))) {
629+
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &orte_topo_signature, 1, OPAL_STRING))) {
626630
ORTE_ERROR_LOG(ret);
627-
OBJ_RELEASE(answer);
631+
OBJ_DESTRUCT(&data);
628632
goto CLEANUP;
629633
}
630634
/* pack the topology */
631-
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
635+
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
632636
ORTE_ERROR_LOG(ret);
633-
OBJ_RELEASE(answer);
637+
OBJ_DESTRUCT(&data);
634638
goto CLEANUP;
635639
}
636640

637641
/* detect and add any coprocessors */
638642
coprocessors = opal_hwloc_base_find_coprocessors(opal_hwloc_topology);
639-
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &coprocessors, 1, OPAL_STRING))) {
643+
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &coprocessors, 1, OPAL_STRING))) {
640644
ORTE_ERROR_LOG(ret);
641645
}
642646
if (NULL != coprocessors) {
643647
free(coprocessors);
644648
}
645649
/* see if I am on a coprocessor */
646650
coprocessors = opal_hwloc_base_check_on_coprocessor();
647-
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &coprocessors, 1, OPAL_STRING))) {
651+
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &coprocessors, 1, OPAL_STRING))) {
648652
ORTE_ERROR_LOG(ret);
649653
}
650654
if (NULL!= coprocessors) {
651655
free(coprocessors);
652656
}
657+
answer = OBJ_NEW(opal_buffer_t);
658+
if (orte_util_compress_block((uint8_t*)data.base_ptr, data.bytes_used,
659+
&cmpdata, &cmplen)) {
660+
/* the data was compressed - mark that we compressed it */
661+
flag = 1;
662+
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &flag, 1, OPAL_INT8))) {
663+
ORTE_ERROR_LOG(ret);
664+
free(cmpdata);
665+
OBJ_DESTRUCT(&data);
666+
}
667+
/* pack the compressed length */
668+
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &cmplen, 1, OPAL_SIZE))) {
669+
ORTE_ERROR_LOG(ret);
670+
free(cmpdata);
671+
OBJ_DESTRUCT(&data);
672+
}
673+
/* pack the uncompressed length */
674+
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &data.bytes_used, 1, OPAL_SIZE))) {
675+
ORTE_ERROR_LOG(ret);
676+
free(cmpdata);
677+
OBJ_DESTRUCT(&data);
678+
}
679+
/* pack the compressed info */
680+
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, cmpdata, cmplen, OPAL_UINT8))) {
681+
ORTE_ERROR_LOG(ret);
682+
free(cmpdata);
683+
OBJ_DESTRUCT(&data);
684+
}
685+
OBJ_DESTRUCT(&data);
686+
free(cmpdata);
687+
} else {
688+
/* mark that it was not compressed */
689+
flag = 0;
690+
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &flag, 1, OPAL_INT8))) {
691+
ORTE_ERROR_LOG(ret);
692+
OBJ_DESTRUCT(&data);
693+
free(cmpdata);
694+
}
695+
/* transfer the payload across */
696+
opal_dss.copy_payload(answer, &data);
697+
OBJ_DESTRUCT(&data);
698+
}
653699
/* send the data */
654700
if (0 > (ret = orte_rml.send_buffer_nb(orte_mgmt_conduit,
655701
sender, answer, ORTE_RML_TAG_TOPOLOGY_REPORT,

0 commit comments

Comments
 (0)