Skip to content

Commit ca1e5a7

Browse files
authored
Merge pull request #644 from joostjager/multi-listen-fix
Listen on all provided addresses
2 parents 09eacfa + 0f621ff commit ca1e5a7

File tree

5 files changed

+68
-58
lines changed

5 files changed

+68
-58
lines changed

bindings/ldk_node.udl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,6 @@ enum NodeError {
325325

326326
dictionary NodeStatus {
327327
boolean is_running;
328-
boolean is_listening;
329328
BestBlock current_best_block;
330329
u64? latest_lightning_wallet_sync_timestamp;
331330
u64? latest_onchain_wallet_sync_timestamp;

src/builder.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use std::collections::HashMap;
99
use std::convert::TryInto;
1010
use std::default::Default;
1111
use std::path::PathBuf;
12-
use std::sync::atomic::AtomicBool;
1312
use std::sync::{Arc, Mutex, Once, RwLock};
1413
use std::time::SystemTime;
1514
use std::{fmt, fs};
@@ -1133,7 +1132,6 @@ fn build_with_store_internal(
11331132
}
11341133

11351134
// Initialize the status fields.
1136-
let is_listening = Arc::new(AtomicBool::new(false));
11371135
let node_metrics = match read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)) {
11381136
Ok(metrics) => Arc::new(RwLock::new(metrics)),
11391137
Err(e) => {
@@ -1734,7 +1732,6 @@ fn build_with_store_internal(
17341732
peer_store,
17351733
payment_store,
17361734
is_running,
1737-
is_listening,
17381735
node_metrics,
17391736
om_mailbox,
17401737
async_payments_role,

src/lib.rs

Lines changed: 52 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ mod wallet;
101101

102102
use std::default::Default;
103103
use std::net::ToSocketAddrs;
104-
use std::sync::atomic::{AtomicBool, Ordering};
105104
use std::sync::{Arc, Mutex, RwLock};
106105
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
107106

@@ -189,7 +188,6 @@ pub struct Node {
189188
peer_store: Arc<PeerStore<Arc<Logger>>>,
190189
payment_store: Arc<PaymentStore>,
191190
is_running: Arc<RwLock<bool>>,
192-
is_listening: Arc<AtomicBool>,
193191
node_metrics: Arc<RwLock<NodeMetrics>>,
194192
om_mailbox: Option<Arc<OnionMessageMailbox>>,
195193
async_payments_role: Option<AsyncPaymentsRole>,
@@ -293,9 +291,7 @@ impl Node {
293291
if let Some(listening_addresses) = &self.config.listening_addresses {
294292
// Setup networking
295293
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
296-
let mut stop_listen = self.stop_sender.subscribe();
297294
let listening_logger = Arc::clone(&self.logger);
298-
let listening_indicator = Arc::clone(&self.is_listening);
299295

300296
let mut bind_addrs = Vec::with_capacity(listening_addresses.len());
301297

@@ -313,45 +309,62 @@ impl Node {
313309
bind_addrs.extend(resolved_address);
314310
}
315311

316-
self.runtime.spawn_cancellable_background_task(async move {
317-
{
318-
let listener =
319-
tokio::net::TcpListener::bind(&*bind_addrs).await
320-
.unwrap_or_else(|e| {
321-
log_error!(listening_logger, "Failed to bind to listen addresses/ports - is something else already listening on it?: {}", e);
322-
panic!(
323-
"Failed to bind to listen address/port - is something else already listening on it?",
324-
);
325-
});
326-
327-
listening_indicator.store(true, Ordering::Release);
328-
329-
loop {
330-
let peer_mgr = Arc::clone(&peer_manager_connection_handler);
331-
tokio::select! {
332-
_ = stop_listen.changed() => {
333-
log_debug!(
334-
listening_logger,
335-
"Stopping listening to inbound connections."
312+
let logger = Arc::clone(&listening_logger);
313+
let listeners = self.runtime.block_on(async move {
314+
let mut listeners = Vec::new();
315+
316+
// Try to bind to all addresses
317+
for addr in &*bind_addrs {
318+
match tokio::net::TcpListener::bind(addr).await {
319+
Ok(listener) => {
320+
log_trace!(logger, "Listener bound to {}", addr);
321+
listeners.push(listener);
322+
},
323+
Err(e) => {
324+
log_error!(
325+
logger,
326+
"Failed to bind to {}: {} - is something else already listening?",
327+
addr,
328+
e
336329
);
337-
break;
338-
}
339-
res = listener.accept() => {
340-
let tcp_stream = res.unwrap().0;
341-
tokio::spawn(async move {
342-
lightning_net_tokio::setup_inbound(
343-
Arc::clone(&peer_mgr),
344-
tcp_stream.into_std().unwrap(),
345-
)
346-
.await;
347-
});
348-
}
330+
return Err(Error::InvalidSocketAddress);
331+
},
349332
}
350333
}
351-
}
352334

353-
listening_indicator.store(false, Ordering::Release);
354-
});
335+
Ok(listeners)
336+
})?;
337+
338+
for listener in listeners {
339+
let logger = Arc::clone(&listening_logger);
340+
let peer_mgr = Arc::clone(&peer_manager_connection_handler);
341+
let mut stop_listen = self.stop_sender.subscribe();
342+
let runtime = Arc::clone(&self.runtime);
343+
self.runtime.spawn_cancellable_background_task(async move {
344+
loop {
345+
tokio::select! {
346+
_ = stop_listen.changed() => {
347+
log_debug!(
348+
logger,
349+
"Stopping listening to inbound connections."
350+
);
351+
break;
352+
}
353+
res = listener.accept() => {
354+
let tcp_stream = res.unwrap().0;
355+
let peer_mgr = Arc::clone(&peer_mgr);
356+
runtime.spawn_cancellable_background_task(async move {
357+
lightning_net_tokio::setup_inbound(
358+
Arc::clone(&peer_mgr),
359+
tcp_stream.into_std().unwrap(),
360+
)
361+
.await;
362+
});
363+
}
364+
}
365+
}
366+
});
367+
}
355368
}
356369

357370
// Regularly reconnect to persisted peers.
@@ -666,7 +679,6 @@ impl Node {
666679
/// Returns the status of the [`Node`].
667680
pub fn status(&self) -> NodeStatus {
668681
let is_running = *self.is_running.read().unwrap();
669-
let is_listening = self.is_listening.load(Ordering::Acquire);
670682
let current_best_block = self.channel_manager.current_best_block().into();
671683
let locked_node_metrics = self.node_metrics.read().unwrap();
672684
let latest_lightning_wallet_sync_timestamp =
@@ -684,7 +696,6 @@ impl Node {
684696

685697
NodeStatus {
686698
is_running,
687-
is_listening,
688699
current_best_block,
689700
latest_lightning_wallet_sync_timestamp,
690701
latest_onchain_wallet_sync_timestamp,
@@ -1495,9 +1506,6 @@ impl Drop for Node {
14951506
pub struct NodeStatus {
14961507
/// Indicates whether the [`Node`] is running.
14971508
pub is_running: bool,
1498-
/// Indicates whether the [`Node`] is listening for incoming connections on the addresses
1499-
/// configured via [`Config::listening_addresses`].
1500-
pub is_listening: bool,
15011509
/// The best block to which our Lightning wallet is currently synced.
15021510
pub current_best_block: BestBlock,
15031511
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully synced

tests/common/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ pub(crate) fn random_storage_path() -> PathBuf {
195195

196196
pub(crate) fn random_port() -> u16 {
197197
let mut rng = thread_rng();
198-
rng.gen_range(5000..65535)
198+
rng.gen_range(5000..32768)
199199
}
200200

201201
pub(crate) fn random_listening_addresses() -> Vec<SocketAddress> {

tests/integration_tests_rust.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,21 @@ fn sign_verify_msg() {
817817
assert!(node.verify_signature(msg, sig.as_str(), &pkey));
818818
}
819819

820+
#[test]
821+
fn connection_multi_listen() {
822+
let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd();
823+
let chain_source = TestChainSource::Esplora(&electrsd);
824+
let (node_a, node_b) = setup_two_nodes(&chain_source, false, false, false);
825+
826+
let node_id_b = node_b.node_id();
827+
828+
let node_addrs_b = node_b.listening_addresses().unwrap();
829+
for node_addr_b in &node_addrs_b {
830+
node_a.connect(node_id_b, node_addr_b.clone(), false).unwrap();
831+
node_a.disconnect(node_id_b).unwrap();
832+
}
833+
}
834+
820835
#[test]
821836
fn connection_restart_behavior() {
822837
do_connection_restart_behavior(true);
@@ -832,11 +847,6 @@ fn do_connection_restart_behavior(persist: bool) {
832847
let node_id_b = node_b.node_id();
833848

834849
let node_addr_b = node_b.listening_addresses().unwrap().first().unwrap().clone();
835-
836-
while !node_b.status().is_listening {
837-
std::thread::sleep(std::time::Duration::from_millis(10));
838-
}
839-
840850
node_a.connect(node_id_b, node_addr_b, persist).unwrap();
841851

842852
let peer_details_a = node_a.list_peers().first().unwrap().clone();
@@ -886,10 +896,6 @@ fn concurrent_connections_succeed() {
886896
let node_id_b = node_b.node_id();
887897
let node_addr_b = node_b.listening_addresses().unwrap().first().unwrap().clone();
888898

889-
while !node_b.status().is_listening {
890-
std::thread::sleep(std::time::Duration::from_millis(10));
891-
}
892-
893899
let mut handles = Vec::new();
894900
for _ in 0..10 {
895901
let thread_node = Arc::clone(&node_a);

0 commit comments

Comments
 (0)