Skip to content
Open
116 changes: 116 additions & 0 deletions components/addressmanager/src/dyndns_extender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use std::{net::IpAddr, sync::{Arc}, time::Duration};
use parking_lot::Mutex;
use kaspa_core::{info, debug, warn, trace, task::{tick::{TickService, TickReason}, service::{AsyncService, AsyncServiceFuture}}};
use crate::{NetAddress, AddressManager};
use kaspa_utils::networking::IpAddress;
use kaspa_consensus_core::config::{Config, IpVersionMode};
use std::net::ToSocketAddrs;

pub const SERVICE_NAME: &str = "dyndns-extender";

// Simplistic resolver trait to allow mocking in tests later
trait DynResolver: Send + Sync {
fn resolve(&self, host: &str) -> std::io::Result<Vec<IpAddr>>;
}

struct DefaultResolver;
impl DynResolver for DefaultResolver {
fn resolve(&self, host: &str) -> std::io::Result<Vec<IpAddr>> { Ok((host, 0).to_socket_addrs()?.map(|sa| sa.ip()).collect()) }
}

pub struct DynDnsExtender {
tick_service: Arc<TickService>,
address_manager: Arc<Mutex<AddressManager>>,
host: String,
min_refresh: Duration,
max_refresh: Duration,
ip_mode: IpVersionMode,
resolver: Box<dyn DynResolver>,
last_ip: Arc<Mutex<Option<IpAddr>>>,
}

impl DynDnsExtender {
pub fn new(config: Arc<Config>, am: Arc<Mutex<AddressManager>>, tick_service: Arc<TickService>, initial_external_ip: Option<IpAddr>) -> Option<Self> {
let host = config.external_dyndns_host.clone()?; // only build if host provided

let instance = Self {
tick_service,
address_manager: am.clone(),
min_refresh: Duration::from_secs(config.external_dyndns_min_refresh_sec),
max_refresh: Duration::from_secs(config.external_dyndns_max_refresh_sec),
ip_mode: config.external_dyndns_ip_version,
host,
resolver: Box::new(DefaultResolver),
last_ip: Arc::new(Mutex::new(initial_external_ip)),
};

Some(instance)
}

fn pick_ip(&self, mut ips: Vec<IpAddr>) -> Option<IpAddr> {
// filter publics
ips.retain(|ip| IpAddress::new(*ip).is_publicly_routable());
if ips.is_empty() { return None; }
match self.ip_mode {
IpVersionMode::Ipv4 => ips.into_iter().find(|ip| matches!(ip, IpAddr::V4(_))),
IpVersionMode::Ipv6 => ips.into_iter().find(|ip| matches!(ip, IpAddr::V6(_))),
IpVersionMode::Auto => {
if let Some(v4) = ips.iter().cloned().find(|ip| matches!(ip, IpAddr::V4(_))) { return Some(v4); }
ips.into_iter().next()
}
}
}

async fn worker(&self) {
info!("[DynDNS] Starting dyndns resolver for host {}", self.host);
let mut interval = self.min_refresh; // adaptive later
loop {
match self.tick_service.tick(interval).await {
TickReason::Shutdown => break,
TickReason::Wakeup => {}
}
match self.resolver.resolve(&self.host) {
Ok(ips) => {
debug!("[DynDNS] Resolved {} -> {:?}", self.host, ips);
let picked = self.pick_ip(ips);
if let Some(new_ip) = picked {
let mut last_guard = self.last_ip.lock();
if Some(new_ip) != *last_guard {
let old_ip = *last_guard;
*last_guard = Some(new_ip);
drop(last_guard);
self.apply_new_ip(new_ip, old_ip);
}
interval = self.min_refresh; // reset
} else {
warn!("[DynDNS] No public IP obtained for {}", self.host);
interval = std::cmp::min(interval * 2, self.max_refresh);
}
}
Err(e) => {
warn!("[DynDNS] Resolve failed for {}: {e}", self.host);
interval = std::cmp::min(interval * 2, self.max_refresh);
}
}
}
trace!("{SERVICE_NAME} worker exiting");
}

fn apply_new_ip(&self, new_ip: IpAddr, old_ip: Option<IpAddr>) {
info!("[DynDNS] External IP changed {:?} -> {}", old_ip, new_ip);
let mut am = self.address_manager.lock();
let port = am.best_local_address().map(|a| a.port).unwrap_or_else(|| am.config.default_p2p_port());
let net = NetAddress::new(new_ip.into(), port);
am.set_best_local_address(net);
let sinks = am.clone_external_ip_change_sinks();
drop(am);
for sink in sinks { let s = sink.clone(); tokio::spawn(async move { s.on_external_ip_changed(new_ip, old_ip); }); }
}
}

