Skip to content

Commit 636c357

Browse files
committed
Introduce Runtime object allowng to detect outer runtime context
Instead of holding an `Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>` and dealing with stuff like `tokio::task::block_in_place` at all callsites, we introduce a `Runtime` object that takes care of the state transitions, and allows to detect and reuse an outer runtime context. We also adjust the `with_runtime` API to take a `tokio::runtime::Handle` rather than an `Arc<Runtime>`.
1 parent a147ad0 commit 636c357

File tree

14 files changed

+322
-258
lines changed

14 files changed

+322
-258
lines changed

bindings/ldk_node.udl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ dictionary LogRecord {
6464

6565
[Trait, WithForeign]
6666
interface LogWriter {
67-
void log(LogRecord record);
67+
void log(LogRecord record);
6868
};
6969

7070
interface Builder {
@@ -161,8 +161,8 @@ interface Node {
161161

162162
[Enum]
163163
interface Bolt11InvoiceDescription {
164-
Hash(string hash);
165-
Direct(string description);
164+
Hash(string hash);
165+
Direct(string description);
166166
};
167167

168168
interface Bolt11Payment {
@@ -335,6 +335,7 @@ enum BuildError {
335335
"InvalidListeningAddresses",
336336
"InvalidAnnouncementAddresses",
337337
"InvalidNodeAlias",
338+
"RuntimeSetupFailed",
338339
"ReadFailed",
339340
"WriteFailed",
340341
"StoragePathAccessFailed",

src/builder.rs

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::liquidity::{
2828
use crate::logger::{log_error, log_info, LdkLogger, LogLevel, LogWriter, Logger};
2929
use crate::message_handler::NodeCustomMessageHandler;
3030
use crate::peer_store::PeerStore;
31+
use crate::runtime::Runtime;
3132
use crate::tx_broadcaster::TransactionBroadcaster;
3233
use crate::types::{
3334
ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter,
@@ -168,6 +169,8 @@ pub enum BuildError {
168169
InvalidAnnouncementAddresses,
169170
/// The provided alias is invalid.
170171
InvalidNodeAlias,
172+
/// An attempt to setup a runtime has failed.
173+
RuntimeSetupFailed,
171174
/// We failed to read data from the [`KVStore`].
172175
///
173176
/// [`KVStore`]: lightning::util::persist::KVStore
@@ -205,6 +208,7 @@ impl fmt::Display for BuildError {
205208
Self::InvalidAnnouncementAddresses => {
206209
write!(f, "Given announcement addresses are invalid.")
207210
},
211+
Self::RuntimeSetupFailed => write!(f, "Failed to setup a runtime."),
208212
Self::ReadFailed => write!(f, "Failed to read from store."),
209213
Self::WriteFailed => write!(f, "Failed to write to store."),
210214
Self::StoragePathAccessFailed => write!(f, "Failed to access the given storage path."),
@@ -236,6 +240,7 @@ pub struct NodeBuilder {
236240
gossip_source_config: Option<GossipSourceConfig>,
237241
liquidity_source_config: Option<LiquiditySourceConfig>,
238242
log_writer_config: Option<LogWriterConfig>,
243+
runtime_handle: Option<tokio::runtime::Handle>,
239244
}
240245

241246
impl NodeBuilder {
@@ -252,16 +257,28 @@ impl NodeBuilder {
252257
let gossip_source_config = None;
253258
let liquidity_source_config = None;
254259
let log_writer_config = None;
260+
let runtime_handle = None;
255261
Self {
256262
config,
257263
entropy_source_config,
258264
chain_data_source_config,
259265
gossip_source_config,
260266
liquidity_source_config,
261267
log_writer_config,
268+
runtime_handle,
262269
}
263270
}
264271

272+
/// Configures the [`Node`] instance to (re-)use a specific `tokio` runtime.
273+
///
274+
/// If not provided, the node will spawn its own runtime or reuse any outer runtime context it
275+
/// can detect.
276+
#[cfg_attr(feature = "uniffi", allow(dead_code))]
277+
pub fn set_runtime(&mut self, runtime_handle: tokio::runtime::Handle) -> &mut Self {
278+
self.runtime_handle = Some(runtime_handle);
279+
self
280+
}
281+
265282
/// Configures the [`Node`] instance to source its wallet entropy from a seed file on disk.
266283
///
267284
/// If the given file does not exist a new random seed file will be generated and
@@ -650,6 +667,15 @@ impl NodeBuilder {
650667
) -> Result<Node, BuildError> {
651668
let logger = setup_logger(&self.log_writer_config, &self.config)?;
652669

670+
let runtime = if let Some(handle) = self.runtime_handle.as_ref() {
671+
Arc::new(Runtime::with_handle(handle.clone()))
672+
} else {
673+
Arc::new(Runtime::new().map_err(|e| {
674+
log_error!(logger, "Failed to setup tokio runtime: {}", e);
675+
BuildError::RuntimeSetupFailed
676+
})?)
677+
};
678+
653679
let seed_bytes = seed_bytes_from_config(
654680
&self.config,
655681
self.entropy_source_config.as_ref(),
@@ -678,6 +704,7 @@ impl NodeBuilder {
678704
self.gossip_source_config.as_ref(),
679705
self.liquidity_source_config.as_ref(),
680706
seed_bytes,
707+
runtime,
681708
logger,
682709
Arc::new(vss_store),
683710
)
@@ -687,6 +714,15 @@ impl NodeBuilder {
687714
pub fn build_with_store(&self, kv_store: Arc<DynStore>) -> Result<Node, BuildError> {
688715
let logger = setup_logger(&self.log_writer_config, &self.config)?;
689716

717+
let runtime = if let Some(handle) = self.runtime_handle.as_ref() {
718+
Arc::new(Runtime::with_handle(handle.clone()))
719+
} else {
720+
Arc::new(Runtime::new().map_err(|e| {
721+
log_error!(logger, "Failed to setup tokio runtime: {}", e);
722+
BuildError::RuntimeSetupFailed
723+
})?)
724+
};
725+
690726
let seed_bytes = seed_bytes_from_config(
691727
&self.config,
692728
self.entropy_source_config.as_ref(),
@@ -700,6 +736,7 @@ impl NodeBuilder {
700736
self.gossip_source_config.as_ref(),
701737
self.liquidity_source_config.as_ref(),
702738
seed_bytes,
739+
runtime,
703740
logger,
704741
kv_store,
705742
)
@@ -1049,7 +1086,7 @@ fn build_with_store_internal(
10491086
config: Arc<Config>, chain_data_source_config: Option<&ChainDataSourceConfig>,
10501087
gossip_source_config: Option<&GossipSourceConfig>,
10511088
liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64],
1052-
logger: Arc<Logger>, kv_store: Arc<DynStore>,
1089+
runtime: Arc<Runtime>, logger: Arc<Logger>, kv_store: Arc<DynStore>,
10531090
) -> Result<Node, BuildError> {
10541091
if let Err(err) = may_announce_channel(&config) {
10551092
if config.announcement_addresses.is_some() {
@@ -1239,8 +1276,6 @@ fn build_with_store_internal(
12391276
},
12401277
};
12411278

1242-
let runtime = Arc::new(RwLock::new(None));
1243-
12441279
// Initialize the ChainMonitor
12451280
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
12461281
Some(Arc::clone(&chain_source)),
@@ -1633,6 +1668,8 @@ fn build_with_store_internal(
16331668
let (stop_sender, _) = tokio::sync::watch::channel(());
16341669
let background_processor_task = Mutex::new(None);
16351670

1671+
let is_running = Arc::new(RwLock::new(false));
1672+
16361673
Ok(Node {
16371674
runtime,
16381675
stop_sender,
@@ -1658,6 +1695,7 @@ fn build_with_store_internal(
16581695
scorer,
16591696
peer_store,
16601697
payment_store,
1698+
is_running,
16611699
is_listening,
16621700
node_metrics,
16631701
})

src/chain/electrum.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::fee_estimator::{
1818
};
1919
use crate::io::utils::write_node_metrics;
2020
use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger};
21+
use crate::runtime::Runtime;
2122
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
2223
use crate::NodeMetrics;
2324

@@ -89,7 +90,7 @@ impl ElectrumChainSource {
8990
}
9091
}
9192

92-
pub(super) fn start(&self, runtime: Arc<tokio::runtime::Runtime>) -> Result<(), Error> {
93+
pub(super) fn start(&self, runtime: Arc<Runtime>) -> Result<(), Error> {
9394
self.electrum_runtime_status.write().unwrap().start(
9495
self.server_url.clone(),
9596
Arc::clone(&runtime),
@@ -345,7 +346,7 @@ impl ElectrumRuntimeStatus {
345346
}
346347

347348
pub(super) fn start(
348-
&mut self, server_url: String, runtime: Arc<tokio::runtime::Runtime>, config: Arc<Config>,
349+
&mut self, server_url: String, runtime: Arc<Runtime>, config: Arc<Config>,
349350
logger: Arc<Logger>,
350351
) -> Result<(), Error> {
351352
match self {
@@ -409,15 +410,14 @@ struct ElectrumRuntimeClient {
409410
electrum_client: Arc<ElectrumClient>,
410411
bdk_electrum_client: Arc<BdkElectrumClient<ElectrumClient>>,
411412
tx_sync: Arc<ElectrumSyncClient<Arc<Logger>>>,
412-
runtime: Arc<tokio::runtime::Runtime>,
413+
runtime: Arc<Runtime>,
413414
config: Arc<Config>,
414415
logger: Arc<Logger>,
415416
}
416417

417418
impl ElectrumRuntimeClient {
418419
fn new(
419-
server_url: String, runtime: Arc<tokio::runtime::Runtime>, config: Arc<Config>,
420-
logger: Arc<Logger>,
420+
server_url: String, runtime: Arc<Runtime>, config: Arc<Config>, logger: Arc<Logger>,
421421
) -> Result<Self, Error> {
422422
let electrum_config = ElectrumConfigBuilder::new()
423423
.retry(ELECTRUM_CLIENT_NUM_RETRIES)
@@ -550,7 +550,6 @@ impl ElectrumRuntimeClient {
550550

551551
let spawn_fut =
552552
self.runtime.spawn_blocking(move || electrum_client.transaction_broadcast(&tx));
553-
554553
let timeout_fut =
555554
tokio::time::timeout(Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), spawn_fut);
556555

src/chain/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::config::{
1919
use crate::fee_estimator::OnchainFeeEstimator;
2020
use crate::io::utils::write_node_metrics;
2121
use crate::logger::{log_info, log_trace, LdkLogger, Logger};
22+
use crate::runtime::Runtime;
2223
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
2324
use crate::{Error, NodeMetrics};
2425

@@ -188,7 +189,7 @@ impl ChainSource {
188189
Self { kind, logger }
189190
}
190191

191-
pub(crate) fn start(&self, runtime: Arc<tokio::runtime::Runtime>) -> Result<(), Error> {
192+
pub(crate) fn start(&self, runtime: Arc<Runtime>) -> Result<(), Error> {
192193
match &self.kind {
193194
ChainSourceKind::Electrum(electrum_chain_source) => {
194195
electrum_chain_source.start(runtime)?

src/event.rs

Lines changed: 32 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ use crate::io::{
2929
};
3030
use crate::logger::{log_debug, log_error, log_info, LdkLogger};
3131

32+
use crate::runtime::Runtime;
33+
3234
use lightning::events::bump_transaction::BumpTransactionEvent;
3335
use lightning::events::{ClosureReason, PaymentPurpose, ReplayEvent};
3436
use lightning::events::{Event as LdkEvent, PaymentFailureReason};
@@ -53,7 +55,7 @@ use core::future::Future;
5355
use core::task::{Poll, Waker};
5456
use std::collections::VecDeque;
5557
use std::ops::Deref;
56-
use std::sync::{Arc, Condvar, Mutex, RwLock};
58+
use std::sync::{Arc, Condvar, Mutex};
5759
use std::time::Duration;
5860

5961
/// An event emitted by [`Node`], which should be handled by the user.
@@ -451,7 +453,7 @@ where
451453
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
452454
payment_store: Arc<PaymentStore>,
453455
peer_store: Arc<PeerStore<L>>,
454-
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
456+
runtime: Arc<Runtime>,
455457
logger: L,
456458
config: Arc<Config>,
457459
}
@@ -466,8 +468,8 @@ where
466468
channel_manager: Arc<ChannelManager>, connection_manager: Arc<ConnectionManager<L>>,
467469
output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
468470
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
469-
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
470-
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>, logger: L, config: Arc<Config>,
471+
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>, runtime: Arc<Runtime>,
472+
logger: L, config: Arc<Config>,
471473
) -> Self {
472474
Self {
473475
event_queue,
@@ -1049,17 +1051,14 @@ where
10491051
let forwarding_channel_manager = self.channel_manager.clone();
10501052
let min = time_forwardable.as_millis() as u64;
10511053

1052-
let runtime_lock = self.runtime.read().unwrap();
1053-
debug_assert!(runtime_lock.is_some());
1054+
let future = async move {
1055+
let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
1056+
tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
10541057

1055-
if let Some(runtime) = runtime_lock.as_ref() {
1056-
runtime.spawn(async move {
1057-
let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
1058-
tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
1058+
forwarding_channel_manager.process_pending_htlc_forwards();
1059+
};
10591060

1060-
forwarding_channel_manager.process_pending_htlc_forwards();
1061-
});
1062-
}
1061+
self.runtime.spawn(future);
10631062
},
10641063
LdkEvent::SpendableOutputs { outputs, channel_id } => {
10651064
match self.output_sweeper.track_spendable_outputs(outputs, channel_id, true, None) {
@@ -1421,31 +1420,27 @@ where
14211420
debug_assert!(false, "We currently don't handle BOLT12 invoices manually, so this event should never be emitted.");
14221421
},
14231422
LdkEvent::ConnectionNeeded { node_id, addresses } => {
1424-
let runtime_lock = self.runtime.read().unwrap();
1425-
debug_assert!(runtime_lock.is_some());
1426-
1427-
if let Some(runtime) = runtime_lock.as_ref() {
1428-
let spawn_logger = self.logger.clone();
1429-
let spawn_cm = Arc::clone(&self.connection_manager);
1430-
runtime.spawn(async move {
1431-
for addr in &addresses {
1432-
match spawn_cm.connect_peer_if_necessary(node_id, addr.clone()).await {
1433-
Ok(()) => {
1434-
return;
1435-
},
1436-
Err(e) => {
1437-
log_error!(
1438-
spawn_logger,
1439-
"Failed to establish connection to peer {}@{}: {}",
1440-
node_id,
1441-
addr,
1442-
e
1443-
);
1444-
},
1445-
}
1423+
let spawn_logger = self.logger.clone();
1424+
let spawn_cm = Arc::clone(&self.connection_manager);
1425+
let future = async move {
1426+
for addr in &addresses {
1427+
match spawn_cm.connect_peer_if_necessary(node_id, addr.clone()).await {
1428+
Ok(()) => {
1429+
return;
1430+
},
1431+
Err(e) => {
1432+
log_error!(
1433+
spawn_logger,
1434+
"Failed to establish connection to peer {}@{}: {}",
1435+
node_id,
1436+
addr,
1437+
e
1438+
);
1439+
},
14461440
}
1447-
});
1448-
}
1441+
}
1442+
};
1443+
self.runtime.spawn(future);
14491444
},
14501445
LdkEvent::BumpTransaction(bte) => {
14511446
match bte {

0 commit comments

Comments
 (0)