Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions packages/ns-ha/files/ns-ha-enable
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,56 @@ if not logger.handlers:
logger.addHandler(handler)
logger.setLevel(logging.INFO)

def find_device_from_config(interface):
# Read device from UCI config because sometimes ubus network dump is not updated yet
u = EUci()
try:
section = u.get_all('network', interface)
except:
return None
device = section.get('device', '')
if device.startswith('@'):
try:
parent = u.get_all('network', device[1:])
except:
return None
return parent.get('device')
return device

def is_device_up(device):
# Use JSON output to reliably inspect link state
proc = subprocess.run(["/sbin/ip", "-j", "link", "show", "dev", device], capture_output=True, text=True)
try:
link_json = json.loads(proc.stdout) if proc.stdout else []
except json.JSONDecodeError:
link_json = []

if isinstance(link_json, list) and link_json:
li = link_json[0]
if li.get('operstate') == 'UP' or 'UP' in li.get('flags', []):
return True
return False

def get_device_ips(device):
ipv4 = []
ipv6 = []
# Use JSON output to reliably inspect addresses
addr_proc = subprocess.run(["/sbin/ip", "-j", "addr", "show", "dev", device], capture_output=True, text=True)
try:
addrs = json.loads(addr_proc.stdout) if addr_proc.stdout else []
except json.JSONDecodeError:
addrs = []

for ent in addrs:
for info in ent.get('addr_info', []):
family = info.get('family')
local = info.get('local', '')
if family == 'inet':
ipv4.append(local)
elif family == 'inet6':
ipv6.append(local)
return ipv4, ipv6

def enable_interfaces(file):
u = EUci()
with open(os.path.join(out_dir, file), 'r') as f:
Expand All @@ -39,6 +89,55 @@ def enable_interfaces(file):
# Return code of ifup is not reliable, so we do not check it nor log it
logger.info("Bringing up interface %s", interface)

def send_gratuitous_arp(file):
# Get the mapping interface -> device
proc = subprocess.run(["ubus", "-v", "call", "network.interface", "dump"], capture_output=True, text=True)
try:
network_dump = json.loads(proc.stdout)
except json.JSONDecodeError:
logger.error("Can't send gratuitous ARP: failed to decode JSON from network dump")
return
device_map = {}
for iface in network_dump.get('interface', []):
if 'device' in iface and 'interface' in iface:
device_map[iface['interface']] = iface['device']
# Load the file with the interfaces to send gratuitous ARP for
with open(os.path.join(out_dir, file), 'r') as f:
interfaces = json.load(f)
for interface in interfaces:
device = device_map.get(interface, find_device_from_config(interface))
if not device:
logger.error("Can't send gratuitous ARP: no device found for interface %s", interface)
continue
# Check if device is up
max_attempts = 10
ready = False
# First check if device is up
for _ in range(max_attempts):
if is_device_up(device):
ready = True
break
time.sleep(0.5)
if not ready:
logger.error("Can't send gratuitous ARP: device %s for interface %s is down", device, interface)
continue
# Check if device has IP address
for _ in range(max_attempts):
# Ignore IPv6 which has a different ARP tools (ndisc6)
ipv4, _ = get_device_ips(device)
if len(ipv4) > 0:
for ip in ipv4:
# Send gratuitous ARP to update switches ARP tables
# Wait for the device to be up and to have the IP address (timeout ~10s)
proca = subprocess.run(["/usr/bin/arping", "-U", "-I", device, "-c", "1", ip], capture_output=True, text=True)
if proca.returncode == 0:
logger.info("Sent gratuitous ARP for IP %s on interface %s: success", ip, interface)
else:
logger.info("Sent gratuitous ARP for IP %s on interface %s: fail, %s", ip, interface, proca.stderr.strip())
return
time.sleep(0.5)


def enable_hotspot_mac():
u = EUci()
devices = utils.get_all_by_type(u, 'network', 'device')
Expand All @@ -60,4 +159,5 @@ if __name__ == "__main__":
enable_interfaces('wg_interfaces')
enable_interfaces('ipsec_interfaces')
enable_hotspot_mac()
send_gratuitous_arp('wan_interfaces')
subprocess.run(["/sbin/reload_config"], capture_output=True)
Loading