Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/broker/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ libbroker_la_SOURCES = \
boot_config.c \
boot_pmi.h \
boot_pmi.c \
boot_flub.h \
boot_flub.c \
publisher.h \
publisher.c \
groups.h \
Expand Down
223 changes: 223 additions & 0 deletions src/broker/boot_flub.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/************************************************************\
* Copyright 2023 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

/* boot_flub.c - FLUx Boot protocol
*
* Add a broker to an existing Flux instance.
*/

#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <errno.h>
#include <jansson.h>
#include <flux/core.h>

#include "src/common/libidset/idset.h"
#include "src/common/libutil/errprintf.h"
#include "src/common/libutil/errno_safe.h"
#include "src/common/libutil/ipaddr.h"
#include "ccan/str/str.h"

#include "attr.h"
#include "overlay.h"
#include "topology.h"

#include "boot_flub.h"

struct boot_info {
int size;
int rank;
json_t *attrs;
};

struct boot_parent {
char *pubkey;
int rank;
const char *uri;
};

static int wait_for_group_membership (flux_t *h,
const char *name,
int rank,
flux_error_t *error)
{
flux_future_t *f;
bool online = false;

if (!(f = flux_rpc_pack (h,
"groups.get",
0,
FLUX_RPC_STREAMING,
"{s:s}",
"name", "broker.online"))) {
errprintf (error, "broker.online: %s", strerror (errno));
return -1;
}
do {
const char *members;
struct idset *ids;

if (flux_rpc_get_unpack (f, "{s:s}", "members", &members) < 0) {
errprintf (error, "broker.online: %s", future_strerror (f, errno));
break;
}
if ((ids = idset_decode (members))
&& idset_test (ids, rank))
online = true;
idset_destroy (ids);
flux_future_reset (f);
} while (!online);
flux_future_destroy (f);

return online ? 0 : -1;
}

static int set_attrs (attr_t *attrs, json_t *dict)
{
const char *key;
json_t *val;

json_object_foreach (dict, key, val) {
const char *s = json_string_value (val);
if (!s) {
errno = EPROTO;
return -1;
}
if (attr_add (attrs, key, s, ATTR_IMMUTABLE) < 0)
return -1;
}
return 0;
}

