Skip to content

Commit 2cca650

Browse files
committed
Add an async resolution option to ChainAccess::get_utxo
For those operating in an async environment, requiring `ChainAccess::get_utxo` return information about the requested UTXO synchronously is incredibly painful. Requesting information about a random UTXO is likely to go over the network, and likely to be a rather slow request. Thus, here, we change the return type of `get_utxo` to have both a synchronous and asynchronous form. The asynchronous form requires the user construct a `AccessFuture` which they `clone` and pass back to us. Internally, an `AccessFuture` has an `Arc` to the `channel_announcement` message which we need to process. When the user completes their lookup, they call `resolve` on their `AccessFuture` which we pull the `channel_announcement` from and then apply to the network graph.
1 parent 1e8553f commit 2cca650

File tree

4 files changed

+180
-28
lines changed

4 files changed

+180
-28
lines changed

fuzz/src/router.rs

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use lightning::chain::transaction::OutPoint;
1515
use lightning::ln::channelmanager::{self, ChannelDetails, ChannelCounterparty};
1616
use lightning::ln::msgs;
1717
use lightning::routing::gossip::{NetworkGraph, RoutingFees};
18-
use lightning::routing::utxo::{UtxoLookup, UtxoLookupError};
18+
use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult};
1919
use lightning::routing::router::{find_route, PaymentParameters, RouteHint, RouteHintHop, RouteParameters};
2020
use lightning::routing::scoring::FixedPenaltyScorer;
2121
use lightning::util::config::UserConfig;
@@ -81,17 +81,36 @@ impl InputData {
8181
}
8282
}
8383

84-
struct FuzzChainSource {
84+
struct FuzzChainSource<'a, 'b, Out: test_logger::Output> {
8585
input: Arc<InputData>,
86+
net_graph: &'a NetworkGraph<&'b test_logger::TestLogger<Out>>,
8687
}
87-
impl UtxoLookup for FuzzChainSource {
88-
fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> Result<TxOut, UtxoLookupError> {
89-
match self.input.get_slice(2) {
90-
Some(&[0, _]) => Err(UtxoLookupError::UnknownChain),
91-
Some(&[1, _]) => Err(UtxoLookupError::UnknownTx),
92-
Some(&[_, x]) => Ok(TxOut { value: 0, script_pubkey: Builder::new().push_int(x as i64).into_script().to_v0_p2wsh() }),
93-
None => Err(UtxoLookupError::UnknownTx),
94-
_ => unreachable!(),
88+
impl<Out: test_logger::Output> UtxoLookup for FuzzChainSource<'_, '_, Out> {
89+
fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult {
90+
let input_slice = self.input.get_slice(2);
91+
if input_slice.is_none() { return UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)); }
92+
let input_slice = input_slice.unwrap();
93+
let txo_res = TxOut {
94+
value: if input_slice[0] % 2 == 0 { 1_000_000 } else { 1_000 },
95+
script_pubkey: Builder::new().push_int(input_slice[1] as i64).into_script().to_v0_p2wsh(),
96+
};
97+
match input_slice {
98+
&[0, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownChain)),
99+
&[1, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)),
100+
&[2, _] => {
101+
let future = UtxoFuture::new();
102+
future.resolve(self.net_graph, Ok(txo_res));
103+
UtxoResult::Async(future.clone())
104+
},
105+
&[3, _] => {
106+
let future = UtxoFuture::new();
107+
future.resolve(self.net_graph, Err(UtxoLookupError::UnknownTx));
108+
UtxoResult::Async(future.clone())
109+
},
110+
&[4, _] => {
111+
UtxoResult::Async(UtxoFuture::new()) // the future will never resolve
112+
},
113+
&[..] => UtxoResult::Sync(Ok(txo_res)),
95114
}
96115
}
97116
}
@@ -171,6 +190,10 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
171190

