Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ concurrency:
jobs:
build:
strategy:
fail-fast: false
matrix:
platform: [
ubuntu-latest,
Expand Down
3 changes: 0 additions & 3 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ use std::default::Default;
use std::fmt;
use std::fs;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex, Once, RwLock};
use std::time::SystemTime;
use vss_client::headers::{FixedHeaders, LnurlAuthToJwtProvider, VssHeaderProvider};
Expand Down Expand Up @@ -1105,7 +1104,6 @@ fn build_with_store_internal(
}

// Initialize the status fields.
let is_listening = Arc::new(AtomicBool::new(false));
let node_metrics = match read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)) {
Ok(metrics) => Arc::new(RwLock::new(metrics)),
Err(e) => {
Expand Down Expand Up @@ -1679,7 +1677,6 @@ fn build_with_store_internal(
peer_store,
payment_store,
is_running,
is_listening,
node_metrics,
})
}
Expand Down
120 changes: 80 additions & 40 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ use rand::Rng;

use std::default::Default;
use std::net::ToSocketAddrs;
use std::sync::atomic::{AtomicBool, Ordering};
use std::process::Command;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

Expand Down Expand Up @@ -203,7 +203,6 @@ pub struct Node {
peer_store: Arc<PeerStore<Arc<Logger>>>,
payment_store: Arc<PaymentStore>,
is_running: Arc<RwLock<bool>>,
is_listening: Arc<AtomicBool>,
node_metrics: Arc<RwLock<NodeMetrics>>,
}