int boot_flub (struct broker *ctx, flux_error_t *error)
{
const char *uri = NULL;
flux_t *h;
flux_future_t *f = NULL;
flux_future_t *f2 = NULL;
struct topology *topo = NULL;
const char *topo_uri;
struct boot_info info;
struct boot_parent parent;
const char *bind_uri = NULL;
int rc = -1;

/* Ask a Flux instance to allocate an available rank.
* N.B. the broker unsets FLUX_URI so either it is set on the
* command line via broker.boot-server, or the compiled-in path
* is used (system instance).
*/
(void)attr_get (ctx->attrs, "broker.boot-server", &uri, NULL);
if (!(h = flux_open_ex (uri, 0, error)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to call uri_resolve(3) here? Then a JOBID or other resolvable URI could be provided directly to the broker.boot-server attribute.

Although, if the intent is to add a higher level command like flux expand OPTIONS... JOBID, perhaps that's unnecessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this might be helpful. I'm not sure what the next steps are for tooling so may as well add that for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A complication is that uri_resolve(3) requires FLUX_URI to be set, and the broker unsets that. I could setenv FLUX_URI to the value of the parent-uri attribute around the call but since the environment is global...eww? Maybe we should table this one for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, yeah, that's a bummer. It might be useful in the long term in case you target the URI of a job that hasn't started yet, but doesn't seem like a big deal for now.

Maybe we need a version of uri_resolve(3) that takes a uri argument and (somehow) passes that down to flux uri if that command is invoked. Meh, probably too soon to worry about it.

return -1;
if (!(f = flux_rpc_pack (h,
"overlay.flub-getinfo",
0,
0,
"{}"))
|| flux_rpc_get_unpack (f,
"{s:i s:i s:o}",
"rank", &info.rank,
"size", &info.size,
"attrs", &info.attrs) < 0) {
errprintf (error, "%s", future_strerror (f, errno));
goto out;
}
/* Set instance attributes obtained from boot server.
*/
if (set_attrs (ctx->attrs, info.attrs) < 0) {
errprintf (error, "error setting attributes: %s", strerror (errno));
goto out;
}
/* Create topology. All ranks are assumed to have the same topology.
* The tbon.topo attribute is set in overlay_create() if not provided
* on the command line.
*/
if (attr_get (ctx->attrs, "tbon.topo", &topo_uri, NULL) < 0) {
errprintf (error, "error fetching tbon.topo attribute");
goto out;
}
if (!(topo = topology_create (topo_uri, info.size, NULL))
|| topology_set_rank (topo, info.rank) < 0
|| overlay_set_topology (ctx->overlay, topo) < 0) {
errprintf (error, "error creating topology: %s", strerror (errno));
goto out;
}
if ((parent.rank = topology_get_parent (topo)) < 0) {
errprintf (error,
"rank %d has no parent in %s topology",
info.rank,
topo_uri);
goto out;
}
/* Exchange public keys with TBON parent and obtain its URI.
*/
if (wait_for_group_membership (h, "broker.online", parent.rank, error) < 0)
goto out;
if (!(f2 = flux_rpc_pack (h,
"overlay.flub-kex",
parent.rank,
0,
"{s:s s:s}",
"name", overlay_cert_name (ctx->overlay),
"pubkey", overlay_cert_pubkey (ctx->overlay)))
|| flux_rpc_get_unpack (f2,
"{s:s s:s}",
"pubkey", &parent.pubkey,
"uri", &parent.uri) < 0) {
errprintf (error, "%s", future_strerror (f, errno));
goto out;
}
/* Inform overlay subsystem of parent info.
*/
if (overlay_set_parent_uri (ctx->overlay, parent.uri) < 0
|| overlay_set_parent_pubkey (ctx->overlay, parent.pubkey) < 0) {
errprintf (error,
"error setting up overlay parameters: %s",
strerror (errno));
goto out;
}
/* If there are children, bind to zmq socket and update tbon.endpoint.
* Since we don't know if OUR children are co-located on the same node,
* always use the tcp transport.
*/
if (topology_get_child_ranks (topo, NULL, 0) > 0) {
char ipaddr[HOST_NAME_MAX + 1];
char *wild = NULL;

overlay_set_ipv6 (ctx->overlay, 1);
if (ipaddr_getprimary (ipaddr,
sizeof (ipaddr),
error->text,
sizeof (error->text)) < 0)
goto out;
if (asprintf (&wild, "tcp://%s:*", ipaddr) < 0
|| overlay_bind (ctx->overlay, wild) < 0) {
errprintf (error, "error binding to tcp://%s:*", ipaddr);
ERRNO_SAFE_WRAP (free, wild);
goto out;
}
bind_uri = overlay_get_bind_uri (ctx->overlay);
}
if (attr_add (ctx->attrs, "tbon.endpoint", bind_uri, ATTR_IMMUTABLE) < 0) {
errprintf (error, "setattr tbon.endpoint");
goto out;
}
rc = 0;
out:
topology_decref (topo);
flux_future_destroy (f);
flux_future_destroy (f2);
flux_close (h);
return rc;
}

// vi:ts=4 sw=4 expandtab
22 changes: 22 additions & 0 deletions src/broker/boot_flub.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/************************************************************\
* Copyright 2023 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#ifndef BROKER_BOOT_FLUB_H
#define BROKER_BOOT_FLUB_H

#include <flux/core.h>

#include "broker.h"

int boot_flub (struct broker *ctx, flux_error_t *error);

#endif /* BROKER_BOOT_FLUB_H */

// vi:ts=4 sw=4 expandtab
Loading