172191
let our_pubkey = get_pubkey!();
173192
let net_graph = NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), &logger);
193+
let chain_source = FuzzChainSource {
194+
input: Arc::clone(&input),
195+
net_graph: &net_graph,
196+
};
174197

175198
let mut node_pks = HashSet::new();
176199
let mut scid = 42;
@@ -191,13 +214,14 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
191214
let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4);
192215
node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1));
193216
node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2));
194-
let _ = net_graph.update_channel_from_unsigned_announcement::<&FuzzChainSource>(&msg, &None);
217+
let _ = net_graph.update_channel_from_unsigned_announcement::
218+
<&FuzzChainSource<'_, '_, Out>>(&msg, &None);
195219
},
196220
2 => {
197221
let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4);
198222
node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1));
199223
node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2));
200-
let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&FuzzChainSource { input: Arc::clone(&input) }));
224+
let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&chain_source));
201225
},
202226
3 => {
203227
let _ = net_graph.update_channel_unsigned(&decode_msg!(msgs::UnsignedChannelUpdate, 72));

lightning/src/routing/gossip.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ pub struct NetworkGraph<L: Deref> where L::Target: Logger {
155155
/// resync them from gossip. Each `NodeId` is mapped to the time (in seconds) it was removed so
156156
/// that once some time passes, we can potentially resync it from gossip again.
157157
removed_nodes: Mutex<HashMap<NodeId, Option<u64>>>,
158+
/// Announcement messages which are awaiting an on-chain lookup to be processed.
159+
pub(super) pending_checks: utxo::PendingChecks,
158160
}
159161

160162
/// A read-only view of [`NetworkGraph`].
@@ -1200,6 +1202,7 @@ impl<L: Deref> ReadableArgs<L> for NetworkGraph<L> where L::Target: Logger {
12001202
last_rapid_gossip_sync_timestamp: Mutex::new(last_rapid_gossip_sync_timestamp),
12011203
removed_nodes: Mutex::new(HashMap::new()),
12021204
removed_channels: Mutex::new(HashMap::new()),
1205+
pending_checks: utxo::PendingChecks::new(),
12031206
})
12041207
}
12051208
}
@@ -1239,6 +1242,7 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
12391242
last_rapid_gossip_sync_timestamp: Mutex::new(None),
12401243
removed_channels: Mutex::new(HashMap::new()),
12411244
removed_nodes: Mutex::new(HashMap::new()),
1245+
pending_checks: utxo::PendingChecks::new(),
12421246
}
12431247
}
12441248

@@ -1494,7 +1498,8 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
14941498
}
14951499
}
14961500

1497-
let utxo_value = utxo::check_channel_announcement(utxo_lookup, msg)?;
1501+
let utxo_value = self.pending_checks.check_channel_announcement(
1502+
utxo_lookup, msg, full_msg)?;
14981503

14991504
#[allow(unused_mut, unused_assignments)]
15001505
let mut announcement_received_time = 0;

lightning/src/routing/utxo.rs

Lines changed: 134 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@ use bitcoin::hashes::hex::ToHex;
1818

1919
use crate::ln::chan_utils::make_funding_redeemscript_from_slices;
2020
use crate::ln::msgs::{self, LightningError, ErrorAction};
21+
use crate::routing::gossip::{NetworkGraph, NodeId};
22+
use crate::util::logger::{Level, Logger};
2123
use crate::util::ser::Writeable;
2224

2325
use crate::prelude::*;
2426

27+
use alloc::sync::{Arc, Weak};
28+
use crate::sync::Mutex;
2529
use core::ops::Deref;
2630

2731
/// An error when accessing the chain via [`UtxoLookup`].
@@ -34,26 +38,117 @@ pub enum UtxoLookupError {
3438
UnknownTx,
3539
}
3640

