Skip to content

Commit d8f967c

Browse files
committed
Listen on all provided addresses
Previously ldk-node would start binding after the first successful bind to an address.
1 parent 2b09f98 commit d8f967c

File tree

3 files changed

+56
-52
lines changed

3 files changed

+56
-52
lines changed

src/builder.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ use std::default::Default;
7575
use std::fmt;
7676
use std::fs;
7777
use std::path::PathBuf;
78-
use std::sync::atomic::AtomicBool;
7978
use std::sync::{Arc, Mutex, Once, RwLock};
8079
use std::time::SystemTime;
8180
use vss_client::headers::{FixedHeaders, LnurlAuthToJwtProvider, VssHeaderProvider};
@@ -1105,7 +1104,6 @@ fn build_with_store_internal(
11051104
}
11061105

11071106
// Initialize the status fields.
1108-
let is_listening = Arc::new(AtomicBool::new(false));
11091107
let node_metrics = match read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)) {
11101108
Ok(metrics) => Arc::new(RwLock::new(metrics)),
11111109
Err(e) => {
@@ -1679,7 +1677,6 @@ fn build_with_store_internal(
16791677
peer_store,
16801678
payment_store,
16811679
is_running,
1682-
is_listening,
16831680
node_metrics,
16841681
})
16851682
}

src/lib.rs

Lines changed: 54 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ use rand::Rng;
167167

168168
use std::default::Default;
169169
use std::net::ToSocketAddrs;
170-
use std::sync::atomic::{AtomicBool, Ordering};
171170
use std::sync::{Arc, Mutex, RwLock};
172171
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
173172

@@ -203,7 +202,6 @@ pub struct Node {
203202
peer_store: Arc<PeerStore<Arc<Logger>>>,
204203
payment_store: Arc<PaymentStore>,
205204
is_running: Arc<RwLock<bool>>,
206-
is_listening: Arc<AtomicBool>,
207205
node_metrics: Arc<RwLock<NodeMetrics>>,
208206
}
209207

@@ -305,9 +303,7 @@ impl Node {
305303
if let Some(listening_addresses) = &self.config.listening_addresses {
306304
// Setup networking
307305
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
308-
let mut stop_listen = self.stop_sender.subscribe();
309306
let listening_logger = Arc::clone(&self.logger);
310-
let listening_indicator = Arc::clone(&self.is_listening);
311307

312308
let mut bind_addrs = Vec::with_capacity(listening_addresses.len());
313309

@@ -325,46 +321,62 @@ impl Node {
325321
bind_addrs.extend(resolved_address);
326322
}
327323

328-
let runtime = Arc::clone(&self.runtime);
329-
self.runtime.spawn_cancellable_background_task(async move {
330-
{
331-
let listener =
332-
tokio::net::TcpListener::bind(&*bind_addrs).await
333-
.unwrap_or_else(|e| {
334-
log_error!(listening_logger, "Failed to bind to listen addresses/ports - is something else already listening on it?: {}", e);
335-
panic!(
336-
"Failed to bind to listen address/port - is something else already listening on it?",
337-
);
338-
});
339-
340-
listening_indicator.store(true, Ordering::Release);
341-
342-
loop {
343-
let peer_mgr = Arc::clone(&peer_manager_connection_handler);
344-
tokio::select! {
345-
_ = stop_listen.changed() => {
346-
log_debug!(
347-
listening_logger,
348-
"Stopping listening to inbound connections."
324+
let logger = Arc::clone(&listening_logger);
325+
let listeners = self.runtime.block_on(async move {
326+
let mut listeners = Vec::new();
327+
328+
// Try to bind to all addresses
329+
for addr in &*bind_addrs {
330+
match tokio::net::TcpListener::bind(addr).await {
331+
Ok(listener) => {
332+
log_trace!(logger, "Listener bound to {}", addr);
333+
listeners.push(listener);
334+
},
335+
Err(e) => {
336+
log_error!(
337+
logger,
338+
"Failed to bind to {}: {} - is something else already listening?",
339+
addr,
340+
e
349341
);
350-
break;
351-
}
352-
res = listener.accept() => {
353-
let tcp_stream = res.unwrap().0;
354-
runtime.spawn_cancellable_background_task(async move {
355-
lightning_net_tokio::setup_inbound(
356-
Arc::clone(&peer_mgr),
357-
tcp_stream.into_std().unwrap(),
358-
)
359-
.await;
360-
});
361-
}
342+
return Err(Error::InvalidSocketAddress);
343+
},
362344
}
363345
}
364-
}
365346

366-
listening_indicator.store(false, Ordering::Release);
367-
});
347+
Ok(listeners)
348+
})?;
349+
350+
for listener in listeners {
351+
let logger = Arc::clone(&listening_logger);
352+
let peer_mgr = Arc::clone(&peer_manager_connection_handler);
353+
let mut stop_listen = self.stop_sender.subscribe();
354+
let runtime = Arc::clone(&self.runtime);
355+
self.runtime.spawn_cancellable_background_task(async move {
356+
loop {
357+
tokio::select! {
358+
_ = stop_listen.changed() => {
359+
log_debug!(
360+
logger,
361+
"Stopping listening to inbound connections."
362+
);
363+
break;
364+
}
365+
res = listener.accept() => {
366+
let tcp_stream = res.unwrap().0;
367+
let peer_mgr = Arc::clone(&peer_mgr);
368+
runtime.spawn_cancellable_background_task(async move {
369+
lightning_net_tokio::setup_inbound(
370+
Arc::clone(&peer_mgr),
371+
tcp_stream.into_std().unwrap(),
372+
)
373+
.await;
374+
});
375+
}
376+
}
377+
}
378+
});
379+
}
368380
}
369381

370382
// Regularly reconnect to persisted peers.
@@ -676,7 +688,8 @@ impl Node {
676688
/// Returns the status of the [`Node`].
677689
pub fn status(&self) -> NodeStatus {
678690
let is_running = *self.is_running.read().unwrap();
679-
let is_listening = self.is_listening.load(Ordering::Acquire);
691+
let is_listening =
692+
is_running && self.config.listening_addresses.as_ref().map_or(false, |v| !v.is_empty());
680693
let current_best_block = self.channel_manager.current_best_block().into();
681694
let locked_node_metrics = self.node_metrics.read().unwrap();
682695
let latest_lightning_wallet_sync_timestamp =

tests/integration_tests_rust.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -837,10 +837,7 @@ fn do_connection_restart_behavior(persist: bool) {
837837
let node_id_b = node_b.node_id();
838838

839839
let node_addr_b = node_b.listening_addresses().unwrap().first().unwrap().clone();
840-
841-
while !node_b.status().is_listening {
842-
std::thread::sleep(std::time::Duration::from_millis(10));
843-
}
840+
assert!(node_b.status().is_listening);
844841

845842
node_a.connect(node_id_b, node_addr_b, persist).unwrap();
846843

@@ -890,10 +887,7 @@ fn concurrent_connections_succeed() {
890887

891888
let node_id_b = node_b.node_id();
892889
let node_addr_b = node_b.listening_addresses().unwrap().first().unwrap().clone();
893-
894-
while !node_b.status().is_listening {
895-
std::thread::sleep(std::time::Duration::from_millis(10));
896-
}
890+
assert!(node_b.status().is_listening);
897891

898892
let mut handles = Vec::new();
899893
for _ in 0..10 {

0 commit comments

Comments
 (0)