diff --git a/gossipd/gossmap_manage.c b/gossipd/gossmap_manage.c index 379851be6a75..5a4331a4d6e8 100644 --- a/gossipd/gossmap_manage.c +++ b/gossipd/gossmap_manage.c @@ -1242,6 +1242,12 @@ void gossmap_manage_channel_spent(struct gossmap_manage *gm, return; } + /* Is it already dying? It's lightningd re-telling us */ + for (size_t i = 0; i < tal_count(gm->dying_channels); i++) { + if (short_channel_id_eq(gm->dying_channels[i].scid, scid)) + return; + } + /* BOLT #7: * - once its funding output has been spent OR reorganized out: * - SHOULD forget a channel after a 12-block delay. diff --git a/lightningd/channel.c b/lightningd/channel.c index 59c5be0a2d0e..93e4bd35256a 100644 --- a/lightningd/channel.c +++ b/lightningd/channel.c @@ -310,6 +310,7 @@ struct channel *new_unsaved_channel(struct peer *peer, channel->ignore_fee_limits = ld->config.ignore_fee_limits; channel->last_stable_connection = 0; channel->stable_conn_timer = NULL; + channel->onchaind_replay_watches = NULL; /* Nothing happened yet */ memset(&channel->stats, 0, sizeof(channel->stats)); channel->state_changes = tal_arr(channel, struct channel_state_change *, 0); @@ -607,6 +608,8 @@ struct channel *new_channel(struct peer *peer, u64 dbid, channel->ignore_fee_limits = ignore_fee_limits; channel->last_stable_connection = last_stable_connection; channel->stable_conn_timer = NULL; + channel->onchaind_replay_watches = NULL; + channel->num_onchain_spent_calls = 0; channel->stats = *stats; channel->state_changes = tal_steal(channel, state_changes); diff --git a/lightningd/channel.h b/lightningd/channel.h index 4b40356990c9..cb74323166e0 100644 --- a/lightningd/channel.h +++ b/lightningd/channel.h @@ -194,6 +194,13 @@ struct channel { /* Watch we have on funding output. */ struct txowatch *funding_spend_watch; + /* If we're doing a replay for onchaind, here are the txids it's watching */ + struct replay_tx_hash *onchaind_replay_watches; + /* Number of outstanding onchaind_spent calls */ + size_t num_onchain_spent_calls; + /* Height we're replaying at (if onchaind_replay_watches set) */ + u32 onchaind_replay_height; + /* Our original funds, in funding amount */ struct amount_sat our_funds; diff --git a/lightningd/gossip_control.c b/lightningd/gossip_control.c index 13ec4e5ed44c..dff0ae62652a 100644 --- a/lightningd/gossip_control.c +++ b/lightningd/gossip_control.c @@ -222,6 +222,10 @@ static void gossipd_new_blockheight_reply(struct subd *gossipd, /* Now, finally update getinfo's blockheight */ gossipd->ld->gossip_blockheight = ptr2int(blockheight); + + /* And use that to trim old entries in the UTXO set */ + wallet_utxoset_prune(gossipd->ld->wallet, + gossipd->ld->gossip_blockheight); } void gossip_notify_new_block(struct lightningd *ld, u32 blockheight) @@ -247,9 +251,25 @@ static void gossipd_init_done(struct subd *gossipd, const int *fds, void *unused) { + struct lightningd *ld = gossipd->ld; + u32 oldspends; + /* Any channels without channel_updates, we populate now: gossipd * might have lost its gossip_store. */ - channel_gossip_init_done(gossipd->ld); + channel_gossip_init_done(ld); + + /* Tell it about any closures it might have missed! */ + oldspends = wallet_utxoset_oldest_spentheight(tmpctx, ld->wallet); + if (oldspends) { + while (oldspends <= get_block_height(ld->topology)) { + const struct short_channel_id *scids; + + scids = wallet_utxoset_get_spent(tmpctx, ld->wallet, + oldspends); + gossipd_notify_spends(ld, oldspends, scids); + oldspends++; + } + } /* Break out of loop, so we can begin */ log_debug(gossipd->ld->log, "io_break: %s", __func__); diff --git a/lightningd/onchain_control.c b/lightningd/onchain_control.c index 41d52f558bef..b3ecf22da9af 100644 --- a/lightningd/onchain_control.c +++ b/lightningd/onchain_control.c @@ -26,6 +26,27 @@ #include #include +/* If we're restarting, we keep a per-channel copy of watches, and replay */ +struct replay_tx { + u32 blockheight; + struct bitcoin_txid txid; + struct bitcoin_tx *tx; +}; + +static const struct bitcoin_txid *replay_tx_keyof(const struct replay_tx *rtx) +{ + return &rtx->txid; +} + +static bool replay_tx_eq_txid(const struct replay_tx *rtx, + const struct bitcoin_txid *txid) +{ + return bitcoin_txid_eq(&rtx->txid, txid); +} + +HTABLE_DEFINE_TYPE(struct replay_tx, replay_tx_keyof, txid_hash, replay_tx_eq_txid, + replay_tx_hash); + /* We dump all the known preimages when onchaind starts up. */ static void onchaind_tell_fulfill(struct channel *channel) { @@ -187,9 +208,8 @@ static enum watch_result onchain_tx_watched(struct lightningd *ld, return KEEP_WATCHING; } - /* Store the channeltx so we can replay later */ - wallet_channeltxs_add(ld->wallet, channel, - WIRE_ONCHAIND_DEPTH, txid, 0, blockheight); + /* Store so we remember if we crash, and can replay later */ + wallet_insert_funding_spend(ld->wallet, channel, txid, 0, blockheight); onchain_tx_depth(channel, txid, depth); return KEEP_WATCHING; @@ -197,6 +217,53 @@ static enum watch_result onchain_tx_watched(struct lightningd *ld, static void watch_tx_and_outputs(struct channel *channel, const struct bitcoin_tx *tx); +static void onchaind_replay(struct channel *channel); + +static void replay_unwatch_txid(struct channel *channel, + const struct bitcoin_txid *txid) +{ + replay_tx_hash_delkey(channel->onchaind_replay_watches, txid); +} + +static void onchaind_spent_reply(struct subd *onchaind, const u8 *msg, + const int *fds, + struct bitcoin_txid *txid) +{ + bool interested; + struct txwatch *txw; + struct channel *channel = onchaind->channel; + + if (!fromwire_onchaind_spent_reply(msg, &interested)) + channel_internal_error(channel, "Invalid onchaind_spent_reply %s", + tal_hex(tmpctx, msg)); + + channel->num_onchain_spent_calls--; + + /* Only delete watch if it says it doesn't care */ + if (interested) + goto out; + + /* If we're doing replay: */ + if (channel->onchaind_replay_watches) { + replay_unwatch_txid(channel, txid); + goto out; + } + + /* Frees the txo watches, too: see watch_tx_and_outputs() */ + txw = find_txwatch(channel->peer->ld->topology, txid, + onchain_tx_watched, channel); + if (!txw) + log_unusual(channel->log, "Can't unwatch txid %s", + fmt_bitcoin_txid(tmpctx, txid)); + tal_free(txw); + +out: + /* If that's the last request, continue asking for blocks */ + if (channel->onchaind_replay_watches + && channel->num_onchain_spent_calls == 0) { + onchaind_replay(channel); + } +} /** * Notify onchaind that an output was spent and register new watches. @@ -204,6 +271,7 @@ static void watch_tx_and_outputs(struct channel *channel, static void onchain_txo_spent(struct channel *channel, const struct bitcoin_tx *tx, size_t input_num, u32 blockheight) { u8 *msg; + struct bitcoin_txid *txid; /* Onchaind needs all inputs, since it uses those to compare * with existing spends (which can vary, with feerate changes). */ struct tx_parts *parts = tx_parts_from_wally_tx(tmpctx, tx->wtx, @@ -211,9 +279,14 @@ static void onchain_txo_spent(struct channel *channel, const struct bitcoin_tx * watch_tx_and_outputs(channel, tx); - msg = towire_onchaind_spent(channel, parts, input_num, blockheight); - subd_send_msg(channel->owner, take(msg)); + /* Reply will need this if we want to unwatch */ + txid = tal(NULL, struct bitcoin_txid); + bitcoin_txid(tx, txid); + msg = towire_onchaind_spent(channel, parts, input_num, blockheight); + subd_req(channel->owner, channel->owner, take(msg), -1, 0, + onchaind_spent_reply, take(txid)); + channel->num_onchain_spent_calls++; } /** @@ -224,14 +297,6 @@ static enum watch_result onchain_txo_watched(struct channel *channel, size_t input_num, const struct block *block) { - struct bitcoin_txid txid; - bitcoin_txid(tx, &txid); - - /* Store the channeltx so we can replay later */ - wallet_channeltxs_add(channel->peer->ld->wallet, channel, - WIRE_ONCHAIND_SPENT, &txid, input_num, - block->height); - onchain_txo_spent(channel, tx, input_num, block->height); /* We don't need to keep watching: If this output is double-spent @@ -280,23 +345,94 @@ static void handle_onchain_log_coin_move(struct channel *channel, const u8 *msg) tal_free(mvt); } -static void handle_onchain_unwatch_tx(struct channel *channel, const u8 *msg) +static void replay_watch_tx(struct channel *channel, + u32 blockheight, + const struct bitcoin_tx *tx TAKES) { - struct bitcoin_txid txid; - struct txwatch *txw; + struct replay_tx *rtx = tal(channel->onchaind_replay_watches, struct replay_tx); + bitcoin_txid(tx, &rtx->txid); + rtx->blockheight = blockheight; + rtx->tx = clone_bitcoin_tx(rtx, tx); + + replay_tx_hash_add(channel->onchaind_replay_watches, rtx); +} + +/* We've finished replaying, turn any txs left into live watches */ +static void convert_replay_txs(struct channel *channel) +{ + struct replay_tx *rtx; + struct replay_tx_hash_iter rit; + struct replay_tx_hash *watches; + + /* Set to NULL so these are queued as real watches */ + watches = tal_steal(tmpctx, channel->onchaind_replay_watches); + channel->onchaind_replay_watches = NULL; + for (rtx = replay_tx_hash_first(watches, &rit); + rtx; + rtx = replay_tx_hash_next(watches, &rit)) { + watch_tx_and_outputs(channel, rtx->tx); + } +} + +static void replay_block(struct bitcoind *bitcoind, + u32 height, + struct bitcoin_blkid *blkid, + struct bitcoin_block *blk, + struct channel *channel) +{ + struct replay_tx *rtx; + struct replay_tx_hash_iter rit; + + /* Tell onchaind that all existing txs have reached a new depth */ + for (rtx = replay_tx_hash_first(channel->onchaind_replay_watches, &rit); + rtx; + rtx = replay_tx_hash_next(channel->onchaind_replay_watches, &rit)) { + /* Note: if you're in this block, that's depth 1! */ + onchain_tx_depth(channel, &rtx->txid, height - rtx->blockheight + 1); + } + + /* See if we add any new txs which spend a watched one */ + for (size_t i = 0; i < tal_count(blk->tx); i++) { + for (size_t j = 0; j < blk->tx[i]->wtx->num_inputs; j++) { + struct bitcoin_txid spent; + bitcoin_tx_input_get_txid(blk->tx[i], j, &spent); + rtx = replay_tx_hash_get(channel->onchaind_replay_watches, &spent); + if (rtx) { + /* Note: for efficiency, blk->tx's don't have + * PSBTs, so add one now */ + if (!blk->tx[i]->psbt) + blk->tx[i]->psbt = new_psbt(blk->tx[i], blk->tx[i]->wtx); + onchain_txo_spent(channel, blk->tx[i], j, height); + /* Watch this and all the children too. */ + replay_watch_tx(channel, height, blk->tx[i]); + } + } + } - if (!fromwire_onchaind_unwatch_tx(msg, &txid)) { - channel_internal_error(channel, "Invalid onchain_unwatch_tx"); + /* Replay finished? Now we'll get fed real blocks */ + if (height == get_block_height(bitcoind->ld->topology)) { + convert_replay_txs(channel); return; } - /* Frees the txo watches, too: see watch_tx_and_outputs() */ - txw = find_txwatch(channel->peer->ld->topology, &txid, - onchain_tx_watched, channel); - if (!txw) - log_unusual(channel->log, "Can't unwatch txid %s", - fmt_bitcoin_txid(tmpctx, &txid)); - tal_free(txw); + /* Ready for next block */ + channel->onchaind_replay_height = height + 1; + + /* Otherwise, wait for those to be resolved (in case onchaind is slow, + * e.g. waiting for HSM). */ + if (channel->num_onchain_spent_calls == 0) + onchaind_replay(channel); +} + +static void onchaind_replay(struct channel *channel) +{ + assert(channel->onchaind_replay_watches); + assert(channel->num_onchain_spent_calls == 0); + + bitcoind_getrawblockbyheight(channel, + channel->peer->ld->topology->bitcoind, + channel->onchaind_replay_height, + replay_block, channel); } static void handle_extracted_preimage(struct channel *channel, const u8 *msg) @@ -1447,10 +1583,6 @@ static unsigned int onchain_msg(struct subd *sd, const u8 *msg, const int *fds U handle_onchain_init_reply(sd->channel, msg); break; - case WIRE_ONCHAIND_UNWATCH_TX: - handle_onchain_unwatch_tx(sd->channel, msg); - break; - case WIRE_ONCHAIND_EXTRACTED_PREIMAGE: handle_extracted_preimage(sd->channel, msg); break; @@ -1516,6 +1648,7 @@ static unsigned int onchain_msg(struct subd *sd, const u8 *msg, const int *fds U case WIRE_ONCHAIND_SPEND_CREATED: case WIRE_ONCHAIND_DEV_MEMLEAK: case WIRE_ONCHAIND_DEV_MEMLEAK_REPLY: + case WIRE_ONCHAIND_SPENT_REPLY: break; } @@ -1674,7 +1807,12 @@ enum watch_result onchaind_funding_spent(struct channel *channel, feerate_min(ld, NULL)); subd_send_msg(channel->owner, take(msg)); - watch_tx_and_outputs(channel, tx); + /* If we're replaying, we just watch this */ + if (channel->onchaind_replay_watches) { + replay_watch_tx(channel, blockheight, tx); + } else { + watch_tx_and_outputs(channel, tx); + } /* We keep watching until peer finally deleted, for reorgs. */ return KEEP_WATCHING; @@ -1682,43 +1820,42 @@ enum watch_result onchaind_funding_spent(struct channel *channel, void onchaind_replay_channels(struct lightningd *ld) { - u32 *onchaind_ids; - struct channeltx *txs; - struct channel *chan; + struct peer *peer; + struct peer_node_id_map_iter it; + /* We don't hold a db tx for all of init */ db_begin_transaction(ld->wallet->db); - onchaind_ids = wallet_onchaind_channels(tmpctx, ld->wallet); - - for (size_t i = 0; i < tal_count(onchaind_ids); i++) { - log_info(ld->log, "Restarting onchaind for channel %d", - onchaind_ids[i]); - - txs = wallet_channeltxs_get(onchaind_ids, ld->wallet, - onchaind_ids[i]); - chan = channel_by_dbid(ld, onchaind_ids[i]); - - for (size_t j = 0; j < tal_count(txs); j++) { - if (txs[j].type == WIRE_ONCHAIND_INIT) { - onchaind_funding_spent(chan, txs[j].tx, - txs[j].blockheight); - - } else if (txs[j].type == WIRE_ONCHAIND_SPENT) { - onchain_txo_spent(chan, txs[j].tx, - txs[j].input_num, - txs[j].blockheight); - - } else if (txs[j].type == WIRE_ONCHAIND_DEPTH) { - onchain_tx_depth(chan, &txs[j].txid, - txs[j].depth); - - } else { - fatal("unknown message of type %d during " - "onchaind replay", - txs[j].type); - } + + /* For each channel, if we've recorded a spend, it's onchaind time! */ + for (peer = peer_node_id_map_first(ld->peers, &it); + peer; + peer = peer_node_id_map_next(ld->peers, &it)) { + struct channel *channel; + + list_for_each(&peer->channels, channel, list) { + struct bitcoin_tx *tx; + u32 blockheight; + + if (channel_state_uncommitted(channel->state)) + continue; + + tx = wallet_get_funding_spend(tmpctx, ld->wallet, channel->dbid, + &blockheight); + if (!tx) + continue; + + log_info(channel->log, + "Restarting onchaind (%s): closed in block %u", + channel_state_name(channel), blockheight); + + /* We're in replay mode */ + channel->onchaind_replay_watches = tal(channel, struct replay_tx_hash); + channel->onchaind_replay_height = blockheight; + replay_tx_hash_init(channel->onchaind_replay_watches); + + onchaind_funding_spent(channel, tx, blockheight); + onchaind_replay(channel); } - tal_free(txs); } - db_commit_transaction(ld->wallet->db); } diff --git a/lightningd/peer_control.c b/lightningd/peer_control.c index 902d5b87444b..480921bab88c 100644 --- a/lightningd/peer_control.c +++ b/lightningd/peer_control.c @@ -2208,8 +2208,8 @@ static enum watch_result funding_spent(struct channel *channel, } } - wallet_channeltxs_add(channel->peer->ld->wallet, channel, - WIRE_ONCHAIND_INIT, &txid, 0, block->height); + wallet_insert_funding_spend(channel->peer->ld->wallet, channel, + &txid, 0, block->height); return onchaind_funding_spent(channel, tx, block->height); } diff --git a/lightningd/subd.c b/lightningd/subd.c index 6a4445d7529d..778811456b37 100644 --- a/lightningd/subd.c +++ b/lightningd/subd.c @@ -142,13 +142,15 @@ static struct subd_req *add_req(const tal_t *ctx, struct subd *sd, int type, size_t num_fds_in, void (*replycb)(struct subd *, const u8 *, const int *, void *), - void *replycb_data) + void *replycb_data TAKES) { struct subd_req *sr = tal(sd, struct subd_req); sr->type = type; sr->replycb = replycb; sr->replycb_data = replycb_data; + if (taken(replycb_data)) + tal_steal(sr, replycb_data); sr->num_reply_fds = num_fds_in; /* We don't allocate sr off ctx, because we still have to handle the @@ -852,7 +854,7 @@ struct subd_req *subd_req_(const tal_t *ctx, const u8 *msg_out, int fd_out, size_t num_fds_in, void (*replycb)(struct subd *, const u8 *, const int *, void *), - void *replycb_data) + void *replycb_data TAKES) { /* Grab type now in case msg_out is taken() */ int type = fromwire_peektype(msg_out); diff --git a/lightningd/subd.h b/lightningd/subd.h index d59009d23002..c572a77d05b8 100644 --- a/lightningd/subd.h +++ b/lightningd/subd.h @@ -178,7 +178,7 @@ void subd_send_fd(struct subd *sd, int fd); * @fd_out: if >=0 fd to pass at the end of the message (closed after) * @num_fds_in: how many fds to read in to hand to @replycb if it's a reply. * @replycb: callback (inside db transaction) when reply comes in (can free subd) - * @replycb_data: final arg to hand to @replycb + * @replycb_data: final arg to hand to @replycb (can be TAKE()) * * @replycb cannot free @sd, so it returns false to remove it. * Note that @replycb is called for replies of type @msg_out + SUBD_REPLY_OFFSET @@ -196,7 +196,7 @@ struct subd_req *subd_req_(const tal_t *ctx, const u8 *msg_out, int fd_out, size_t num_fds_in, void (*replycb)(struct subd *, const u8 *, const int *, void *), - void *replycb_data); + void *replycb_data TAKES); /** * subd_release_channel - shut down a subdaemon which no longer owns the channel. diff --git a/lightningd/test/run-invoice-select-inchan.c b/lightningd/test/run-invoice-select-inchan.c index fe8e125958e7..f2037b333d3c 100644 --- a/lightningd/test/run-invoice-select-inchan.c +++ b/lightningd/test/run-invoice-select-inchan.c @@ -939,7 +939,7 @@ struct subd_req *subd_req_(const tal_t *ctx UNNEEDED, const u8 *msg_out UNNEEDED, int fd_out UNNEEDED, size_t num_fds_in UNNEEDED, void (*replycb)(struct subd * UNNEEDED, const u8 * UNNEEDED, const int * UNNEEDED, void *) UNNEEDED, - void *replycb_data UNNEEDED) + void *replycb_data TAKES UNNEEDED) { fprintf(stderr, "subd_req_ called!\n"); abort(); } /* Generated stub for subd_send_fd */ void subd_send_fd(struct subd *sd UNNEEDED, int fd UNNEEDED) @@ -1025,11 +1025,6 @@ const char *version(void) /* Generated stub for wallet_channel_save */ void wallet_channel_save(struct wallet *w UNNEEDED, struct channel *chan UNNEEDED) { fprintf(stderr, "wallet_channel_save called!\n"); abort(); } -/* Generated stub for wallet_channeltxs_add */ -void wallet_channeltxs_add(struct wallet *w UNNEEDED, struct channel *chan UNNEEDED, - const int type UNNEEDED, const struct bitcoin_txid *txid UNNEEDED, - const u32 input_num UNNEEDED, const u32 blockheight UNNEEDED) -{ fprintf(stderr, "wallet_channeltxs_add called!\n"); abort(); } /* Generated stub for wallet_delete_peer_if_unused */ void wallet_delete_peer_if_unused(struct wallet *w UNNEEDED, u64 peer_dbid UNNEEDED) { fprintf(stderr, "wallet_delete_peer_if_unused called!\n"); abort(); } @@ -1053,6 +1048,12 @@ bool wallet_htlcs_load_out_for_channel(struct wallet *wallet UNNEEDED, /* Generated stub for wallet_init_channels */ bool wallet_init_channels(struct wallet *w UNNEEDED) { fprintf(stderr, "wallet_init_channels called!\n"); abort(); } +/* Generated stub for wallet_insert_funding_spend */ +void wallet_insert_funding_spend(struct wallet *w UNNEEDED, + const struct channel *chan UNNEEDED, + const struct bitcoin_txid *txid UNNEEDED, + const u32 input_num UNNEEDED, const u32 blockheight UNNEEDED) +{ fprintf(stderr, "wallet_insert_funding_spend called!\n"); abort(); } /* Generated stub for wallet_offer_find */ char *wallet_offer_find(const tal_t *ctx UNNEEDED, struct wallet *w UNNEEDED, diff --git a/onchaind/onchaind.c b/onchaind/onchaind.c index 7e8c57a84d3d..3d573baffe12 100644 --- a/onchaind/onchaind.c +++ b/onchaind/onchaind.c @@ -978,14 +978,6 @@ static void billboard_update(struct tracked_output **outs) output_type_name(best->output_type), best->depth); } -static void unwatch_txid(const struct bitcoin_txid *txid) -{ - u8 *msg; - - msg = towire_onchaind_unwatch_tx(NULL, txid); - wire_sync_write(REQ_FD, take(msg)); -} - static void handle_htlc_onchain_fulfill(struct tracked_output *out, const struct tx_parts *tx_parts, const struct bitcoin_outpoint *htlc_outpoint) @@ -1194,23 +1186,30 @@ static void onchain_annotate_txin(const struct bitcoin_txid *txid, u32 innum, tmpctx, txid, innum, type))); } -/* An output has been spent: see if it resolves something we care about. */ -static void output_spent(struct tracked_output ***outs, +/* An output has been spent: see if it resolves something we care about. + * Return true if it's useful to know about, false to suppress this and any + * child transactions. + */ +static bool output_spent(struct tracked_output ***outs, const struct tx_parts *tx_parts, u32 input_num, u32 tx_blockheight) { + bool interesting = false; + for (size_t i = 0; i < tal_count(*outs); i++) { struct tracked_output *out = (*outs)[i]; struct bitcoin_outpoint htlc_outpoint; - if (out->resolved) - continue; - if (!wally_tx_input_spends(tx_parts->inputs[input_num], &out->outpoint)) continue; + interesting = true; + + if (out->resolved) + continue; + /* Was this our resolution? */ if (resolved_by_proposal(out, tx_parts)) { /* If it's our htlc tx, we need to resolve that, too. */ @@ -1221,7 +1220,7 @@ static void output_spent(struct tracked_output ***outs, record_coin_movements(out, tx_blockheight, &tx_parts->txid); - return; + break; } htlc_outpoint.txid = tx_parts->txid; @@ -1342,17 +1341,18 @@ static void output_spent(struct tracked_output ***outs, tx_type_name(out->tx_type), output_type_name(out->output_type)); } - return; } - struct bitcoin_txid txid; - wally_tx_input_get_txid(tx_parts->inputs[input_num], &txid); - /* Not interesting to us, so unwatch the tx and all its outputs */ - status_debug("Notified about tx %s output %u spend, but we don't care", - fmt_bitcoin_txid(tmpctx, &txid), - tx_parts->inputs[input_num]->index); + if (!interesting) { + struct bitcoin_txid txid; + wally_tx_input_get_txid(tx_parts->inputs[input_num], &txid); - unwatch_txid(&tx_parts->txid); + status_debug("Notified about tx %s output %u spend, but we don't care", + fmt_bitcoin_txid(tmpctx, &txid), + tx_parts->inputs[input_num]->index); + } + + return interesting; } static void update_resolution_depth(struct tracked_output *out, u32 depth) @@ -1610,12 +1610,16 @@ static void handle_onchaind_spent(struct tracked_output ***outs, const u8 *msg) { struct tx_parts *tx_parts; u32 input_num, tx_blockheight; + bool interesting; if (!fromwire_onchaind_spent(msg, msg, &tx_parts, &input_num, &tx_blockheight)) master_badmsg(WIRE_ONCHAIND_SPENT, msg); - output_spent(outs, tx_parts, input_num, tx_blockheight); + interesting = output_spent(outs, tx_parts, input_num, tx_blockheight); + + /* Tell lightningd if it was interesting */ + wire_sync_write(REQ_FD, take(towire_onchaind_spent_reply(NULL, interesting))); } static void handle_onchaind_known_preimage(struct tracked_output ***outs, @@ -1675,7 +1679,7 @@ static void wait_for_resolved(struct tracked_output **outs) /* We send these, not receive! */ case WIRE_ONCHAIND_INIT_REPLY: - case WIRE_ONCHAIND_UNWATCH_TX: + case WIRE_ONCHAIND_SPENT_REPLY: case WIRE_ONCHAIND_EXTRACTED_PREIMAGE: case WIRE_ONCHAIND_MISSING_HTLC_OUTPUT: case WIRE_ONCHAIND_HTLC_TIMEOUT: diff --git a/onchaind/onchaind_wire.csv b/onchaind/onchaind_wire.csv index d75b3ab65e86..bc7b3da91557 100644 --- a/onchaind/onchaind_wire.csv +++ b/onchaind/onchaind_wire.csv @@ -67,15 +67,15 @@ msgdata,onchaind_spent,tx,tx_parts, msgdata,onchaind_spent,input_num,u32, msgdata,onchaind_spent,blockheight,u32, +# onchaind->master: do we want to continue watching this? +msgtype,onchaind_spent_reply,5104 +msgdata,onchaind_spent_reply,interested,bool, + # master->onchaind: We will receive more than one of these, as depth changes. msgtype,onchaind_depth,5005 msgdata,onchaind_depth,txid,bitcoin_txid, msgdata,onchaind_depth,depth,u32, -# onchaind->master: We don't want to watch this tx, or its outputs -msgtype,onchaind_unwatch_tx,5006 -msgdata,onchaind_unwatch_tx,txid,bitcoin_txid, - # master->onchaind: We know HTLC preimage msgtype,onchaind_known_preimage,5007 msgdata,onchaind_known_preimage,preimage,preimage, diff --git a/onchaind/test/run-grind_feerate.c b/onchaind/test/run-grind_feerate.c index bc10c960ad44..91295e425cd5 100644 --- a/onchaind/test/run-grind_feerate.c +++ b/onchaind/test/run-grind_feerate.c @@ -334,9 +334,9 @@ u8 *towire_onchaind_spend_penalty(const tal_t *ctx UNNEEDED, const struct bitcoi /* Generated stub for towire_onchaind_spend_to_us */ u8 *towire_onchaind_spend_to_us(const tal_t *ctx UNNEEDED, const struct bitcoin_outpoint *outpoint UNNEEDED, struct amount_sat outpoint_amount UNNEEDED, u32 minblock UNNEEDED, u64 commit_num UNNEEDED, const u8 *wscript UNNEEDED) { fprintf(stderr, "towire_onchaind_spend_to_us called!\n"); abort(); } -/* Generated stub for towire_onchaind_unwatch_tx */ -u8 *towire_onchaind_unwatch_tx(const tal_t *ctx UNNEEDED, const struct bitcoin_txid *txid UNNEEDED) -{ fprintf(stderr, "towire_onchaind_unwatch_tx called!\n"); abort(); } +/* Generated stub for towire_onchaind_spent_reply */ +u8 *towire_onchaind_spent_reply(const tal_t *ctx UNNEEDED, bool interested UNNEEDED) +{ fprintf(stderr, "towire_onchaind_spent_reply called!\n"); abort(); } /* Generated stub for towire_secp256k1_ecdsa_signature */ void towire_secp256k1_ecdsa_signature(u8 **pptr UNNEEDED, const secp256k1_ecdsa_signature *signature UNNEEDED) diff --git a/tests/test_closing.py b/tests/test_closing.py index 0f261b9df7e0..a71dca13539d 100644 --- a/tests/test_closing.py +++ b/tests/test_closing.py @@ -1751,7 +1751,7 @@ def test_onchain_first_commit(node_factory, bitcoind): l1.daemon.wait_for_log('onchaind complete, forgetting peer') -def test_onchain_unwatch(node_factory, bitcoind): +def test_onchain_unwatch(node_factory, bitcoind, chainparams): """Onchaind should not watch random spends""" # We track channel balances, to verify that accounting is ok. coin_mvt_plugin = os.path.join(os.getcwd(), 'tests/plugins/coin_movements.py') @@ -1784,7 +1784,12 @@ def test_onchain_unwatch(node_factory, bitcoind): # Daemon gets told about wallet; says it doesn't care. l1.rpc.withdraw(l1.rpc.newaddr()['bech32'], 'all') bitcoind.generate_block(1) - l1.daemon.wait_for_log("but we don't care") + + # We see *two* of these: one for anchor spend as well! + if chainparams['elements']: + l1.daemon.wait_for_log("but we don't care") + else: + l1.daemon.wait_for_logs(["but we don't care"] * 2) # And lightningd should respect that! assert not l1.daemon.is_in_log("Can't unwatch txid") @@ -1848,7 +1853,7 @@ def test_onchaind_replay(node_factory, bitcoind): l1.restart() # Can't wait for it, it's after the "Server started" wait in restart() - assert l1.daemon.is_in_log(r'Restarting onchaind for channel') + assert l1.daemon.is_in_log(r'Restarting onchaind \(ONCHAIN\): closed in block 109') # l1 should still notice that the funding was spent and that we should react to it _, txid, blocks = l1.wait_for_onchaind_tx('OUR_DELAYED_RETURN_TO_WALLET', diff --git a/wallet/db.c b/wallet/db.c index 66597428d850..53852e529c56 100644 --- a/wallet/db.c +++ b/wallet/db.c @@ -1021,6 +1021,7 @@ static struct migration dbmigrations[] = { {SQL("ALTER TABLE channels ADD remote_htlc_minimum_msat BIGINT DEFAULT NULL;"), NULL}, {SQL("ALTER TABLE channels ADD last_stable_connection BIGINT DEFAULT 0;"), NULL}, {NULL, migrate_initialize_alias_local}, + /* FIXME: Remove now-unused type column from channeltxs */ }; /** diff --git a/wallet/test/run-wallet.c b/wallet/test/run-wallet.c index 15664715b4eb..af380769b72b 100644 --- a/wallet/test/run-wallet.c +++ b/wallet/test/run-wallet.c @@ -985,7 +985,7 @@ struct subd_req *subd_req_(const tal_t *ctx UNNEEDED, const u8 *msg_out UNNEEDED, int fd_out UNNEEDED, size_t num_fds_in UNNEEDED, void (*replycb)(struct subd * UNNEEDED, const u8 * UNNEEDED, const int * UNNEEDED, void *) UNNEEDED, - void *replycb_data UNNEEDED) + void *replycb_data TAKES UNNEEDED) { fprintf(stderr, "subd_req_ called!\n"); abort(); } /* Generated stub for subd_send_fd */ void subd_send_fd(struct subd *sd UNNEEDED, int fd UNNEEDED) diff --git a/wallet/wallet.c b/wallet/wallet.c index 149778f684f1..c233424b2562 100644 --- a/wallet/wallet.c +++ b/wallet/wallet.c @@ -4198,7 +4198,7 @@ bool wallet_sanity_check(struct wallet *w) /** * wallet_utxoset_prune -- Remove spent UTXO entries that are old */ -static void wallet_utxoset_prune(struct wallet *w, const u32 blockheight) +void wallet_utxoset_prune(struct wallet *w, u32 blockheight) { struct db_stmt *stmt; @@ -4236,9 +4236,6 @@ void wallet_block_add(struct wallet *w, struct block *b) db_bind_null(stmt); } db_exec_prepared_v2(take(stmt)); - - /* Now cleanup UTXOs that we don't care about anymore */ - wallet_utxoset_prune(w, b->height); } void wallet_block_remove(struct wallet *w, struct block *b) @@ -4465,6 +4462,26 @@ wallet_utxoset_get_spent(const tal_t *ctx, struct wallet *w, return db_scids(ctx, stmt); } +u32 wallet_utxoset_oldest_spentheight(const tal_t *ctx, struct wallet *w) +{ + struct db_stmt *stmt; + u32 height; + stmt = db_prepare_v2(w->db, SQL("SELECT" + " spendheight " + "FROM utxoset " + "WHERE spendheight IS NOT NULL " + "ORDER BY spendheight ASC " + "LIMIT 1")); + db_query_prepared(stmt); + + if (db_step(stmt)) + height = db_col_int(stmt, "spendheight"); + else + height = 0; + tal_free(stmt); + return height; +} + const struct short_channel_id * wallet_utxoset_get_created(const tal_t *ctx, struct wallet *w, u32 blockheight) @@ -4659,9 +4676,10 @@ struct bitcoin_txid *wallet_transactions_by_height(const tal_t *ctx, return txids; } -void wallet_channeltxs_add(struct wallet *w, struct channel *chan, - const int type, const struct bitcoin_txid *txid, - const u32 input_num, const u32 blockheight) +void wallet_insert_funding_spend(struct wallet *w, + const struct channel *chan, + const struct bitcoin_txid *txid, + const u32 input_num, const u32 blockheight) { struct db_stmt *stmt; stmt = db_prepare_v2(w->db, SQL("INSERT INTO channeltxs (" @@ -4672,70 +4690,43 @@ void wallet_channeltxs_add(struct wallet *w, struct channel *chan, ", blockheight" ") VALUES (?, ?, ?, ?, ?);")); db_bind_int(stmt, chan->dbid); - db_bind_int(stmt, type); - db_bind_sha256(stmt, &txid->shad.sha); + /* FIXME: This is WIRE_ONCHAIND_INIT, accidentally leaked into db! */ + db_bind_int(stmt, 5001); + db_bind_txid(stmt, txid); db_bind_int(stmt, input_num); db_bind_int(stmt, blockheight); db_exec_prepared_v2(take(stmt)); } -u32 *wallet_onchaind_channels(const tal_t *ctx, struct wallet *w) +struct bitcoin_tx *wallet_get_funding_spend(const tal_t *ctx, + struct wallet *w, + u64 channel_id, + u32 *blockheight) { struct db_stmt *stmt; - size_t count = 0; - u32 *channel_ids = tal_arr(ctx, u32, 0); - stmt = db_prepare_v2( - w->db, - SQL("SELECT DISTINCT(channel_id) FROM channeltxs WHERE type = ?;")); - db_bind_int(stmt, WIRE_ONCHAIND_INIT); - db_query_prepared(stmt); - - while (db_step(stmt)) { - count++; - tal_resize(&channel_ids, count); - channel_ids[count-1] = db_col_u64(stmt, "DISTINCT(channel_id)"); - } - tal_free(stmt); - - return channel_ids; -} + struct bitcoin_tx *tx; -struct channeltx *wallet_channeltxs_get(const tal_t *ctx, struct wallet *w, - u32 channel_id) -{ - struct db_stmt *stmt; - size_t count = 0; - struct channeltx *res = tal_arr(ctx, struct channeltx, 0); - stmt = db_prepare_v2( - w->db, SQL("SELECT" - " c.type" - ", c.blockheight" - ", t.rawtx" - ", c.input_num" - ", c.blockheight - t.blockheight + 1 AS depth" - ", t.id as txid " - "FROM channeltxs c " - "JOIN transactions t ON t.id = c.transaction_id " - "WHERE c.channel_id = ? " - "ORDER BY c.id ASC;")); + stmt = db_prepare_v2(w->db, + SQL("SELECT" + " t.blockheight" + ", t.rawtx" + " FROM channeltxs c" + " JOIN transactions t ON t.id = c.transaction_id" + " WHERE c.channel_id = ? AND t.blockheight IS NOT NULL AND c.type = ?" + " ORDER BY c.id ASC;")); db_bind_int(stmt, channel_id); + db_bind_int(stmt, WIRE_ONCHAIND_INIT); db_query_prepared(stmt); - while (db_step(stmt)) { - count++; - tal_resize(&res, count); - - res[count-1].channel_id = channel_id; - res[count-1].type = db_col_int(stmt, "c.type"); - res[count-1].blockheight = db_col_int(stmt, "c.blockheight"); - res[count-1].tx = db_col_tx(ctx, stmt, "t.rawtx"); - res[count-1].input_num = db_col_int(stmt, "c.input_num"); - res[count-1].depth = db_col_int(stmt, "depth"); - db_col_txid(stmt, "txid", &res[count-1].txid); - } + if (db_step(stmt)) { + tx = db_col_tx(ctx, stmt, "t.rawtx"); + *blockheight = db_col_int(stmt, "t.blockheight"); + } else + tx = NULL; tal_free(stmt); - return res; + + return tx; } static bool wallet_forwarded_payment_update(struct wallet *w, diff --git a/wallet/wallet.h b/wallet/wallet.h index 9329e5a37750..e556bebd2a89 100644 --- a/wallet/wallet.h +++ b/wallet/wallet.h @@ -1135,6 +1135,12 @@ void wallet_utxoset_add(struct wallet *w, const struct short_channel_id * wallet_utxoset_get_spent(const tal_t *ctx, struct wallet *w, u32 blockheight); +/* Prune all UTXO entries spent (far) below this block height */ +void wallet_utxoset_prune(struct wallet *w, u32 blockheight); + +/* Get oldest spendheight (or 0 if none), to catch up */ +u32 wallet_utxoset_oldest_spentheight(const tal_t *ctx, struct wallet *w); + /** * Retrieve all UTXO entries that were created at a given blockheight. */ @@ -1181,22 +1187,21 @@ struct bitcoin_txid *wallet_transactions_by_height(const tal_t *ctx, const u32 blockheight); /** - * Store transactions of interest in the database to replay on restart + * Store funding txid spend to start replay on restart + * Note that tx should already be saved by wallet_transaction_add! */ -void wallet_channeltxs_add(struct wallet *w, struct channel *chan, - const int type, const struct bitcoin_txid *txid, - const u32 input_num, const u32 blockheight); - -/** - * List channels for which we had an onchaind running - */ -u32 *wallet_onchaind_channels(const tal_t *ctx, struct wallet *w); +void wallet_insert_funding_spend(struct wallet *w, + const struct channel *chan, + const struct bitcoin_txid *txid, + const u32 input_num, const u32 blockheight); /** - * Get transactions that we'd like to replay for a channel. + * Get the transaction which spend funding for this channel, if any. */ -struct channeltx *wallet_channeltxs_get(const tal_t *ctx, struct wallet *w, - u32 channel_id); +struct bitcoin_tx *wallet_get_funding_spend(const tal_t *ctx, + struct wallet *w, + u64 channel_id, + u32 *blockheight); /** * Add of update a forwarded_payment