41+
/// The result of a [`UtxoLookup::get_utxo`] call. A call may resolve either synchronously,
42+
/// returning the `Sync` variant, or asynchronously, returning an [`UtxoFuture`] in the `Async`
43+
/// variant.
44+
pub enum UtxoResult {
45+
/// A result which was resolved synchronously. It either includes a [`TxOut`] for the output
46+
/// requested or a [`UtxoLookupError`].
47+
Sync(Result<TxOut, UtxoLookupError>),
48+
/// A result which will be resolved asynchronously. It includes a [`UtxoFuture`], a `clone` of
49+
/// which you must keep locally and call [`UtxoFuture::resolve`] on once the lookup completes.
50+
///
51+
/// Note that in order to avoid runaway memory usage, the number of parallel checks is limited,
52+
/// but only fairly loosely. Because a pending checks block all message processing, leaving
53+
/// checks pending for an extended time may cause DoS of other functions. It is recommended you
54+
/// keep a tight timeout on lookups, on the order of a few seconds.
55+
Async(UtxoFuture),
56+
}
57+
3758
/// The `UtxoLookup` trait defines behavior for accessing on-chain UTXOs.
3859
pub trait UtxoLookup {
3960
/// Returns the transaction output of a funding transaction encoded by [`short_channel_id`].
4061
/// Returns an error if `genesis_hash` is for a different chain or if such a transaction output
4162
/// is unknown.
4263
///
4364
/// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id
44-
fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> Result<TxOut, UtxoLookupError>;
65+
fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult;
66+
}
67+
68+
enum ChannelAnnouncement {
69+
Full(msgs::ChannelAnnouncement),
70+
Unsigned(msgs::UnsignedChannelAnnouncement),
71+
}
72+
73+
struct UtxoMessages {
74+
complete: Option<Result<TxOut, UtxoLookupError>>,
75+
channel_announce: Option<ChannelAnnouncement>,
76+
}
77+
78+
/// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async.
79+
///
80+
/// See [`UtxoResult::Async`] and [`UtxoFuture::resolve`] for more info.
81+
#[derive(Clone)]
82+
pub struct UtxoFuture {
83+
state: Arc<Mutex<UtxoMessages>>,
84+
}
85+
86+
/// A trivial implementation of [`UtxoLookup`] which is used to call back into the network graph
87+
/// once we have a concrete resolution of a request.
88+
struct UtxoResolver(Result<TxOut, UtxoLookupError>);
89+
impl UtxoLookup for UtxoResolver {
90+
fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult {
91+
UtxoResult::Sync(self.0.clone())
92+
}
93+
}
94+
95+
impl UtxoFuture {
96+
/// Builds a new future for later resolution.
97+
pub fn new() -> Self {
98+
Self { state: Arc::new(Mutex::new(UtxoMessages {
99+
complete: None,
100+
channel_announce: None,
101+
}))}
102+
}
103+
104+
/// Resolves this future against the given `graph` and with the given `result`.
105+
pub fn resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
106+
where L::Target: Logger {
107+
let announcement = {
108+
let mut async_messages = self.state.lock().unwrap();
109+
110+
if async_messages.channel_announce.is_none() {
111+
// We raced returning to `check_channel_announcement` which hasn't updated
112+
// `channel_announce` yet. That's okay, we can set the `complete` field which it will
113+
// check once it gets control again.
114+
async_messages.complete = Some(result);
115+
return;
116+
}
117+
118+
async_messages.channel_announce.take().unwrap()
119+
};
120+
121+
// Now that we've updated our internal state, pass the pending messages back through the
122+
// network graph with a different `UtxoLookup` which will resolve immediately.
123+
// Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do
124+
// with them.
125+
let resolver = UtxoResolver(result);
126+
match announcement {
127+
ChannelAnnouncement::Full(signed_msg) => {
128+
let _ = graph.update_channel_from_announcement(&signed_msg, &Some(&resolver));
129+
},
130+
ChannelAnnouncement::Unsigned(msg) => {
131+
let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
132+
},
133+
}
134+
}
135+
}
136+
137+
/// A set of messages which are pending UTXO lookups for processing.
138+
pub(super) struct PendingChecks {
45139
}
46140