Expand Down Expand Up @@ -305,9 +304,7 @@ impl Node {
if let Some(listening_addresses) = &self.config.listening_addresses {
// Setup networking
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
let mut stop_listen = self.stop_sender.subscribe();
let listening_logger = Arc::clone(&self.logger);
let listening_indicator = Arc::clone(&self.is_listening);

let mut bind_addrs = Vec::with_capacity(listening_addresses.len());

Expand All @@ -325,45 +322,79 @@ impl Node {
bind_addrs.extend(resolved_address);
}

self.runtime.spawn_cancellable_background_task(async move {
{
let listener =
tokio::net::TcpListener::bind(&*bind_addrs).await
.unwrap_or_else(|e| {
log_error!(listening_logger, "Failed to bind to listen addresses/ports - is something else already listening on it?: {}", e);
panic!(
"Failed to bind to listen address/port - is something else already listening on it?",
);
});

listening_indicator.store(true, Ordering::Release);

loop {
let peer_mgr = Arc::clone(&peer_manager_connection_handler);
tokio::select! {
_ = stop_listen.changed() => {
log_debug!(
listening_logger,
"Stopping listening to inbound connections."
let logger = Arc::clone(&listening_logger);
let listeners = self.runtime.block_on(async move {
let mut listeners = Vec::new();

// Try to bind to all addresses
for addr in &*bind_addrs {
match tokio::net::TcpListener::bind(addr).await {
Ok(listener) => {
log_trace!(logger, "Listener bound to {}", addr);
listeners.push(listener);
},
Err(e) => {
let pid = std::process::id();
println!("Failed to bind to port {}: {} (this pid: {})", addr, e, pid);

let output = Command::new("lsof")
.args(&["-i", &format!(":{}", &addr.port())])
.output()
.expect("failed to execute lsof");

println!("LSOF output: {}", String::from_utf8_lossy(&output.stdout));

let output = Command::new("netstat")
.args(&["-an"])
.output()
.expect("failed to execute netstat");

println!("Netstat output: {}", String::from_utf8_lossy(&output.stdout));

log_error!(
logger,
"Failed to bind to {}: {} - is something else already listening?",
addr,
e
);
break;
}
res = listener.accept() => {
let tcp_stream = res.unwrap().0;
tokio::spawn(async move {
lightning_net_tokio::setup_inbound(
Arc::clone(&peer_mgr),
tcp_stream.into_std().unwrap(),
)
.await;
});
}
return Err(Error::InvalidSocketAddress);
},
}
}
}

listening_indicator.store(false, Ordering::Release);
});
Ok(listeners)
})?;

for listener in listeners {
let logger = Arc::clone(&listening_logger);
let peer_mgr = Arc::clone(&peer_manager_connection_handler);
let mut stop_listen = self.stop_sender.subscribe();
let runtime = Arc::clone(&self.runtime);
self.runtime.spawn_cancellable_background_task(async move {
loop {
tokio::select! {
_ = stop_listen.changed() => {
log_debug!(
logger,
"Stopping listening to inbound connections."
);
break;
}
res = listener.accept() => {
let tcp_stream = res.unwrap().0;
let peer_mgr = Arc::clone(&peer_mgr);
runtime.spawn_cancellable_background_task(async move {
lightning_net_tokio::setup_inbound(
Arc::clone(&peer_mgr),
tcp_stream.into_std().unwrap(),
)
.await;
});
}
}
}
});
}
}

// Regularly reconnect to persisted peers.
Expand Down Expand Up @@ -675,7 +706,8 @@ impl Node {
/// Returns the status of the [`Node`].
pub fn status(&self) -> NodeStatus {
let is_running = *self.is_running.read().unwrap();
let is_listening = self.is_listening.load(Ordering::Acquire);
let is_listening =
is_running && self.config.listening_addresses.as_ref().map_or(false, |v| !v.is_empty());
let current_best_block = self.channel_manager.current_best_block().into();
let locked_node_metrics = self.node_metrics.read().unwrap();
let latest_lightning_wallet_sync_timestamp =
Expand Down Expand Up @@ -981,6 +1013,14 @@ impl Node {

// We need to use our main runtime here as a local runtime might not be around to poll
// connection futures going forward.

log_info!(
self.logger,
"Attempt connection to peer {}@{}..",
peer_info.node_id,
peer_info.address
);

self.runtime.block_on(async move {
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
})?;
Expand Down
50 changes: 30 additions & 20 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ pub(crate) fn setup_bitcoind_and_electrsd() -> (BitcoinD, ElectrsD) {
let mut bitcoind_conf = corepc_node::Conf::default();
bitcoind_conf.network = "regtest";
bitcoind_conf.args.push("-rest");
bitcoind_conf.view_stdout = true;
let bitcoind = BitcoinD::with_conf(bitcoind_exe, &bitcoind_conf).unwrap();

let electrs_exe = env::var("ELECTRS_EXE")
Expand All @@ -188,6 +189,7 @@ pub(crate) fn setup_bitcoind_and_electrsd() -> (BitcoinD, ElectrsD) {
let mut electrsd_conf = electrsd::Conf::default();
electrsd_conf.http_enabled = true;
electrsd_conf.network = "regtest";
electrsd_conf.view_stderr = true;
let electrsd = ElectrsD::with_conf(electrs_exe, &bitcoind, &electrsd_conf).unwrap();
(bitcoind, electrsd)
}
Expand All @@ -202,7 +204,7 @@ pub(crate) fn random_storage_path() -> PathBuf {

pub(crate) fn random_port() -> u16 {
let mut rng = thread_rng();
rng.gen_range(5000..65535)
rng.gen_range(5000..32768)
}

pub(crate) fn random_listening_addresses() -> Vec<SocketAddress> {
Expand All @@ -227,7 +229,7 @@ pub(crate) fn random_node_alias() -> Option<NodeAlias> {
Some(NodeAlias(bytes))
}

pub(crate) fn random_config(anchor_channels: bool) -> TestConfig {
pub(crate) fn random_config(anchor_channels: bool, node_id: String) -> TestConfig {
let mut node_config = Config::default();

if !anchor_channels {
Expand All @@ -249,7 +251,9 @@ pub(crate) fn random_config(anchor_channels: bool) -> TestConfig {
println!("Setting random LDK node alias: {:?}", alias);
node_config.node_alias = alias;

TestConfig { node_config, ..Default::default() }
let log_writer = TestLogWriter::Custom(Arc::new(MultiNodeLogger::new(node_id)));

TestConfig { node_config, log_writer }
}

#[cfg(feature = "uniffi")]
Expand All @@ -274,24 +278,28 @@ pub(crate) struct TestConfig {
macro_rules! setup_builder {
($builder: ident, $config: expr) => {
#[cfg(feature = "uniffi")]
let $builder = Builder::from_config($config.clone());
let mut $builder = Builder::from_config($config.node_config.clone());
#[cfg(not(feature = "uniffi"))]
let mut $builder = Builder::from_config($config.clone());
let mut $builder = Builder::from_config($config.node_config.clone());

crate::common::set_builder_log_writer(&mut $builder, &$config);
};
}

pub(crate) use setup_builder;

use crate::common::logging::MultiNodeLogger;

pub(crate) fn setup_two_nodes(
chain_source: &TestChainSource, allow_0conf: bool, anchor_channels: bool,
anchors_trusted_no_reserve: bool,
) -> (TestNode, TestNode) {
println!("== Node A ==");
let config_a = random_config(anchor_channels);
let config_a = random_config(anchor_channels, "node_a".to_string());
let node_a = setup_node(chain_source, config_a, None);

println!("\n== Node B ==");
let mut config_b = random_config(anchor_channels);
let mut config_b = random_config(anchor_channels, "node_b".to_string());
if allow_0conf {
config_b.node_config.trusted_peers_0conf.push(node_a.node_id());
}
Expand All @@ -311,7 +319,7 @@ pub(crate) fn setup_two_nodes(
pub(crate) fn setup_node(
chain_source: &TestChainSource, config: TestConfig, seed_bytes: Option<Vec<u8>>,
) -> TestNode {
setup_builder!(builder, config.node_config);
setup_builder!(builder, config);
match chain_source {
TestChainSource::Esplora(electrsd) => {
let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap());
Expand Down Expand Up @@ -350,18 +358,6 @@ pub(crate) fn setup_node(
},
}

match &config.log_writer {
TestLogWriter::FileWriter => {
builder.set_filesystem_logger(None, None);
},
TestLogWriter::LogFacade => {
builder.set_log_facade_logger();
},
TestLogWriter::Custom(custom_log_writer) => {
builder.set_custom_logger(Arc::clone(custom_log_writer));
},
}

if let Some(seed) = seed_bytes {
#[cfg(feature = "uniffi")]
{
Expand All @@ -383,6 +379,20 @@ pub(crate) fn setup_node(
node
}

pub(crate) fn set_builder_log_writer(builder: &mut Builder, config: &TestConfig) {
match &config.log_writer {
TestLogWriter::FileWriter => {
builder.set_filesystem_logger(None, None);
},
TestLogWriter::LogFacade => {
builder.set_log_facade_logger();
},
TestLogWriter::Custom(custom_log_writer) => {
builder.set_custom_logger(Arc::clone(custom_log_writer));
},
}
}

pub(crate) fn generate_blocks_and_wait<E: ElectrumApi>(
bitcoind: &BitcoindClient, electrs: &E, num: usize,
) {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests_cln.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fn test_cln() {
common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 1);

// Setup LDK Node
let config = common::random_config(true);
let config = common::random_config(true, "ldk_node".to_string());
let mut builder = Builder::from_config(config.node_config);
builder.set_chain_source_esplora("http://127.0.0.1:3002".to_string(), None);

Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests_lnd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn test_lnd() {
common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 1);

// Setup LDK Node
let config = common::random_config(true);
let config = common::random_config(true, "ldk_node".to_string());
let mut builder = Builder::from_config(config.node_config);
builder.set_chain_source_esplora("http://127.0.0.1:3002".to_string(), None);

Expand Down
Loading
Loading