Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 78 additions & 11 deletions custom_components/network_scanner/sensor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@

import logging
import nmap
import re
import socket
from datetime import timedelta
from homeassistant.helpers.entity import Entity
from .const import DOMAIN
Expand All @@ -24,6 +27,32 @@ def __init__(self, hass, ip_range, mac_mapping):
self.nm = nmap.PortScanner()
_LOGGER.info("Network Scanner initialized")


# ---------------------- helpers (Option A) ----------------------
@staticmethod
def _short_label(name: str) -> str | None:
"""Return lowercase host label before first dot, cleaned, or None."""
if not name:
return None
short = name.strip().rstrip(".").split(".", 1)[0].lower()
short = re.sub(r"[^a-z0-9_-]", "", short)
return short or None

@staticmethod
def _fast_rdns(ip: str, timeout: float = 0.3) -> str | None:
"""Reverse-DNS with a very short timeout; returns cleaned short label or None."""
old_to = socket.getdefaulttimeout()
socket.setdefaulttimeout(timeout)
try:
host, _, _ = socket.gethostbyaddr(ip)
return NetworkScanner._short_label(host)
except Exception:
return None
finally:
# restore previous default timeout to avoid impacting HA internals
socket.setdefaulttimeout(old_to)


@property
def should_poll(self):
"""Return True as updates are needed via polling."""
Expand Down Expand Up @@ -70,20 +99,51 @@ def get_device_info_from_mac(self, mac_address):
return self.mac_mapping.get(mac_address.lower(), ("Unknown Device", "Unknown Device"))

def scan_network(self):
"""Scan the network and return device information."""
self.nm.scan(hosts=self.ip_range, arguments='-sn')
"""Scan the network and return device information.
Option A: fast discovery with -n, then resolve only active hosts with short timeout.
"""
# Fast discovery: no DNS (-n), TCP SYN pings on common ports, aggressive timing
# Adjust --min-rate to taste; keep it conservative for NAS CPUs
args = '-sn -n -T4'
try:
self.nm.scan(hosts=self.ip_range, arguments=args)
except Exception as e:
_LOGGER.error("nmap scan failed with args '%s': %s", args, e)
return []

devices = []

for host in self.nm.all_hosts():
_LOGGER.debug("Found Host: %s", host)
if 'mac' in self.nm[host]['addresses']:
_LOGGER.debug("Found Mac: %s", self.nm[host]['addresses'])
ip = self.nm[host]['addresses']['ipv4']
mac = self.nm[host]['addresses']['mac']
try:
addrs = self.nm[host].get('addresses', {})
if 'mac' not in addrs or 'ipv4' not in addrs:
continue

ip = addrs['ipv4']
mac = addrs['mac']

# Vendor (from nmap OUI db if available)
vendor = "Unknown"
if 'vendor' in self.nm[host] and mac in self.nm[host]['vendor']:
vendor = self.nm[host]['vendor'][mac]
hostname = self.nm[host].hostname()
vendor_map = self.nm[host].get('vendor', {})
if mac in vendor_map:
vendor = vendor_map[mac]

# Hostname from nmap result (no DNS done here due to -n)
raw_hostname = self.nm[host].hostname() or ""
if not raw_hostname:
# Sometimes present in 'hostnames' list
for h in self.nm[host].get('hostnames', []):
n = h.get('name')
if n:
raw_hostname = n
break

hostname = self._short_label(raw_hostname)

# If still missing, do a very fast reverse-DNS just for this active IP
if not hostname:
hostname = self._fast_rdns(ip, timeout=0.3)

device_name, device_type = self.get_device_info_from_mac(mac)
devices.append({
"ip": ip,
Expand All @@ -93,9 +153,16 @@ def scan_network(self):
"vendor": vendor,
"hostname": hostname
})
except Exception as e:
_LOGGER.debug("Error parsing host %s: %s", host, e)
continue

# Sort the devices by IP address
devices.sort(key=lambda x: [int(num) for num in x['ip'].split('.')])
try:
devices.sort(key=lambda x: [int(num) for num in x['ip'].split('.')])
except Exception:
pass

return devices

async def async_setup_entry(hass, config_entry, async_add_entities):
Expand Down