Skip to content

Commit c52e4e7

Browse files
committed
Switch to async traits for bitcoind and KVStore outside of monitors
With LDK 0.2, most traits now offer async variants. In the previous commit we ignored these but here we migrate to them everywhere except in the `ChannelMonitor` persistence.
1 parent d9e7a95 commit c52e4e7

File tree

5 files changed

+199
-209
lines changed

5 files changed

+199
-209
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ lightning-block-sync = { version = "0.2.0-beta1", features = [ "rpc-client", "to
1313
lightning-dns-resolver = { version = "0.3.0-beta1" }
1414
lightning-invoice = { version = "0.34.0-beta1" }
1515
lightning-net-tokio = { version = "0.2.0-beta1" }
16-
lightning-persister = { version = "0.2.0-beta1" }
16+
lightning-persister = { version = "0.2.0-beta1", features = [ "tokio" ] }
1717
lightning-background-processor = { version = "0.2.0-beta1" }
1818
lightning-rapid-gossip-sync = { version = "0.2.0-beta1" }
1919
lightning-macros = { version = "0.2.0-beta1" }

src/bitcoind_client.rs

Lines changed: 59 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ use bitcoin::key::XOnlyPublicKey;
1616
use bitcoin::psbt::Psbt;
1717
use bitcoin::{Network, OutPoint, TxOut, WPubkeyHash};
1818
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
19-
use lightning::events::bump_transaction::Utxo;
20-
use lightning::events::bump_transaction::sync::WalletSourceSync;
19+
use lightning::events::bump_transaction::{Utxo, WalletSource};
2120
use lightning::log_error;
22-
use lightning::sign::ChangeDestinationSourceSync;
21+
use lightning::sign::ChangeDestinationSource;
22+
use lightning::util::async_poll::AsyncResult;
2323
use lightning::util::logger::Logger;
2424
use lightning_block_sync::http::HttpEndpoint;
2525
use lightning_block_sync::rpc::RpcClient;
@@ -32,7 +32,7 @@ use std::sync::atomic::{AtomicU32, Ordering};
3232
use std::sync::Arc;
3333
use std::time::Duration;
3434

35-
use tokio::runtime::{self, Runtime};
35+
use tokio::runtime::Handle;
3636

3737
pub struct BitcoindClient {
3838
pub(crate) bitcoind_rpc_client: Arc<RpcClient>,
@@ -42,8 +42,7 @@ pub struct BitcoindClient {
4242
rpc_user: String,
4343
rpc_password: String,
4444
fees: Arc<HashMap<ConfirmationTarget, AtomicU32>>,
45-
main_runtime_handle: runtime::Handle,
46-
inner_runtime: Arc<Runtime>,
45+
main_runtime_handle: Handle,
4746
logger: Arc<FilesystemLogger>,
4847
}
4948

@@ -71,7 +70,7 @@ const MIN_FEERATE: u32 = 253;
7170
impl BitcoindClient {
7271
pub(crate) async fn new(
7372
host: String, port: u16, rpc_user: String, rpc_password: String, network: Network,
74-
handle: runtime::Handle, logger: Arc<FilesystemLogger>,
73+
handle: Handle, logger: Arc<FilesystemLogger>,
7574
) -> std::io::Result<Self> {
7675
let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port);
7776
let rpc_credentials =
@@ -100,15 +99,6 @@ impl BitcoindClient {
10099
fees.insert(ConfirmationTarget::ChannelCloseMinimum, AtomicU32::new(MIN_FEERATE));
101100
fees.insert(ConfirmationTarget::OutputSpendingFee, AtomicU32::new(MIN_FEERATE));
102101

103-
let mut builder = runtime::Builder::new_multi_thread();
104-
let runtime =
105-
builder.enable_all().worker_threads(1).thread_name("rpc-worker").build().unwrap();
106-
let inner_runtime = Arc::new(runtime);
107-
// Tokio will panic if we drop a runtime while in another runtime. Because the entire
108-
// application runs inside a tokio runtime, we have to ensure this runtime is never
109-
// `drop`'d, which we do by leaking an Arc reference.
110-
std::mem::forget(Arc::clone(&inner_runtime));
111-
112102
let client = Self {
113103
bitcoind_rpc_client: Arc::new(bitcoind_rpc_client),
114104
host,
@@ -118,7 +108,6 @@ impl BitcoindClient {
118108
network,
119109
fees: Arc::new(fees),
120110
main_runtime_handle: handle.clone(),
121-
inner_runtime,
122111
logger,
123112
};
124113
BitcoindClient::poll_for_fee_estimates(
@@ -131,7 +120,7 @@ impl BitcoindClient {
131120

132121
fn poll_for_fee_estimates(
133122
fees: Arc<HashMap<ConfirmationTarget, AtomicU32>>, rpc_client: Arc<RpcClient>,
134-
handle: tokio::runtime::Handle,
123+
handle: Handle,
135124
) {
136125
handle.spawn(async move {
137126
loop {
@@ -241,39 +230,6 @@ impl BitcoindClient {
241230
});
242231
}
243232

244-
fn run_future_in_blocking_context<F: Future + Send + 'static>(&self, future: F) -> F::Output
245-
where
246-
F::Output: Send + 'static,
247-
{
248-
// Tokio deliberately makes it nigh impossible to block on a future in a sync context that
249-
// is running in an async task (which makes it really hard to interact with sync code that
250-
// has callbacks in an async project).
251-
//
252-
// Reading the docs, it *seems* like
253-
// `tokio::task::block_in_place(tokio::runtime::Handle::spawn(future))` should do the
254-
// trick, and 99.999% of the time it does! But tokio has a "non-stealable I/O driver" - if
255-
// the task we're running happens to, by sheer luck, be holding the "I/O driver" when we go
256-
// into a `block_in_place` call, and the inner future requires I/O (which of course it
257-
// does, its a future!), the whole thing will come to a grinding halt as no other thread is
258-
// allowed to poll I/O until the blocked one finishes.
259-
//
260-
// This is, of course, nuts, and an almost trivial performance penalty of occasional
261-
// additional wakeups would solve this, but tokio refuses to do so because any performance
262-
// penalty at all would be too much (tokio issue #4730).
263-
//
264-
// Instead, we have to do a rather insane dance - we have to spawn the `future` we want to
265-
// run on a *different* (threaded) tokio runtime (doing the `block_in_place` dance to avoid
266-
// blocking too many threads on the main runtime). We want to block on that `future` being
267-
// run on the other runtime's threads, but tokio only provides `block_on` to do so, which
268-
// runs the `future` itself on the current thread, panicing if this thread is already a
269-
// part of a tokio runtime (which in this case it is - the main tokio runtime). Thus, we
270-
// have to `spawn` the `future` on the secondary runtime and then `block_on` the resulting
271-
// `JoinHandle` on the main runtime.
272-
tokio::task::block_in_place(move || {
273-
self.main_runtime_handle.block_on(self.inner_runtime.spawn(future)).unwrap()
274-
})
275-
}
276-
277233
pub fn get_new_rpc_client(&self) -> RpcClient {
278234
let http_endpoint = HttpEndpoint::for_host(self.host.clone()).with_port(self.port);
279235
let rpc_credentials = base64::encode(format!("{}:{}", self.rpc_user, self.rpc_password));
@@ -406,60 +362,64 @@ impl BroadcasterInterface for BitcoindClient {
406362
}
407363
}
408364

409-
impl ChangeDestinationSourceSync for BitcoindClient {
410-
fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
411-
let future = self.get_new_address();
412-
Ok(self.run_future_in_blocking_context(async move { future.await.script_pubkey() }))
365+
impl ChangeDestinationSource for BitcoindClient {
366+
fn get_change_destination_script<'a>(&'a self) -> AsyncResult<'a, ScriptBuf> {
367+
Box::pin(async move {
368+
Ok(self.get_new_address().await.script_pubkey())
369+
})
413370
}
414371
}
415372

416-
impl WalletSourceSync for BitcoindClient {
417-
fn list_confirmed_utxos(&self) -> Result<Vec<Utxo>, ()> {
418-
let future = self.list_unspent();
419-
let utxos = self.run_future_in_blocking_context(async move { future.await.0 });
420-
Ok(utxos
421-
.into_iter()
422-
.filter_map(|utxo| {
423-
let outpoint = OutPoint { txid: utxo.txid, vout: utxo.vout };
424-
let value = bitcoin::Amount::from_sat(utxo.amount);
425-
match utxo.address.witness_program() {
426-
Some(prog) if prog.is_p2wpkh() => {
427-
WPubkeyHash::from_slice(prog.program().as_bytes())
428-
.map(|wpkh| Utxo::new_v0_p2wpkh(outpoint, value, &wpkh))
429-
.ok()
430-
},
431-
Some(prog) if prog.is_p2tr() => {
432-
// TODO: Add `Utxo::new_v1_p2tr` upstream.
433-
XOnlyPublicKey::from_slice(prog.program().as_bytes())
434-
.map(|_| Utxo {
435-
outpoint,
436-
output: TxOut {
437-
value,
438-
script_pubkey: utxo.address.script_pubkey(),
439-
},
440-
satisfaction_weight: 1 /* empty script_sig */ * WITNESS_SCALE_FACTOR as u64 +
441-
1 /* witness items */ + 1 /* schnorr sig len */ + 64, /* schnorr sig */
442-
})
443-
.ok()
444-
},
445-
_ => None,
446-
}
447-
})
448-
.collect())
373+
impl WalletSource for BitcoindClient {
374+
fn list_confirmed_utxos<'a>(&'a self) -> AsyncResult<'a, Vec<Utxo>> {
375+
Box::pin(async move {
376+
let utxos = self.list_unspent().await.0;
377+
Ok(utxos
378+
.into_iter()
379+
.filter_map(|utxo| {
380+
let outpoint = OutPoint { txid: utxo.txid, vout: utxo.vout };
381+
let value = bitcoin::Amount::from_sat(utxo.amount);
382+
match utxo.address.witness_program() {
383+
Some(prog) if prog.is_p2wpkh() => {
384+
WPubkeyHash::from_slice(prog.program().as_bytes())
385+
.map(|wpkh| Utxo::new_v0_p2wpkh(outpoint, value, &wpkh))
386+
.ok()
387+
},
388+
Some(prog) if prog.is_p2tr() => {
389+
// TODO: Add `Utxo::new_v1_p2tr` upstream.
390+
XOnlyPublicKey::from_slice(prog.program().as_bytes())
391+
.map(|_| Utxo {
392+
outpoint,
393+
output: TxOut {
394+
value,
395+
script_pubkey: utxo.address.script_pubkey(),
396+
},
397+
satisfaction_weight: 1 /* empty script_sig */ * WITNESS_SCALE_FACTOR as u64 +
398+
1 /* witness items */ + 1 /* schnorr sig len */ + 64, /* schnorr sig */
399+
})
400+
.ok()
401+
},
402+
_ => None,
403+
}
404+
})
405+
.collect())
406+
})
449407
}
450408

451-
fn get_change_script(&self) -> Result<ScriptBuf, ()> {
452-
let future = self.get_new_address();
453-
Ok(self.run_future_in_blocking_context(async move { future.await.script_pubkey() }))
409+
fn get_change_script<'a>(&'a self) -> AsyncResult<'a, ScriptBuf> {
410+
Box::pin(async move {
411+
Ok(self.get_new_address().await.script_pubkey())
412+
})
454413
}
455414

456-
fn sign_psbt(&self, tx: Psbt) -> Result<Transaction, ()> {
457-
let mut tx_bytes = Vec::new();
458-
let _ = tx.unsigned_tx.consensus_encode(&mut tx_bytes).map_err(|_| ());
459-
let tx_hex = hex_utils::hex_str(&tx_bytes);
460-
let future = self.sign_raw_transaction_with_wallet(tx_hex);
461-
let signed_tx = self.run_future_in_blocking_context(async move { future.await });
462-
let signed_tx_bytes = hex_utils::to_vec(&signed_tx.hex).ok_or(())?;
463-
Transaction::consensus_decode(&mut signed_tx_bytes.as_slice()).map_err(|_| ())
415+
fn sign_psbt<'a>(&'a self, tx: Psbt) -> AsyncResult<'a, Transaction> {
416+
Box::pin(async move {
417+
let mut tx_bytes = Vec::new();
418+
let _ = tx.unsigned_tx.consensus_encode(&mut tx_bytes).map_err(|_| ());
419+
let tx_hex = hex_utils::hex_str(&tx_bytes);
420+
let signed_tx = self.sign_raw_transaction_with_wallet(tx_hex).await;
421+
let signed_tx_bytes = hex_utils::to_vec(&signed_tx.hex).ok_or(())?;
422+
Transaction::consensus_decode(&mut signed_tx_bytes.as_slice()).map_err(|_| ())
423+
})
464424
}
465425
}

0 commit comments

Comments
 (0)