Skip to content
This repository was archived by the owner on Sep 30, 2022. It is now read-only.

Commit 2d11571

Browse files
author
Ralph Castain
committed
Fix the grpcomm operations at scale. Restore the direct component to be the default, and to execute a rollup collective. This may in fact be faster than the alternatives, and something appears broken at scale when using brks in particular. Turn off the rcd and brks components as they don't work at scale right now - they can be restored at some future point when someone can debug them.
Adjust to Jeff's quibbles (cherry picked from commit open-mpi/ompi@68912d0)
1 parent efeac60 commit 2d11571

File tree

8 files changed

+116
-59
lines changed

8 files changed

+116
-59
lines changed

orte/mca/grpcomm/base/grpcomm_base_frame.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
* All rights reserved.
1313
* Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights
1414
* reserved.
15-
* Copyright (c) 2014 Intel, Inc. All rights reserved.
15+
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
1616
* Copyright (c) 2015 Research Organization for Information Science
1717
* and Technology (RIST). All rights reserved.
1818
* $COPYRIGHT$
@@ -122,6 +122,7 @@ static void ccon(orte_grpcomm_coll_t *p)
122122
OBJ_CONSTRUCT(&p->distance_mask_recv, opal_bitmap_t);
123123
p->dmns = NULL;
124124
p->ndmns = 0;
125+
p->nexpected = 0;
125126
p->nreported = 0;
126127
p->cbfunc = NULL;
127128
p->cbdata = NULL;

orte/mca/grpcomm/base/grpcomm_base_stubs.c

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ static void allgather_stub(int fd, short args, void *cbdata)
157157
ret = opal_hash_table_set_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void *)&cd->sig->seq_num);
158158
if (OPAL_SUCCESS != ret) {
159159
OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
160-
"%s rpcomm:base:allgather can't not add new signature to hash table",
160+
"%s rpcomm:base:allgather cannot add new signature to hash table",
161161
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
162162
ORTE_ERROR_LOG(ret);
163163
OBJ_RELEASE(cd);
@@ -208,6 +208,9 @@ orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig
208208
{
209209
orte_grpcomm_coll_t *coll;
210210
int rc;
211+
orte_namelist_t *nm;
212+
opal_list_t children;
213+
size_t n;
211214

212215
/* search the existing tracker list to see if this already exists */
213216
OPAL_LIST_FOREACH(coll, &orte_grpcomm_base.ongoing, orte_grpcomm_coll_t) {
@@ -254,6 +257,30 @@ orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig
254257
ORTE_ERROR_LOG(rc);
255258
return NULL;
256259
}
260+
/* cycle thru the array of daemons and compare them to our
261+
* children in the routing tree, counting the ones that match
262+
* so we know how many daemons we should receive contributions from */
263+
OBJ_CONSTRUCT(&children, opal_list_t);
264+
orte_routed.get_routing_list(&children);
265+
while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&children))) {
266+
for (n=0; n < coll->ndmns; n++) {
267+
if (nm->name.vpid == coll->dmns[n]) {
268+
coll->nexpected++;
269+
break;
270+
}
271+
}
272+
OBJ_RELEASE(nm);
273+
}
274+
OPAL_LIST_DESTRUCT(&children);
275+
/* see if I am in the array of participants - note that I may
276+
* be in the rollup tree even though I'm not participating
277+
* in the collective itself */
278+
for (n=0; n < coll->ndmns; n++) {
279+
if (coll->dmns[n] == ORTE_PROC_MY_NAME->vpid) {
280+
coll->nexpected++;
281+
break;
282+
}
283+
}
257284
return coll;
258285
}
259286

@@ -292,6 +319,9 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
292319
/* all daemons hosting this jobid are participating */
293320
if (NULL == (jdata = orte_get_job_data_object(sig->signature[0].jobid))) {
294321
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
322+
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
323+
*ndmns = 0;
324+
*dmns = NULL;
295325
return ORTE_ERR_NOT_FOUND;
296326
}
297327
if (NULL == jdata->map) {
@@ -321,7 +351,10 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
321351
/* should never happen */
322352
ORTE_ERROR_LOG(ORTE_ERROR);
323353
free(dns);
324-
return ORTE_ERROR;
354+
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
355+
*ndmns = 0;
356+
*dmns = NULL;
357+
return ORTE_ERR_NOT_FOUND;
325358
}
326359
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
327360
"%s grpcomm:base:create_dmns adding daemon %s to array",
@@ -338,6 +371,9 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
338371
if (NULL == (jdata = orte_get_job_data_object(sig->signature[n].jobid))) {
339372
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
340373
OPAL_LIST_DESTRUCT(&ds);
374+
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
375+
*ndmns = 0;
376+
*dmns = NULL;
341377
return ORTE_ERR_NOT_FOUND;
342378
}
343379
opal_output_verbose(5, orte_grpcomm_base_framework.framework_output,
@@ -347,12 +383,17 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
347383
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, sig->signature[n].vpid))) {
348384
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
349385
OPAL_LIST_DESTRUCT(&ds);
386+
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
387+
*ndmns = 0;
388+
*dmns = NULL;
350389
return ORTE_ERR_NOT_FOUND;
351390
}
352391
if (NULL == proc->node || NULL == proc->node->daemon) {
353392
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
354393
OPAL_LIST_DESTRUCT(&ds);
355394
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
395+
*ndmns = 0;
396+
*dmns = NULL;
356397
return ORTE_ERR_NOT_FOUND;
357398
}
358399
vpid = proc->node->daemon->name.vpid;
@@ -372,7 +413,10 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
372413
if (0 == opal_list_get_size(&ds)) {
373414
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
374415
OPAL_LIST_DESTRUCT(&ds);
375-
return ORTE_ERR_BAD_PARAM;
416+
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
417+
*ndmns = 0;
418+
*dmns = NULL;
419+
return ORTE_ERR_NOT_FOUND;
376420
}
377421
dns = (orte_vpid_t*)malloc(opal_list_get_size(&ds) * sizeof(orte_vpid_t));
378422
nds = 0;

