Skip to content

Commit 204cd26

Browse files
garlickmergify[bot]
authored andcommitted
broker: use both IPC and TCP within an instance
Problem: when brokers are co-located and bootstrap with PMI, they currently only use ipc if the entire instance is on one node, but instances where some brokers are co-located could benefit from IPC too. Use the taskmap to determine when peers are co-located and use IPC between those, unless it is suppressed by setting the tbon.prefertcp broker attribute to a nonzero value.
1 parent 76efee4 commit 204cd26

File tree

1 file changed

+65
-36
lines changed

1 file changed

+65
-36
lines changed

src/broker/boot_pmi.c

Lines changed: 65 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -120,30 +120,43 @@ static int set_broker_mapping_attr (struct upmi *upmi,
120120
return rc;
121121
}
122122

123-
/* Check if IPC can be used to communicate.
124-
* Currently this only goes so far as to check if the process mapping of
125-
* brokers has all brokers on the same node. We could check if all peers
126-
* are on the same node, but given how the TBON maps to rank assignments,
127-
* it is fairly unlikely.
123+
/* Count the number of TBON children that could be reached by IPC.
128124
*/
129-
static bool use_ipc (attr_t *attrs)
125+
static int count_local_children (attr_t *attrs,
126+
int *child_ranks,
127+
int child_count,
128+
int rank)
130129
{
131-
bool result = false;
132-
struct taskmap *map = NULL;
133130
const char *val;
131+
struct taskmap *map;
132+
int nodeid;
133+
int count = 0;
134134

135-
if (attr_get (attrs, "tbon.prefertcp", &val, NULL) == 0
136-
&& !streq (val, "0"))
137-
goto done;
138-
if (attr_get (attrs, "broker.mapping", &val, NULL) < 0 || !val)
139-
goto done;
140-
if (!(map = taskmap_decode (val, NULL)))
141-
goto done;
142-
if (taskmap_nnodes (map) == 1)
143-
result = true;
144-
done:
135+
if (attr_get (attrs, "broker.mapping", &val, NULL) < 0
136+
|| val == NULL
137+
|| !(map = taskmap_decode (val, NULL)))
138+
return 0;
139+
nodeid = taskmap_nodeid (map, rank); // this broker's nodeid
140+
if (nodeid >= 0) {
141+
for (int i = 0; i < child_count; i++) {
142+
if (taskmap_nodeid (map, child_ranks[i]) == nodeid)
143+
count++;
144+
}
145+
}
145146
taskmap_destroy (map);
146-
return result;
147+
return count;
148+
}
149+
150+
/* Check if TCP should be used, even if IPC could work.
151+
*/
152+
static bool get_prefer_tcp (attr_t *attrs)
153+
{
154+
const char *val;
155+
if (attr_get (attrs, "tbon.prefertcp", &val, NULL) < 0
156+
|| val == NULL
157+
|| streq (val, "0"))
158+
return false;
159+
return true;
147160
}
148161

149162
/* Build a tcp:// URI with a wildcard port, taking into account the value of
@@ -465,29 +478,40 @@ int boot_pmi (const char *hostname, struct overlay *overlay, attr_t *attrs)
465478
goto error;
466479
}
467480

468-
/* If there are to be downstream peers, then bind to socket.
481+
/* If there are to be downstream peers, then bind to a socket.
482+
* Depending on locality of children, use tcp://, ipc://, or both.
469483
*/
470484
if (child_count > 0) {
471-
char uri[1024];
472-
473-
if (use_ipc (attrs)) {
474-
if (format_ipc_uri (uri,
475-
sizeof (uri),
476-
attrs,
477-
info.rank,
478-
&error) < 0) {
479-
log_err ("%s", error.text);
485+
bool prefer_tcp = get_prefer_tcp (attrs);
486+
int nlocal;
487+
char tcp[1024];
488+
char ipc[1024];
489+
490+
nlocal = count_local_children (attrs,
491+
child_ranks,
492+
child_count,
493+
info.rank);
494+
495+
if (format_tcp_uri (tcp, sizeof (tcp), attrs, &error) < 0) {
496+
log_err ("%s", error.text);
497+
goto error;
498+
}
499+
if (format_ipc_uri (ipc, sizeof (ipc), attrs, info.rank, &error) < 0) {
500+
log_err ("%s", error.text);
501+
goto error;
502+
}
503+
if (prefer_tcp || nlocal == 0) {
504+
if (overlay_bind (overlay, tcp, NULL) < 0)
505+
goto error;
506+
}
507+
else if (!prefer_tcp && nlocal == child_count) {
508+
if (overlay_bind (overlay, ipc, NULL) < 0)
480509
goto error;
481-
}
482510
}
483511
else {
484-
if (format_tcp_uri (uri, sizeof (uri), attrs, &error) < 0) {
485-
log_err ("%s", error.text);
512+
if (overlay_bind (overlay, tcp, ipc) < 0)
486513
goto error;
487-
}
488514
}
489-
if (overlay_bind (overlay, uri, NULL) < 0)
490-
goto error;
491515
}
492516

493517
/* Each broker writes a business card consisting of hostname,
@@ -524,12 +548,17 @@ int boot_pmi (const char *hostname, struct overlay *overlay, attr_t *attrs)
524548
*/
525549
if (info.rank > 0) {
526550
int parent_rank = topology_get_parent (topo);
551+
const char *uri = NULL;
527552

528553
if (get_bizcard (upmi, cache, parent_rank, &bc, &error) < 0) {
529554
log_msg ("%s", error.text);
530555
goto error;
531556
}
532-
if (overlay_set_parent_uri (overlay, bizcard_uri_first (bc)) < 0) {
557+
if (!get_prefer_tcp (attrs) && streq (bizcard_hostname (bc), hostname))
558+
uri = bizcard_uri_find (bc, "ipc://");
559+
if (!uri)
560+
uri = bizcard_uri_find (bc, NULL);
561+
if (overlay_set_parent_uri (overlay, uri) < 0) {
533562
log_err ("overlay_set_parent_uri");
534563
goto error;
535564
}

0 commit comments

Comments
 (0)