33"""
44import ipaddress
55import netifaces
6- from scapy .all import arping , ARP , get_if_addr # pylint: disable=E0611
6+ from scapy .all import ARP , get_if_addr , srp , Ether # pylint: disable=E0611
77from PySide6 .QtWidgets import ( # pylint: disable=E0611
88 QMainWindow ,
99 QVBoxLayout ,
1010 QLabel ,
1111 QWidget ,
1212 QDialog ,
13- QListWidgetItem
13+ QListWidgetItem ,
14+ QProgressBar
1415)
1516from PySide6 .QtGui import QIcon , QFont , QColor # pylint: disable=E0611
1617from PySide6 .QtCore import Slot , Qt , QTimer # pylint: disable=E0611
@@ -54,17 +55,25 @@ def __init__(self, ip_address, mac_address, hostname, device_vendor):
5455
5556class DeviceDiscoveryDialog (QDialog ): # pylint: disable=too-many-instance-attributes
5657 """Device Discovery"""
57- def __init__ (self , interface , oui_url , parent = None ):
58+ def __init__ (self , interface , oui_url , timeout = 1000 , parent = None ):
5859 super ().__init__ (parent )
5960 self .interface = interface
6061 self .mac_vendor_lookup = vendor .MacVendorLookup (oui_url )
62+ self .timeout = timeout
6163
6264 self ._ui = Ui_DeviceDiscovery ()
6365 self ._ui .setupUi (self )
6466
6567 # Initialize the UI and connection setup
6668 self .setup_ui_elements ()
6769
70+ # Add a progress bar to the UI
71+ self .progress_bar = QProgressBar (self )
72+ self .progress_bar .setRange (0 , 100 )
73+ self .progress_bar .setValue (0 )
74+ # Add it to the vertical layout (or any layout of your choice)
75+ self ._ui .verticalLayout .addWidget (self .progress_bar )
76+
6877 # Initialize scanner and device info storage
6978 self .scanner_timer = None
7079 self .device_info = {} # Store dynamic device info here
@@ -118,13 +127,23 @@ def add_static_ui_labels(self):
118127 self ._ui .verticalLayout .addWidget (default_gateway_label )
119128 self ._ui .verticalLayout .addWidget (local_mac_label )
120129
130+ # Add timeout information
131+ timeout_label = QLabel (f"Scan Timeout: { self .timeout } ms" )
132+ timeout_label .setStyleSheet ("color: black" )
133+ self ._ui .verticalLayout .addWidget (timeout_label )
134+
121135 def setup_font_for_list_widgets (self ):
122136 """Sets up a uniform font for list widgets."""
123137 font = QFont ()
124138 font .setPointSize (12 )
125139 self ._ui .devices .setFont (font )
126140 self ._ui .responses .setFont (font )
127141
142+ @Slot (int )
143+ def update_progress (self , progress ):
144+ """Update progress"""
145+ self .progress_bar .setValue (progress )
146+
128147 @Slot (QListWidgetItem )
129148 def open_device_details (self , item ):
130149 """Click on a device opens another window with details."""
@@ -179,15 +198,21 @@ def start_scan(self):
179198 return
180199
181200 # Create and start a new ARP scan thread
182- self .arp_scanner_thread = ARPScannerThread (self .interface , self .mac_vendor_lookup )
201+ self .arp_scanner_thread = ARPScannerThread (
202+ self .interface ,
203+ self .mac_vendor_lookup ,
204+ self .timeout / 1000
205+ )
183206 self .arp_scanner_thread .partialResults .connect (self .handle_partial_results )
184207 self .arp_scanner_thread .finished .connect (self .handle_scan_results )
208+ self .arp_scanner_thread .progressChanged .connect (self .update_progress ) # New connection
185209 self .arp_scanner_thread .start ()
186- print ("Started ARP scan. " )
187-
210+ print (f "Started ARP scan with timeout: { self . timeout } ms " )
211+
188212 @Slot (list )
189213 def handle_partial_results (self , partial_results ):
190- for ip_address , mac , hostname , device_vendor , packet in partial_results :
214+ """Update partials"""
215+ for ip_address , mac , hostname , device_vendor , packet in partial_results : # pylint: disable=unused-variable
191216 self .add_device_to_list (ip_address , mac , hostname , device_vendor )
192217
193218 @Slot (list )
@@ -229,9 +254,11 @@ def quit_application(self):
229254 self .arp_scanner_thread .wait ()
230255 self .close ()
231256
232- class ARPScannerThread (QThread ):
257+ class ARPScannerThread (QThread ): # pylint: disable=too-few-public-methods
258+ """ARP scanner"""
233259 finished = Signal (list ) # Final results
234260 partialResults = Signal (list ) # Intermediate results
261+ progressChanged = Signal (int ) # New signal for progress (0-100%)
235262
236263 def __init__ (self , interface , mac_vendor_lookup , timeout = 1 ):
237264 super ().__init__ ()
@@ -247,10 +274,10 @@ def _scan_ip_native(self, src_ip, target_ip):
247274 self .interface ,
248275 str (src_ip ),
249276 str (target_ip ),
250- int (self .timeout * 300 ) # 300ms timeout per scan
277+ int (self .timeout * 1000 ) # Convert to ms
251278 )
252279 return target_ip , result
253- except Exception as e :
280+ except Exception as e : # pylint: disable=broad-exception-caught
254281 print (f"Error scanning { target_ip } : { e } " )
255282 return target_ip , None
256283
@@ -262,6 +289,7 @@ def _create_arp_response(self, ip_addr, mac):
262289 })()
263290
264291 def run (self ):
292+ """Run the ARP scan thread."""
265293 src_ip = get_if_addr (self .interface )
266294 try :
267295 netmask = netifaces .ifaddresses (self .interface )[netifaces .AF_INET ][0 ]['netmask' ]
@@ -270,38 +298,61 @@ def run(self):
270298 self .finished .emit ([])
271299 return
272300
273- arp_results = []
301+ network = ipaddress .IPv4Network (network_cidr )
302+ arp_results = self ._scan_network (src_ip , network )
303+ self .finished .emit (arp_results )
304+
305+ def _scan_network (self , src_ip , network ):
306+ """Scan the given network and return ARP results."""
307+ hosts = [str (ip ) for ip in network .hosts () if str (ip ) != src_ip ]
274308 if self .use_native :
275309 print ("Using native ARP scanner" )
276- network = ipaddress .IPv4Network (network_cidr )
277- count = 0
278- for ip in network .hosts ():
279- if str (ip ) == src_ip :
280- continue # Skip scanning our own IP
281- target_ip , result = self ._scan_ip_native (src_ip , str (ip ))
282- if result :
283- device_vendor = self .mac_vendor_lookup .lookup_vendor (result ['mac' ])
284- hostname = net .get_hostname (target_ip )
285- arp_response = self ._create_arp_response (target_ip , result ['mac' ])
286- arp_results .append ((target_ip , result ['mac' ], hostname , device_vendor , arp_response ))
287- count += 1
288- # Every 10 IPs (or any chosen interval), emit partial results
289- if count % 10 == 0 :
290- self .partialResults .emit (arp_results )
291- self .finished .emit (arp_results )
292- else :
293- print ("Using Scapy ARP scanner" )
310+ return self ._run_native_scan (src_ip , hosts )
311+ print ("Using Scapy ARP scanner with progress updates" )
312+ return self ._run_scapy_scan (hosts )
313+
314+ def _run_native_scan (self , src_ip , hosts ):
315+ """Perform native ARP scanning on a list of hosts."""
316+ arp_results = []
317+ total = len (hosts )
318+ for count , ip in enumerate (hosts , start = 1 ):
319+ target_ip , result = self ._scan_ip_native (src_ip , ip )
320+ if result :
321+ device_vendor = self .mac_vendor_lookup .lookup_vendor (result ['mac' ])
322+ hostname = net .get_hostname (target_ip )
323+ arp_response = self ._create_arp_response (target_ip , result ['mac' ])
324+ arp_results .append (target_ip ,
325+ result ['mac' ],
326+ hostname ,
327+ device_vendor ,
328+ arp_response
329+ )
330+ self ._update_progress (count , total , arp_results )
331+ return arp_results
332+
333+ def _run_scapy_scan (self , hosts ):
334+ """Perform Scapy ARP scanning on a list of hosts."""
335+ arp_results = []
336+ total = len (hosts )
337+ for count , ip in enumerate (hosts , start = 1 ):
294338 try :
295- arp_packets = arping (network_cidr , timeout = self .timeout , verbose = 1 )[0 ]
296- except Exception as e :
297- print (f"Error during ARP scan: { e } " )
298- self .finished .emit ([])
299- return
300-
301- for packet in arp_packets :
302- ip_addr = packet [1 ][ARP ].psrc
303- mac = packet [1 ][ARP ].hwsrc
304- device_vendor = self .mac_vendor_lookup .lookup_vendor (mac )
305- hostname = net .get_hostname (ip_addr )
306- arp_results .append ((ip_addr , mac , hostname , device_vendor , packet [1 ][ARP ]))
307- self .finished .emit (arp_results )
339+ ans , _ = srp (Ether (dst = "ff:ff:ff:ff:ff:ff" ) / ARP (pdst = ip ),
340+ timeout = self .timeout , verbose = 0 )
341+ if ans :
342+ for _ , received in ans :
343+ ip_addr = received .psrc
344+ mac = received .hwsrc
345+ device_vendor = self .mac_vendor_lookup .lookup_vendor (mac )
346+ hostname = net .get_hostname (ip_addr )
347+ arp_results .append ((ip_addr , mac , hostname , device_vendor , received ))
348+ except Exception as e : # pylint: disable=broad-exception-caught
349+ print (f"Error scanning { ip } : { e } " )
350+ self ._update_progress (count , total , arp_results )
351+ return arp_results
352+
353+ def _update_progress (self , count , total , arp_results ):
354+ """Update progress and emit partial results every 10 hosts."""
355+ progress = int ((count / total ) * 100 )
356+ self .progressChanged .emit (progress )
357+ if count % 10 == 0 :
358+ self .partialResults .emit (arp_results )
0 commit comments