orte/mca/grpcomm/brks/.opal_ignore

Whitespace-only changes.

orte/mca/grpcomm/direct/grpcomm_direct.c

Lines changed: 57 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
66
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All
77
* rights reserved.
8-
* Copyright (c) 2014-2015 Intel, Inc. All rights reserved.
8+
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
99
* Copyright (c) 2014 Research Organization for Information Science
1010
* and Technology (RIST). All rights reserved.
1111
* $COPYRIGHT$
@@ -124,7 +124,7 @@ static int xcast(orte_vpid_t *vpids,
124124
static int allgather(orte_grpcomm_coll_t *coll,
125125
opal_buffer_t *buf)
126126
{
127-
int rc, ret;
127+
int rc;
128128
opal_buffer_t *relay;
129129

130130
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
@@ -143,35 +143,16 @@ static int allgather(orte_grpcomm_coll_t *coll,
143143
return rc;
144144
}
145145

146-
/* if we are the HNP and nobody else is participating,
147-
* then just execute the xcast */
148-
if (ORTE_PROC_IS_HNP && 1 == coll->ndmns) {
149-
/* pack the status - success since the allgather completed. This
150-
* would be an error if we timeout instead */
151-
ret = ORTE_SUCCESS;
152-
if (OPAL_SUCCESS != (rc = opal_dss.pack(relay, &ret, 1, OPAL_INT))) {
153-
ORTE_ERROR_LOG(rc);
154-
OBJ_RELEASE(relay);
155-
return rc;
156-
}
157-
/* pass along the payload */
158-
opal_dss.copy_payload(relay, buf);
159-
orte_grpcomm.xcast(coll->sig, ORTE_RML_TAG_COLL_RELEASE, relay);
160-
OBJ_RELEASE(relay);
161-
return ORTE_SUCCESS;
162-
}
163-
164146
/* pass along the payload */
165147
opal_dss.copy_payload(relay, buf);
166148

167-
/* otherwise, we need to send this to the HNP for
168-
* processing */
149+
/* send this to ourselves for processing */
169150
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
170-
"%s grpcomm:direct:allgather sending to HNP",
151+
"%s grpcomm:direct:allgather sending to ourself",
171152
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
172153

173-
/* send the info to the HNP for tracking */
174-
rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, relay,
154+
/* send the info to ourselves for tracking */
155+
rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_NAME, relay,
175156
ORTE_RML_TAG_ALLGATHER_DIRECT,
176157
orte_rml_send_callback, NULL);
177158
return rc;
@@ -212,35 +193,60 @@ static void allgather_recv(int status, orte_process_name_t* sender,
212193
opal_dss.copy_payload(&coll->bucket, buffer);
213194

214195
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
215-
"%s grpcomm:direct allgather recv ndmns %d nrep %d",
196+
"%s grpcomm:direct allgather recv nexpected %d nrep %d",
216197
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
217-
(int)coll->ndmns, (int)coll->nreported));
218-
219-
/* if all participating daemons have reported */
220-
if (coll->ndmns == coll->nreported) {
221-
reply = OBJ_NEW(opal_buffer_t);
222-
/* pack the signature */
223-
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
224-
ORTE_ERROR_LOG(rc);
225-
OBJ_RELEASE(reply);
226-
OBJ_RELEASE(sig);
227-
return;
228-
}
229-
/* pack the status - success since the allgather completed. This
230-
* would be an error if we timeout instead */
231-
ret = ORTE_SUCCESS;
232-
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
233-
ORTE_ERROR_LOG(rc);
198+
(int)coll->nexpected, (int)coll->nreported));
199+
200+
/* see if everyone has reported */
201+
if (coll->nreported == coll->nexpected) {
202+
if (ORTE_PROC_IS_HNP) {
203+
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
204+
"%s grpcomm:direct allgather HNP reports complete",
205+
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
206+
/* the allgather is complete - send the xcast */
207+
reply = OBJ_NEW(opal_buffer_t);
208+
/* pack the signature */
209+
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
210+
ORTE_ERROR_LOG(rc);
211+
OBJ_RELEASE(reply);
212+
OBJ_RELEASE(sig);
213+
return;
214+
}
215+
/* pack the status - success since the allgather completed. This
216+
* would be an error if we timeout instead */
217+
ret = ORTE_SUCCESS;
218+
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
219+
ORTE_ERROR_LOG(rc);
220+
OBJ_RELEASE(reply);
221+
OBJ_RELEASE(sig);
222+
return;
223+
}
224+
/* transfer the collected bucket */
225+
opal_dss.copy_payload(reply, &coll->bucket);
226+
/* send the release via xcast */
227+
(void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply);
234228
OBJ_RELEASE(reply);
235-
OBJ_RELEASE(sig);
236-
return;
229+
} else {
230+
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
231+
"%s grpcomm:direct allgather rollup complete - sending to %s",
232+
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
233+
ORTE_NAME_PRINT(ORTE_PROC_MY_PARENT)));
234+
/* relay the bucket upward */
235+
reply = OBJ_NEW(opal_buffer_t);
236+
/* pack the signature */
237+
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
238+
ORTE_ERROR_LOG(rc);
239+
OBJ_RELEASE(reply);
240+
OBJ_RELEASE(sig);
241+
return;
242+
}
243+
/* transfer the collected bucket */
244+
opal_dss.copy_payload(reply, &coll->bucket);
245+
/* send the info to our parent */
246+
rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_PARENT, reply,
247+
ORTE_RML_TAG_ALLGATHER_DIRECT,
248+
orte_rml_send_callback, NULL);
237249
}
238-
/* transfer the collected bucket */
239-
opal_dss.copy_payload(reply, &coll->bucket);
240-
241-
/* send the release via xcast */
242-
(void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply);
243-
OBJ_RELEASE(reply);
244250
}
245251
OBJ_RELEASE(sig);
246252
}

