diff --git a/bitcoin/short_channel_id.h b/bitcoin/short_channel_id.h index 3c7cce9036bc..0a49a73fc7d7 100644 --- a/bitcoin/short_channel_id.h +++ b/bitcoin/short_channel_id.h @@ -17,7 +17,7 @@ static inline bool short_channel_id_eq(struct short_channel_id a, return a.u64 == b.u64; } -static inline size_t short_channel_id_hash(struct short_channel_id scid) +static inline size_t hash_scid(struct short_channel_id scid) { /* scids cost money to generate, so simple hash works here */ return (scid.u64 >> 32) ^ (scid.u64 >> 16) ^ scid.u64; @@ -46,6 +46,12 @@ static inline bool short_channel_id_dir_eq(const struct short_channel_id_dir *a, return short_channel_id_eq(a->scid, b->scid) && a->dir == b->dir; } +static inline size_t hash_scidd(const struct short_channel_id_dir *scidd) +{ + /* Bottom bit is common, so use bit 4 for direction */ + return hash_scid(scidd->scid) | (scidd->dir << 4); +} + static inline u32 short_channel_id_blocknum(struct short_channel_id scid) { return scid.u64 >> 40; diff --git a/common/gossmap.c b/common/gossmap.c index 948ed0b9c4c1..cb979f61e74a 100644 --- a/common/gossmap.c +++ b/common/gossmap.c @@ -29,11 +29,7 @@ static bool chanidx_eq_id(const ptrint_t *pidx, struct short_channel_id pidxid = chanidx_id(pidx); return short_channel_id_eq(pidxid, scid); } -static size_t scid_hash(const struct short_channel_id scid) -{ - return siphash24(siphash_seed(), &scid, sizeof(scid)); -} -HTABLE_DEFINE_NODUPS_TYPE(ptrint_t, chanidx_id, scid_hash, chanidx_eq_id, +HTABLE_DEFINE_NODUPS_TYPE(ptrint_t, chanidx_id, hash_scid, chanidx_eq_id, chanidx_htable); static struct node_id nodeidx_id(const ptrint_t *pidx); diff --git a/connectd/connectd.h b/connectd/connectd.h index 1e78834ba650..8876427c28d6 100644 --- a/connectd/connectd.h +++ b/connectd/connectd.h @@ -260,7 +260,7 @@ static bool scid_to_node_id_eq_scid(const struct scid_to_node_id *scid_to_node_i * we use this to forward onion messages which specify the next hop by scid/dir. */ HTABLE_DEFINE_NODUPS_TYPE(struct scid_to_node_id, scid_to_node_id_keyof, - short_channel_id_hash, + hash_scid, scid_to_node_id_eq_scid, scid_htable); diff --git a/doc/lightningd-config.5.md b/doc/lightningd-config.5.md index e63d5b321b41..4a0740ae8c14 100644 --- a/doc/lightningd-config.5.md +++ b/doc/lightningd-config.5.md @@ -561,6 +561,10 @@ command, so they invoices can also be paid onchain. This option makes the `getroutes` call fail if it takes more than this many seconds. Setting it to zero is a fun way to ensure your node never makes payments. +* **askrene-max-threads**=*NUMBER* [plugin `askrene`, *dynamic*] + + This option controls how many routes askrene will calculate at once: this is only useful on nodes which make multiple payments at once, and setting the number higher than your number of cores/CPUS will not help. The default is 4. + ### Networking options Note that for simple setups, the implicit *autolisten* option does the diff --git a/lightningd/channel.h b/lightningd/channel.h index bb27b6872503..9661e4b7d2dd 100644 --- a/lightningd/channel.h +++ b/lightningd/channel.h @@ -891,7 +891,7 @@ static inline bool scid_to_channel_eq_scid(const struct scid_to_channel *scidcha /* Define channel_scid_map */ HTABLE_DEFINE_NODUPS_TYPE(struct scid_to_channel, scid_to_channel_key, - short_channel_id_hash, + hash_scid, scid_to_channel_eq_scid, channel_scid_map); diff --git a/plugins/askrene/Makefile b/plugins/askrene/Makefile index bd09f1b98847..292059112a67 100644 --- a/plugins/askrene/Makefile +++ b/plugins/askrene/Makefile @@ -1,30 +1,24 @@ -PLUGIN_ASKRENE_SRC := \ +PLUGIN_ASKRENE_PARENT_SRC := \ plugins/askrene/askrene.c \ plugins/askrene/datastore_wire.c \ plugins/askrene/layer.c \ - plugins/askrene/reserve.c \ - plugins/askrene/mcf.c \ - plugins/askrene/dijkstra.c \ - plugins/askrene/flow.c \ - plugins/askrene/refine.c \ - plugins/askrene/explain_failure.c \ - plugins/askrene/graph.c \ - plugins/askrene/priorityqueue.c \ - plugins/askrene/algorithm.c + plugins/askrene/reserve.c -PLUGIN_ASKRENE_HEADER := \ - plugins/askrene/askrene.h \ - plugins/askrene/datastore_wire.h \ - plugins/askrene/layer.h \ - plugins/askrene/reserve.h \ - plugins/askrene/mcf.h \ - plugins/askrene/dijkstra.h \ - plugins/askrene/flow.h \ - plugins/askrene/refine.h \ - plugins/askrene/explain_failure.h \ - plugins/askrene/graph.h \ - plugins/askrene/priorityqueue.h \ - plugins/askrene/algorithm.h +PLUGIN_ASKRENE_CHILD_SRC := \ + plugins/askrene/child/child.c \ + plugins/askrene/child/mcf.c \ + plugins/askrene/child/dijkstra.c \ + plugins/askrene/child/flow.c \ + plugins/askrene/child/refine.c \ + plugins/askrene/child/explain_failure.c \ + plugins/askrene/child/graph.c \ + plugins/askrene/child/priorityqueue.c \ + plugins/askrene/child/algorithm.c \ + plugins/askrene/child/child_log.c \ + plugins/askrene/child/route_query.c \ + +PLUGIN_ASKRENE_SRC := $(PLUGIN_ASKRENE_PARENT_SRC) $(PLUGIN_ASKRENE_CHILD_SRC) +PLUGIN_ASKRENE_HEADER := $(PLUGIN_ASKRENE_SRC:.c=.h) plugins/askrene/child/additional_costs.h PLUGIN_ASKRENE_OBJS := $(PLUGIN_ASKRENE_SRC:.c=.o) diff --git a/plugins/askrene/askrene.c b/plugins/askrene/askrene.c index 23e541d75911..ad5829143eb3 100644 --- a/plugins/askrene/askrene.c +++ b/plugins/askrene/askrene.c @@ -8,6 +8,8 @@ */ #include "config.h" #include +#include +#include #include #include #include @@ -15,40 +17,41 @@ #include #include #include +#include #include +#include #include #include #include #include -#include +#include +#include +#include #include -#include #include +#include +#include +#include -/* "spendable" for a channel assumes a single HTLC: for additional HTLCs, - * the need to pay for fees (if we're the owner) reduces it */ -struct per_htlc_cost { - struct short_channel_id_dir scidd; - struct amount_msat per_htlc_cost; -}; +struct router_child { + /* Inside askrene->children */ + struct list_node list; + struct command *cmd; + struct timemono start; + int pid; + struct io_conn *log_conn; + struct io_conn *reply_conn; -static const struct short_channel_id_dir * -per_htlc_cost_key(const struct per_htlc_cost *phc) -{ - return &phc->scidd; -} + /* A whole msg read in for logging */ + u8 *log_msg; -static inline bool per_htlc_cost_eq_key(const struct per_htlc_cost *phc, - const struct short_channel_id_dir *scidd) -{ - return short_channel_id_dir_eq(scidd, &phc->scidd); -} + /* How much we've read so far */ + char *reply_buf; + size_t reply_bytes; -HTABLE_DEFINE_NODUPS_TYPE(struct per_htlc_cost, - per_htlc_cost_key, - hash_scidd, - per_htlc_cost_eq_key, - additional_cost_htable); + /* How much we just read (populated by io_read_partial) */ + size_t this_reply_len; +}; static bool have_layer(const char **layers, const char *name) { @@ -286,22 +289,12 @@ static struct layer *remove_small_channel_layer(const tal_t *ctx, return layer; } -struct amount_msat get_additional_per_htlc_cost(const struct route_query *rq, - const struct short_channel_id_dir *scidd) -{ - const struct per_htlc_cost *phc; - phc = additional_cost_htable_get(rq->additional_costs, scidd); - if (phc) - return phc->per_htlc_cost; - else - return AMOUNT_MSAT(0); -} - -const char *rq_log(const tal_t *ctx, - const struct route_query *rq, - enum log_level level, - const char *fmt, - ...) +PRINTF_FMT(4, 5) +static const char *cmd_log(const tal_t *ctx, + struct command *cmd, + enum log_level level, + const char *fmt, + ...) { va_list args; const char *msg; @@ -310,66 +303,17 @@ const char *rq_log(const tal_t *ctx, msg = tal_vfmt(ctx, fmt, args); va_end(args); - plugin_notify_message(rq->cmd, level, "%s", msg); + plugin_notify_message(cmd, level, "%s", msg); /* Notifications already get logged at debug. Otherwise reduce * severity. */ if (level != LOG_DBG) - plugin_log(rq->plugin, + plugin_log(cmd->plugin, level == LOG_BROKEN ? level : level - 1, - "%s: %s", rq->cmd->id, msg); + "%s: %s", cmd->id, msg); return msg; } -static const char *fmt_route(const tal_t *ctx, - const struct route *route, - struct amount_msat delivers, - u32 final_cltv) -{ - char *str = tal_strdup(ctx, ""); - - for (size_t i = 0; i < tal_count(route->hops); i++) { - struct short_channel_id_dir scidd; - scidd.scid = route->hops[i].scid; - scidd.dir = route->hops[i].direction; - tal_append_fmt(&str, "%s/%u %s -> ", - fmt_amount_msat(tmpctx, route->hops[i].amount), - route->hops[i].delay, - fmt_short_channel_id_dir(tmpctx, &scidd)); - } - tal_append_fmt(&str, "%s/%u", - fmt_amount_msat(tmpctx, delivers), final_cltv); - return str; -} - -const char *fmt_flow_full(const tal_t *ctx, - const struct route_query *rq, - const struct flow *flow) -{ - struct amount_msat amt = flow->delivers; - char *str = fmt_amount_msat(ctx, flow->delivers); - - for (int i = tal_count(flow->path) - 1; i >= 0; i--) { - struct short_channel_id_dir scidd; - struct amount_msat min, max; - scidd.scid = gossmap_chan_scid(rq->gossmap, flow->path[i]); - scidd.dir = flow->dirs[i]; - if (!amount_msat_add_fee(&amt, - flow->path[i]->half[scidd.dir].base_fee, - flow->path[i]->half[scidd.dir].proportional_fee)) - abort(); - get_constraints(rq, flow->path[i], scidd.dir, &min, &max); - tal_append_fmt(&str, " <- %s %s (cap=%s,fee=%u+%u,delay=%u)", - fmt_amount_msat(tmpctx, amt), - fmt_short_channel_id_dir(tmpctx, &scidd), - fmt_amount_msat(tmpctx, max), - flow->path[i]->half[scidd.dir].base_fee, - flow->path[i]->half[scidd.dir].proportional_fee, - flow->path[i]->half[scidd.dir].delay); - } - return str; -} - enum algorithm { /* Min. Cost Flow by successive shortests paths. */ ALGO_DEFAULT, @@ -395,6 +339,8 @@ param_algorithm(struct command *cmd, const char *name, const char *buffer, } struct getroutes_info { + /* We keep this around in askrene->waiting if we're busy */ + struct list_node list; struct command *cmd; struct node_id source, dest; struct amount_msat amount, maxfee; @@ -408,157 +354,162 @@ struct getroutes_info { u32 maxparts; }; -static void apply_layers(struct askrene *askrene, struct route_query *rq, - const struct node_id *source, - struct amount_msat amount, - struct gossmap_localmods *localmods, - const char **layers, - const struct layer *local_layer) +/* Gather layers, clear capacities where layers contains info */ +static const struct layer **apply_layers(const tal_t *ctx, + struct askrene *askrene, + struct command *cmd, + const struct node_id *source, + struct amount_msat amount, + struct gossmap_localmods *localmods, + const char **layernames, + const struct layer *local_layer, + fp16_t *capacities) { + const struct layer **layers = tal_arr(ctx, const struct layer *, 0); /* Layers must exist, but might be special ones! */ - for (size_t i = 0; i < tal_count(layers); i++) { - const struct layer *l = find_layer(askrene, layers[i]); + for (size_t i = 0; i < tal_count(layernames); i++) { + const struct layer *l = find_layer(askrene, layernames[i]); if (!l) { - if (streq(layers[i], "auto.localchans")) { - plugin_log(rq->plugin, LOG_DBG, "Adding auto.localchans"); + if (streq(layernames[i], "auto.localchans")) { + cmd_log(tmpctx, cmd, LOG_DBG, + "Adding auto.localchans"); l = local_layer; - } else if (streq(layers[i], "auto.no_mpp_support")) { - plugin_log(rq->plugin, LOG_DBG, "Adding auto.no_mpp_support, sorry"); - l = remove_small_channel_layer(layers, askrene, amount, localmods); + } else if (streq(layernames[i], "auto.no_mpp_support")) { + cmd_log(tmpctx, cmd, LOG_DBG, + "Adding auto.no_mpp_support, sorry"); + l = remove_small_channel_layer(layernames, askrene, amount, localmods); } else { - assert(streq(layers[i], "auto.sourcefree")); - plugin_log(rq->plugin, LOG_DBG, "Adding auto.sourcefree"); - l = source_free_layer(layers, askrene, source, localmods); + assert(streq(layernames[i], "auto.sourcefree")); + cmd_log(tmpctx, cmd, LOG_DBG, + "Adding auto.sourcefree"); + l = source_free_layer(layernames, askrene, source, localmods); } } - tal_arr_expand(&rq->layers, l); + tal_arr_expand(&layers, l); /* FIXME: Implement localmods_merge, and cache this in layer? */ - layer_add_localmods(l, rq->gossmap, localmods); + layer_add_localmods(l, askrene->gossmap, localmods); /* Clear any entries in capacities array if we * override them (incl local channels) */ - layer_clear_overridden_capacities(l, askrene->gossmap, rq->capacities); + layer_clear_overridden_capacities(l, askrene->gossmap, capacities); } + return layers; } -/* Convert back into routes, with delay and other information fixed */ -static struct route **convert_flows_to_routes(const tal_t *ctx, - struct route_query *rq, - u32 finalcltv, - struct flow **flows, - struct amount_msat **amounts) +static struct command_result *reap_child(struct router_child *child) { - struct route **routes; - routes = tal_arr(ctx, struct route *, tal_count(flows)); - *amounts = tal_arr(ctx, struct amount_msat, tal_count(flows)); - - for (size_t i = 0; i < tal_count(flows); i++) { - struct route *r; - struct amount_msat msat; - u32 delay; - - routes[i] = r = tal(routes, struct route); - r->success_prob = flow_probability(flows[i], rq); - r->hops = tal_arr(r, struct route_hop, tal_count(flows[i]->path)); - - /* Fill in backwards to calc amount and delay */ - msat = flows[i]->delivers; - delay = finalcltv; - - for (int j = tal_count(flows[i]->path) - 1; j >= 0; j--) { - struct route_hop *rh = &r->hops[j]; - struct gossmap_node *far_end; - const struct half_chan *h = flow_edge(flows[i], j); - - if (!amount_msat_add_fee(&msat, h->base_fee, h->proportional_fee)) - plugin_err(rq->plugin, "Adding fee to amount"); - delay += h->delay; - - rh->scid = gossmap_chan_scid(rq->gossmap, flows[i]->path[j]); - rh->direction = flows[i]->dirs[j]; - far_end = gossmap_nth_node(rq->gossmap, flows[i]->path[j], !flows[i]->dirs[j]); - gossmap_node_get_id(rq->gossmap, far_end, &rh->node_id); - rh->amount = msat; - rh->delay = delay; - } - (*amounts)[i] = flows[i]->delivers; - rq_log(tmpctx, rq, LOG_INFORM, "Flow %zu/%zu: %s", - i, tal_count(flows), - fmt_route(tmpctx, r, (*amounts)[i], finalcltv)); + int child_status; + struct timerel time_delta; + const char *err; + + waitpid(child->pid, &child_status, 0); + time_delta = timemono_between(time_mono(), child->start); + + /* log the time of computation */ + cmd_log(tmpctx, child->cmd, LOG_DBG, "get_routes %s %" PRIu64 " ms", + WEXITSTATUS(child_status) != 0 ? "failed after" : "completed in", + time_to_msec(time_delta)); + + if (WIFSIGNALED(child_status)) { + err = tal_fmt(tmpctx, "child died with signal %u", + WTERMSIG(child_status)); + goto fail_broken; } - return routes; + /* This is how it indicates an error message */ + if (WEXITSTATUS(child_status) != 0 && child->reply_bytes) { + err = tal_strndup(child, child->reply_buf, child->reply_bytes); + goto fail; + } + if (child->reply_bytes == 0) { + err = tal_fmt(child, "child produced no output (exited %i)?", + WEXITSTATUS(child_status)); + goto fail_broken; + } + + /* Frees child, since it's a child of cmd */ + return command_finish_rawstr(child->cmd, + child->reply_buf, child->reply_bytes); + +fail_broken: + plugin_log(child->cmd->plugin, LOG_BROKEN, "%s", err); +fail: + assert(err); + /* Frees child, since it's a child of cmd */ + return command_fail(child->cmd, PAY_ROUTE_NOT_FOUND, "%s", err); } -static void json_add_getroutes(struct json_stream *js, - struct route **routes, - const struct amount_msat *amounts, - double probability, - u32 final_cltv) +/* Last one out finalizes */ +static void log_closed(struct io_conn *conn, struct router_child *child) { - json_add_u64(js, "probability_ppm", (u64)(probability * 1000000)); - json_array_start(js, "routes"); - for (size_t i = 0; i < tal_count(routes); i++) { - json_object_start(js, NULL); - json_add_u64(js, "probability_ppm", - (u64)(routes[i]->success_prob * 1000000)); - json_add_amount_msat(js, "amount_msat", amounts[i]); - json_add_u32(js, "final_cltv", final_cltv); - json_array_start(js, "path"); - for (size_t j = 0; j < tal_count(routes[i]->hops); j++) { - struct short_channel_id_dir scidd; - const struct route_hop *r = &routes[i]->hops[j]; - json_object_start(js, NULL); - scidd.scid = r->scid; - scidd.dir = r->direction; - json_add_short_channel_id_dir( - js, "short_channel_id_dir", scidd); - json_add_node_id(js, "next_node_id", &r->node_id); - json_add_amount_msat(js, "amount_msat", r->amount); - json_add_u32(js, "delay", r->delay); - json_object_end(js); - } - json_array_end(js); - json_object_end(js); - } - json_array_end(js); + child->log_conn = NULL; + if (child->reply_conn == NULL) + reap_child(child); } -void get_constraints(const struct route_query *rq, - const struct gossmap_chan *chan, - int dir, - struct amount_msat *min, - struct amount_msat *max) +static void reply_closed(struct io_conn *conn, struct router_child *child) { - struct short_channel_id_dir scidd; - size_t idx = gossmap_chan_idx(rq->gossmap, chan); + child->reply_conn = NULL; + if (child->log_conn == NULL) + reap_child(child); +} - *min = AMOUNT_MSAT(0); +static struct io_plan *log_msg_in(struct io_conn *conn, + struct router_child *child) +{ + enum log_level level; + char *entry; + struct node_id *peer; - /* Fast path: no information known, no reserve. */ - if (idx < tal_count(rq->capacities) && rq->capacities[idx] != 0) { - *max = amount_msat(fp16_to_u64(rq->capacities[idx]) * 1000); - return; + if (fromwire_status_log(tmpctx, child->log_msg, &level, &peer, &entry)) + cmd_log(tmpctx, child->cmd, level, "%s", entry); + else { + cmd_log(tmpctx, child->cmd, LOG_BROKEN, + "unexpected non-log message %s", + tal_hex(tmpctx, child->log_msg)); } + return io_read_wire(conn, child, &child->log_msg, log_msg_in, child); +} - /* Naive implementation! */ - scidd.scid = gossmap_chan_scid(rq->gossmap, chan); - scidd.dir = dir; - *max = AMOUNT_MSAT(-1ULL); +static struct io_plan *child_log_init(struct io_conn *conn, + struct router_child *child) +{ + io_set_finish(conn, log_closed, child); + return io_read_wire(conn, child, &child->log_msg, log_msg_in, child); +} - /* Look through layers for any constraints (might be dummy - * ones, for created channels!) */ - for (size_t i = 0; i < tal_count(rq->layers); i++) - layer_apply_constraints(rq->layers[i], &scidd, min, max); +static size_t remaining_read_len(const struct router_child *child) +{ + return tal_bytelen(child->reply_buf) - child->reply_bytes; +} + +static struct io_plan *child_reply_in(struct io_conn *conn, + struct router_child *child) +{ + child->reply_bytes += child->this_reply_len; + if (remaining_read_len(child) < 64) + tal_resize(&child->reply_buf, tal_bytelen(child->reply_buf) * 2); + return io_read_partial(conn, + child->reply_buf + child->reply_bytes, + remaining_read_len(child), + &child->this_reply_len, + child_reply_in, child); +} - /* Might be here because it's reserved, but capacity is normal. */ - if (amount_msat_eq(*max, AMOUNT_MSAT(-1ULL))) - *max = gossmap_chan_get_capacity(rq->gossmap, chan); +static struct io_plan *child_reply_init(struct io_conn *conn, + struct router_child *child) +{ + io_set_finish(conn, reply_closed, child); + child->reply_buf = tal_arr(child, char, 64); + child->reply_bytes = 0; + child->this_reply_len = 0; + return child_reply_in(conn, child); +} - /* Finally, if any is in use, subtract that! */ - reserve_sub(rq->reserved, &scidd, rq->layers, min); - reserve_sub(rq->reserved, &scidd, rq->layers, max); +static void destroy_router_child(struct router_child *child) +{ + list_del(&child->list); } static struct command_result *do_getroutes(struct command *cmd, @@ -566,13 +517,13 @@ static struct command_result *do_getroutes(struct command *cmd, struct getroutes_info *info) { struct askrene *askrene = get_askrene(cmd->plugin); - struct route_query *rq = tal(cmd, struct route_query); const char *err; - double probability; - struct amount_msat *amounts; - struct route **routes; - struct flow **flows; - struct json_stream *response; + struct timemono deadline; + int replyfds[2], logfds[2]; + struct router_child *child; + const struct layer **layers; + s8 *biases; + fp16_t *capacities; /* update the gossmap */ if (gossmap_refresh(askrene->gossmap)) { @@ -582,50 +533,35 @@ static struct command_result *do_getroutes(struct command *cmd, get_capacities(askrene, askrene->plugin, askrene->gossmap); } - /* build this request structure */ - rq->cmd = cmd; - rq->plugin = cmd->plugin; - rq->gossmap = askrene->gossmap; - rq->reserved = askrene->reserved; - rq->layers = tal_arr(rq, const struct layer *, 0); - rq->capacities = tal_dup_talarr(rq, fp16_t, askrene->capacities); - /* FIXME: we still need to do something useful with these */ - rq->additional_costs = info->additional_costs; - rq->maxparts = info->maxparts; + capacities = tal_dup_talarr(cmd, fp16_t, askrene->capacities); /* apply selected layers to the localmods */ - apply_layers(askrene, rq, &info->source, info->amount, localmods, - info->layers, info->local_layer); + layers = apply_layers(cmd, askrene, cmd, + &info->source, info->amount, localmods, + info->layers, info->local_layer, capacities); /* Clear scids with reservations, too, so we don't have to look up * all the time! */ reserves_clear_capacities(askrene->reserved, askrene->gossmap, - rq->capacities); + capacities); /* we temporarily apply localmods */ gossmap_apply_localmods(askrene->gossmap, localmods); - /* I want to be able to disable channels while working on this query. - * Layers are for user interaction and cannot be used for this purpose. - */ - rq->disabled_chans = - tal_arrz(rq, bitmap, - 2 * BITMAP_NWORDS(gossmap_max_chan_idx(askrene->gossmap))); - /* localmods can add channels, so we need to allocate biases array * *afterwards* */ - rq->biases = - tal_arrz(rq, s8, gossmap_max_chan_idx(askrene->gossmap) * 2); + biases = tal_arrz(cmd, s8, gossmap_max_chan_idx(askrene->gossmap) * 2); /* Note any channel biases */ - for (size_t i = 0; i < tal_count(rq->layers); i++) - layer_apply_biases(rq->layers[i], askrene->gossmap, rq->biases); + for (size_t i = 0; i < tal_count(layers); i++) + layer_apply_biases(layers[i], askrene->gossmap, biases); /* checkout the source */ const struct gossmap_node *srcnode = gossmap_find_node(askrene->gossmap, &info->source); if (!srcnode) { - err = rq_log(tmpctx, rq, LOG_INFORM, "Unknown source node %s", + err = cmd_log(tmpctx, cmd, LOG_INFORM, + "Unknown source node %s", fmt_node_id(tmpctx, &info->source)); goto fail; } @@ -634,7 +570,7 @@ static struct command_result *do_getroutes(struct command *cmd, const struct gossmap_node *dstnode = gossmap_find_node(askrene->gossmap, &info->dest); if (!dstnode) { - err = rq_log(tmpctx, rq, LOG_INFORM, + err = cmd_log(tmpctx, cmd, LOG_INFORM, "Unknown destination node %s", fmt_node_id(tmpctx, &info->dest)); goto fail; @@ -644,61 +580,85 @@ static struct command_result *do_getroutes(struct command *cmd, if (have_layer(info->layers, "auto.no_mpp_support") && info->dev_algo != ALGO_SINGLE_PATH) { info->dev_algo = ALGO_SINGLE_PATH; - rq_log(tmpctx, rq, LOG_DBG, + cmd_log(tmpctx, cmd, LOG_DBG, "Layer no_mpp_support is active we switch to a " "single path algorithm."); } - if (rq->maxparts == 1 && + if (info->maxparts == 1 && info->dev_algo != ALGO_SINGLE_PATH) { info->dev_algo = ALGO_SINGLE_PATH; - rq_log(tmpctx, rq, LOG_DBG, + cmd_log(tmpctx, cmd, LOG_DBG, "maxparts == 1: switching to a single path algorithm."); } - /* Compute the routes. At this point we might select between multiple - * algorithms. Right now there is only one algorithm available. */ - struct timemono time_start = time_mono(); - struct timemono deadline = timemono_add(time_start, - time_from_sec(askrene->route_seconds)); - if (info->dev_algo == ALGO_SINGLE_PATH) { - err = single_path_routes(rq, rq, deadline, srcnode, dstnode, info->amount, - info->maxfee, info->finalcltv, - info->maxdelay, &flows, &probability); - } else { - assert(info->dev_algo == ALGO_DEFAULT); - err = default_routes(rq, rq, deadline, srcnode, dstnode, info->amount, - info->maxfee, info->finalcltv, - info->maxdelay, &flows, &probability); + child = tal(cmd, struct router_child); + child->start = time_mono(); + deadline = timemono_add(child->start, + time_from_sec(askrene->route_seconds)); + + if (pipe(replyfds) != 0) { + err = tal_fmt(tmpctx, "failed to create pipes: %s", strerror(errno)); + goto fail_broken; + } + if (pipe(logfds) != 0) { + err = tal_fmt(tmpctx, "failed to create pipes: %s", strerror(errno)); + close_noerr(replyfds[0]); + close_noerr(replyfds[1]); + goto fail_broken; + } + child->pid = fork(); + if (child->pid < 0) { + err = tal_fmt(tmpctx, "failed to fork: %s", strerror(errno)); + close_noerr(replyfds[0]); + close_noerr(replyfds[1]); + close_noerr(logfds[0]); + close_noerr(logfds[1]); + goto fail_broken; } - struct timerel time_delta = timemono_between(time_mono(), time_start); - /* log the time of computation */ - rq_log(tmpctx, rq, LOG_DBG, "get_routes %s %" PRIu64 " ms", - err ? "failed after" : "completed in", - time_to_msec(time_delta)); - if (err) - goto fail; + if (child->pid == 0) { + /* We are the child. Run the algo */ + close(logfds[0]); + close(replyfds[0]); + set_child_log_fd(logfds[1]); + + /* Make sure we don't stomp over plugin fds, even if we have a bug */ + for (int i = 0; i < min_u64(logfds[1], replyfds[1]); i++) { + /* stderr is maintained */ + if (i != 2) + close(i); + } - /* otherwise we continue */ - assert(tal_count(flows) > 0); - rq_log(tmpctx, rq, LOG_DBG, "Final answer has %zu flows", - tal_count(flows)); + /* Does not return! */ + run_child(askrene->gossmap, + layers, + biases, + info->additional_costs, + askrene->reserved, + take(capacities), + info->dev_algo == ALGO_SINGLE_PATH, + deadline, srcnode, dstnode, info->amount, + info->maxfee, info->finalcltv, info->maxdelay, + info->maxparts, cmd->id, cmd->filter, replyfds[1]); + abort(); + } - /* convert flows to routes */ - routes = convert_flows_to_routes(rq, rq, info->finalcltv, flows, - &amounts); - assert(tal_count(routes) == tal_count(flows)); - assert(tal_count(amounts) == tal_count(flows)); + close(logfds[1]); + close(replyfds[1]); - /* At last we remove the localmods from the gossmap. */ + /* We don't need this any more. */ gossmap_remove_localmods(askrene->gossmap, localmods); + child->reply_conn = io_new_conn(child, replyfds[0], + child_reply_init, child); + child->log_conn = io_new_conn(child, logfds[0], child_log_init, child); + child->cmd = cmd; - /* output the results */ - response = jsonrpc_stream_success(cmd); - json_add_getroutes(response, routes, amounts, probability, - info->finalcltv); - return command_finished(cmd, response); + list_add_tail(&askrene->children, &child->list); + tal_add_destructor(child, destroy_router_child); + return command_still_pending(cmd); +fail_broken: + plugin_log(cmd->plugin, LOG_BROKEN, "%s", err); fail: assert(err); gossmap_remove_localmods(askrene->gossmap, localmods); @@ -801,6 +761,49 @@ listpeerchannels_done(struct command *cmd, return do_getroutes(cmd, localmods, info); } +/* Mutual recursion */ +static struct command_result *begin_request(struct askrene *askrene, + struct getroutes_info *info); + +/* One is finished. Maybe wake up a waiter */ +static void destroy_live_command(struct command *cmd) +{ + struct askrene *askrene = get_askrene(cmd->plugin); + struct getroutes_info *info; + + assert(askrene->num_live_requests > 0); + askrene->num_live_requests--; + + if (askrene->num_live_requests >= askrene->max_children) + return; + + info = list_pop(&askrene->waiters, struct getroutes_info, list); + if (info) + begin_request(askrene, info); +} + +static struct command_result *begin_request(struct askrene *askrene, + struct getroutes_info *info) +{ + askrene->num_live_requests++; + + /* Wake any waiting ones when we're finished */ + tal_add_destructor(info->cmd, destroy_live_command); + + if (have_layer(info->layers, "auto.localchans")) { + struct out_req *req; + + req = jsonrpc_request_start(info->cmd, + "listpeerchannels", + listpeerchannels_done, + forward_error, info); + return send_outreq(req); + } else + info->local_layer = NULL; + + return do_getroutes(info->cmd, gossmap_localmods_new(info->cmd), info); +} + static struct command_result *json_getroutes(struct command *cmd, const char *buffer, const jsmntok_t *params) @@ -813,6 +816,7 @@ static struct command_result *json_getroutes(struct command *cmd, */ /* FIXME: Typo in spec for CLTV in descripton! But it breaks our spelling check, so we omit it above */ const u32 maxdelay_allowed = 2016; + struct askrene *askrene = get_askrene(cmd->plugin); const u32 default_maxparts = 100; struct getroutes_info *info = tal(cmd, struct getroutes_info); /* param functions require pointers */ @@ -867,22 +871,18 @@ static struct command_result *json_getroutes(struct command *cmd, info->finalcltv = *finalcltv; info->maxdelay = *maxdelay; info->dev_algo = *dev_algo; - info->additional_costs = tal(info, struct additional_cost_htable); - additional_cost_htable_init(info->additional_costs); + info->additional_costs = new_htable(info, additional_cost_htable); info->maxparts = *maxparts; - if (have_layer(info->layers, "auto.localchans")) { - struct out_req *req; - - req = jsonrpc_request_start(cmd, - "listpeerchannels", - listpeerchannels_done, - forward_error, info); - return send_outreq(req); - } else - info->local_layer = NULL; + if (askrene->num_live_requests >= askrene->max_children) { + cmd_log(tmpctx, cmd, LOG_INFORM, + "Too many running at once (%zu vs %u): waiting", + askrene->num_live_requests, askrene->max_children); + list_add_tail(&askrene->waiters, &info->list); + return command_still_pending(cmd); + } - return do_getroutes(cmd, gossmap_localmods_new(cmd), info); + return begin_request(askrene, info); } static struct command_result *json_askrene_reserve(struct command *cmd, @@ -1397,6 +1397,9 @@ static const char *init(struct command *init_cmd, askrene->plugin = plugin; list_head_init(&askrene->layers); + list_head_init(&askrene->children); + list_head_init(&askrene->waiters); + askrene->num_live_requests = 0; askrene->reserved = new_reserve_htable(askrene); askrene->gossmap = gossmap_load(askrene, GOSSIP_STORE_FILENAME, plugin_gossmap_logcb, plugin); @@ -1424,6 +1427,7 @@ int main(int argc, char *argv[]) askrene = tal(NULL, struct askrene); askrene->route_seconds = 10; + askrene->max_children = 4; plugin_main(argv, init, take(askrene), PLUGIN_RESTARTABLE, true, NULL, commands, ARRAY_SIZE(commands), NULL, 0, NULL, 0, NULL, 0, plugin_option_dynamic("askrene-timeout", @@ -1432,5 +1436,11 @@ int main(int argc, char *argv[]) " Defaults to 10 seconds", u32_option, u32_jsonfmt, &askrene->route_seconds), + plugin_option_dynamic("askrene-max-threads", + "int", + "How many routes to calculate at once." + " Defaults to 4", + u32_option, u32_jsonfmt, + &askrene->max_children), NULL); } diff --git a/plugins/askrene/askrene.h b/plugins/askrene/askrene.h index a9a809737329..a574538991b2 100644 --- a/plugins/askrene/askrene.h +++ b/plugins/askrene/askrene.h @@ -12,14 +12,6 @@ struct gossmap_chan; -/* A single route. */ -struct route { - /* Actual path to take */ - struct route_hop *hops; - /* Probability estimate (0-1) */ - double success_prob; -}; - /* Grab-bag of "globals" for this plugin */ struct askrene { struct plugin *plugin; @@ -36,71 +28,19 @@ struct askrene { struct command *layer_cmd; /* How long before we abort trying to find a route? */ u32 route_seconds; + /* Maximum number of routing children */ + u32 max_children; + /* How many requests live now? */ + size_t num_live_requests; + /* Routing children currently in flight. */ + struct list_head children; + /* Ones waiting */ + struct list_head waiters; }; -/* Information for a single route query. */ -struct route_query { - /* Command pointer, mainly for command id. */ - struct command *cmd; - - /* Plugin pointer, for logging mainly */ - struct plugin *plugin; - - /* This is *not* updated during a query! Has all layers applied. */ - const struct gossmap *gossmap; - - /* We need to take in-flight payments into account */ - const struct reserve_htable *reserved; - - /* Array of layers we're applying */ - const struct layer **layers; - - /* Cache of channel capacities for non-reserved, unknown channels. */ - fp16_t *capacities; - - /* Compact cache of biases */ - s8 *biases; - - /* Additional per-htlc cost for local channels */ - const struct additional_cost_htable *additional_costs; - - /* channels we disable during computation to meet constraints */ - bitmap *disabled_chans; - - /* limit the number of paths in the solution */ - u32 maxparts; -}; - -/* Given a gossmap channel, get the current known min/max */ -void get_constraints(const struct route_query *rq, - const struct gossmap_chan *chan, - int dir, - struct amount_msat *min, - struct amount_msat *max); - -/* Say something about this route_query */ -const char *rq_log(const tal_t *ctx, - const struct route_query *rq, - enum log_level level, - const char *fmt, - ...) - PRINTF_FMT(4, 5); - -/* Is there a known additional per-htlc cost for this channel? */ -struct amount_msat get_additional_per_htlc_cost(const struct route_query *rq, - const struct short_channel_id_dir *scidd); - /* Useful plugin->askrene mapping */ static inline struct askrene *get_askrene(struct plugin *plugin) { return plugin_get_data(plugin, struct askrene); } - -/* Convenience routine for hash tables */ -static inline size_t hash_scidd(const struct short_channel_id_dir *scidd) -{ - /* scids cost money to generate, so simple hash works here */ - return (scidd->scid.u64 >> 32) ^ (scidd->scid.u64 >> 16) ^ (scidd->scid.u64 << 1) ^ scidd->dir; -} - #endif /* LIGHTNING_PLUGINS_ASKRENE_ASKRENE_H */ diff --git a/plugins/askrene/child/additional_costs.h b/plugins/askrene/child/additional_costs.h new file mode 100644 index 000000000000..44eb0971f800 --- /dev/null +++ b/plugins/askrene/child/additional_costs.h @@ -0,0 +1,32 @@ +#ifndef LIGHTNING_PLUGINS_ASKRENE_CHILD_ADDITIONAL_COSTS_H +#define LIGHTNING_PLUGINS_ASKRENE_CHILD_ADDITIONAL_COSTS_H +#include "config.h" +#include +#include + +/* "spendable" for a channel assumes a single HTLC: for additional HTLCs, + * the need to pay for fees (if we're the owner) reduces it */ +struct per_htlc_cost { + struct short_channel_id_dir scidd; + struct amount_msat per_htlc_cost; +}; + +static inline const struct short_channel_id_dir * +per_htlc_cost_key(const struct per_htlc_cost *phc) +{ + return &phc->scidd; +} + +static inline bool per_htlc_cost_eq_key(const struct per_htlc_cost *phc, + const struct short_channel_id_dir *scidd) +{ + return short_channel_id_dir_eq(scidd, &phc->scidd); +} + +HTABLE_DEFINE_NODUPS_TYPE(struct per_htlc_cost, + per_htlc_cost_key, + hash_scidd, + per_htlc_cost_eq_key, + additional_cost_htable); + +#endif /* LIGHTNING_PLUGINS_ASKRENE_CHILD_ADDITIONAL_COSTS_H */ diff --git a/plugins/askrene/algorithm.c b/plugins/askrene/child/algorithm.c similarity index 99% rename from plugins/askrene/algorithm.c rename to plugins/askrene/child/algorithm.c index d253325a8b23..4da79e0cb095 100644 --- a/plugins/askrene/algorithm.c +++ b/plugins/askrene/child/algorithm.c @@ -1,8 +1,8 @@ #include "config.h" #include #include -#include -#include +#include +#include static const s64 INFINITE = INT64_MAX; diff --git a/plugins/askrene/algorithm.h b/plugins/askrene/child/algorithm.h similarity index 96% rename from plugins/askrene/algorithm.h rename to plugins/askrene/child/algorithm.h index 40010eb5cfd9..67bbff93ca3a 100644 --- a/plugins/askrene/algorithm.h +++ b/plugins/askrene/child/algorithm.h @@ -1,11 +1,11 @@ -#ifndef LIGHTNING_PLUGINS_ASKRENE_ALGORITHM_H -#define LIGHTNING_PLUGINS_ASKRENE_ALGORITHM_H +#ifndef LIGHTNING_PLUGINS_ASKRENE_CHILD_ALGORITHM_H +#define LIGHTNING_PLUGINS_ASKRENE_CHILD_ALGORITHM_H /* Implementation of network algorithms: shortests path, minimum cost flow, etc. */ #include "config.h" -#include +#include /* Search any path from source to destination using Breadth First Search. * @@ -176,4 +176,4 @@ bool mcf_refinement(const tal_t *ctx, const s64 *cost, s64 *potential); -#endif /* LIGHTNING_PLUGINS_ASKRENE_ALGORITHM_H */ +#endif /* LIGHTNING_PLUGINS_ASKRENE_CHILD_ALGORITHM_H */ diff --git a/plugins/askrene/child/child.c b/plugins/askrene/child/child.c new file mode 100644 index 000000000000..c4c308db67ee --- /dev/null +++ b/plugins/askrene/child/child.c @@ -0,0 +1,234 @@ +#include "config.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* A single route. */ +struct route { + /* Actual path to take */ + struct route_hop *hops; + /* Probability estimate (0-1) */ + double success_prob; +}; + +static const char *fmt_route(const tal_t *ctx, + const struct route *route, + struct amount_msat delivers, + u32 final_cltv) +{ + char *str = tal_strdup(ctx, ""); + + for (size_t i = 0; i < tal_count(route->hops); i++) { + struct short_channel_id_dir scidd; + scidd.scid = route->hops[i].scid; + scidd.dir = route->hops[i].direction; + tal_append_fmt(&str, "%s/%u %s -> ", + fmt_amount_msat(tmpctx, route->hops[i].amount), + route->hops[i].delay, + fmt_short_channel_id_dir(tmpctx, &scidd)); + } + tal_append_fmt(&str, "%s/%u", + fmt_amount_msat(tmpctx, delivers), final_cltv); + return str; +} + +/* Convert back into routes, with delay and other information fixed */ +static struct route **convert_flows_to_routes(const tal_t *ctx, + const struct route_query *rq, + u32 finalcltv, + struct flow **flows, + struct amount_msat **amounts) +{ + struct route **routes; + routes = tal_arr(ctx, struct route *, tal_count(flows)); + *amounts = tal_arr(ctx, struct amount_msat, tal_count(flows)); + + for (size_t i = 0; i < tal_count(flows); i++) { + struct route *r; + struct amount_msat msat; + u32 delay; + + routes[i] = r = tal(routes, struct route); + r->success_prob = flow_probability(flows[i], rq); + r->hops = tal_arr(r, struct route_hop, tal_count(flows[i]->path)); + + /* Fill in backwards to calc amount and delay */ + msat = flows[i]->delivers; + delay = finalcltv; + + for (int j = tal_count(flows[i]->path) - 1; j >= 0; j--) { + struct route_hop *rh = &r->hops[j]; + struct gossmap_node *far_end; + const struct half_chan *h = flow_edge(flows[i], j); + + if (!amount_msat_add_fee(&msat, h->base_fee, h->proportional_fee)) { + child_err("Adding fee to amount"); + } + delay += h->delay; + + rh->scid = gossmap_chan_scid(rq->gossmap, flows[i]->path[j]); + rh->direction = flows[i]->dirs[j]; + far_end = gossmap_nth_node(rq->gossmap, flows[i]->path[j], !flows[i]->dirs[j]); + gossmap_node_get_id(rq->gossmap, far_end, &rh->node_id); + rh->amount = msat; + rh->delay = delay; + } + (*amounts)[i] = flows[i]->delivers; + child_log(tmpctx, LOG_INFORM, "Flow %zu/%zu: %s", + i, tal_count(flows), + fmt_route(tmpctx, r, (*amounts)[i], finalcltv)); + } + + return routes; +} + +static void json_add_getroutes(struct json_stream *js, + struct route **routes, + const struct amount_msat *amounts, + double probability, + u32 final_cltv) +{ + json_add_u64(js, "probability_ppm", (u64)(probability * 1000000)); + json_array_start(js, "routes"); + for (size_t i = 0; i < tal_count(routes); i++) { + json_object_start(js, NULL); + json_add_u64(js, "probability_ppm", + (u64)(routes[i]->success_prob * 1000000)); + json_add_amount_msat(js, "amount_msat", amounts[i]); + json_add_u32(js, "final_cltv", final_cltv); + json_array_start(js, "path"); + for (size_t j = 0; j < tal_count(routes[i]->hops); j++) { + struct short_channel_id_dir scidd; + const struct route_hop *r = &routes[i]->hops[j]; + json_object_start(js, NULL); + scidd.scid = r->scid; + scidd.dir = r->direction; + json_add_short_channel_id_dir( + js, "short_channel_id_dir", scidd); + json_add_node_id(js, "next_node_id", &r->node_id); + json_add_amount_msat(js, "amount_msat", r->amount); + json_add_u32(js, "delay", r->delay); + json_object_end(js); + } + json_array_end(js); + json_object_end(js); + } + json_array_end(js); +} + +static struct route_query *new_route_query(const tal_t *ctx, + const struct gossmap *gossmap, + const char *cmd_id, + const struct layer **layers, + const s8 *biases, + const struct additional_cost_htable *additional_costs, + struct reserve_htable *reserved, + fp16_t *capacities TAKES) +{ + struct route_query *rq = tal(ctx, struct route_query); + + rq->gossmap = gossmap; + rq->cmd_id = tal_strdup(rq, cmd_id); + rq->layers = layers; + rq->biases = biases; + rq->additional_costs = additional_costs; + rq->reserved = reserved; + rq->capacities = tal_dup_talarr(rq, fp16_t, capacities); + rq->disabled_chans = + tal_arrz(rq, bitmap, + 2 * BITMAP_NWORDS(gossmap_max_chan_idx(gossmap))); + + return rq; +} + +void run_child(const struct gossmap *gossmap, + const struct layer **layers, + const s8 *biases, + const struct additional_cost_htable *additional_costs, + struct reserve_htable *reserved, + fp16_t *capacities TAKES, + bool single_path, + struct timemono deadline, + const struct gossmap_node *srcnode, + const struct gossmap_node *dstnode, + struct amount_msat amount, struct amount_msat maxfee, + u32 finalcltv, u32 maxdelay, size_t maxparts, + const char *cmd_id, + struct json_filter *cmd_filter, + int replyfd) +{ + double probability; + struct flow **flows; + struct route **routes; + struct amount_msat *amounts; + const char *err, *p; + size_t len; + struct route_query *rq; + + /* We exit below, so we don't bother freeing this */ + rq = new_route_query(NULL, gossmap, cmd_id, layers, + biases, additional_costs, + reserved, capacities); + if (single_path) { + err = single_path_routes(rq, rq, deadline, srcnode, dstnode, + amount, maxfee, finalcltv, + maxdelay, &flows, &probability); + } else { + err = default_routes(rq, rq, deadline, srcnode, dstnode, + amount, maxfee, finalcltv, maxdelay, + maxparts, &flows, &probability); + } + if (err) { + write_all(replyfd, err, strlen(err)); + /* Non-zero exit tells parent this is an error string. */ + exit(1); + } + + /* otherwise we continue */ + assert(tal_count(flows) > 0); + child_log(tmpctx, LOG_DBG, "Final answer has %zu flows", + tal_count(flows)); + + /* convert flows to routes */ + routes = convert_flows_to_routes(rq, rq, finalcltv, flows, + &amounts); + assert(tal_count(routes) == tal_count(flows)); + assert(tal_count(amounts) == tal_count(flows)); + + /* output the results */ + struct json_stream *js = new_json_stream(tmpctx, NULL, NULL); + json_object_start(js, NULL); + json_add_string(js, "jsonrpc", "2.0"); + json_add_id(js, cmd_id); + json_object_start(js, "result"); + if (cmd_filter) + json_stream_attach_filter(js, cmd_filter); + json_add_getroutes(js, routes, amounts, probability, finalcltv); + + /* Detach filter before it complains about closing object it never saw */ + if (cmd_filter) { + err = json_stream_detach_filter(tmpctx, js); + if (err) + json_add_string(js, "warning_parameter_filter", err); + } + /* "result" object */ + json_object_end(js); + /* Global object */ + json_object_end(js); + json_stream_close(js, NULL); + + p = json_out_contents(js->jout, &len); + if (!write_all(replyfd, p, len)) + abort(); + exit(0); +} + diff --git a/plugins/askrene/child/child.h b/plugins/askrene/child/child.h new file mode 100644 index 000000000000..1ad16164ab37 --- /dev/null +++ b/plugins/askrene/child/child.h @@ -0,0 +1,35 @@ +#ifndef LIGHTNING_PLUGINS_ASKRENE_CHILD_CHILD_H +#define LIGHTNING_PLUGINS_ASKRENE_CHILD_CHILD_H +#include "config.h" +#include +#include +#include +#include +#include +#include + +struct additional_cost_htable; +struct gossmap; +struct json_filter; +struct layer; +struct reserve_htable; + +/* This is the child. Do the thing. */ +void run_child(const struct gossmap *gossmap, + const struct layer **layers, + const s8 *biases, + const struct additional_cost_htable *additional_costs, + struct reserve_htable *reserved, + fp16_t *capacities TAKES, + bool single_path, + struct timemono deadline, + const struct gossmap_node *srcnode, + const struct gossmap_node *dstnode, + struct amount_msat amount, struct amount_msat maxfee, + u32 finalcltv, u32 maxdelay, size_t maxparts, + const char *cmd_id, + struct json_filter *cmd_filter, + int reply_fd) NORETURN; + +#endif /* LIGHTNING_PLUGINS_ASKRENE_CHILD_CHILD_H */ + diff --git a/plugins/askrene/child/child_log.c b/plugins/askrene/child/child_log.c new file mode 100644 index 000000000000..604573f9ab30 --- /dev/null +++ b/plugins/askrene/child/child_log.c @@ -0,0 +1,57 @@ +#include "config.h" +#include +#include +#include +#include +#include +#include +#include + +static int log_fd = -1; + +static const char *child_logv(const tal_t *ctx, + enum log_level level, + const char *fmt, + va_list ap) +{ + const char *str = tal_vfmt(ctx, fmt, ap); + u8 *msg; + + /* We reuse status_wire here */ + msg = towire_status_log(NULL, level, NULL, str); + if (!wire_sync_write(log_fd, take(msg))) + abort(); + return str; +} + +const char *child_log(const tal_t *ctx, + enum log_level level, + const char *fmt, + ...) +{ + va_list args; + const char *str; + + va_start(args, fmt); + str = child_logv(ctx, level, fmt, args); + va_end(args); + return str; +} + +void child_err(const char *fmt, ...) +{ + va_list args; + + va_start(args, fmt); + child_logv(tmpctx, LOG_BROKEN, fmt, args); + va_end(args); + + abort(); +} + +void set_child_log_fd(int fd) +{ + assert(log_fd == -1); + assert(fd != -1); + log_fd = fd; +} diff --git a/plugins/askrene/child/child_log.h b/plugins/askrene/child/child_log.h new file mode 100644 index 000000000000..aad4db062cd9 --- /dev/null +++ b/plugins/askrene/child/child_log.h @@ -0,0 +1,20 @@ +#ifndef LIGHTNING_PLUGINS_ASKRENE_CHILD_CHILD_LOG_H +#define LIGHTNING_PLUGINS_ASKRENE_CHILD_CHILD_LOG_H +#include "config.h" +#include +#include + +/* Logs this, and also returns the string allocated off ctx */ +const char *child_log(const tal_t *ctx, + enum log_level level, + const char *fmt, + ...) + PRINTF_FMT(3,4); + +/* BROKEN variant. */ +void child_err(const char *fmt, ...) + PRINTF_FMT(1, 2) NORETURN; + +/* At initialization, we set this to the fd for child_log() to write to */ +void set_child_log_fd(int fd); +#endif /* LIGHTNING_PLUGINS_ASKRENE_CHILD_CHILD_LOG_H */ diff --git a/plugins/askrene/dijkstra.c b/plugins/askrene/child/dijkstra.c similarity index 99% rename from plugins/askrene/dijkstra.c rename to plugins/askrene/child/dijkstra.c index b3f9d39deb7b..4e4ca61b50ce 100644 --- a/plugins/askrene/dijkstra.c +++ b/plugins/askrene/child/dijkstra.c @@ -1,6 +1,6 @@ #define NDEBUG 1 #include "config.h" -#include +#include /* In the heap we keep node idx, but in this structure we keep the distance * value associated to every node, and their position in the heap as a pointer diff --git a/plugins/askrene/dijkstra.h b/plugins/askrene/child/dijkstra.h similarity index 85% rename from plugins/askrene/dijkstra.h rename to plugins/askrene/child/dijkstra.h index f8ff62a8ad7f..60047626962c 100644 --- a/plugins/askrene/dijkstra.h +++ b/plugins/askrene/child/dijkstra.h @@ -1,5 +1,5 @@ -#ifndef LIGHTNING_PLUGINS_ASKRENE_DIJKSTRA_H -#define LIGHTNING_PLUGINS_ASKRENE_DIJKSTRA_H +#ifndef LIGHTNING_PLUGINS_ASKRENE_CHILD_DIJKSTRA_H +#define LIGHTNING_PLUGINS_ASKRENE_CHILD_DIJKSTRA_H #include "config.h" #include #include @@ -27,4 +27,4 @@ size_t dijkstra_size(const struct dijkstra *dijkstra); /* Maximum number of elements the heap can host */ size_t dijkstra_maxsize(const struct dijkstra *dijkstra); -#endif /* LIGHTNING_PLUGINS_ASKRENE_DIJKSTRA_H */ +#endif /* LIGHTNING_PLUGINS_ASKRENE_CHILD_DIJKSTRA_H */ diff --git a/plugins/askrene/explain_failure.c b/plugins/askrene/child/explain_failure.c similarity index 84% rename from plugins/askrene/explain_failure.c rename to plugins/askrene/child/explain_failure.c index f9720db0df1c..a87e2d7499de 100644 --- a/plugins/askrene/explain_failure.c +++ b/plugins/askrene/child/explain_failure.c @@ -3,8 +3,10 @@ #include #include #include -#include -#include +#include +#include +#include +#include #include #include @@ -133,33 +135,33 @@ static const char *check_capacity(const tal_t *ctx, node_stats(rq, node, node_direction, &stats); if (amount_msat_greater(amount, stats.total.capacity)) { - return rq_log(ctx, rq, LOG_DBG, - NO_USABLE_PATHS_STRING - " Total %s capacity is only %s" - " (in %zu channels).", - name, - fmt_amount_msat(tmpctx, stats.total.capacity), - stats.total.num_channels); + return child_log(ctx, LOG_DBG, + NO_USABLE_PATHS_STRING + " Total %s capacity is only %s" + " (in %zu channels).", + name, + fmt_amount_msat(tmpctx, stats.total.capacity), + stats.total.num_channels); } if (amount_msat_greater(amount, stats.gossip_known.capacity)) { - return rq_log(ctx, rq, LOG_DBG, - NO_USABLE_PATHS_STRING - " Missing gossip for %s: only known %zu/%zu channels, leaving capacity only %s of %s.", - name, - stats.gossip_known.num_channels, - stats.total.num_channels, - fmt_amount_msat(tmpctx, stats.gossip_known.capacity), - fmt_amount_msat(tmpctx, stats.total.capacity)); + return child_log(ctx, LOG_DBG, + NO_USABLE_PATHS_STRING + " Missing gossip for %s: only known %zu/%zu channels, leaving capacity only %s of %s.", + name, + stats.gossip_known.num_channels, + stats.total.num_channels, + fmt_amount_msat(tmpctx, stats.gossip_known.capacity), + fmt_amount_msat(tmpctx, stats.total.capacity)); } if (amount_msat_greater(amount, stats.enabled.capacity)) { - return rq_log(ctx, rq, LOG_DBG, - NO_USABLE_PATHS_STRING + return child_log(ctx, LOG_DBG, + NO_USABLE_PATHS_STRING " The %s has disabled %zu of %zu channels, leaving capacity only %s of %s.", - name, - stats.total.num_channels - stats.enabled.num_channels, - stats.total.num_channels, - fmt_amount_msat(tmpctx, stats.enabled.capacity), - fmt_amount_msat(tmpctx, stats.total.capacity)); + name, + stats.total.num_channels - stats.enabled.num_channels, + stats.total.num_channels, + fmt_amount_msat(tmpctx, stats.enabled.capacity), + fmt_amount_msat(tmpctx, stats.total.capacity)); } return NULL; } @@ -236,7 +238,7 @@ const char *explain_failure(const tal_t *ctx, hops = route_from_dijkstra(tmpctx, rq->gossmap, dij, srcnode, AMOUNT_MSAT(0), 0); if (!hops) - return rq_log(ctx, rq, LOG_INFORM, + return child_log(ctx, LOG_INFORM, "There is no connection between source and destination at all"); /* Description of shortest path */ @@ -282,16 +284,16 @@ const char *explain_failure(const tal_t *ctx, else continue; - return rq_log(ctx, rq, LOG_INFORM, - NO_USABLE_PATHS_STRING - " The shortest path is %s, but %s %s", - path, - fmt_short_channel_id_dir(tmpctx, &scidd), - explanation); + return child_log(ctx, LOG_INFORM, + NO_USABLE_PATHS_STRING + " The shortest path is %s, but %s %s", + path, + fmt_short_channel_id_dir(tmpctx, &scidd), + explanation); } - return rq_log(ctx, rq, LOG_BROKEN, - "Actually, I'm not sure why we didn't find the" - " obvious route %s: perhaps this is a bug?", - path); + return child_log(ctx, LOG_BROKEN, + "Actually, I'm not sure why we didn't find the" + " obvious route %s: perhaps this is a bug?", + path); } diff --git a/plugins/askrene/explain_failure.h b/plugins/askrene/child/explain_failure.h similarity index 65% rename from plugins/askrene/explain_failure.h rename to plugins/askrene/child/explain_failure.h index ed470f8298dc..2914b9eede54 100644 --- a/plugins/askrene/explain_failure.h +++ b/plugins/askrene/child/explain_failure.h @@ -1,5 +1,5 @@ -#ifndef LIGHTNING_PLUGINS_ASKRENE_EXPLAIN_FAILURE_H -#define LIGHTNING_PLUGINS_ASKRENE_EXPLAIN_FAILURE_H +#ifndef LIGHTNING_PLUGINS_ASKRENE_CHILD_EXPLAIN_FAILURE_H +#define LIGHTNING_PLUGINS_ASKRENE_CHILD_EXPLAIN_FAILURE_H #include "config.h" #include @@ -13,4 +13,4 @@ const char *explain_failure(const tal_t *ctx, const struct gossmap_node *dstnode, struct amount_msat amount); -#endif /* LIGHTNING_PLUGINS_ASKRENE_EXPLAIN_FAILURE_H */ +#endif /* LIGHTNING_PLUGINS_ASKRENE_CHILD_EXPLAIN_FAILURE_H */ diff --git a/plugins/askrene/flow.c b/plugins/askrene/child/flow.c similarity index 65% rename from plugins/askrene/flow.c rename to plugins/askrene/child/flow.c index 9431f7d5e001..5b21c27319cf 100644 --- a/plugins/askrene/flow.c +++ b/plugins/askrene/child/flow.c @@ -5,8 +5,9 @@ #include #include #include -#include -#include +#include +#include +#include #include #include @@ -17,16 +18,15 @@ #endif /* How much do we deliver to destination using this set of routes */ -struct amount_msat flowset_delivers(struct plugin *plugin, - struct flow **flows) +struct amount_msat flowset_delivers(struct flow **flows) { struct amount_msat final = AMOUNT_MSAT(0); for (size_t i = 0; i < tal_count(flows); i++) { if (!amount_msat_accumulate(&final, flows[i]->delivers)) { - plugin_err(plugin, "Could not add flowsat %s to %s (%zu/%zu)", - fmt_amount_msat(tmpctx, flows[i]->delivers), - fmt_amount_msat(tmpctx, final), - i, tal_count(flows)); + child_err("Could not add flowsat %s to %s (%zu/%zu)", + fmt_amount_msat(tmpctx, flows[i]->delivers), + fmt_amount_msat(tmpctx, final), + i, tal_count(flows)); } } return final; @@ -68,7 +68,7 @@ static double edge_probability(const struct route_query *rq, return 1.0 - amount_msat_ratio(numerator, denominator); } -struct amount_msat flow_spend(struct plugin *plugin, const struct flow *flow) +struct amount_msat flow_spend(const struct flow *flow) { const size_t pathlen = tal_count(flow->path); struct amount_msat spend = flow->delivers; @@ -77,38 +77,38 @@ struct amount_msat flow_spend(struct plugin *plugin, const struct flow *flow) const struct half_chan *h = flow_edge(flow, i); if (!amount_msat_add_fee(&spend, h->base_fee, h->proportional_fee)) { - plugin_err(plugin, "Could not add fee %u/%u to amount %s in %i/%zu", - h->base_fee, h->proportional_fee, - fmt_amount_msat(tmpctx, spend), - i, pathlen); + child_err("Could not add fee %u/%u to amount %s in %i/%zu", + h->base_fee, h->proportional_fee, + fmt_amount_msat(tmpctx, spend), + i, pathlen); } } return spend; } -struct amount_msat flow_fee(struct plugin *plugin, const struct flow *flow) +struct amount_msat flow_fee(const struct flow *flow) { - struct amount_msat spend = flow_spend(plugin, flow); + struct amount_msat spend = flow_spend(flow); struct amount_msat fee; if (!amount_msat_sub(&fee, spend, flow->delivers)) { - plugin_err(plugin, "Could not subtract %s from %s for fee", - fmt_amount_msat(tmpctx, flow->delivers), - fmt_amount_msat(tmpctx, spend)); + child_err("Could not subtract %s from %s for fee", + fmt_amount_msat(tmpctx, flow->delivers), + fmt_amount_msat(tmpctx, spend)); } return fee; } -struct amount_msat flowset_fee(struct plugin *plugin, struct flow **flows) +struct amount_msat flowset_fee(struct flow **flows) { struct amount_msat fee = AMOUNT_MSAT(0); for (size_t i = 0; i < tal_count(flows); i++) { - struct amount_msat this_fee = flow_fee(plugin, flows[i]); + struct amount_msat this_fee = flow_fee(flows[i]); if (!amount_msat_accumulate(&fee, this_fee)) { - plugin_err(plugin, "Could not add %s to %s for flowset fee", - fmt_amount_msat(tmpctx, this_fee), - fmt_amount_msat(tmpctx, fee)); + child_err("Could not add %s to %s for flowset fee", + fmt_amount_msat(tmpctx, this_fee), + fmt_amount_msat(tmpctx, fee)); } } return fee; @@ -144,10 +144,10 @@ double flow_probability(const struct flow *flow, if (!amount_msat_add_fee(&spend, h->base_fee, h->proportional_fee)) { - plugin_err(rq->plugin, "Could not add fee %u/%u to amount %s in %i/%zu", - h->base_fee, h->proportional_fee, - fmt_amount_msat(tmpctx, spend), - i, pathlen); + child_err("Could not add fee %u/%u to amount %s in %i/%zu", + h->base_fee, h->proportional_fee, + fmt_amount_msat(tmpctx, spend), + i, pathlen); } } @@ -184,6 +184,34 @@ const char *fmt_flows_step_scid(const tal_t *ctx, return fmt_short_channel_id_dir(ctx, &scidd); } +const char *fmt_flow_full(const tal_t *ctx, + const struct route_query *rq, + const struct flow *flow) +{ + struct amount_msat amt = flow->delivers; + char *str = fmt_amount_msat(ctx, flow->delivers); + + for (int i = tal_count(flow->path) - 1; i >= 0; i--) { + struct short_channel_id_dir scidd; + struct amount_msat min, max; + scidd.scid = gossmap_chan_scid(rq->gossmap, flow->path[i]); + scidd.dir = flow->dirs[i]; + if (!amount_msat_add_fee(&amt, + flow->path[i]->half[scidd.dir].base_fee, + flow->path[i]->half[scidd.dir].proportional_fee)) + abort(); + get_constraints(rq, flow->path[i], scidd.dir, &min, &max); + tal_append_fmt(&str, " <- %s %s (cap=%s,fee=%u+%u,delay=%u)", + fmt_amount_msat(tmpctx, amt), + fmt_short_channel_id_dir(tmpctx, &scidd), + fmt_amount_msat(tmpctx, max), + flow->path[i]->half[scidd.dir].base_fee, + flow->path[i]->half[scidd.dir].proportional_fee, + flow->path[i]->half[scidd.dir].delay); + } + return str; +} + #ifndef SUPERVERBOSE_ENABLED #undef SUPERVERBOSE #endif diff --git a/plugins/askrene/flow.h b/plugins/askrene/child/flow.h similarity index 80% rename from plugins/askrene/flow.h rename to plugins/askrene/child/flow.h index 28ac7627723a..eeec5a96fb44 100644 --- a/plugins/askrene/flow.h +++ b/plugins/askrene/child/flow.h @@ -1,5 +1,5 @@ -#ifndef LIGHTNING_PLUGINS_ASKRENE_FLOW_H -#define LIGHTNING_PLUGINS_ASKRENE_FLOW_H +#ifndef LIGHTNING_PLUGINS_ASKRENE_CHILD_FLOW_H +#define LIGHTNING_PLUGINS_ASKRENE_CHILD_FLOW_H #include "config.h" #include #include @@ -40,17 +40,16 @@ double flow_probability(const struct flow *flow, const struct route_query *rq); /* How much do we need to send to make this flow arrive. */ -struct amount_msat flow_spend(struct plugin *plugin, const struct flow *flow); +struct amount_msat flow_spend(const struct flow *flow); /* How much do we pay in fees to make this flow arrive. */ -struct amount_msat flow_fee(struct plugin *plugin, const struct flow *flow); +struct amount_msat flow_fee(const struct flow *flow); /* What fee to we pay for this entire flow set? */ -struct amount_msat flowset_fee(struct plugin *plugin, struct flow **flows); +struct amount_msat flowset_fee(struct flow **flows); /* How much does this entire flowset deliver? */ -struct amount_msat flowset_delivers(struct plugin *plugin, - struct flow **flows); +struct amount_msat flowset_delivers(struct flow **flows); /* How much CLTV does this flow require? */ u64 flow_delay(const struct flow *flow); @@ -66,4 +65,4 @@ const char *fmt_flows_step_scid(const tal_t *ctx, const char *fmt_flow_full(const tal_t *ctx, const struct route_query *rq, const struct flow *flow); -#endif /* LIGHTNING_PLUGINS_ASKRENE_FLOW_H */ +#endif /* LIGHTNING_PLUGINS_ASKRENE_CHILD_FLOW_H */ diff --git a/plugins/askrene/graph.c b/plugins/askrene/child/graph.c similarity index 98% rename from plugins/askrene/graph.c rename to plugins/askrene/child/graph.c index 973e6adcc365..7a0d9c1d92ff 100644 --- a/plugins/askrene/graph.c +++ b/plugins/askrene/child/graph.c @@ -1,5 +1,5 @@ #include "config.h" -#include +#include /* in the background add the actual arc or dual arc */ static void graph_push_outbound_arc(struct graph *graph, const struct arc arc, diff --git a/plugins/askrene/graph.h b/plugins/askrene/child/graph.h similarity index 97% rename from plugins/askrene/graph.h rename to plugins/askrene/child/graph.h index e84ed131b938..6107fd654be7 100644 --- a/plugins/askrene/graph.h +++ b/plugins/askrene/child/graph.h @@ -1,5 +1,5 @@ -#ifndef LIGHTNING_PLUGINS_ASKRENE_GRAPH_H -#define LIGHTNING_PLUGINS_ASKRENE_GRAPH_H +#ifndef LIGHTNING_PLUGINS_ASKRENE_CHILD_GRAPH_H +#define LIGHTNING_PLUGINS_ASKRENE_CHILD_GRAPH_H /* Defines a graph data structure. */ @@ -168,4 +168,4 @@ bool graph_add_arc(struct graph *graph, const struct arc arc, struct graph *graph_new(const tal_t *ctx, const size_t max_num_nodes, const size_t max_num_arcs, const size_t arc_dual_bit); -#endif /* LIGHTNING_PLUGINS_ASKRENE_GRAPH_H */ +#endif /* LIGHTNING_PLUGINS_ASKRENE_CHILD_GRAPH_H */ diff --git a/plugins/askrene/mcf.c b/plugins/askrene/child/mcf.c similarity index 91% rename from plugins/askrene/mcf.c rename to plugins/askrene/child/mcf.c index a59c6feb6b05..019e8ab9fe6e 100644 --- a/plugins/askrene/mcf.c +++ b/plugins/askrene/child/mcf.c @@ -9,15 +9,15 @@ #include #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include /* # Optimal payments @@ -528,26 +528,6 @@ static double base_fee_penalty_estimate(struct amount_msat amount) return amount_msat_ratio(AMOUNT_MSAT(10000000), amount); } -struct amount_msat linear_flow_cost(const struct flow *flow, - struct amount_msat total_amount, - double delay_feefactor) -{ - struct amount_msat msat_cost; - s64 cost_ppm = 0; - double base_fee_penalty = base_fee_penalty_estimate(total_amount); - - for (size_t i = 0; i < tal_count(flow->path); i++) { - const struct half_chan *h = &flow->path[i]->half[flow->dirs[i]]; - - cost_ppm += - linear_fee_cost(h->base_fee, h->proportional_fee, h->delay, - base_fee_penalty, delay_feefactor); - } - if (!amount_msat_fee(&msat_cost, flow->delivers, 0, cost_ppm)) - abort(); - return msat_cost; -} - static void init_linear_network(const tal_t *ctx, const struct pay_parameters *params, struct graph **graph, double **arc_prob_cost, @@ -965,13 +945,13 @@ get_flow_singlepath(const tal_t *ctx, const struct pay_parameters *params, * Check that local channels have fee costs = 0 and bounds with certainty (min=max). */ // TODO(eduardo): we should LOG_DBG the process of finding the MCF while // adjusting the frugality factor. -struct flow **minflow(const tal_t *ctx, - const struct route_query *rq, - const struct gossmap_node *source, - const struct gossmap_node *target, - struct amount_msat amount, - u32 mu, - double delay_feefactor) +static struct flow **minflow(const tal_t *ctx, + const struct route_query *rq, + const struct gossmap_node *source, + const struct gossmap_node *target, + struct amount_msat amount, + u32 mu, + double delay_feefactor) { struct flow **flow_paths; /* We allocate everything off this, and free it at the end, @@ -1036,8 +1016,8 @@ struct flow **minflow(const tal_t *ctx, if (!simple_feasibleflow(working_ctx, graph, src, dst, arc_capacity, pay_amount)) { - rq_log(tmpctx, rq, LOG_INFORM, - "%s failed: unable to find a feasible flow.", __func__); + child_log(tmpctx, LOG_INFORM, + "%s failed: unable to find a feasible flow.", __func__); goto fail; } combine_cost_function(working_ctx, graph, arc_prob_cost, arc_fee_cost, @@ -1050,8 +1030,8 @@ struct flow **minflow(const tal_t *ctx, arc_capacity, arc_cost, node_potential)) { - rq_log(tmpctx, rq, LOG_BROKEN, - "%s: MCF optimization step failed", __func__); + child_log(tmpctx, LOG_BROKEN, + "%s: MCF optimization step failed", __func__); goto fail; } @@ -1060,10 +1040,10 @@ struct flow **minflow(const tal_t *ctx, * channel in the routes. */ flow_paths = get_flow_paths(ctx, working_ctx, params, graph, arc_capacity); - if(!flow_paths){ - rq_log(tmpctx, rq, LOG_BROKEN, - "%s: failed to extract flow paths from the MCF solution", - __func__); + if(!flow_paths) { + child_log(tmpctx, LOG_BROKEN, + "%s: failed to extract flow paths from the MCF solution", + __func__); goto fail; } tal_free(working_ctx); @@ -1167,12 +1147,27 @@ static void init_linear_network_single_path( } } -/* Similar to minflow but computes routes that have a single path. */ -struct flow **single_path_flow(const tal_t *ctx, const struct route_query *rq, - const struct gossmap_node *source, - const struct gossmap_node *target, - struct amount_msat amount, u32 mu, - double delay_feefactor) +/** + * API for min cost single path. + * @ctx: context to allocate returned flows from + * @rq: the route_query we're processing (for logging) + * @source: the source to start from + * @target: the target to pay + * @amount: the amount we want to reach @target + * @mu: 0 = corresponds to only probabilities, 100 corresponds to only fee. + * @delay_feefactor: convert 1 block delay into msat. + * + * @delay_feefactor converts 1 block delay into msat, as if it were an additional + * fee. So if a CLTV delay on a node is 5 blocks, that's treated as if it + * were a fee of 5 * @delay_feefactor. + * + * Returns an array with one flow which deliver amount to target, or NULL. + */ +static struct flow **single_path_flow(const tal_t *ctx, const struct route_query *rq, + const struct gossmap_node *source, + const struct gossmap_node *target, + struct amount_msat amount, u32 mu, + double delay_feefactor) { struct flow **flow_paths; /* We allocate everything off this, and free it at the end, @@ -1220,8 +1215,8 @@ struct flow **single_path_flow(const tal_t *ctx, const struct route_query *rq, distance)) { /* This might fail if we are unable to find a suitable route, it * doesn't mean the plugin is broken, that's why we LOG_INFORM. */ - rq_log(tmpctx, rq, LOG_INFORM, - "%s: could not find a feasible single path", __func__); + child_log(tmpctx, LOG_INFORM, + "%s: could not find a feasible single path", __func__); goto fail; } const u64 pay_amount = @@ -1233,17 +1228,16 @@ struct flow **single_path_flow(const tal_t *ctx, const struct route_query *rq, flow_paths = get_flow_singlepath(ctx, params, graph, rq->gossmap, src, dst, pay_amount, prev); if (!flow_paths) { - rq_log(tmpctx, rq, LOG_BROKEN, - "%s: failed to extract flow paths from the single-path " - "solution", - __func__); + child_log(tmpctx, LOG_BROKEN, + "%s: failed to extract flow paths from the single-path " + "solution", + __func__); goto fail; } if (tal_count(flow_paths) != 1) { - rq_log( - tmpctx, rq, LOG_BROKEN, - "%s: single-path solution returned a multi route solution", - __func__); + child_log(tmpctx, LOG_BROKEN, + "%s: single-path solution returned a multi route solution", + __func__); goto fail; } tal_free(working_ctx); @@ -1350,6 +1344,7 @@ linear_routes(const tal_t *ctx, struct route_query *rq, const struct gossmap_node *srcnode, const struct gossmap_node *dstnode, struct amount_msat amount, struct amount_msat maxfee, u32 finalcltv, u32 maxdelay, + size_t maxparts, struct flow ***flows, double *probability, struct flow **(*solver)(const tal_t *, const struct route_query *, const struct gossmap_node *, @@ -1378,9 +1373,9 @@ linear_routes(const tal_t *ctx, struct route_query *rq, while (!amount_msat_is_zero(amount_to_deliver)) { if (timemono_after(time_mono(), deadline)) { - error_message = rq_log(ctx, rq, LOG_BROKEN, - "%s: timed out after deadline", - __func__); + error_message = child_log(ctx, LOG_BROKEN, + "%s: timed out after deadline", + __func__); goto fail; } @@ -1417,7 +1412,7 @@ linear_routes(const tal_t *ctx, struct route_query *rq, goto fail; /* we finished removing flows and excess */ - all_deliver = flowset_delivers(rq->plugin, new_flows); + all_deliver = flowset_delivers(new_flows); if (amount_msat_is_zero(all_deliver)) { /* We removed all flows and we have not modified the * MCF parameters. We will not have an infinite loop @@ -1446,19 +1441,18 @@ linear_routes(const tal_t *ctx, struct route_query *rq, * flows deliver with respect to the total remaining amount, * ie. we avoid "consuming" all the feebudget if we still need * to run MCF again for some remaining amount. */ - struct amount_msat all_fees = - flowset_fee(rq->plugin, new_flows); + struct amount_msat all_fees = flowset_fee(new_flows); const double deliver_fraction = amount_msat_ratio(all_deliver, amount_to_deliver); struct amount_msat partial_feebudget; if (!amount_msat_scale(&partial_feebudget, feebudget, deliver_fraction)) { error_message = - rq_log(ctx, rq, LOG_BROKEN, - "%s: failed to scale the fee budget (%s) by " - "fraction (%lf)", - __func__, fmt_amount_msat(tmpctx, feebudget), - deliver_fraction); + child_log(ctx, LOG_BROKEN, + "%s: failed to scale the fee budget (%s) by " + "fraction (%lf)", + __func__, fmt_amount_msat(tmpctx, feebudget), + deliver_fraction); goto fail; } if (amount_msat_greater(all_fees, partial_feebudget)) { @@ -1470,8 +1464,8 @@ linear_routes(const tal_t *ctx, struct route_query *rq, else mu += 10; mu = MIN(mu, MU_MAX); - rq_log( - tmpctx, rq, LOG_INFORM, + child_log( + tmpctx, LOG_INFORM, "The flows had a fee of %s, greater than " "max of %s, retrying with mu of %u%%...", fmt_amount_msat(tmpctx, all_fees), @@ -1482,16 +1476,16 @@ linear_routes(const tal_t *ctx, struct route_query *rq, /* we cannot increase mu anymore and all_fees * already exceeds feebudget we fail. */ error_message = - rq_log(ctx, rq, LOG_UNUSUAL, - "Could not find route without " - "excessive cost"); + child_log(ctx, LOG_UNUSUAL, + "Could not find route without " + "excessive cost"); goto fail; } else { /* mu cannot be increased but at least all_fees * does not exceed feebudget, we give it a shot. */ - rq_log( - tmpctx, rq, LOG_UNUSUAL, + child_log( + tmpctx, LOG_UNUSUAL, "The flows had a fee of %s, greater than " "max of %s, but still within the fee " "budget %s, we accept those flows.", @@ -1505,18 +1499,18 @@ linear_routes(const tal_t *ctx, struct route_query *rq, if (finalcltv + flows_worst_delay(new_flows) > maxdelay) { if (delay_feefactor > 10) { error_message = - rq_log(ctx, rq, LOG_UNUSUAL, - "Could not find route without " - "excessive delays"); + child_log(ctx, LOG_UNUSUAL, + "Could not find route without " + "excessive delays"); goto fail; } delay_feefactor *= 2; - rq_log(tmpctx, rq, LOG_INFORM, - "The worst flow delay is %" PRIu64 - " (> %i), retrying with delay_feefactor %f...", - flows_worst_delay(new_flows), maxdelay - finalcltv, - delay_feefactor); + child_log(tmpctx, LOG_INFORM, + "The worst flow delay is %" PRIu64 + " (> %i), retrying with delay_feefactor %f...", + flows_worst_delay(new_flows), maxdelay - finalcltv, + delay_feefactor); continue; } @@ -1535,7 +1529,7 @@ linear_routes(const tal_t *ctx, struct route_query *rq, &all_deliver, new_flows[i]->delivers) || !amount_msat_accumulate( &all_fees, - flow_fee(rq->plugin, new_flows[i]))) + flow_fee(new_flows[i]))) abort(); } } @@ -1543,10 +1537,10 @@ linear_routes(const tal_t *ctx, struct route_query *rq, if (!amount_msat_deduct(&feebudget, all_fees) || !amount_msat_deduct(&amount_to_deliver, all_deliver)) { error_message = - rq_log(ctx, rq, LOG_BROKEN, - "%s: unexpected arithmetic operation " - "failure on amount_msat", - __func__); + child_log(ctx, LOG_BROKEN, + "%s: unexpected arithmetic operation " + "failure on amount_msat", + __func__); goto fail; } } @@ -1561,29 +1555,27 @@ linear_routes(const tal_t *ctx, struct route_query *rq, /* If we're over the number of parts, try to cram excess into the * largest-capacity parts */ - if (tal_count(*flows) > rq->maxparts) { + if (tal_count(*flows) > maxparts) { struct amount_msat fee; - error_message = reduce_num_flows(rq, rq, flows, amount, rq->maxparts); + error_message = reduce_num_flows(rq, rq, flows, amount, maxparts); if (error_message) { *flows = tal_free(*flows); return error_message; } /* Check fee budget! */ - fee = flowset_fee(rq->plugin, *flows); + fee = flowset_fee(*flows); if (amount_msat_greater(fee, maxfee)) { - error_message = rq_log(rq, rq, LOG_INFORM, - "After reducing the flows to %zu (i.e. maxparts)," - " we had a fee of %s, greater than " - "max of %s.", - tal_count(*flows), - fmt_amount_msat(tmpctx, fee), - fmt_amount_msat(tmpctx, maxfee)); - if (error_message) { - *flows = tal_free(*flows); - return error_message; - } + error_message = child_log(rq, LOG_INFORM, + "After reducing the flows to %zu (i.e. maxparts)," + " we had a fee of %s, greater than " + "max of %s.", + tal_count(*flows), + fmt_amount_msat(tmpctx, fee), + fmt_amount_msat(tmpctx, maxfee)); + *flows = tal_free(*flows); + return error_message; } } @@ -1596,24 +1588,23 @@ linear_routes(const tal_t *ctx, struct route_query *rq, * verify" */ if (!check_htlc_min_limits(rq, *flows)) { error_message = - rq_log(rq, rq, LOG_BROKEN, + child_log(rq, LOG_BROKEN, "%s: check_htlc_min_limits failed", __func__); *flows = tal_free(*flows); return error_message; } if (!check_htlc_max_limits(rq, *flows)) { *flows = tal_free(*flows); - return rq_log(rq, rq, LOG_BROKEN, - "%s: check_htlc_max_limits failed", __func__); + return child_log(rq, LOG_BROKEN, + "%s: check_htlc_max_limits failed", __func__); } - if (tal_count(*flows) > rq->maxparts) { + if (tal_count(*flows) > maxparts) { size_t num_flows = tal_count(*flows); *flows = tal_free(*flows); - return rq_log(rq, rq, LOG_BROKEN, - "%s: the number of flows (%zu) exceeds the limit set " - "on payment parts (%" PRIu32 - "), please submit a bug report", - __func__, num_flows, rq->maxparts); + return child_log(rq, LOG_BROKEN, + "%s: the number of flows (%zu) exceeds the limit set " + "on payment parts (%zu), please submit a bug report", + __func__, num_flows, maxparts); } return NULL; @@ -1630,11 +1621,12 @@ const char *default_routes(const tal_t *ctx, struct route_query *rq, const struct gossmap_node *srcnode, const struct gossmap_node *dstnode, struct amount_msat amount, struct amount_msat maxfee, - u32 finalcltv, u32 maxdelay, struct flow ***flows, + u32 finalcltv, u32 maxdelay, size_t maxparts, + struct flow ***flows, double *probability) { return linear_routes(ctx, rq, deadline, srcnode, dstnode, amount, maxfee, - finalcltv, maxdelay, flows, probability, minflow); + finalcltv, maxdelay, maxparts, flows, probability, minflow); } const char *single_path_routes(const tal_t *ctx, struct route_query *rq, @@ -1647,6 +1639,6 @@ const char *single_path_routes(const tal_t *ctx, struct route_query *rq, double *probability) { return linear_routes(ctx, rq, deadline, srcnode, dstnode, amount, maxfee, - finalcltv, maxdelay, flows, probability, + finalcltv, maxdelay, 1, flows, probability, single_path_flow); } diff --git a/plugins/askrene/child/mcf.h b/plugins/askrene/child/mcf.h new file mode 100644 index 000000000000..27538c522992 --- /dev/null +++ b/plugins/askrene/child/mcf.h @@ -0,0 +1,33 @@ +#ifndef LIGHTNING_PLUGINS_ASKRENE_CHILD_MCF_H +#define LIGHTNING_PLUGINS_ASKRENE_CHILD_MCF_H +/* Eduardo Quintela's (lagrang3@protonmail.com) Min Cost Flow implementation + * from renepay, as modified to fit askrene */ +#include "config.h" +#include +#include +#include + +struct route_query; + +/* A wrapper to the min. cost flow solver that actually takes into consideration + * the extra msats per channel needed to pay for fees. */ +const char *default_routes(const tal_t *ctx, struct route_query *rq, + struct timemono deadline, + const struct gossmap_node *srcnode, + const struct gossmap_node *dstnode, + struct amount_msat amount, + struct amount_msat maxfee, u32 finalcltv, + u32 maxdelay, size_t maxparts, struct flow ***flows, + double *probability); + +/* A wrapper to the single-path constrained solver. */ +const char *single_path_routes(const tal_t *ctx, struct route_query *rq, + struct timemono deadline, + const struct gossmap_node *srcnode, + const struct gossmap_node *dstnode, + struct amount_msat amount, + struct amount_msat maxfee, u32 finalcltv, + u32 maxdelay, struct flow ***flows, + double *probability); + +#endif /* LIGHTNING_PLUGINS_ASKRENE_CHILD_MCF_H */ diff --git a/plugins/askrene/priorityqueue.c b/plugins/askrene/child/priorityqueue.c similarity index 98% rename from plugins/askrene/priorityqueue.c rename to plugins/askrene/child/priorityqueue.c index 7f0a96cca699..3991fb11d8d1 100644 --- a/plugins/askrene/priorityqueue.c +++ b/plugins/askrene/child/priorityqueue.c @@ -1,6 +1,6 @@ #define NDEBUG 1 #include "config.h" -#include +#include /* priorityqueue: a data structure for pairs (key, value) with * 0<=key #include #include -#include -#include -#include +#include +#include +#include +#include +#include #include #include @@ -23,10 +25,10 @@ static void get_scidd(const struct gossmap *gossmap, scidd->dir = flow->dirs[i]; } -static void destroy_reservations(struct reserve_hop *rhops, struct askrene *askrene) +static void destroy_reservations(struct reserve_hop *rhops, struct reserve_htable *reserved) { for (size_t i = 0; i < tal_count(rhops); i++) - reserve_remove(askrene->reserved, &rhops[i]); + reserve_remove(reserved, &rhops[i]); } struct reserve_hop *new_reservations(const tal_t *ctx, @@ -35,7 +37,7 @@ struct reserve_hop *new_reservations(const tal_t *ctx, struct reserve_hop *rhops = tal_arr(ctx, struct reserve_hop, 0); /* Unreserve on free */ - tal_add_destructor2(rhops, destroy_reservations, get_askrene(rq->plugin)); + tal_add_destructor2(rhops, destroy_reservations, rq->reserved); return rhops; } @@ -58,16 +60,15 @@ static void add_reservation(struct reserve_hop **reservations, struct amount_msat amt) { struct reserve_hop rhop, *prev; - struct askrene *askrene = get_askrene(rq->plugin); size_t idx; /* Update in-place if possible */ prev = find_reservation(*reservations, scidd); if (prev) { - reserve_remove(askrene->reserved, prev); + reserve_remove(rq->reserved, prev); if (!amount_msat_accumulate(&prev->amount, amt)) abort(); - reserve_add(askrene->reserved, prev, rq->cmd->id); + reserve_add(rq->reserved, prev, rq->cmd_id); return; } rhop.scidd = *scidd; @@ -75,7 +76,7 @@ static void add_reservation(struct reserve_hop **reservations, /* We don't have to restrict it to a layer, since it's transitory: * nobody else will see this. */ rhop.layer = NULL; - reserve_add(askrene->reserved, &rhop, rq->cmd->id); + reserve_add(rq->reserved, &rhop, rq->cmd_id); /* Set capacities entry to 0 so it get_constraints() looks in reserve. */ idx = gossmap_chan_idx(rq->gossmap, chan); @@ -109,7 +110,7 @@ void create_flow_reservations(const struct route_query *rq, amount_to_reserve); if (!amount_msat_add_fee(&msat, h->base_fee, h->proportional_fee)) - plugin_err(rq->plugin, "Adding fee to amount"); + child_err("Adding fee to amount"); } } @@ -281,8 +282,8 @@ remove_htlc_min_violations(const tal_t *ctx, struct route_query *rq, + 2 * gossmap_chan_idx(rq->gossmap, flow->path[i]); get_scidd(rq->gossmap, flow, i, &scidd); - rq_log( - ctx, rq, LOG_INFORM, + child_log( + ctx, LOG_INFORM, "Sending %s across %s would violate htlc_min " "(~%s), disabling this channel", fmt_amount_msat(ctx, msat), @@ -295,8 +296,8 @@ remove_htlc_min_violations(const tal_t *ctx, struct route_query *rq, &msat, hc->base_fee, hc->proportional_fee)) { error_message = - rq_log(ctx, rq, LOG_BROKEN, - "%s: Adding fee to amount", __func__); + child_log(ctx, LOG_BROKEN, + "%s: Adding fee to amount", __func__); break; } } @@ -633,9 +634,9 @@ const char *reduce_num_flows(const tal_t *ctx, del_flow_from_arr(flows, tal_count(*flows) - 1); if (!increase_flows(rq, *flows, deliver, -1.0)) - return rq_log(ctx, rq, LOG_INFORM, - "Failed to reduce %zu flows down to maxparts (%zu)", - orig_num_flows, num_parts); + return child_log(ctx, LOG_INFORM, + "Failed to reduce %zu flows down to maxparts (%zu)", + orig_num_flows, num_parts); return NULL; } diff --git a/plugins/askrene/refine.h b/plugins/askrene/child/refine.h similarity index 90% rename from plugins/askrene/refine.h rename to plugins/askrene/child/refine.h index 7c1f1ecbd3d5..2726e591ea1e 100644 --- a/plugins/askrene/refine.h +++ b/plugins/askrene/child/refine.h @@ -1,5 +1,5 @@ -#ifndef LIGHTNING_PLUGINS_ASKRENE_REFINE_H -#define LIGHTNING_PLUGINS_ASKRENE_REFINE_H +#ifndef LIGHTNING_PLUGINS_ASKRENE_CHILD_REFINE_H +#define LIGHTNING_PLUGINS_ASKRENE_CHILD_REFINE_H #include "config.h" #include @@ -41,4 +41,4 @@ const char *reduce_num_flows(const tal_t *ctx, struct flow ***flows, struct amount_msat deliver, size_t num_parts); -#endif /* LIGHTNING_PLUGINS_ASKRENE_REFINE_H */ +#endif /* LIGHTNING_PLUGINS_ASKRENE_CHILD_REFINE_H */ diff --git a/plugins/askrene/child/route_query.c b/plugins/askrene/child/route_query.c new file mode 100644 index 000000000000..f6366eaf354d --- /dev/null +++ b/plugins/askrene/child/route_query.c @@ -0,0 +1,53 @@ +#include "config.h" +#include +#include +#include +#include +#include + +struct amount_msat get_additional_per_htlc_cost(const struct route_query *rq, + const struct short_channel_id_dir *scidd) +{ + const struct per_htlc_cost *phc; + phc = additional_cost_htable_get(rq->additional_costs, scidd); + if (phc) + return phc->per_htlc_cost; + else + return AMOUNT_MSAT(0); +} + +void get_constraints(const struct route_query *rq, + const struct gossmap_chan *chan, + int dir, + struct amount_msat *min, + struct amount_msat *max) +{ + struct short_channel_id_dir scidd; + size_t idx = gossmap_chan_idx(rq->gossmap, chan); + + *min = AMOUNT_MSAT(0); + + /* Fast path: no information known, no reserve. */ + if (idx < tal_count(rq->capacities) && rq->capacities[idx] != 0) { + *max = amount_msat(fp16_to_u64(rq->capacities[idx]) * 1000); + return; + } + + /* Naive implementation! */ + scidd.scid = gossmap_chan_scid(rq->gossmap, chan); + scidd.dir = dir; + *max = AMOUNT_MSAT(-1ULL); + + /* Look through layers for any constraints (might be dummy + * ones, for created channels!) */ + for (size_t i = 0; i < tal_count(rq->layers); i++) + layer_apply_constraints(rq->layers[i], &scidd, min, max); + + /* Might be here because it's reserved, but capacity is normal. */ + if (amount_msat_eq(*max, AMOUNT_MSAT(-1ULL))) + *max = gossmap_chan_get_capacity(rq->gossmap, chan); + + /* Finally, if any is in use, subtract that! */ + reserve_sub(rq->reserved, &scidd, rq->layers, min); + reserve_sub(rq->reserved, &scidd, rq->layers, max); +} diff --git a/plugins/askrene/child/route_query.h b/plugins/askrene/child/route_query.h new file mode 100644 index 000000000000..51c718a1173e --- /dev/null +++ b/plugins/askrene/child/route_query.h @@ -0,0 +1,46 @@ +#ifndef LIGHTNING_PLUGINS_ASKRENE_CHILD_ROUTE_QUERY_H +#define LIGHTNING_PLUGINS_ASKRENE_CHILD_ROUTE_QUERY_H +#include "config.h" +#include +/* Child-safe access routines for the route query. */ + +/* Information for a single route query. */ +struct route_query { + /* This is *not* updated during a query! Has all layers applied. */ + const struct gossmap *gossmap; + + /* command id to use for reservations we create. */ + const char *cmd_id; + + /* Array of layers we're applying */ + const struct layer **layers; + + /* Compact cache of biases */ + const s8 *biases; + + /* Additional per-htlc cost for local channels */ + const struct additional_cost_htable *additional_costs; + + /* We need to take in-flight payments into account (this is + * askrene->reserved, so make sure to undo changes! */ + struct reserve_htable *reserved; + + /* Cache of channel capacities for non-reserved, unknown channels. */ + fp16_t *capacities; + + /* channels we disable during computation to meet constraints */ + bitmap *disabled_chans; +}; + +/* Given a gossmap channel, get the current known min/max */ +void get_constraints(const struct route_query *rq, + const struct gossmap_chan *chan, + int dir, + struct amount_msat *min, + struct amount_msat *max); + +/* Is there a known additional per-htlc cost for this channel? */ +struct amount_msat get_additional_per_htlc_cost(const struct route_query *rq, + const struct short_channel_id_dir *scidd); + +#endif /* LIGHTNING_PLUGINS_ASKRENE_CHILD_ROUTE_QUERY_H */ diff --git a/plugins/askrene/layer.c b/plugins/askrene/layer.c index 8dccaaa28fde..34405f6bb117 100644 --- a/plugins/askrene/layer.c +++ b/plugins/askrene/layer.c @@ -81,12 +81,6 @@ local_channel_scid(const struct local_channel *lc) return lc->scid; } -static size_t hash_scid(const struct short_channel_id scid) -{ - /* scids cost money to generate, so simple hash works here */ - return (scid.u64 >> 32) ^ (scid.u64 >> 16) ^ scid.u64; -} - static inline bool local_channel_eq_scid(const struct local_channel *lc, const struct short_channel_id scid) { diff --git a/plugins/askrene/layer.h b/plugins/askrene/layer.h index 9f3f9966f0ef..54ce89dc6d70 100644 --- a/plugins/askrene/layer.h +++ b/plugins/askrene/layer.h @@ -14,6 +14,7 @@ #include struct askrene; +struct command; struct layer; struct json_stream; diff --git a/plugins/askrene/mcf.h b/plugins/askrene/mcf.h deleted file mode 100644 index 7d60159063d6..000000000000 --- a/plugins/askrene/mcf.h +++ /dev/null @@ -1,85 +0,0 @@ -#ifndef LIGHTNING_PLUGINS_ASKRENE_MCF_H -#define LIGHTNING_PLUGINS_ASKRENE_MCF_H -/* Eduardo Quintela's (lagrang3@protonmail.com) Min Cost Flow implementation - * from renepay, as modified to fit askrene */ -#include "config.h" -#include -#include - -struct route_query; - -/** - * optimal_payment_flow - API for min cost flow function(s). - * @ctx: context to allocate returned flows from - * @rq: the route_query we're processing (for logging) - * @source: the source to start from - * @target: the target to pay - * @amount: the amount we want to reach @target - * @mu: 0 = corresponds to only probabilities, 100 corresponds to only fee. - * @delay_feefactor: convert 1 block delay into msat. - * @single_part: don't do MCF at all, just create a single flow. - * - * @delay_feefactor converts 1 block delay into msat, as if it were an additional - * fee. So if a CLTV delay on a node is 5 blocks, that's treated as if it - * were a fee of 5 * @delay_feefactor. - * - * Return a series of subflows which deliver amount to target, or NULL. - */ -struct flow **minflow(const tal_t *ctx, - const struct route_query *rq, - const struct gossmap_node *source, - const struct gossmap_node *target, - struct amount_msat amount, - u32 mu, - double delay_feefactor); - -/** - * API for min cost single path. - * @ctx: context to allocate returned flows from - * @rq: the route_query we're processing (for logging) - * @source: the source to start from - * @target: the target to pay - * @amount: the amount we want to reach @target - * @mu: 0 = corresponds to only probabilities, 100 corresponds to only fee. - * @delay_feefactor: convert 1 block delay into msat. - * - * @delay_feefactor converts 1 block delay into msat, as if it were an additional - * fee. So if a CLTV delay on a node is 5 blocks, that's treated as if it - * were a fee of 5 * @delay_feefactor. - * - * Returns an array with one flow which deliver amount to target, or NULL. - */ -struct flow **single_path_flow(const tal_t *ctx, const struct route_query *rq, - const struct gossmap_node *source, - const struct gossmap_node *target, - struct amount_msat amount, u32 mu, - double delay_feefactor); - -/* To sanity check: this is the approximation mcf uses for the cost - * of each channel. */ -struct amount_msat linear_flow_cost(const struct flow *flow, - struct amount_msat total_amount, - double delay_feefactor); - -/* A wrapper to the min. cost flow solver that actually takes into consideration - * the extra msats per channel needed to pay for fees. */ -const char *default_routes(const tal_t *ctx, struct route_query *rq, - struct timemono deadline, - const struct gossmap_node *srcnode, - const struct gossmap_node *dstnode, - struct amount_msat amount, - struct amount_msat maxfee, u32 finalcltv, - u32 maxdelay, struct flow ***flows, - double *probability); - -/* A wrapper to the single-path constrained solver. */ -const char *single_path_routes(const tal_t *ctx, struct route_query *rq, - struct timemono deadline, - const struct gossmap_node *srcnode, - const struct gossmap_node *dstnode, - struct amount_msat amount, - struct amount_msat maxfee, u32 finalcltv, - u32 maxdelay, struct flow ***flows, - double *probability); - -#endif /* LIGHTNING_PLUGINS_ASKRENE_MCF_H */ diff --git a/plugins/askrene/reserve.h b/plugins/askrene/reserve.h index cac7589236c1..c58ff72ca1ac 100644 --- a/plugins/askrene/reserve.h +++ b/plugins/askrene/reserve.h @@ -8,6 +8,8 @@ #include #include +struct json_stream; + /* Initialize hash table for reservations */ struct reserve_htable *new_reserve_htable(const tal_t *ctx); diff --git a/plugins/askrene/test/run-bfs.c b/plugins/askrene/test/run-bfs.c index 5c5f0a045a89..3f15e6313a52 100644 --- a/plugins/askrene/test/run-bfs.c +++ b/plugins/askrene/test/run-bfs.c @@ -3,11 +3,11 @@ #include #include #include -#include +#include #include #define ASKRENE_UNITTEST -#include "../algorithm.c" +#include "../child/algorithm.c" #define MAX_NODES 256 #define MAX_ARCS 256 diff --git a/plugins/askrene/test/run-dijkstra.c b/plugins/askrene/test/run-dijkstra.c index 3c82e48c03e1..18c32d0bf634 100644 --- a/plugins/askrene/test/run-dijkstra.c +++ b/plugins/askrene/test/run-dijkstra.c @@ -3,11 +3,11 @@ #include #include #include -#include +#include #include #define ASKRENE_UNITTEST -#include "../algorithm.c" +#include "../child/algorithm.c" // 1->2 7 // 1->3 9 diff --git a/plugins/askrene/test/run-flow.c b/plugins/askrene/test/run-flow.c index 0f1eaba8f8ff..a4e541505a72 100644 --- a/plugins/askrene/test/run-flow.c +++ b/plugins/askrene/test/run-flow.c @@ -3,11 +3,11 @@ #include #include #include -#include +#include #include #define ASKRENE_UNITTEST -#include "../algorithm.c" +#include "../child/algorithm.c" #define MAX_NODES 256 #define MAX_ARCS 256 diff --git a/plugins/askrene/test/run-graph.c b/plugins/askrene/test/run-graph.c index c1017e66619e..cbbe067f51ff 100644 --- a/plugins/askrene/test/run-graph.c +++ b/plugins/askrene/test/run-graph.c @@ -7,7 +7,7 @@ #include #define ASKRENE_UNITTEST -#include "../graph.c" +#include "../child/graph.c" #define MAX_NODES 10 #define MAX_ARCS 256 diff --git a/plugins/askrene/test/run-mcf-large.c b/plugins/askrene/test/run-mcf-large.c index ad521956fd59..2b7ffc16608f 100644 --- a/plugins/askrene/test/run-mcf-large.c +++ b/plugins/askrene/test/run-mcf-large.c @@ -3,11 +3,11 @@ #include #include #include -#include +#include #include #define ASKRENE_UNITTEST -#include "../algorithm.c" +#include "../child/algorithm.c" #ifdef HAVE_ZLIB #include diff --git a/plugins/askrene/test/run-mcf.c b/plugins/askrene/test/run-mcf.c index aacb51c92ea0..55c647350763 100644 --- a/plugins/askrene/test/run-mcf.c +++ b/plugins/askrene/test/run-mcf.c @@ -3,11 +3,11 @@ #include #include #include -#include +#include #include #define ASKRENE_UNITTEST -#include "../algorithm.c" +#include "../child/algorithm.c" #define CHECK(arg) if(!(arg)){fprintf(stderr, "failed CHECK at line %d: %s\n", __LINE__, #arg); abort();} diff --git a/plugins/askrene/test/run-pqueue.c b/plugins/askrene/test/run-pqueue.c index bd7850b935a3..e22c30eefc78 100644 --- a/plugins/askrene/test/run-pqueue.c +++ b/plugins/askrene/test/run-pqueue.c @@ -6,7 +6,7 @@ #include #define ASKRENE_UNITTEST -#include "../priorityqueue.c" +#include "../child/priorityqueue.c" #define CHECK(arg) if(!(arg)){fprintf(stderr, "failed CHECK at line %d: %s\n", __LINE__, #arg); abort();} diff --git a/plugins/channel_hint.c b/plugins/channel_hint.c index 4862832d8cbd..eb83d564f95d 100644 --- a/plugins/channel_hint.c +++ b/plugins/channel_hint.c @@ -3,15 +3,6 @@ #include #include -size_t channel_hint_hash(const struct short_channel_id_dir *out) -{ - struct siphash24_ctx ctx; - siphash24_init(&ctx, siphash_seed()); - siphash24_update(&ctx, &out->scid.u64, sizeof(u64)); - siphash24_update(&ctx, &out->dir, sizeof(int)); - return siphash24_done(&ctx); -} - const struct short_channel_id_dir *channel_hint_keyof(const struct channel_hint *out) { return &out->scid; @@ -20,8 +11,7 @@ const struct short_channel_id_dir *channel_hint_keyof(const struct channel_hint bool channel_hint_eq(const struct channel_hint *a, const struct short_channel_id_dir *b) { - return short_channel_id_eq(a->scid.scid, b->scid) && - a->scid.dir == b->dir; + return short_channel_id_dir_eq(&a->scid, b); } void channel_hint_to_json(const char *name, const struct channel_hint *hint, diff --git a/plugins/channel_hint.h b/plugins/channel_hint.h index db94c5b3ecff..44e3bc1c102e 100644 --- a/plugins/channel_hint.h +++ b/plugins/channel_hint.h @@ -40,15 +40,13 @@ struct channel_hint { struct amount_msat capacity; }; -size_t channel_hint_hash(const struct short_channel_id_dir *out); - const struct short_channel_id_dir *channel_hint_keyof(const struct channel_hint *out); bool channel_hint_eq(const struct channel_hint *a, const struct short_channel_id_dir *b); HTABLE_DEFINE_NODUPS_TYPE(struct channel_hint, channel_hint_keyof, - channel_hint_hash, channel_hint_eq, channel_hint_map) + hash_scidd, channel_hint_eq, channel_hint_map) /* A collection of channel_hint instances, allowing us to handle and * update them more easily. */ diff --git a/plugins/libplugin.c b/plugins/libplugin.c index 22d4995402e2..72f209984a16 100644 --- a/plugins/libplugin.c +++ b/plugins/libplugin.c @@ -473,11 +473,9 @@ struct json_stream *jsonrpc_stream_fail_data(struct command *cmd, return js; } -static struct command_result *command_complete(struct command *cmd, - struct json_stream *result) +static struct command_result *command_complete_nojson(struct command *cmd, + struct json_stream *result) { - /* Global object */ - json_object_end(result); json_stream_close(result, cmd); ld_send(cmd->plugin, result); tal_free(cmd); @@ -485,6 +483,14 @@ static struct command_result *command_complete(struct command *cmd, return &complete; } +static struct command_result *command_complete(struct command *cmd, + struct json_stream *result) +{ + /* Global object */ + json_object_end(result); + return command_complete_nojson(cmd, result); +} + struct command_result *command_finished(struct command *cmd, struct json_stream *response) { @@ -506,6 +512,20 @@ struct command_result *command_finished(struct command *cmd, return command_complete(cmd, response); } +struct command_result *command_finish_rawstr(struct command *cmd, + const char *json, + size_t json_len) +{ + struct json_stream *js = new_json_stream(cmd, cmd, NULL); + char *raw; + + assert(cmd->type == COMMAND_TYPE_NORMAL + || cmd->type == COMMAND_TYPE_HOOK); + raw = json_out_direct(js->jout, json_len); + memcpy(raw, json, json_len); + return command_complete_nojson(cmd, js); +} + struct command_result *WARN_UNUSED_RESULT command_still_pending(struct command *cmd) { diff --git a/plugins/libplugin.h b/plugins/libplugin.h index bcc68dc03948..ca8455050d8a 100644 --- a/plugins/libplugin.h +++ b/plugins/libplugin.h @@ -363,6 +363,13 @@ WARN_UNUSED_RESULT struct command_result *command_still_pending(struct command *cmd) NON_NULL_ARGS(1); +/* Forward this raw JSON string as the command response */ +WARN_UNUSED_RESULT +struct command_result *command_finish_rawstr(struct command *cmd, + const char *json, + size_t json_len) + NO_NULL_ARGS; + /* Helper to create a zero or single-value JSON object; if @str is NULL, * object is empty. */ struct json_out *json_out_obj(const tal_t *ctx, diff --git a/plugins/renepay/chan_extra.h b/plugins/renepay/chan_extra.h index d7213ad52cad..69a206e93ed3 100644 --- a/plugins/renepay/chan_extra.h +++ b/plugins/renepay/chan_extra.h @@ -50,7 +50,8 @@ static inline bool chan_extra_eq_scid(const struct chan_extra *cd, return short_channel_id_eq(scid, cd->scid); } -HTABLE_DEFINE_NODUPS_TYPE(struct chan_extra, chan_extra_scid, short_channel_id_hash, +HTABLE_DEFINE_NODUPS_TYPE(struct chan_extra, chan_extra_scid, + hash_scid, chan_extra_eq_scid, chan_extra_map); diff --git a/plugins/renepay/disabledmap.c b/plugins/renepay/disabledmap.c index badeac868637..7854bb8b95f8 100644 --- a/plugins/renepay/disabledmap.c +++ b/plugins/renepay/disabledmap.c @@ -47,7 +47,7 @@ void disabledmap_add_channel(struct disabledmap *p, struct short_channel_id_dir scidd) { struct short_channel_id_dir *ptr_scidd = - scidd_map_get(p->disabled_map, scidd); + scidd_map_get(p->disabled_map, &scidd); if (ptr_scidd) { /* htable allows for duplicates, but we don't want duplicates. */ @@ -64,7 +64,7 @@ void disabledmap_warn_channel(struct disabledmap *p, struct short_channel_id_dir scidd) { struct short_channel_id_dir *ptr_scidd = - scidd_map_get(p->warned_map, scidd); + scidd_map_get(p->warned_map, &scidd); if (ptr_scidd) { /* htable allows for duplicates, but we don't want duplicates. */ @@ -84,7 +84,7 @@ void disabledmap_add_node(struct disabledmap *p, struct node_id node) bool disabledmap_channel_is_warned(struct disabledmap *p, struct short_channel_id_dir scidd) { - return scidd_map_get(p->warned_map, scidd) != NULL; + return scidd_map_get(p->warned_map, &scidd) != NULL; } bitmap *tal_disabledmap_get_bitmap(const tal_t *ctx, struct disabledmap *p, diff --git a/plugins/renepay/disabledmap.h b/plugins/renepay/disabledmap.h index 7bd0487bd23d..1f273f0bd758 100644 --- a/plugins/renepay/disabledmap.h +++ b/plugins/renepay/disabledmap.h @@ -8,31 +8,16 @@ #include #include -static inline size_t hash_scidd(const struct short_channel_id_dir scidd) -{ - /* scids cost money to generate, so simple hash works here. Letting same - * scid with two directions collide. */ - return (scidd.scid.u64 >> 32) ^ (scidd.scid.u64 >> 16) ^ scidd.scid.u64; -} - -static inline struct short_channel_id_dir +static inline const struct short_channel_id_dir * self_scidd(const struct short_channel_id_dir *self) { - return *self; -} - -static inline bool -my_short_channel_id_dir_eq(const struct short_channel_id_dir *scidd_a, - const struct short_channel_id_dir scidd_b) -{ - return short_channel_id_eq(scidd_a->scid, scidd_b.scid) && - scidd_a->dir == scidd_b.dir; + return self; } /* A htable for short_channel_id_dir, the structure itself is the element key. */ HTABLE_DEFINE_NODUPS_TYPE(struct short_channel_id_dir, self_scidd, hash_scidd, - my_short_channel_id_dir_eq, scidd_map); + short_channel_id_dir_eq, scidd_map); struct disabledmap { /* Channels we decided to disable for various reasons. */ diff --git a/tests/test_askrene.py b/tests/test_askrene.py index bf87287e9b05..f8e03f382e28 100644 --- a/tests/test_askrene.py +++ b/tests/test_askrene.py @@ -1405,7 +1405,7 @@ def test_min_htlc_after_excess(node_factory, bitcoind): @pytest.mark.slow_test -def test_real_data(node_factory, bitcoind): +def test_real_data(node_factory, bitcoind, executor): # Route from Rusty's node to the top nodes # From tests/data/gossip-store-2024-09-22-node-map.xz: # Me: 3301:024b9a1fa8e006f1e3937f65f66c408e6da8e1ca728ea43222a7381df1cc449605:BLUEIRON @@ -1451,48 +1451,53 @@ def test_real_data(node_factory, bitcoind): limit = 100 expected = (9, 96, 6565466, 668476, 90) - fees = {} + # 0.5% is the norm + MAX_FEE = AMOUNT // 200 + + # Do these in parallel. + futs = {} for n in range(0, limit): - print(f"XXX: {n}") - # 0.5% is the norm - MAX_FEE = AMOUNT // 200 + futs[n] = executor.submit(l1.rpc.getroutes, + source=l1.info['id'], + destination=nodeids[n], + amount_msat=AMOUNT, + layers=['auto.sourcefree', 'auto.localchans'], + maxfee_msat=MAX_FEE, + final_cltv=18) + fees = {} + prevs = {} + for n in range(0, limit): + fees[n] = [] if n in badnodes: with pytest.raises(RpcError, match=badnodes[n]): - l1.rpc.getroutes(source=l1.info['id'], - destination=nodeids[n], - amount_msat=AMOUNT, - layers=['auto.sourcefree', 'auto.localchans'], - maxfee_msat=MAX_FEE, - final_cltv=18) - fees[n] = [] - continue - - try: - prev = l1.rpc.getroutes(source=l1.info['id'], - destination=nodeids[n], - amount_msat=AMOUNT, - layers=['auto.sourcefree', 'auto.localchans'], - maxfee_msat=MAX_FEE, - final_cltv=18) - except RpcError: - fees[n] = [] + futs[n].result(TIMEOUT) continue - # Now stress it, by asking it to spend 1msat less! - fees[n] = [sum([r['path'][0]['amount_msat'] for r in prev['routes']]) - AMOUNT] + prevs[n] = futs[n].result(TIMEOUT) + + # Stress it by asking harder for each one which succeeded + while prevs != {}: + futs = {} + for n, prev in prevs.items(): + # Record fees + fees[n].append(sum([r['path'][0]['amount_msat'] for r in prev['routes']]) - AMOUNT) + # Now stress it, by asking it to spend 1msat less! + futs[n] = executor.submit(l1.rpc.getroutes, + source=l1.info['id'], + destination=nodeids[n], + amount_msat=AMOUNT, + layers=['auto.sourcefree', 'auto.localchans'], + maxfee_msat=fees[n][-1] - 1, + final_cltv=18) - while True: - # Keep making it harder... + for n, fut in futs.items(): try: - routes = l1.rpc.getroutes(source=l1.info['id'], - destination=nodeids[n], - amount_msat=AMOUNT, - layers=['auto.sourcefree', 'auto.localchans'], - maxfee_msat=fees[n][-1] - 1, - final_cltv=18) + routes = fut.result(TIMEOUT) except RpcError: - break + # Too much, this one is one. + del prevs[n] + continue fee = sum([r['path'][0]['amount_msat'] for r in routes['routes']]) - AMOUNT # Should get less expensive @@ -1500,10 +1505,8 @@ def test_real_data(node_factory, bitcoind): # Should get less likely (Note! This is violated because once we care # about fees, the total is reduced, leading to better prob!). -# assert routes['probability_ppm'] < prev['probability_ppm'] - - fees[n].append(fee) - prev = routes +# assert routes['probability_ppm'] < prevs[n]['probability_ppm'] + prevs[n] = routes # Which succeeded in improving improved = [n for n in fees if len(fees[n]) > 1] @@ -1521,10 +1524,12 @@ def test_real_data(node_factory, bitcoind): best = n assert (len(fees[best]), len(improved), total_first_fee, total_final_fee, percent_fee_reduction) == expected + # askrene will have restricted how many we run + assert l1.daemon.is_in_log(r"Too many running at once \(4 vs 4\): waiting") @pytest.mark.slow_test -def test_real_biases(node_factory, bitcoind): +def test_real_biases(node_factory, bitcoind, executor): # Route from Rusty's node to the top 100. # From tests/data/gossip-store-2024-09-22-node-map.xz: # Me: 3301:024b9a1fa8e006f1e3937f65f66c408e6da8e1ca728ea43222a7381df1cc449605:BLUEIRON @@ -1572,22 +1577,34 @@ def test_real_biases(node_factory, bitcoind): num_changed = {} bias_ineffective = 0 - for bias in (1, 2, 4, 8, 16, 32, 64, 100): - num_changed[bias] = 0 - for n in range(0, limit): - # 0.5% is the norm - MAX_FEE = AMOUNT // 200 - - if n in badnodes: - continue + # 0.5% is the norm + MAX_FEE = AMOUNT // 200 - route = l1.rpc.getroutes(source=l1.info['id'], + # To exercise parallelism, do bases all at once. + futures = {} + for n in range(0, limit): + if n in badnodes: + continue + futures[n] = executor.submit(l1.rpc.getroutes, + source=l1.info['id'], destination=nodeids[n], amount_msat=AMOUNT, layers=['auto.sourcefree', 'auto.localchans'], maxfee_msat=MAX_FEE, final_cltv=18) + base_routes = {} + for n in range(0, limit): + if n in badnodes: + continue + base_routes[n] = futures[n].result(TIMEOUT) + + for bias in (1, 2, 4, 8, 16, 32, 64, 100): + num_changed[bias] = 0 + for n in range(0, limit): + if n in badnodes: + continue + route = base_routes[n] # Now add bias against final channel, see if it changes. chan = route['routes'][0]['path'][-1]['short_channel_id_dir']