impl AsyncService for DynDnsExtender {
fn ident(self: Arc<Self>) -> &'static str { SERVICE_NAME }
fn start(self: Arc<Self>) -> AsyncServiceFuture { Box::pin(async move { self.worker().await; Ok(()) }) }
fn signal_exit(self: Arc<Self>) { trace!("sending an exit signal to {}", SERVICE_NAME); }
fn stop(self: Arc<Self>) -> AsyncServiceFuture { Box::pin(async move { trace!("{} stopped", SERVICE_NAME); Ok(()) }) }
}
141 changes: 123 additions & 18 deletions components/addressmanager/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
mod dyndns_extender;
mod port_mapping_extender;
mod stores;
extern crate self as address_manager;

use std::{collections::HashSet, iter, net::SocketAddr, sync::Arc, time::Duration};

use address_manager::port_mapping_extender::Extender;
use dyndns_extender::DynDnsExtender;
use igd_next::{
self as igd, aio::tokio::Tokio, AddAnyPortError, AddPortError, Gateway, GetExternalIpError, GetGenericPortMappingEntryError,
SearchError,
Expand All @@ -13,7 +15,7 @@ use itertools::{
Either::{Left, Right},
Itertools,
};
use kaspa_consensus_core::config::Config;
use kaspa_consensus_core::config::{Config, IpVersionMode};
use kaspa_core::{debug, info, task::tick::TickService, time::unix_now, warn};
use kaspa_database::prelude::{CachePolicy, StoreResultExtensions, DB};
use kaspa_utils::networking::IpAddress;
Expand All @@ -24,6 +26,10 @@ use thiserror::Error;

pub use stores::NetAddress;

pub trait ExternalIpChangeSink: Send + Sync {
fn on_external_ip_changed(&self, new_ip: std::net::IpAddr, old_ip: Option<std::net::IpAddr>);
}

const MAX_ADDRESSES: usize = 4096;
const MAX_CONNECTION_FAILED_COUNT: u64 = 3;

Expand Down Expand Up @@ -56,35 +62,112 @@ pub struct AddressManager {
address_store: address_store_with_cache::Store,
config: Arc<Config>,
local_net_addresses: Vec<NetAddress>,
external_ip_change_sinks: Vec<Arc<dyn ExternalIpChangeSink>>,
}

impl AddressManager {
pub fn new(config: Arc<Config>, db: Arc<DB>, tick_service: Arc<TickService>) -> (Arc<Mutex<Self>>, Option<Extender>) {
let mut instance = Self {
pub fn new(
config: Arc<Config>,
db: Arc<DB>,
tick_service: Arc<TickService>,
) -> (Arc<Mutex<Self>>, Option<Extender>, Option<DynDnsExtender>) {
let instance = Self {
banned_address_store: DbBannedAddressesStore::new(db.clone(), CachePolicy::Count(MAX_ADDRESSES)),
address_store: address_store_with_cache::new(db),
local_net_addresses: Vec::new(),
external_ip_change_sinks: Vec::new(),
config,
};

let extender = instance.init_local_addresses(tick_service);
let am = Arc::new(Mutex::new(instance));

let extender = Self::init_local_addresses(&am, tick_service.clone());
let dyndns_extender = if extender.is_none() {
Self::try_initial_dyndns_resolve(&am, tick_service.clone())
} else {
debug!("[AddrMan] UPnP extender active; DynDNS skipped");
None
};
// Note: initial DynDNS seed handled above when applicable
debug!(
"[AddrMan] Exit AddressManager::new (upnp_extender={} dyndns_extender={})",
extender.is_some(),
dyndns_extender.is_some()
);
(am, extender, dyndns_extender)
}

fn try_initial_dyndns_resolve(am: &Arc<Mutex<Self>>, tick_service: Arc<TickService>) -> Option<DynDnsExtender> {
// Only attempt if a DynDNS host is configured
let host_opt = am.lock().config.external_dyndns_host.clone();
let Some(host) = host_opt else { return None };
let ip_mode = am.lock().config.external_dyndns_ip_version;
let mut initial_ip: Option<std::net::IpAddr> = None;

debug!("[AddrMan] Performing minimal initial DynDNS resolve for host {}", host);
// DNS resolution outside lock
let result: std::io::Result<Vec<std::net::IpAddr>> = (|| {
use std::net::ToSocketAddrs;
(host.as_str(), 0).to_socket_addrs().map(|it| it.map(|sa| sa.ip()).collect())
})();
match result {
Ok(mut ips) => {
ips.retain(|ip| IpAddress::new(*ip).is_publicly_routable());
let selected = match ip_mode {
IpVersionMode::Ipv4 => ips.iter().cloned().find(|ip| matches!(ip, std::net::IpAddr::V4(_))),
IpVersionMode::Ipv6 => ips.iter().cloned().find(|ip| matches!(ip, std::net::IpAddr::V6(_))),
IpVersionMode::Auto => {
if let Some(v4) = ips.iter().cloned().find(|ip| matches!(ip, std::net::IpAddr::V4(_))) {
Some(v4)
} else {
ips.get(0).cloned()
}
}
};
if let Some(ip) = selected {
initial_ip = Some(ip);
let mut guard = am.lock();
let port = guard.config.default_p2p_port();
guard.set_best_local_address(NetAddress::new(ip.into(), port));
debug!("[AddrMan] Initial DynDNS external IP set to {}:{}", ip, port);
} else {
debug!("[AddrMan] Initial DynDNS resolve returned no public addresses");
}
}
Err(e) => debug!("[AddrMan] Initial DynDNS resolve failed: {e}"),
}

let extender = DynDnsExtender::new(am.lock().config.clone(), am.clone(), tick_service, initial_ip);
if extender.is_some() {
debug!("[AddrMan] DynDnsExtender constructed");
} else {
debug!("[AddrMan] DynDnsExtender NOT constructed (unexpected None)");
}
extender
}

pub fn register_external_ip_change_sink(&mut self, sink: Arc<dyn ExternalIpChangeSink>) {
self.external_ip_change_sinks.push(sink);
}

(Arc::new(Mutex::new(instance)), extender)
pub fn clone_external_ip_change_sinks(&self) -> Vec<Arc<dyn ExternalIpChangeSink>> {
self.external_ip_change_sinks.clone()
}

fn init_local_addresses(&mut self, tick_service: Arc<TickService>) -> Option<Extender> {
self.local_net_addresses = self.local_addresses().collect();
fn init_local_addresses(this: &Arc<Mutex<Self>>, tick_service: Arc<TickService>) -> Option<Extender> {
let mut me = this.lock();
me.local_net_addresses = me.local_addresses().collect();

let extender = if self.local_net_addresses.is_empty() && !self.config.disable_upnp {
let (net_address, ExtendHelper { gateway, local_addr, external_port }) = match self.upnp() {
let extender = if me.local_net_addresses.is_empty() && !me.config.disable_upnp {
let (net_address, ExtendHelper { gateway, local_addr, external_port }) = match me.upnp() {
Err(err) => {
warn!("[UPnP] Error adding port mapping: {err}");
return None;
}
Ok(None) => return None,
Ok(Some((net_address, extend_helper))) => (net_address, extend_helper),
};
self.local_net_addresses.push(net_address);
me.local_net_addresses.push(net_address);

let gateway: igd_next::aio::Gateway<Tokio> = igd_next::aio::Gateway {
addr: gateway.addr,
Expand All @@ -101,12 +184,14 @@ impl AddressManager {
gateway,
external_port,
local_addr,
Arc::clone(this),
Some(net_address.ip.into()),
))
} else {
None
};

self.local_net_addresses.iter().for_each(|net_addr| {
me.local_net_addresses.iter().for_each(|net_addr| {
info!("Publicly routable local address {} added to store", net_addr);
});
extender
Expand Down Expand Up @@ -142,12 +227,24 @@ impl AddressManager {
return Left(Right(iter::empty()));
};
// TODO: Add Check IPv4 or IPv6 match from Go code
Right(network_interfaces.into_iter().map(|(_, ip)| IpAddress::from(ip)).filter(|&ip| ip.is_publicly_routable()).map(
|ip| {
info!("Publicly routable local address found: {}", ip);
NetAddress::new(ip, self.config.default_p2p_port())
},
))
Right(
network_interfaces
.into_iter()
.map(|(_, ip)| IpAddress::from(ip))
.filter(|ip| {
if self.config.disable_ipv6_interface_discovery {
// Skip IPv6 during automatic discovery if the flag is set
!matches!(**ip, std::net::IpAddr::V6(_))
} else {
true
}
})
.filter(|&ip| ip.is_publicly_routable())
.map(|ip| {
info!("Publicly routable local address found: {}", ip);
NetAddress::new(ip, self.config.default_p2p_port())
}),
)
} else {
Left(Right(iter::empty()))
}
Expand Down Expand Up @@ -251,6 +348,14 @@ impl AddressManager {
}
}

pub fn set_best_local_address(&mut self, address: NetAddress) {
if self.local_net_addresses.is_empty() {
self.local_net_addresses.push(address);
} else {
self.local_net_addresses[0] = address;
}
}

pub fn add_address(&mut self, address: NetAddress) {
if address.ip.is_loopback() || address.ip.is_unspecified() {
debug!("[Address manager] skipping local address {}", address.ip);
Expand Down Expand Up @@ -548,7 +653,7 @@ mod address_store_with_cache {

let db = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let config = Config::new(SIMNET_PARAMS);
let (am, _) = AddressManager::new(Arc::new(config), db.1, Arc::new(TickService::default()));
let (am, _, _) = AddressManager::new(Arc::new(config), db.1, Arc::new(TickService::default()));

let mut am_guard = am.lock();

Expand Down
Loading
Loading