Skip to content

Commit 05a5b0e

Browse files
committed
node: Name spawned tasks, fix some hanging tasks
1 parent 14a845d commit 05a5b0e

File tree

3 files changed

+84
-53
lines changed

3 files changed

+84
-53
lines changed

lexe-ln/src/bitcoind/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use bitcoin::consensus::encode;
1111
use bitcoin::hash_types::{BlockHash, Txid};
1212
use bitcoin::util::address::Address;
1313
use common::cli::{BitcoindRpcInfo, Network};
14+
use common::shutdown::ShutdownChannel;
1415
use common::task::LxTask;
1516
use lightning::chain::chaininterface::{
1617
BroadcasterInterface, ConfirmationTarget, FeeEstimator,
@@ -36,12 +37,14 @@ pub struct LexeBitcoind {
3637
background_fees: Arc<AtomicU32>,
3738
normal_fees: Arc<AtomicU32>,
3839
high_prio_fees: Arc<AtomicU32>,
40+
shutdown: ShutdownChannel,
3941
}
4042

4143
impl LexeBitcoind {
4244
pub async fn init(
4345
bitcoind_rpc: BitcoindRpcInfo,
4446
network: Network,
47+
shutdown: ShutdownChannel,
4548
) -> anyhow::Result<Self> {
4649
debug!(%network, "Initializing bitcoind client");
4750

@@ -61,6 +64,7 @@ impl LexeBitcoind {
6164
background_fees,
6265
normal_fees,
6366
high_prio_fees,
67+
shutdown,
6468
};
6569

6670
// Make an initial test call to check that the RPC client is working
@@ -95,12 +99,17 @@ impl LexeBitcoind {
9599
let background_fees = self.background_fees.clone();
96100
let normal_fees = self.normal_fees.clone();
97101
let high_prio_fees = self.high_prio_fees.clone();
102+
let shutdown = self.shutdown.clone();
98103

104+
// TODO(max): Instrument with shutdown
99105
LxTask::spawn(async move {
100106
let mut poll_interval = time::interval(POLL_FEE_ESTIMATE_INTERVAL);
101107

102108
loop {
103-
poll_interval.tick().await;
109+
tokio::select! {
110+
_ = poll_interval.tick() => {}
111+
() = shutdown.recv() => break,
112+
}
104113

105114
let poll_res = Self::refresh_fees(
106115
background_fees.as_ref(),

node/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ argh = "0.1"
5454
async-trait = "0.1"
5555
# Working with bytes
5656
bytes = "1"
57-
# Used to manually poll a future in ldk-sample's networking code TODO remove
57+
# Utils for working with futures
5858
futures = "0.3"
5959
# Used to specify the HTTP methods used to interact with the API
6060
http = "0.2"

node/src/init.rs

Lines changed: 73 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use common::rng::Crng;
1818
use common::shutdown::ShutdownChannel;
1919
use common::task::LxTask;
2020
use futures::future;
21+
use futures::stream::{FuturesUnordered, StreamExt};
2122
use lexe_ln::alias::{
2223
BlockSourceType, BroadcasterType, FeeEstimatorType, WalletType,
2324
};
@@ -55,13 +56,15 @@ use crate::{api, command};
5556
pub const DEFAULT_CHANNEL_SIZE: usize = 256;
5657
// TODO(max): p2p stuff should probably go in its own module
5758
const P2P_RECONNECT_INTERVAL: Duration = Duration::from_secs(60);
58-
const SHUTDOWN_JOIN_TIMEOUT: Duration = Duration::from_secs(15);
59+
/// The amount of time tasks have to finish after a graceful shutdown was
60+
/// initiated before the program exits.
61+
const SHUTDOWN_TIME_LIMIT: Duration = Duration::from_secs(15);
5962

6063
pub struct UserNode {
6164
// --- General --- //
6265
args: RunArgs,
6366
pub peer_port: Port,
64-
handles: Vec<LxTask<()>>,
67+
tasks: Vec<(&'static str, LxTask<()>)>,
6568

6669
// --- Actors --- //
6770
pub channel_manager: NodeChannelManager,
@@ -110,9 +113,19 @@ impl UserNode {
110113
let min_cpusvn = MIN_SGX_CPUSVN;
111114
let api = init_api(&args);
112115

116+
// Init channels
117+
let (activity_tx, activity_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
118+
let shutdown = ShutdownChannel::new();
119+
let (channel_monitor_updated_tx, channel_monitor_updated_rx) =
120+
mpsc::channel(DEFAULT_CHANNEL_SIZE);
121+
113122
// Initialize LexeBitcoind, fetch provisioned data
114123
let (bitcoind_res, fetch_res) = tokio::join!(
115-
LexeBitcoind::init(args.bitcoind_rpc.clone(), args.network),
124+
LexeBitcoind::init(
125+
args.bitcoind_rpc.clone(),
126+
args.network,
127+
shutdown.clone(),
128+
),
116129
fetch_provisioned_secrets(
117130
api.as_ref(),
118131
user_pk,
@@ -128,12 +141,11 @@ impl UserNode {
128141
let root_seed = &provisioned_secrets.root_seed;
129142
let bitcoind = Arc::new(bitcoind);
130143

131-
// Collect all handles to spawn tasks
132-
let mut handles = Vec::with_capacity(10);
144+
// Collect all handles to spawned tasks
145+
let mut tasks = Vec::with_capacity(10);
133146

134147
// Spawn task to refresh feerates
135-
let refresh_fees_handle = bitcoind.spawn_refresh_fees_task();
136-
handles.push(refresh_fees_handle);
148+
tasks.push(("refresh fees", bitcoind.spawn_refresh_fees_task()));
137149

138150
// Build LexeKeysManager from node init data
139151
let keys_manager = LexeKeysManager::init(rng, &node.node_pk, root_seed)
@@ -149,12 +161,6 @@ impl UserNode {
149161
let fee_estimator = bitcoind.clone();
150162
let broadcaster = bitcoind.clone();
151163

152-
// Init Tokio channels
153-
let (activity_tx, activity_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
154-
let shutdown = ShutdownChannel::new();
155-
let (channel_monitor_updated_tx, channel_monitor_updated_rx) =
156-
mpsc::channel(DEFAULT_CHANNEL_SIZE);
157-
158164
// Initialize Persister
159165
let persister = NodePersister::new(
160166
api.clone(),
@@ -174,14 +180,14 @@ impl UserNode {
174180
));
175181

176182
// Set up the persister -> chain monitor channel
177-
let channel_monitor_updated_handle = spawn_channel_monitor_updated_task(
183+
let channel_monitor_updated_task = spawn_channel_monitor_updated_task(
178184
chain_monitor.clone(),
179185
channel_monitor_updated_rx,
180186
shutdown.clone(),
181187
);
182-
handles.push(channel_monitor_updated_handle);
188+
tasks.push(("channel monitor updated", channel_monitor_updated_task));
183189

184-
// Read the `ChannelMonitor`s and initialize the `P2PGossipSync`
190+
// Read the `ChannelMonitor`s and init `P2PGossipSync`
185191
let (channel_monitors_res, gossip_sync_res) = tokio::join!(
186192
channel_monitors(&persister, keys_manager.clone()),
187193
gossip_sync(args.network, &persister, logger.clone())
@@ -227,22 +233,22 @@ impl UserNode {
227233
);
228234

229235
// Set up listening for inbound P2P connections
230-
let (p2p_listener_handle, peer_port) = spawn_p2p_listener(
236+
let (p2p_listener_task, peer_port) = spawn_p2p_listener(
231237
peer_manager.clone(),
232238
args.peer_port,
233239
shutdown.clone(),
234240
)
235241
.await;
236-
handles.push(p2p_listener_handle);
242+
tasks.push(("p2p listener", p2p_listener_task));
237243

238244
// Spawn a task to regularly reconnect to channel peers
239-
let p2p_reconnector_handle = spawn_p2p_reconnector(
245+
let p2p_reconnector_task = spawn_p2p_reconnector(
240246
channel_manager.clone(),
241247
peer_manager.clone(),
242248
persister.clone(),
243249
shutdown.clone(),
244250
);
245-
handles.push(p2p_reconnector_handle);
251+
tasks.push(("p2p reconnectooor", p2p_reconnector_task));
246252

247253
// Build owner service TLS config for authenticating owner
248254
let node_dns = args.node_dns_name.clone();
@@ -256,32 +262,40 @@ impl UserNode {
256262
network_graph.clone(),
257263
activity_tx,
258264
);
265+
let owner_shutdown = shutdown.clone();
259266
let (owner_addr, owner_service_fut) = warp::serve(owner_routes)
260267
.tls()
261268
.preconfigured_tls(owner_tls)
262269
// A value of 0 indicates that the OS will assign a port for us
263-
.bind_ephemeral(([127, 0, 0, 1], args.owner_port.unwrap_or(0)));
270+
.bind_with_graceful_shutdown(
271+
([127, 0, 0, 1], args.owner_port.unwrap_or(0)),
272+
async move { owner_shutdown.recv().await },
273+
);
264274
let owner_port = owner_addr.port();
265275
info!("Owner service listening on port {}", owner_port);
266-
let owner_service_handle = LxTask::spawn(async move {
276+
let owner_service_task = LxTask::spawn(async move {
267277
owner_service_fut.await;
268278
});
269-
handles.push(owner_service_handle);
279+
tasks.push(("owner service", owner_service_task));
270280

271281
// TODO(phlip9): authenticate host<->node
272282
// Start warp service for host
273283
let host_routes =
274284
command::server::host_routes(args.user_pk, shutdown.clone());
285+
let host_shutdown = shutdown.clone();
275286
let (host_addr, host_service_fut) = warp::serve(host_routes)
276287
// A value of 0 indicates that the OS will assign a port for us
277-
.try_bind_ephemeral(([127, 0, 0, 1], args.host_port.unwrap_or(0)))
288+
.try_bind_with_graceful_shutdown(
289+
([127, 0, 0, 1], args.host_port.unwrap_or(0)),
290+
async move { host_shutdown.recv().await },
291+
)
278292
.context("Failed to bind warp")?;
279293
let host_port = host_addr.port();
280294
info!("Host service listening on port {}", host_port);
281-
let host_service_handle = LxTask::spawn(async move {
295+
let host_service_task = LxTask::spawn(async move {
282296
host_service_fut.await;
283297
});
284-
handles.push(host_service_handle);
298+
tasks.push(("host service", host_service_task));
285299

286300
// Let the runner know that we're ready
287301
info!("Node is ready to accept commands; notifying runner");
@@ -323,7 +337,7 @@ impl UserNode {
323337
));
324338

325339
// Start Background Processing
326-
let bg_processor_handle = LexeBackgroundProcessor::start(
340+
let bg_processor_task = LexeBackgroundProcessor::start(
327341
channel_manager.clone(),
328342
peer_manager.clone(),
329343
persister.clone(),
@@ -333,7 +347,7 @@ impl UserNode {
333347
scorer.clone(),
334348
shutdown.clone(),
335349
);
336-
handles.push(bg_processor_handle);
350+
tasks.push(("background processor", bg_processor_task));
337351

338352
// Construct (but don't start) the inactivity timer
339353
let inactivity_timer = InactivityTimer::new(
@@ -348,7 +362,7 @@ impl UserNode {
348362
// General
349363
args,
350364
peer_port,
351-
handles,
365+
tasks,
352366

353367
// Actors
354368
channel_manager,
@@ -396,21 +410,21 @@ impl UserNode {
396410
.await
397411
.context("Could not sync channel listeners")?;
398412

399-
// Populate / feed the chain monitor and spawn the SPV client
400-
let spv_client_handle = synced_chain_listeners
413+
// Populate the chain monitor and spawn the SPV client
414+
let spv_client_task = synced_chain_listeners
401415
.feed_chain_monitor_and_spawn_spv(
402416
self.chain_monitor.clone(),
403417
self.shutdown.clone(),
404418
)
405419
.context("Error wrapping up sync")?;
406-
self.handles.push(spv_client_handle);
420+
self.tasks.push(("spv client", spv_client_task));
407421

408422
// Sync is complete; start the inactivity timer.
409423
debug!("Starting inactivity timer");
410-
let inactivity_timer_handle = LxTask::spawn(async move {
424+
let inactivity_timer_task = LxTask::spawn(async move {
411425
self.inactivity_timer.start().await;
412426
});
413-
self.handles.push(inactivity_timer_handle);
427+
self.tasks.push(("inactivity timer", inactivity_timer_task));
414428

415429
// --- Run --- //
416430

@@ -445,20 +459,28 @@ impl UserNode {
445459
self.peer_manager.disconnect_all_peers();
446460

447461
info!("Waiting on all tasks to finish");
448-
let join_all_with_timeout = time::timeout(
449-
SHUTDOWN_JOIN_TIMEOUT,
450-
future::join_all(self.handles),
451-
);
452-
match join_all_with_timeout.await {
453-
Ok(results) => {
454-
for res in results {
455-
if let Err(e) = res {
456-
error!("Spawned task panicked: {:#}", e);
462+
let mut tasks = self
463+
.tasks
464+
.into_iter()
465+
.map(|(name, task)| async move {
466+
let join_res = task.await;
467+
(name, join_res)
468+
})
469+
.collect::<FuturesUnordered<_>>();
470+
let timeout = tokio::time::sleep(SHUTDOWN_TIME_LIMIT);
471+
tokio::pin!(timeout);
472+
while !tasks.is_empty() {
473+
tokio::select! {
474+
Some((name, join_res)) = tasks.next() => {
475+
match join_res {
476+
Ok(()) => info!("'{name}' task finished"),
477+
Err(e) => error!("'{name}' task panicked: {e:#}"),
457478
}
458479
}
459-
}
460-
Err(e) => {
461-
error!("Joining on all spawned tasks timed out: {:#}", e);
480+
() = &mut timeout => {
481+
warn!("{} tasks failed to finish", tasks.len());
482+
break;
483+
}
462484
}
463485
}
464486

@@ -610,7 +632,7 @@ async fn spawn_p2p_listener(
610632
info!("Listening for LN P2P connections on port {}", peer_port);
611633

612634
let handle = LxTask::spawn(async move {
613-
let mut child_handles = Vec::with_capacity(1);
635+
let mut child_tasks = Vec::with_capacity(1);
614636

615637
loop {
616638
tokio::select! {
@@ -633,7 +655,7 @@ async fn spawn_p2p_listener(
633655

634656
// Spawn a task to await on the connection
635657
let peer_manager_clone = peer_manager.as_arc_inner();
636-
let child_handle = LxTask::spawn(async move {
658+
let child_task = LxTask::spawn(async move {
637659
// `setup_inbound()` returns a future that completes
638660
// when the connection is closed. The main thread calls
639661
// peer_manager.disconnect_all_peers() once it receives
@@ -646,15 +668,15 @@ async fn spawn_p2p_listener(
646668
connection_closed.await;
647669
});
648670

649-
child_handles.push(child_handle);
671+
child_tasks.push(child_task);
650672
}
651673
_ = shutdown.recv() =>
652674
break info!("LN P2P listen task shutting down"),
653675
}
654676
}
655677

656678
// Wait on all child tasks to finish (i.e. all connections close).
657-
for res in future::join_all(child_handles).await {
679+
for res in future::join_all(child_tasks).await {
658680
if let Err(e) = res {
659681
error!("P2P task panicked: {:#}", e);
660682
}
@@ -682,7 +704,7 @@ fn spawn_p2p_reconnector(
682704
// Prevents race condition where we initiate a reconnect *after*
683705
// a shutdown signal was received, causing this task to hang
684706
biased;
685-
_ = shutdown.recv() => break info!("P2P reconnector shutting down"),
707+
_ = shutdown.recv() => break,
686708
_ = interval.tick() => {}
687709
}
688710

0 commit comments

Comments
 (0)