Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 48 additions & 10 deletions components/addressmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ use thiserror::Error;

pub use stores::NetAddress;

pub trait ExternalIpChangeSink: Send + Sync {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called Sink?

fn on_external_ip_changed(&self, new_ip: std::net::IpAddr, old_ip: Option<std::net::IpAddr>);
}

const MAX_ADDRESSES: usize = 4096;
const MAX_CONNECTION_FAILED_COUNT: u64 = 3;

Expand Down Expand Up @@ -56,35 +60,47 @@ pub struct AddressManager {
address_store: address_store_with_cache::Store,
config: Arc<Config>,
local_net_addresses: Vec<NetAddress>,
external_ip_change_sinks: Vec<Arc<dyn ExternalIpChangeSink>>,
}

impl AddressManager {
pub fn new(config: Arc<Config>, db: Arc<DB>, tick_service: Arc<TickService>) -> (Arc<Mutex<Self>>, Option<Extender>) {
let mut instance = Self {
let instance = Self {
banned_address_store: DbBannedAddressesStore::new(db.clone(), CachePolicy::Count(MAX_ADDRESSES)),
address_store: address_store_with_cache::new(db),
local_net_addresses: Vec::new(),
external_ip_change_sinks: Vec::new(),
config,
};

let extender = instance.init_local_addresses(tick_service);
let am = Arc::new(Mutex::new(instance));
let extender = Self::init_local_addresses(&am, tick_service);

(Arc::new(Mutex::new(instance)), extender)
(am, extender)
}

fn init_local_addresses(&mut self, tick_service: Arc<TickService>) -> Option<Extender> {
self.local_net_addresses = self.local_addresses().collect();
pub fn register_external_ip_change_sink(&mut self, sink: Arc<dyn ExternalIpChangeSink>) {
self.external_ip_change_sinks.push(sink);
}

let extender = if self.local_net_addresses.is_empty() && !self.config.disable_upnp {
let (net_address, ExtendHelper { gateway, local_addr, external_port }) = match self.upnp() {
pub fn clone_external_ip_change_sinks(&self) -> Vec<Arc<dyn ExternalIpChangeSink>> {
self.external_ip_change_sinks.clone()
}

fn init_local_addresses(this: &Arc<Mutex<Self>>, tick_service: Arc<TickService>) -> Option<Extender> {
let mut me = this.lock();
me.local_net_addresses = me.local_addresses().collect();

let extender = if me.local_net_addresses.is_empty() && !me.config.disable_upnp {
let (net_address, ExtendHelper { gateway, local_addr, external_port }) = match me.upnp() {
Err(err) => {
warn!("[UPnP] Error adding port mapping: {err}");
return None;
}
Ok(None) => return None,
Ok(Some((net_address, extend_helper))) => (net_address, extend_helper),
};
self.local_net_addresses.push(net_address);
me.local_net_addresses.push(net_address);

let gateway: igd_next::aio::Gateway<Tokio> = igd_next::aio::Gateway {
addr: gateway.addr,
Expand All @@ -101,12 +117,14 @@ impl AddressManager {
gateway,
external_port,
local_addr,
Arc::clone(this),
Some(net_address.ip.into()),
))
} else {
None
};

self.local_net_addresses.iter().for_each(|net_addr| {
me.local_net_addresses.iter().for_each(|net_addr| {
info!("Publicly routable local address {} added to store", net_addr);
});
extender
Expand Down Expand Up @@ -142,7 +160,19 @@ impl AddressManager {
return Left(Right(iter::empty()));
};
// TODO: Add Check IPv4 or IPv6 match from Go code
Right(network_interfaces.into_iter().map(|(_, ip)| IpAddress::from(ip)).filter(|&ip| ip.is_publicly_routable()).map(
Right(network_interfaces
.into_iter()
.map(|(_, ip)| IpAddress::from(ip))
.filter(|ip| {
if self.config.disable_ipv6_interface_discovery {
// Skip IPv6 during automatic discovery if the flag is set
!matches!(**ip, std::net::IpAddr::V6(_))
} else {
true
}
})
.filter(|&ip| ip.is_publicly_routable())
.map(
|ip| {
info!("Publicly routable local address found: {}", ip);
NetAddress::new(ip, self.config.default_p2p_port())
Expand Down Expand Up @@ -251,6 +281,14 @@ impl AddressManager {
}
}

pub fn set_best_local_address(&mut self, address: NetAddress) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's better to handle the TODO in pub fn best_local_address(&mut self) -> Option<NetAddress> { instead

if self.local_net_addresses.is_empty() {
self.local_net_addresses.push(address);
} else {
self.local_net_addresses[0] = address;
}
}

pub fn add_address(&mut self, address: NetAddress) {
if address.ip.is_loopback() || address.ip.is_unspecified() {
debug!("[Address manager] skipping local address {}", address.ip);
Expand Down
75 changes: 74 additions & 1 deletion components/addressmanager/src/port_mapping_extender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,22 @@ use kaspa_core::{
use std::{net::SocketAddr, sync::Arc, time::Duration};

use crate::UPNP_REGISTRATION_NAME;
use crate::{AddressManager, NetAddress};
use kaspa_utils::networking::IpAddress;
use parking_lot::Mutex;

pub const SERVICE_NAME: &str = "port-mapping-extender";

#[derive(Clone)]
pub struct Extender {
tick_service: Arc<TickService>,
fetch_interval: Duration,
deadline_sec: u64,
gateway: igd_next::aio::Gateway<Tokio>,
external_port: u16,
local_addr: SocketAddr,
address_manager: Arc<Mutex<AddressManager>>,
last_known_external_ip: Arc<Mutex<Option<std::net::IpAddr>>>,
}

impl Extender {
Expand All @@ -30,14 +36,31 @@ impl Extender {
gateway: igd_next::aio::Gateway<Tokio>,
external_port: u16,
local_addr: SocketAddr,
address_manager: Arc<Mutex<AddressManager>>,
initial_external_ip: Option<std::net::IpAddr>,
) -> Self {
Self { tick_service, fetch_interval, deadline_sec, gateway, external_port, local_addr }
// Log the initial IP for debugging
if let Some(initial_ip) = initial_external_ip {
debug!("[UPnP] Extender initialized with initial external IP: {}", initial_ip);
}

Self {
tick_service,
fetch_interval,
deadline_sec,
gateway,
external_port,
local_addr,
address_manager,
last_known_external_ip: Arc::new(Mutex::new(initial_external_ip)),
}
}
}

impl Extender {
pub async fn worker(&self) -> Result<(), AddPortError> {
while let TickReason::Wakeup = self.tick_service.tick(self.fetch_interval).await {

if let Err(e) = self
.gateway
.add_port(
Expand All @@ -53,12 +76,62 @@ impl Extender {
} else {
debug!("[UPnP] Extend external ip mapping");
}

let external_ip_result = self.gateway.get_external_ip().await;
if let Err(e) = &external_ip_result {
warn!("[UPnP] Fetch external ip err: {e:?}");
} else {
debug!("[UPnP] Fetched external ip");
}

if let Ok(current_ip) = external_ip_result {
// Check if IP has changed
let ip_changed = {
let mut last_ip_guard = self.last_known_external_ip.lock();
if *last_ip_guard != Some(current_ip) {
let old_ip = *last_ip_guard;
*last_ip_guard = Some(current_ip);
Some((current_ip, old_ip))
} else {
None
}
}; // MutexGuard is dropped here

if let Some((new_ip, old_ip)) = ip_changed {
self.handle_ip_change(new_ip, old_ip).await;
}
}


}
// Let the system print final logs before exiting
tokio::time::sleep(Duration::from_millis(500)).await;
trace!("{SERVICE_NAME} worker exiting");
Ok(())
}

async fn handle_ip_change(&self, new_ip: std::net::IpAddr, old_ip: Option<std::net::IpAddr>) {
info!("[UPnP] External IP changed from {:?} to {}", old_ip, new_ip);

// Update best_local_address
let mut am_guard = self.address_manager.lock();
let ip = IpAddress::new(new_ip);
let net_addr = NetAddress { ip, port: self.external_port };
am_guard.set_best_local_address(net_addr);
debug!("[UPnP] Updated best local address to {}", net_addr);

// Notify registered sinks (fire-and-forget). We offload each sync callback into its
// own lightweight task so the extender loop isn't delayed by sink logic.
let sinks = am_guard.clone_external_ip_change_sinks();
drop(am_guard);
for sink in sinks {
let s = sink.clone();
tokio::spawn(async move {
// Trait is sync; we just invoke inside an async task (fire-and-forget).
s.on_external_ip_changed(new_ip, old_ip);
});
}
}
}

impl AsyncService for Extender {
Expand Down
48 changes: 47 additions & 1 deletion components/connectionmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use duration_string::DurationString;
use futures_util::future::{join_all, try_join_all};
use itertools::Itertools;
use kaspa_addressmanager::{AddressManager, NetAddress};
use kaspa_addressmanager::{AddressManager, NetAddress, ExternalIpChangeSink};
use kaspa_core::{debug, info, warn};
use kaspa_p2p_lib::{common::ProtocolError, ConnectionError, Peer};
use kaspa_utils::triggers::SingleTrigger;
Expand Down Expand Up @@ -93,6 +93,46 @@ impl ConnectionManager {
});
}

/// Synchronously trigger a staggered outbound reconnect (terminates peers one by one with 30s delays)
pub fn trigger_outbound_reconnect(&self) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary? When your external IP changes, wouldn't a disconnection already have happened (you disconnect from all your peers) and when you try to reconnect to your peers they would then know your new IP?

let outbound_peers: Vec<_> = self.p2p_adaptor.active_peers()
.into_iter()
.filter(|p| p.is_outbound())
.collect();

if outbound_peers.is_empty() {
info!("No outbound peers to reconnect");
return;
}

let peer_count = outbound_peers.len();
info!("Starting staggered outbound reconnect: {} peers will be renewed with 30s delays", peer_count);

// Spawn async task for staggered renewal
let p2p_adaptor = self.p2p_adaptor.clone();
let force_sender = self.force_next_iteration.clone();

tokio::spawn(async move {
for (i, peer) in outbound_peers.into_iter().enumerate() {
// Terminate peer
p2p_adaptor.terminate(peer.key()).await;
info!("Terminated outbound peer {} ({}/{})", peer.net_address(), i+1, peer_count);

// Trigger reconnection (except for the last peer)
if i < peer_count - 1 {
force_sender.send(()).unwrap();

// Wait 30 seconds
tokio::time::sleep(Duration::from_secs(30)).await;
}
}

// Final trigger for the last renewal
force_sender.send(()).unwrap();
info!("Staggered outbound reconnect completed");
});
}

async fn handle_event(self: Arc<Self>) {
debug!("Starting connection loop iteration");
let peers = self.p2p_adaptor.active_peers();
Expand Down Expand Up @@ -338,3 +378,9 @@ impl ConnectionManager {
self.connection_requests.lock().await.iter().any(|(address, request)| request.is_permanent && address.ip() == ip)
}
}

impl ExternalIpChangeSink for ConnectionManager {
fn on_external_ip_changed(&self, _new_ip: std::net::IpAddr, _old_ip: Option<std::net::IpAddr>) {
self.trigger_outbound_reconnect();
}
}
4 changes: 4 additions & 0 deletions consensus/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ pub struct Config {

pub disable_upnp: bool,

/// Disable IPv6 during automatic local address discovery (explicit IPv6 in config is still honored)
pub disable_ipv6_interface_discovery: bool,

/// A scale factor to apply to memory allocation bounds
pub ram_scale: f64,

Expand Down Expand Up @@ -97,6 +100,7 @@ impl Config {
#[cfg(feature = "devnet-prealloc")]
initial_utxo_set: Default::default(),
disable_upnp: false,
disable_ipv6_interface_discovery: false,
ram_scale: 1.0,
retention_period_days: None,
}
Expand Down
5 changes: 5 additions & 0 deletions kaspad/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub struct Args {
pub prealloc_amount: u64,

pub disable_upnp: bool,
pub disable_ipv6_interface_discovery: bool,
#[serde(rename = "nodnsseed")]
pub disable_dns_seeding: bool,
#[serde(rename = "nogrpc")]
Expand Down Expand Up @@ -138,6 +139,7 @@ impl Default for Args {
prealloc_amount: 10_000_000_000,

disable_upnp: false,
disable_ipv6_interface_discovery: false,
disable_dns_seeding: false,
disable_grpc: false,
ram_scale: 1.0,
Expand All @@ -150,6 +152,7 @@ impl Args {
pub fn apply_to_config(&self, config: &mut Config) {
config.utxoindex = self.utxoindex;
config.disable_upnp = self.disable_upnp;
config.disable_ipv6_interface_discovery = self.disable_ipv6_interface_discovery;
config.unsafe_rpc = self.unsafe_rpc;
config.enable_unsynced_mining = self.enable_unsynced_mining;
config.enable_mainnet_mining = self.enable_mainnet_mining;
Expand Down Expand Up @@ -362,6 +365,7 @@ Setting to 0 prevents the preallocation and sets the maximum to {}, leading to 0
.help("Interval in seconds for performance metrics collection."),
)
.arg(arg!(--"disable-upnp" "Disable upnp"))
.arg(arg!(--"disable-ipv6-interface-discovery" "Disable IPv6 during automatic local address discovery (explicit IPv6 in config is still honored)"))
.arg(arg!(--"nodnsseed" "Disable DNS seeding for peers"))
.arg(arg!(--"nogrpc" "Disable gRPC server"))
.arg(
Expand Down Expand Up @@ -455,6 +459,7 @@ impl Args {
// Note: currently used programmatically by benchmarks and not exposed to CLI users
block_template_cache_lifetime: defaults.block_template_cache_lifetime,
disable_upnp: arg_match_unwrap_or::<bool>(&m, "disable-upnp", defaults.disable_upnp),
disable_ipv6_interface_discovery: arg_match_unwrap_or::<bool>(&m, "disable-ipv6-interface-discovery", defaults.disable_ipv6_interface_discovery),
disable_dns_seeding: arg_match_unwrap_or::<bool>(&m, "nodnsseed", defaults.disable_dns_seeding),
disable_grpc: arg_match_unwrap_or::<bool>(&m, "nogrpc", defaults.disable_grpc),
ram_scale: arg_match_unwrap_or::<f64>(&m, "ram-scale", defaults.ram_scale),
Expand Down
5 changes: 3 additions & 2 deletions kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
notify_service.notifier(),
index_service.as_ref().map(|x| x.notifier()),
mining_manager,
flow_context,
flow_context.clone(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this clone necessary?

subscription_context,
index_service.as_ref().map(|x| x.utxoindex().unwrap()),
config.clone(),
Expand Down Expand Up @@ -646,13 +646,14 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
if let Some(index_service) = index_service {
async_runtime.register(index_service)
};

if let Some(port_mapping_extender_svc) = port_mapping_extender_svc {
async_runtime.register(Arc::new(port_mapping_extender_svc))
};
async_runtime.register(rpc_core_service.clone());
if let Some(grpc_service) = grpc_service {
async_runtime.register(grpc_service)
}
};
async_runtime.register(p2p_service);
async_runtime.register(consensus_monitor);
async_runtime.register(mining_monitor);
Expand Down
6 changes: 6 additions & 0 deletions protocol/flows/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ impl AsyncService for P2pService {
self.flow_context.address_manager.clone(),
);

// Register as sink for external IP changes
self.flow_context
.address_manager
.lock()
.register_external_ip_change_sink(connection_manager.clone());

self.flow_context.set_connection_manager(connection_manager.clone());
self.flow_context.start_async_services();

Expand Down
Loading