Skip to content

Commit cd21ba3

Browse files
garlickmergify[bot]
authored andcommitted
broker: use taskmap to determine best iface
Problem: it's hard to write single node CI tests for the ability to use both ipc:// and tcp:// endpoints when using business card hostnames to determine locality. Refactor some internal functions to make the taskmap object available in the main line PMI code, and then use that to determine locality.
1 parent 204cd26 commit cd21ba3

File tree

1 file changed

+97
-49
lines changed

1 file changed

+97
-49
lines changed

src/broker/boot_pmi.c

Lines changed: 97 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -78,75 +78,110 @@ static int set_tbon_interface_hint_attr (struct upmi *upmi,
7878
return rc;
7979
}
8080

81-
static char *pmi_mapping_to_taskmap (const char *s)
81+
static int create_singleton_taskmap (struct taskmap **mp,
82+
flux_error_t *error)
8283
{
83-
char *result;
84-
flux_error_t error;
85-
struct taskmap *map = taskmap_decode (s, &error);
86-
if (!map) {
87-
log_err ("failed to decode PMI_process_mapping: %s",
88-
error.text);
89-
return NULL;
84+
struct taskmap *map;
85+
flux_error_t e;
86+
87+
if (!(map = taskmap_decode ("[[0,1,1,1]]", &e))) {
88+
errprintf (error, "error creating singleton taskmap: %s", e.text);
89+
return -1;
90+
}
91+
*mp = map;
92+
return 0;
93+
}
94+
95+
/* Fetch key from the PMI server and decode it as a taskmap.
96+
* Return -1 with error filled if a parse error occurs.
97+
* Return 0 if map was properly parsed OR if it doesn't exist.
98+
*/
99+
static int fetch_taskmap_one (struct upmi *upmi,
100+
const char *key,
101+
struct taskmap **mp,
102+
flux_error_t *error)
103+
{
104+
struct taskmap *map;
105+
char *val;
106+
flux_error_t e;
107+
108+
if (upmi_get (upmi, key, -1, &val, NULL) < 0) {
109+
*mp = NULL;
110+
return 0;
90111
}
91-
result = taskmap_encode (map, 0);
92-
taskmap_destroy (map);
93-
return result;
112+
if (!(map = taskmap_decode (val, &e))) {
113+
errprintf (error, "%s: error decoding %s", key, e.text);
114+
free (val);
115+
return -1;
116+
}
117+
free (val);
118+
*mp = map;
119+
return 0;
120+
}
121+
122+
static int fetch_taskmap (struct upmi *upmi,
123+
struct taskmap **mp,
124+
flux_error_t *error)
125+
{
126+
struct taskmap *map;
127+
if (fetch_taskmap_one (upmi, "flux.taskmap", &map, error) < 0)
128+
return -1;
129+
if (map == NULL
130+
&& fetch_taskmap_one (upmi, "PMI_process_mapping", &map, error) < 0)
131+
return -1;
132+
*mp = map; // might be NULL - that is OK
133+
return 0;
94134
}
95135

96136
/* Set broker.mapping attribute from enclosing instance taskmap.
137+
* It is not an error if the map is NULL.
97138
*/
98-
static int set_broker_mapping_attr (struct upmi *upmi,
99-
int size,
100-
attr_t *attrs)
139+
static int set_broker_mapping_attr (attr_t *attrs, struct taskmap *map)
101140
{
102141
char *val = NULL;
103-
int rc;
104142

105-
if (size == 1)
106-
val = strdup ("[[0,1,1,1]]");
107-
else {
108-
/* First attempt to get flux.taskmap, falling back to
109-
* PMI_process_mapping if this key is not available.
110-
*/
111-
char *s;
112-
if (upmi_get (upmi, "flux.taskmap", -1, &s, NULL) == 0
113-
|| upmi_get (upmi, "PMI_process_mapping", -1, &s, NULL) == 0) {
114-
val = pmi_mapping_to_taskmap (s);
115-
free (s);
116-
}
143+
if (map && !(val = taskmap_encode (map, 0)))
144+
return -1;
145+
if (attr_add (attrs, "broker.mapping", val, ATTR_IMMUTABLE) < 0) {
146+
ERRNO_SAFE_WRAP (free, val);
147+
return -1;
117148
}
118-
rc = attr_add (attrs, "broker.mapping", val, ATTR_IMMUTABLE);
119149
free (val);
120-
return rc;
150+
return 0;
121151
}
122152

123153
/* Count the number of TBON children that could be reached by IPC.
124154
*/
125-
static int count_local_children (attr_t *attrs,
155+
static int count_local_children (struct taskmap *map,
126156
int *child_ranks,
127157
int child_count,
128158
int rank)
129159
{
130-
const char *val;
131-
struct taskmap *map;
132-
int nodeid;
133160
int count = 0;
134161

135-
if (attr_get (attrs, "broker.mapping", &val, NULL) < 0
136-
|| val == NULL
137-
|| !(map = taskmap_decode (val, NULL)))
138-
return 0;
139-
nodeid = taskmap_nodeid (map, rank); // this broker's nodeid
140-
if (nodeid >= 0) {
141-
for (int i = 0; i < child_count; i++) {
142-
if (taskmap_nodeid (map, child_ranks[i]) == nodeid)
143-
count++;
162+
if (map) {
163+
int nodeid = taskmap_nodeid (map, rank); // this broker's nodeid
164+
if (nodeid >= 0) {
165+
for (int i = 0; i < child_count; i++) {
166+
if (taskmap_nodeid (map, child_ranks[i]) == nodeid)
167+
count++;
168+
}
144169
}
145170
}
146-
taskmap_destroy (map);
147171
return count;
148172
}
149173

174+
static bool ranks_are_peers (struct taskmap *map, int rank1, int rank2)
175+
{
176+
int nid1;
177+
int nid2;
178+
179+
if ((nid1 = taskmap_nodeid (map, rank1)) < 0
180+
|| (nid2 = taskmap_nodeid (map, rank2)) < 0)
181+
return false;
182+
return (nid1 == nid2 ? true : false);
183+
}
184+
150185
/* Check if TCP should be used, even if IPC could work.
151186
*/
152187
static bool get_prefer_tcp (attr_t *attrs)
@@ -404,6 +439,7 @@ int boot_pmi (const char *hostname, struct overlay *overlay, attr_t *attrs)
404439
bool under_flux;
405440
const struct bizcard *bc;
406441
struct bizcache *cache = NULL;
442+
struct taskmap *taskmap = NULL;
407443

408444
// N.B. overlay_create() sets the tbon.topo attribute
409445
if (attr_get (attrs, "tbon.topo", &topo_uri, NULL) < 0) {
@@ -441,10 +477,6 @@ int boot_pmi (const char *hostname, struct overlay *overlay, attr_t *attrs)
441477
log_err ("error setting tbon.interface-hint attribute");
442478
goto error;
443479
}
444-
if (set_broker_mapping_attr (upmi, info.size, attrs) < 0) {
445-
log_err ("error setting broker.mapping attribute");
446-
goto error;
447-
}
448480
if (!(topo = topology_create (topo_uri, info.size, &error))) {
449481
log_msg ("error creating '%s' topology: %s", topo_uri, error.text);
450482
goto error;
@@ -457,6 +489,19 @@ int boot_pmi (const char *hostname, struct overlay *overlay, attr_t *attrs)
457489
goto error;
458490
}
459491

492+
if (info.size == 1) {
493+
if (create_singleton_taskmap (&taskmap, &error) < 0)
494+
goto error;
495+
}
496+
else {
497+
if (fetch_taskmap (upmi, &taskmap, &error) < 0)
498+
goto error;
499+
}
500+
if (set_broker_mapping_attr (attrs, taskmap) < 0) {
501+
log_err ("error setting broker.mapping attribute");
502+
goto error;
503+
}
504+
460505
/* A size=1 instance has no peers, so skip the PMI exchange.
461506
*/
462507
if (info.size == 1) {
@@ -487,7 +532,7 @@ int boot_pmi (const char *hostname, struct overlay *overlay, attr_t *attrs)
487532
char tcp[1024];
488533
char ipc[1024];
489534

490-
nlocal = count_local_children (attrs,
535+
nlocal = count_local_children (taskmap,
491536
child_ranks,
492537
child_count,
493538
info.rank);
@@ -554,7 +599,8 @@ int boot_pmi (const char *hostname, struct overlay *overlay, attr_t *attrs)
554599
log_msg ("%s", error.text);
555600
goto error;
556601
}
557-
if (!get_prefer_tcp (attrs) && streq (bizcard_hostname (bc), hostname))
602+
if (!get_prefer_tcp (attrs)
603+
&& ranks_are_peers (taskmap, info.rank, parent_rank))
558604
uri = bizcard_uri_find (bc, "ipc://");
559605
if (!uri)
560606
uri = bizcard_uri_find (bc, NULL);
@@ -621,6 +667,7 @@ int boot_pmi (const char *hostname, struct overlay *overlay, attr_t *attrs)
621667
upmi_destroy (upmi);
622668
hostlist_destroy (hl);
623669
free (child_ranks);
670+
taskmap_destroy (taskmap);
624671
topology_decref (topo);
625672
bizcache_destroy (cache);
626673
return 0;
@@ -635,6 +682,7 @@ int boot_pmi (const char *hostname, struct overlay *overlay, attr_t *attrs)
635682
upmi_destroy (upmi);
636683
hostlist_destroy (hl);
637684
free (child_ranks);
685+
taskmap_destroy (taskmap);
638686
topology_decref (topo);
639687
bizcache_destroy (cache);
640688
return -1;

0 commit comments

Comments
 (0)