|
| 1 | +/************************************************************\ |
| 2 | + * Copyright 2023 Lawrence Livermore National Security, LLC |
| 3 | + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) |
| 4 | + * |
| 5 | + * This file is part of the Flux resource manager framework. |
| 6 | + * For details, see https://github.com/flux-framework. |
| 7 | + * |
| 8 | + * SPDX-License-Identifier: LGPL-3.0 |
| 9 | +\************************************************************/ |
| 10 | + |
| 11 | +/* boot_flub.c - FLUx Boot protocol |
| 12 | + * |
| 13 | + * Add a broker to an existing Flux instance. |
| 14 | + */ |
| 15 | + |
| 16 | +#if HAVE_CONFIG_H |
| 17 | +#include "config.h" |
| 18 | +#endif |
| 19 | +#include <errno.h> |
| 20 | +#include <jansson.h> |
| 21 | +#include <flux/core.h> |
| 22 | + |
| 23 | +#include "src/common/libidset/idset.h" |
| 24 | +#include "src/common/libutil/errprintf.h" |
| 25 | +#include "src/common/libutil/errno_safe.h" |
| 26 | +#include "src/common/libutil/ipaddr.h" |
| 27 | +#include "ccan/str/str.h" |
| 28 | + |
| 29 | +#include "attr.h" |
| 30 | +#include "overlay.h" |
| 31 | +#include "topology.h" |
| 32 | + |
| 33 | +#include "boot_flub.h" |
| 34 | + |
| 35 | +struct boot_info { |
| 36 | + int size; |
| 37 | + int rank; |
| 38 | + json_t *attrs; |
| 39 | +}; |
| 40 | + |
| 41 | +struct boot_parent { |
| 42 | + char *pubkey; |
| 43 | + int rank; |
| 44 | + const char *uri; |
| 45 | +}; |
| 46 | + |
| 47 | +static int wait_for_group_membership (flux_t *h, |
| 48 | + const char *name, |
| 49 | + int rank, |
| 50 | + flux_error_t *error) |
| 51 | +{ |
| 52 | + flux_future_t *f; |
| 53 | + bool online = false; |
| 54 | + |
| 55 | + if (!(f = flux_rpc_pack (h, |
| 56 | + "groups.get", |
| 57 | + 0, |
| 58 | + FLUX_RPC_STREAMING, |
| 59 | + "{s:s}", |
| 60 | + "name", "broker.online"))) { |
| 61 | + errprintf (error, "broker.online: %s", strerror (errno)); |
| 62 | + return -1; |
| 63 | + } |
| 64 | + do { |
| 65 | + const char *members; |
| 66 | + struct idset *ids; |
| 67 | + |
| 68 | + if (flux_rpc_get_unpack (f, "{s:s}", "members", &members) < 0) { |
| 69 | + errprintf (error, "broker.online: %s", future_strerror (f, errno)); |
| 70 | + break; |
| 71 | + } |
| 72 | + if ((ids = idset_decode (members)) |
| 73 | + && idset_test (ids, rank)) |
| 74 | + online = true; |
| 75 | + idset_destroy (ids); |
| 76 | + flux_future_reset (f); |
| 77 | + } while (!online); |
| 78 | + flux_future_destroy (f); |
| 79 | + |
| 80 | + // TODO: cancel groups.get RPC (add groups.get-cancel to groups.c) |
| 81 | + |
| 82 | + return online ? 0 : -1; |
| 83 | +} |
| 84 | + |
| 85 | +static int set_attrs (attr_t *attrs, json_t *dict) |
| 86 | +{ |
| 87 | + const char *key; |
| 88 | + json_t *val; |
| 89 | + |
| 90 | + json_object_foreach (dict, key, val) { |
| 91 | + const char *s = json_string_value (val); |
| 92 | + if (!s) { |
| 93 | + errno = EPROTO; |
| 94 | + return -1; |
| 95 | + } |
| 96 | + if (attr_add (attrs, key, s, ATTR_IMMUTABLE) < 0) |
| 97 | + return -1; |
| 98 | + } |
| 99 | + return 0; |
| 100 | +} |
| 101 | + |
| 102 | +int boot_flub (struct broker *ctx, flux_error_t *error) |
| 103 | +{ |
| 104 | + const char *uri = NULL; |
| 105 | + flux_t *h; |
| 106 | + flux_future_t *f = NULL; |
| 107 | + flux_future_t *f2 = NULL; |
| 108 | + struct topology *topo = NULL; |
| 109 | + const char *topo_uri; |
| 110 | + struct boot_info info; |
| 111 | + struct boot_parent parent; |
| 112 | + const char *bind_uri = NULL; |
| 113 | + int rc = -1; |
| 114 | + |
| 115 | + /* Ask a Flux instance to allocate an available rank. |
| 116 | + * N.B. the broker unsets FLUX_URI so either it is set on the |
| 117 | + * command line via broker.boot-server, or the compiled-in path |
| 118 | + * is used (system instance). |
| 119 | + */ |
| 120 | + (void)attr_get (ctx->attrs, "broker.boot-server", &uri, NULL); |
| 121 | + if (!(h = flux_open_ex (uri, 0, error))) |
| 122 | + return -1; |
| 123 | + if (!(f = flux_rpc_pack (h, |
| 124 | + "overlay.flub-getinfo", |
| 125 | + 0, |
| 126 | + 0, |
| 127 | + "{}")) |
| 128 | + || flux_rpc_get_unpack (f, |
| 129 | + "{s:i s:i s:o}", |
| 130 | + "rank", &info.rank, |
| 131 | + "size", &info.size, |
| 132 | + "attrs", &info.attrs) < 0) { |
| 133 | + errprintf (error, "%s", future_strerror (f, errno)); |
| 134 | + goto out; |
| 135 | + } |
| 136 | + /* Set instance attributes obtained from boot server. |
| 137 | + */ |
| 138 | + if (set_attrs (ctx->attrs, info.attrs) < 0) { |
| 139 | + errprintf (error, "error setting attributes: %s", strerror (errno)); |
| 140 | + goto out; |
| 141 | + } |
| 142 | + /* Create topology. All ranks are assumed to have the same topology. |
| 143 | + * The tbon.topo attribute is set in overlay_create() if not provided |
| 144 | + * on the command line. |
| 145 | + */ |
| 146 | + if (attr_get (ctx->attrs, "tbon.topo", &topo_uri, NULL) < 0) { |
| 147 | + errprintf (error, "error fetching tbon.topo attribute"); |
| 148 | + goto out; |
| 149 | + } |
| 150 | + if (!(topo = topology_create (topo_uri, info.size, NULL)) |
| 151 | + || topology_set_rank (topo, info.rank) < 0 |
| 152 | + || overlay_set_topology (ctx->overlay, topo) < 0) { |
| 153 | + errprintf (error, "error creating topology: %s", strerror (errno)); |
| 154 | + goto out; |
| 155 | + } |
| 156 | + if ((parent.rank = topology_get_parent (topo)) < 0) { |
| 157 | + errprintf (error, |
| 158 | + "rank %d has no parent in %s topology", |
| 159 | + info.rank, |
| 160 | + topo_uri); |
| 161 | + goto out; |
| 162 | + } |
| 163 | + /* Exchange public keys with TBON parent and obtain its URI. |
| 164 | + */ |
| 165 | + if (wait_for_group_membership (h, "broker.online", parent.rank, error) < 0) |
| 166 | + goto out; |
| 167 | + if (!(f2 = flux_rpc_pack (h, |
| 168 | + "overlay.flub-kex", |
| 169 | + parent.rank, |
| 170 | + 0, |
| 171 | + "{s:s s:s}", |
| 172 | + "name", overlay_cert_name (ctx->overlay), |
| 173 | + "pubkey", overlay_cert_pubkey (ctx->overlay))) |
| 174 | + || flux_rpc_get_unpack (f2, |
| 175 | + "{s:s s:s}", |
| 176 | + "pubkey", &parent.pubkey, |
| 177 | + "uri", &parent.uri) < 0) { |
| 178 | + errprintf (error, "%s", future_strerror (f, errno)); |
| 179 | + goto out; |
| 180 | + } |
| 181 | + /* Inform overlay subsystem of parent info. |
| 182 | + */ |
| 183 | + if (overlay_set_parent_uri (ctx->overlay, parent.uri) < 0 |
| 184 | + || overlay_set_parent_pubkey (ctx->overlay, parent.pubkey) < 0) { |
| 185 | + errprintf (error, |
| 186 | + "error setting up overlay parameters: %s", |
| 187 | + strerror (errno)); |
| 188 | + goto out; |
| 189 | + } |
| 190 | + /* If there are children, bind to zmq socket and update tbon.endpoint. |
| 191 | + * Since we don't know if OUR children are co-located on the same node, |
| 192 | + * always use the tcp transport. |
| 193 | + */ |
| 194 | + if (topology_get_child_ranks (topo, NULL, 0) > 0) { |
| 195 | + char ipaddr[HOST_NAME_MAX + 1]; |
| 196 | + char *wild = NULL; |
| 197 | + |
| 198 | + overlay_set_ipv6 (ctx->overlay, 1); |
| 199 | + if (ipaddr_getprimary (ipaddr, |
| 200 | + sizeof (ipaddr), |
| 201 | + error->text, |
| 202 | + sizeof (error->text)) < 0) |
| 203 | + goto out; |
| 204 | + if (asprintf (&wild, "tcp://%s:*", ipaddr) < 0 |
| 205 | + || overlay_bind (ctx->overlay, wild) < 0) { |
| 206 | + errprintf (error, "error binding to tcp://%s:*", ipaddr); |
| 207 | + ERRNO_SAFE_WRAP (free, wild); |
| 208 | + goto out; |
| 209 | + } |
| 210 | + bind_uri = overlay_get_bind_uri (ctx->overlay); |
| 211 | + } |
| 212 | + if (attr_add (ctx->attrs, "tbon.endpoint", bind_uri, ATTR_IMMUTABLE) < 0) { |
| 213 | + errprintf (error, "setattr tbon.endpoint"); |
| 214 | + goto out; |
| 215 | + } |
| 216 | + rc = 0; |
| 217 | +out: |
| 218 | + topology_decref (topo); |
| 219 | + flux_future_destroy (f); |
| 220 | + flux_future_destroy (f2); |
| 221 | + flux_close (h); |
| 222 | + return rc; |
| 223 | +} |
| 224 | + |
| 225 | +// vi:ts=4 sw=4 expandtab |
0 commit comments