diff --git a/components/addressmanager/src/lib.rs b/components/addressmanager/src/lib.rs index b220a8ab1b..e2915ea2b0 100644 --- a/components/addressmanager/src/lib.rs +++ b/components/addressmanager/src/lib.rs @@ -24,6 +24,10 @@ use thiserror::Error; pub use stores::NetAddress; +pub trait ExternalIpChangeSink: Send + Sync { + fn on_external_ip_changed(&self, new_ip: std::net::IpAddr, old_ip: Option); +} + const MAX_ADDRESSES: usize = 4096; const MAX_CONNECTION_FAILED_COUNT: u64 = 3; @@ -56,27 +60,39 @@ pub struct AddressManager { address_store: address_store_with_cache::Store, config: Arc, local_net_addresses: Vec, + external_ip_change_sinks: Vec>, } impl AddressManager { pub fn new(config: Arc, db: Arc, tick_service: Arc) -> (Arc>, Option) { - let mut instance = Self { + let instance = Self { banned_address_store: DbBannedAddressesStore::new(db.clone(), CachePolicy::Count(MAX_ADDRESSES)), address_store: address_store_with_cache::new(db), local_net_addresses: Vec::new(), + external_ip_change_sinks: Vec::new(), config, }; - let extender = instance.init_local_addresses(tick_service); + let am = Arc::new(Mutex::new(instance)); + let extender = Self::init_local_addresses(&am, tick_service); - (Arc::new(Mutex::new(instance)), extender) + (am, extender) } - fn init_local_addresses(&mut self, tick_service: Arc) -> Option { - self.local_net_addresses = self.local_addresses().collect(); + pub fn register_external_ip_change_sink(&mut self, sink: Arc) { + self.external_ip_change_sinks.push(sink); + } - let extender = if self.local_net_addresses.is_empty() && !self.config.disable_upnp { - let (net_address, ExtendHelper { gateway, local_addr, external_port }) = match self.upnp() { + pub fn clone_external_ip_change_sinks(&self) -> Vec> { + self.external_ip_change_sinks.clone() + } + + fn init_local_addresses(this: &Arc>, tick_service: Arc) -> Option { + let mut me = this.lock(); + me.local_net_addresses = me.local_addresses().collect(); + + let extender = if me.local_net_addresses.is_empty() && !me.config.disable_upnp { + let (net_address, ExtendHelper { gateway, local_addr, external_port }) = match me.upnp() { Err(err) => { warn!("[UPnP] Error adding port mapping: {err}"); return None; @@ -84,7 +100,7 @@ impl AddressManager { Ok(None) => return None, Ok(Some((net_address, extend_helper))) => (net_address, extend_helper), }; - self.local_net_addresses.push(net_address); + me.local_net_addresses.push(net_address); let gateway: igd_next::aio::Gateway = igd_next::aio::Gateway { addr: gateway.addr, @@ -101,12 +117,14 @@ impl AddressManager { gateway, external_port, local_addr, + Arc::clone(this), + Some(net_address.ip.into()), )) } else { None }; - self.local_net_addresses.iter().for_each(|net_addr| { + me.local_net_addresses.iter().for_each(|net_addr| { info!("Publicly routable local address {} added to store", net_addr); }); extender @@ -142,7 +160,19 @@ impl AddressManager { return Left(Right(iter::empty())); }; // TODO: Add Check IPv4 or IPv6 match from Go code - Right(network_interfaces.into_iter().map(|(_, ip)| IpAddress::from(ip)).filter(|&ip| ip.is_publicly_routable()).map( + Right(network_interfaces + .into_iter() + .map(|(_, ip)| IpAddress::from(ip)) + .filter(|ip| { + if self.config.disable_ipv6_interface_discovery { + // Skip IPv6 during automatic discovery if the flag is set + !matches!(**ip, std::net::IpAddr::V6(_)) + } else { + true + } + }) + .filter(|&ip| ip.is_publicly_routable()) + .map( |ip| { info!("Publicly routable local address found: {}", ip); NetAddress::new(ip, self.config.default_p2p_port()) @@ -251,6 +281,14 @@ impl AddressManager { } } + pub fn set_best_local_address(&mut self, address: NetAddress) { + if self.local_net_addresses.is_empty() { + self.local_net_addresses.push(address); + } else { + self.local_net_addresses[0] = address; + } + } + pub fn add_address(&mut self, address: NetAddress) { if address.ip.is_loopback() || address.ip.is_unspecified() { debug!("[Address manager] skipping local address {}", address.ip); diff --git a/components/addressmanager/src/port_mapping_extender.rs b/components/addressmanager/src/port_mapping_extender.rs index 11651bab93..0b8436beb2 100644 --- a/components/addressmanager/src/port_mapping_extender.rs +++ b/components/addressmanager/src/port_mapping_extender.rs @@ -10,9 +10,13 @@ use kaspa_core::{ use std::{net::SocketAddr, sync::Arc, time::Duration}; use crate::UPNP_REGISTRATION_NAME; +use crate::{AddressManager, NetAddress}; +use kaspa_utils::networking::IpAddress; +use parking_lot::Mutex; pub const SERVICE_NAME: &str = "port-mapping-extender"; +#[derive(Clone)] pub struct Extender { tick_service: Arc, fetch_interval: Duration, @@ -20,6 +24,8 @@ pub struct Extender { gateway: igd_next::aio::Gateway, external_port: u16, local_addr: SocketAddr, + address_manager: Arc>, + last_known_external_ip: Arc>>, } impl Extender { @@ -30,14 +36,31 @@ impl Extender { gateway: igd_next::aio::Gateway, external_port: u16, local_addr: SocketAddr, + address_manager: Arc>, + initial_external_ip: Option, ) -> Self { - Self { tick_service, fetch_interval, deadline_sec, gateway, external_port, local_addr } + // Log the initial IP for debugging + if let Some(initial_ip) = initial_external_ip { + debug!("[UPnP] Extender initialized with initial external IP: {}", initial_ip); + } + + Self { + tick_service, + fetch_interval, + deadline_sec, + gateway, + external_port, + local_addr, + address_manager, + last_known_external_ip: Arc::new(Mutex::new(initial_external_ip)), + } } } impl Extender { pub async fn worker(&self) -> Result<(), AddPortError> { while let TickReason::Wakeup = self.tick_service.tick(self.fetch_interval).await { + if let Err(e) = self .gateway .add_port( @@ -53,12 +76,62 @@ impl Extender { } else { debug!("[UPnP] Extend external ip mapping"); } + + let external_ip_result = self.gateway.get_external_ip().await; + if let Err(e) = &external_ip_result { + warn!("[UPnP] Fetch external ip err: {e:?}"); + } else { + debug!("[UPnP] Fetched external ip"); + } + + if let Ok(current_ip) = external_ip_result { + // Check if IP has changed + let ip_changed = { + let mut last_ip_guard = self.last_known_external_ip.lock(); + if *last_ip_guard != Some(current_ip) { + let old_ip = *last_ip_guard; + *last_ip_guard = Some(current_ip); + Some((current_ip, old_ip)) + } else { + None + } + }; // MutexGuard is dropped here + + if let Some((new_ip, old_ip)) = ip_changed { + self.handle_ip_change(new_ip, old_ip).await; + } + } + + } // Let the system print final logs before exiting tokio::time::sleep(Duration::from_millis(500)).await; trace!("{SERVICE_NAME} worker exiting"); Ok(()) } + + async fn handle_ip_change(&self, new_ip: std::net::IpAddr, old_ip: Option) { + info!("[UPnP] External IP changed from {:?} to {}", old_ip, new_ip); + + // Update best_local_address + let mut am_guard = self.address_manager.lock(); + let ip = IpAddress::new(new_ip); + let net_addr = NetAddress { ip, port: self.external_port }; + am_guard.set_best_local_address(net_addr); + debug!("[UPnP] Updated best local address to {}", net_addr); + + // Notify registered sinks (fire-and-forget). We offload each sync callback into its + // own lightweight task so the extender loop isn't delayed by sink logic. + let sinks = am_guard.clone_external_ip_change_sinks(); + drop(am_guard); + for sink in sinks { + let s = sink.clone(); + tokio::spawn(async move { + // Trait is sync; we just invoke inside an async task (fire-and-forget). + s.on_external_ip_changed(new_ip, old_ip); + }); + } + } } impl AsyncService for Extender { diff --git a/components/connectionmanager/src/lib.rs b/components/connectionmanager/src/lib.rs index 2146ec62d1..f741a776b1 100644 --- a/components/connectionmanager/src/lib.rs +++ b/components/connectionmanager/src/lib.rs @@ -9,7 +9,7 @@ use std::{ use duration_string::DurationString; use futures_util::future::{join_all, try_join_all}; use itertools::Itertools; -use kaspa_addressmanager::{AddressManager, NetAddress}; +use kaspa_addressmanager::{AddressManager, NetAddress, ExternalIpChangeSink}; use kaspa_core::{debug, info, warn}; use kaspa_p2p_lib::{common::ProtocolError, ConnectionError, Peer}; use kaspa_utils::triggers::SingleTrigger; @@ -93,6 +93,46 @@ impl ConnectionManager { }); } + /// Synchronously trigger a staggered outbound reconnect (terminates peers one by one with 30s delays) + pub fn trigger_outbound_reconnect(&self) { + let outbound_peers: Vec<_> = self.p2p_adaptor.active_peers() + .into_iter() + .filter(|p| p.is_outbound()) + .collect(); + + if outbound_peers.is_empty() { + info!("No outbound peers to reconnect"); + return; + } + + let peer_count = outbound_peers.len(); + info!("Starting staggered outbound reconnect: {} peers will be renewed with 30s delays", peer_count); + + // Spawn async task for staggered renewal + let p2p_adaptor = self.p2p_adaptor.clone(); + let force_sender = self.force_next_iteration.clone(); + + tokio::spawn(async move { + for (i, peer) in outbound_peers.into_iter().enumerate() { + // Terminate peer + p2p_adaptor.terminate(peer.key()).await; + info!("Terminated outbound peer {} ({}/{})", peer.net_address(), i+1, peer_count); + + // Trigger reconnection (except for the last peer) + if i < peer_count - 1 { + force_sender.send(()).unwrap(); + + // Wait 30 seconds + tokio::time::sleep(Duration::from_secs(30)).await; + } + } + + // Final trigger for the last renewal + force_sender.send(()).unwrap(); + info!("Staggered outbound reconnect completed"); + }); + } + async fn handle_event(self: Arc) { debug!("Starting connection loop iteration"); let peers = self.p2p_adaptor.active_peers(); @@ -338,3 +378,9 @@ impl ConnectionManager { self.connection_requests.lock().await.iter().any(|(address, request)| request.is_permanent && address.ip() == ip) } } + +impl ExternalIpChangeSink for ConnectionManager { + fn on_external_ip_changed(&self, _new_ip: std::net::IpAddr, _old_ip: Option) { + self.trigger_outbound_reconnect(); + } +} diff --git a/consensus/core/src/config/mod.rs b/consensus/core/src/config/mod.rs index bc41cde562..04581b13ca 100644 --- a/consensus/core/src/config/mod.rs +++ b/consensus/core/src/config/mod.rs @@ -66,6 +66,9 @@ pub struct Config { pub disable_upnp: bool, + /// Disable IPv6 during automatic local address discovery (explicit IPv6 in config is still honored) + pub disable_ipv6_interface_discovery: bool, + /// A scale factor to apply to memory allocation bounds pub ram_scale: f64, @@ -97,6 +100,7 @@ impl Config { #[cfg(feature = "devnet-prealloc")] initial_utxo_set: Default::default(), disable_upnp: false, + disable_ipv6_interface_discovery: false, ram_scale: 1.0, retention_period_days: None, } diff --git a/kaspad/src/args.rs b/kaspad/src/args.rs index 9a58bc6dbf..dd4f7c39fb 100644 --- a/kaspad/src/args.rs +++ b/kaspad/src/args.rs @@ -85,6 +85,7 @@ pub struct Args { pub prealloc_amount: u64, pub disable_upnp: bool, + pub disable_ipv6_interface_discovery: bool, #[serde(rename = "nodnsseed")] pub disable_dns_seeding: bool, #[serde(rename = "nogrpc")] @@ -138,6 +139,7 @@ impl Default for Args { prealloc_amount: 10_000_000_000, disable_upnp: false, + disable_ipv6_interface_discovery: false, disable_dns_seeding: false, disable_grpc: false, ram_scale: 1.0, @@ -150,6 +152,7 @@ impl Args { pub fn apply_to_config(&self, config: &mut Config) { config.utxoindex = self.utxoindex; config.disable_upnp = self.disable_upnp; + config.disable_ipv6_interface_discovery = self.disable_ipv6_interface_discovery; config.unsafe_rpc = self.unsafe_rpc; config.enable_unsynced_mining = self.enable_unsynced_mining; config.enable_mainnet_mining = self.enable_mainnet_mining; @@ -362,6 +365,7 @@ Setting to 0 prevents the preallocation and sets the maximum to {}, leading to 0 .help("Interval in seconds for performance metrics collection."), ) .arg(arg!(--"disable-upnp" "Disable upnp")) + .arg(arg!(--"disable-ipv6-interface-discovery" "Disable IPv6 during automatic local address discovery (explicit IPv6 in config is still honored)")) .arg(arg!(--"nodnsseed" "Disable DNS seeding for peers")) .arg(arg!(--"nogrpc" "Disable gRPC server")) .arg( @@ -455,6 +459,7 @@ impl Args { // Note: currently used programmatically by benchmarks and not exposed to CLI users block_template_cache_lifetime: defaults.block_template_cache_lifetime, disable_upnp: arg_match_unwrap_or::(&m, "disable-upnp", defaults.disable_upnp), + disable_ipv6_interface_discovery: arg_match_unwrap_or::(&m, "disable-ipv6-interface-discovery", defaults.disable_ipv6_interface_discovery), disable_dns_seeding: arg_match_unwrap_or::(&m, "nodnsseed", defaults.disable_dns_seeding), disable_grpc: arg_match_unwrap_or::(&m, "nogrpc", defaults.disable_grpc), ram_scale: arg_match_unwrap_or::(&m, "ram-scale", defaults.ram_scale), diff --git a/kaspad/src/daemon.rs b/kaspad/src/daemon.rs index 943a2b98ac..0cb0b65e30 100644 --- a/kaspad/src/daemon.rs +++ b/kaspad/src/daemon.rs @@ -611,7 +611,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm notify_service.notifier(), index_service.as_ref().map(|x| x.notifier()), mining_manager, - flow_context, + flow_context.clone(), subscription_context, index_service.as_ref().map(|x| x.utxoindex().unwrap()), config.clone(), @@ -646,13 +646,14 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm if let Some(index_service) = index_service { async_runtime.register(index_service) }; + if let Some(port_mapping_extender_svc) = port_mapping_extender_svc { async_runtime.register(Arc::new(port_mapping_extender_svc)) }; async_runtime.register(rpc_core_service.clone()); if let Some(grpc_service) = grpc_service { async_runtime.register(grpc_service) - } + }; async_runtime.register(p2p_service); async_runtime.register(consensus_monitor); async_runtime.register(mining_monitor); diff --git a/protocol/flows/src/service.rs b/protocol/flows/src/service.rs index 1633e26db0..3d5dc8a0da 100644 --- a/protocol/flows/src/service.rs +++ b/protocol/flows/src/service.rs @@ -80,6 +80,12 @@ impl AsyncService for P2pService { self.flow_context.address_manager.clone(), ); + // Register as sink for external IP changes + self.flow_context + .address_manager + .lock() + .register_external_ip_change_sink(connection_manager.clone()); + self.flow_context.set_connection_manager(connection_manager.clone()); self.flow_context.start_async_services();