47-
pub(crate) fn check_channel_announcement<U: Deref>(
48-
utxo_lookup: &Option<U>, msg: &msgs::UnsignedChannelAnnouncement
49-
) -> Result<Option<u64>, msgs::LightningError> where U::Target: UtxoLookup {
50-
match utxo_lookup {
51-
&None => {
52-
// Tentatively accept, potentially exposing us to DoS attacks
53-
Ok(None)
54-
},
55-
&Some(ref utxo_lookup) => {
56-
match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) {
141+
impl PendingChecks {
142+
pub(super) fn new() -> Self {
143+
PendingChecks {}
144+
}
145+
146+
pub(super) fn check_channel_announcement<U: Deref>(&self,
147+
utxo_lookup: &Option<U>, msg: &msgs::UnsignedChannelAnnouncement,
148+
full_msg: Option<&msgs::ChannelAnnouncement>
149+
) -> Result<Option<u64>, msgs::LightningError> where U::Target: UtxoLookup {
150+
let handle_result = |res| {
151+
match res {
57152
Ok(TxOut { value, script_pubkey }) => {
58153
let expected_script =
59154
make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_slice(), msg.bitcoin_key_2.as_slice()).to_v0_p2wsh();
@@ -80,6 +175,34 @@ pub(crate) fn check_channel_announcement<U: Deref>(
80175
})
81176
},
82177
}
178+
};
179+
180+
match utxo_lookup {
181+
&None => {
182+
// Tentatively accept, potentially exposing us to DoS attacks
183+
Ok(None)
184+
},
185+
&Some(ref utxo_lookup) => {
186+
match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) {
187+
UtxoResult::Sync(res) => handle_result(res),
188+
UtxoResult::Async(future) => {
189+
let mut async_messages = future.state.lock().unwrap();
190+
if let Some(res) = async_messages.complete.take() {
191+
// In the unlikely event the future resolved before we managed to get it,
192+
// handle the result in-line.
193+
handle_result(res)
194+
} else {
195+
async_messages.channel_announce = Some(
196+
if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
197+
else { ChannelAnnouncement::Unsigned(msg.clone()) });
198+
Err(LightningError {
199+
err: "Channel being checked async".to_owned(),
200+
action: ErrorAction::IgnoreAndLog(Level::Gossip),
201+
})
202+
}
203+
},
204+
}
205+
}
83206
}
84207
}
85208
}

lightning/src/util/test_utils.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::ln::{msgs, wire};
2323
use crate::ln::msgs::LightningError;
2424
use crate::ln::script::ShutdownScript;
2525
use crate::routing::gossip::{NetworkGraph, NodeId};
26-
use crate::routing::utxo::{UtxoLookup, UtxoLookupError};
26+
use crate::routing::utxo::{UtxoLookup, UtxoLookupError, UtxoResult};
2727
use crate::routing::router::{find_route, InFlightHtlcs, Route, RouteHop, RouteParameters, Router, ScorerAccountingForInFlightHtlcs};
2828
use crate::routing::scoring::FixedPenaltyScorer;
2929
use crate::util::config::UserConfig;
@@ -857,12 +857,12 @@ impl TestChainSource {
857857
}
858858

859859
impl UtxoLookup for TestChainSource {
860-
fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> Result<TxOut, UtxoLookupError> {
860+
fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult {
861861
if self.genesis_hash != *genesis_hash {
862-
return Err(UtxoLookupError::UnknownChain);
862+
return UtxoResult::Sync(Err(UtxoLookupError::UnknownChain));
863863
}
864864

865-
self.utxo_ret.lock().unwrap().clone()
865+
UtxoResult::Sync(self.utxo_ret.lock().unwrap().clone())
866866
}
867867
}
868868

0 commit comments

Comments
 (0)