33"""
44import ipaddress
55import netifaces
6- from scapy .all import arping , ARP , get_if_addr , srp , Ether # pylint: disable=E0611
6+ from scapy .all import ARP , get_if_addr , srp , Ether # pylint: disable=E0611
77from PySide6 .QtWidgets import ( # pylint: disable=E0611
88 QMainWindow ,
99 QVBoxLayout ,
@@ -68,11 +68,11 @@ def __init__(self, interface, oui_url, timeout=1000, parent=None):
6868 self .setup_ui_elements ()
6969
7070 # Add a progress bar to the UI
71- self .progressBar = QProgressBar (self )
72- self .progressBar .setRange (0 , 100 )
73- self .progressBar .setValue (0 )
71+ self .progress_bar = QProgressBar (self )
72+ self .progress_bar .setRange (0 , 100 )
73+ self .progress_bar .setValue (0 )
7474 # Add it to the vertical layout (or any layout of your choice)
75- self ._ui .verticalLayout .addWidget (self .progressBar )
75+ self ._ui .verticalLayout .addWidget (self .progress_bar )
7676
7777 # Initialize scanner and device info storage
7878 self .scanner_timer = None
@@ -141,7 +141,8 @@ def setup_font_for_list_widgets(self):
141141
142142 @Slot (int )
143143 def update_progress (self , progress ):
144- self .progressBar .setValue (progress )
144+ """Update progress"""
145+ self .progress_bar .setValue (progress )
145146
146147 @Slot (QListWidgetItem )
147148 def open_device_details (self , item ):
@@ -190,13 +191,18 @@ def setup_scanner_timer(self):
190191
191192 @Slot ()
192193 def start_scan (self ):
194+ """Starts scanning the network."""
193195 # Check if there's already a running scan, and don't start another one
194196 if self .arp_scanner_thread is not None and self .arp_scanner_thread .isRunning ():
195197 print ("Scan is already in progress." )
196198 return
197199
198200 # Create and start a new ARP scan thread
199- self .arp_scanner_thread = ARPScannerThread (self .interface , self .mac_vendor_lookup , self .timeout / 1000 )
201+ self .arp_scanner_thread = ARPScannerThread (
202+ self .interface ,
203+ self .mac_vendor_lookup ,
204+ self .timeout / 1000
205+ )
200206 self .arp_scanner_thread .partialResults .connect (self .handle_partial_results )
201207 self .arp_scanner_thread .finished .connect (self .handle_scan_results )
202208 self .arp_scanner_thread .progressChanged .connect (self .update_progress ) # New connection
@@ -205,7 +211,8 @@ def start_scan(self):
205211
206212 @Slot (list )
207213 def handle_partial_results (self , partial_results ):
208- for ip_address , mac , hostname , device_vendor , packet in partial_results :
214+ """Update partials"""
215+ for ip_address , mac , hostname , device_vendor , packet in partial_results : # pylint: disable=unused-variable
209216 self .add_device_to_list (ip_address , mac , hostname , device_vendor )
210217
211218 @Slot (list )
@@ -247,7 +254,8 @@ def quit_application(self):
247254 self .arp_scanner_thread .wait ()
248255 self .close ()
249256
250- class ARPScannerThread (QThread ):
257+ class ARPScannerThread (QThread ): # pylint: disable=too-few-public-methods
258+ """ARP scanner"""
251259 finished = Signal (list ) # Final results
252260 partialResults = Signal (list ) # Intermediate results
253261 progressChanged = Signal (int ) # New signal for progress (0-100%)
@@ -269,7 +277,7 @@ def _scan_ip_native(self, src_ip, target_ip):
269277 int (self .timeout * 1000 ) # Convert to ms
270278 )
271279 return target_ip , result
272- except Exception as e :
280+ except Exception as e : # pylint: disable=broad-exception-caught
273281 print (f"Error scanning { target_ip } : { e } " )
274282 return target_ip , None
275283
@@ -281,7 +289,7 @@ def _create_arp_response(self, ip_addr, mac):
281289 })()
282290
283291 def run (self ):
284-
292+ """Run the ARP scan thread."""
285293 src_ip = get_if_addr (self .interface )
286294 try :
287295 netmask = netifaces .ifaddresses (self .interface )[netifaces .AF_INET ][0 ]['netmask' ]
@@ -290,45 +298,61 @@ def run(self):
290298 self .finished .emit ([])
291299 return
292300
293- arp_results = []
294301 network = ipaddress .IPv4Network (network_cidr )
302+ arp_results = self ._scan_network (src_ip , network )
303+ self .finished .emit (arp_results )
304+
305+ def _scan_network (self , src_ip , network ):
306+ """Scan the given network and return ARP results."""
307+ hosts = [str (ip ) for ip in network .hosts () if str (ip ) != src_ip ]
295308 if self .use_native :
296309 print ("Using native ARP scanner" )
297- hosts = [str (ip ) for ip in network .hosts () if str (ip ) != src_ip ]
298- total = len (hosts )
299- count = 0
300- for ip in hosts :
301- target_ip , result = self ._scan_ip_native (src_ip , ip )
302- if result :
303- device_vendor = self .mac_vendor_lookup .lookup_vendor (result ['mac' ])
304- hostname = net .get_hostname (target_ip )
305- arp_response = self ._create_arp_response (target_ip , result ['mac' ])
306- arp_results .append ((target_ip , result ['mac' ], hostname , device_vendor , arp_response ))
307- count += 1
308- self .progressChanged .emit (int ((count / total ) * 100 ))
309- if count % 10 == 0 :
310- self .partialResults .emit (arp_results )
311- self .finished .emit (arp_results )
312- else :
313- print ("Using Scapy ARP scanner with progress updates" )
314- hosts = [str (ip ) for ip in network .hosts () if str (ip ) != src_ip ]
315- total = len (hosts )
316- count = 0
317- for ip in hosts :
318- try :
319- ans , _ = srp (Ether (dst = "ff:ff:ff:ff:ff:ff" )/ ARP (pdst = ip ),
320- timeout = self .timeout , verbose = 0 )
321- if ans :
322- for sent , received in ans :
323- ip_addr = received .psrc
324- mac = received .hwsrc
325- device_vendor = self .mac_vendor_lookup .lookup_vendor (mac )
326- hostname = net .get_hostname (ip_addr )
327- arp_results .append ((ip_addr , mac , hostname , device_vendor , received ))
328- except Exception as e :
329- print (f"Error scanning { ip } : { e } " )
330- count += 1
331- self .progressChanged .emit (int ((count / total ) * 100 ))
332- if count % 10 == 0 :
333- self .partialResults .emit (arp_results )
334- self .finished .emit (arp_results )
310+ return self ._run_native_scan (src_ip , hosts )
311+ print ("Using Scapy ARP scanner with progress updates" )
312+ return self ._run_scapy_scan (hosts )
313+
314+ def _run_native_scan (self , src_ip , hosts ):
315+ """Perform native ARP scanning on a list of hosts."""
316+ arp_results = []
317+ total = len (hosts )
318+ for count , ip in enumerate (hosts , start = 1 ):
319+ target_ip , result = self ._scan_ip_native (src_ip , ip )
320+ if result :
321+ device_vendor = self .mac_vendor_lookup .lookup_vendor (result ['mac' ])
322+ hostname = net .get_hostname (target_ip )
323+ arp_response = self ._create_arp_response (target_ip , result ['mac' ])
324+ arp_results .append (target_ip ,
325+ result ['mac' ],
326+ hostname ,
327+ device_vendor ,
328+ arp_response
329+ )
330+ self ._update_progress (count , total , arp_results )
331+ return arp_results
332+
333+ def _run_scapy_scan (self , hosts ):
334+ """Perform Scapy ARP scanning on a list of hosts."""
335+ arp_results = []
336+ total = len (hosts )
337+ for count , ip in enumerate (hosts , start = 1 ):
338+ try :
339+ ans , _ = srp (Ether (dst = "ff:ff:ff:ff:ff:ff" ) / ARP (pdst = ip ),
340+ timeout = self .timeout , verbose = 0 )
341+ if ans :
342+ for _ , received in ans :
343+ ip_addr = received .psrc
344+ mac = received .hwsrc
345+ device_vendor = self .mac_vendor_lookup .lookup_vendor (mac )
346+ hostname = net .get_hostname (ip_addr )
347+ arp_results .append ((ip_addr , mac , hostname , device_vendor , received ))
348+ except Exception as e : # pylint: disable=broad-exception-caught
349+ print (f"Error scanning { ip } : { e } " )
350+ self ._update_progress (count , total , arp_results )
351+ return arp_results
352+
353+ def _update_progress (self , count , total , arp_results ):
354+ """Update progress and emit partial results every 10 hosts."""
355+ progress = int ((count / total ) * 100 )
356+ self .progressChanged .emit (progress )
357+ if count % 10 == 0 :
358+ self .partialResults .emit (arp_results )
0 commit comments