orte/mca/grpcomm/direct/grpcomm_direct_component.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
22
/*
33
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
4-
* Copyright (c) 2011-2015 Los Alamos National Security, LLC. All rights
4+
* Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights
55
* reserved.
66
* Copyright (c) 2014 Intel, Inc. All rights reserved.
77
* $COPYRIGHT$
@@ -55,7 +55,7 @@ static int direct_register(void)
5555
/* make the priority adjustable so users can select
5656
* direct for use by apps without affecting daemons
5757
*/
58-
my_priority = 1;
58+
my_priority = 85;
5959
(void) mca_base_component_var_register(c, "priority",
6060
"Priority of the grpcomm direct component",
6161
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,

orte/mca/grpcomm/grpcomm.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ typedef struct {
7777
size_t ndmns;
7878
/** my index in the dmns array */
7979
unsigned long my_rank;
80+
/* number of buckets expected */
81+
size_t nexpected;
8082
/* number reported in */
8183
size_t nreported;
8284
/* distance masks for receive */

orte/mca/grpcomm/rcd/.opal_ignore

Whitespace-only changes.

orte/mca/odls/base/odls_base_default_fns.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* Copyright (c) 2011-2015 Los Alamos National Security, LLC.
1515
* All rights reserved.
1616
* Copyright (c) 2011-2013 Cisco Systems, Inc. All rights reserved.
17-
* Copyright (c) 2013-2015 Intel, Inc. All rights reserved.
17+
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
1818
* Copyright (c) 2014 Research Organization for Information Science
1919
* and Technology (RIST). All rights reserved.
2020
* $COPYRIGHT$
@@ -240,6 +240,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
240240
orte_app_context_t *app;
241241
bool found;
242242
orte_node_t *node;
243+
bool newmap = false;
243244

244245
OPAL_OUTPUT_VERBOSE((5, orte_odls_base_framework.framework_output,
245246
"%s odls:constructing child list",
@@ -389,6 +390,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
389390
/* ensure the map object is present */
390391
if (NULL == jdata->map) {
391392
jdata->map = OBJ_NEW(orte_job_map_t);
393+
newmap = true;
392394
}
393395

394396
/* if we have a file map, then we need to load it */
@@ -446,7 +448,9 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
446448
if (!found) {
447449
OBJ_RETAIN(dmn->node);
448450
opal_pointer_array_add(jdata->map->nodes, dmn->node);
449-
jdata->map->num_nodes++;
451+
if (newmap) {
452+
jdata->map->num_nodes++;
453+
}
450454
}
451455

452456
/* see if it belongs to us */

0 commit comments

Comments
 (0)