33# See https://scapy.net/ for more information
44# Copyright (C) Nils Weiss <[email protected] > 55# Copyright (C) Alexander Schroeder <[email protected] > 6- import itertools
7- import json
6+
87# scapy.contrib.description = ISO-TP (ISO 15765-2) Scanner Utility
98# scapy.contrib.status = library
9+
10+ import itertools
11+ import json
1012import logging
1113import time
12-
1314from threading import Event
14-
15- from scapy .packet import Packet
16- from scapy .compat import orb
17- from scapy .layers .can import CAN
18- from scapy .supersocket import SuperSocket
19- from scapy .contrib .cansocket import PYTHON_CAN
20- from scapy .contrib .isotp .isotp_packet import ISOTPHeader , ISOTPHeaderEA , \
21- ISOTP_FF , ISOTP
22-
2315# Typing imports
2416from typing import (
2517 Any ,
2820 List ,
2921 Optional ,
3022 Tuple ,
31- Union ,
23+ Union , Type ,
3224)
3325
26+ from scapy .compat import orb
27+ from scapy .contrib .cansocket import PYTHON_CAN
28+ from scapy .contrib .isotp import ISOTPHeader_FD
29+ from scapy .contrib .isotp .isotp_packet import ISOTPHeader , ISOTPHeaderEA , \
30+ ISOTP_FF , ISOTP , ISOTPHeaderEA_FD
31+ from scapy .layers .can import CAN , CANFD
32+ from scapy .packet import Packet
33+ from scapy .supersocket import SuperSocket
34+
3435log_isotp = logging .getLogger ("scapy.contrib.isotp" )
3536
3637
@@ -55,22 +56,29 @@ def send_multiple_ext(sock, ext_id, packet, number_of_packets):
5556 sock .send (packet )
5657
5758
58- def get_isotp_packet (identifier = 0x0 , extended = False , extended_can_id = False ):
59- # type: (int, bool, bool) -> Packet
59+ def get_isotp_packet (identifier = 0x0 , extended = False , extended_can_id = False , fd = False ):
60+ # type: (int, bool, bool, bool ) -> Packet
6061 """Craft ISO-TP packet
6162
6263 :param identifier: identifier of crafted packet
6364 :param extended: boolean if packet uses extended address
6465 :param extended_can_id: boolean if CAN should use extended Ids
66+ :param fd: boolean if CANFD packets should be used
6567 :return: Crafted Packet
6668 """
6769
6870 if extended :
69- pkt = ISOTPHeaderEA () / ISOTP_FF () # type: Packet
71+ if fd :
72+ pkt = ISOTPHeaderEA_FD () / ISOTP_FF () # type: Packet
73+ else :
74+ pkt = ISOTPHeaderEA () / ISOTP_FF ()
7075 pkt .extended_address = 0
7176 pkt .data = b'\x00 \x00 \x00 \x00 \x00 '
7277 else :
73- pkt = ISOTPHeader () / ISOTP_FF ()
78+ if fd :
79+ pkt = ISOTPHeader_FD () / ISOTP_FF ()
80+ else :
81+ pkt = ISOTPHeader () / ISOTP_FF ()
7482 pkt .data = b'\x00 \x00 \x00 \x00 \x00 \x00 '
7583 if extended_can_id :
7684 pkt .flags = "extended"
@@ -170,7 +178,8 @@ def scan(sock, # type: SuperSocket
170178 sniff_time = 0.1 , # type: float
171179 extended_can_id = False , # type: bool
172180 verify_results = True , # type: bool
173- stop_event = None # type: Optional[Event]
181+ stop_event = None , # type: Optional[Event]
182+ fd = False # type: bool
174183 ): # type: (...) -> Dict[int, Tuple[Packet, int]]
175184 """Scan and return dictionary of detections
176185
@@ -187,6 +196,7 @@ def scan(sock, # type: SuperSocket
187196 :param verify_results: Verify scan results. This will cause a second scan
188197 of all possible candidates for ISOTP Sockets
189198 :param stop_event: Event object to asynchronously stop the scan
199+ :param fd: Use CANFD packets for scan
190200 :return: Dictionary with all found packets
191201 """
192202 return_values = dict () # type: Dict[int, Tuple[Packet, int]]
@@ -195,7 +205,7 @@ def scan(sock, # type: SuperSocket
195205 break
196206 if noise_ids and value in noise_ids :
197207 continue
198- sock .send (get_isotp_packet (value , False , extended_can_id ))
208+ sock .send (get_isotp_packet (value , False , extended_can_id , fd ))
199209 sock .sniff (prn = lambda pkt : get_isotp_fc (value , return_values ,
200210 noise_ids , False , pkt ),
201211 timeout = sniff_time , store = False )
@@ -210,7 +220,7 @@ def scan(sock, # type: SuperSocket
210220 for value in retest_ids :
211221 if stop_event is not None and stop_event .is_set ():
212222 break
213- sock .send (get_isotp_packet (value , False , extended_can_id ))
223+ sock .send (get_isotp_packet (value , False , extended_can_id , fd ))
214224 sock .sniff (prn = lambda pkt : get_isotp_fc (value , cleaned_ret_val ,
215225 noise_ids , False , pkt ),
216226 timeout = sniff_time * 10 , store = False )
@@ -225,7 +235,8 @@ def scan_extended(sock, # type: SuperSocket
225235 noise_ids = None , # type: Optional[List[int]]
226236 sniff_time = 0.1 , # type: float
227237 extended_can_id = False , # type: bool
228- stop_event = None # type: Optional[Event]
238+ stop_event = None , # type: Optional[Event]
239+ fd = False # type: bool
229240 ): # type: (...) -> Dict[int, Tuple[Packet, int]]
230241 """Scan with ISOTP extended addresses and return dictionary of detections
231242
@@ -243,6 +254,7 @@ def scan_extended(sock, # type: SuperSocket
243254 after sending a first frame
244255 :param extended_can_id: Send extended can frames
245256 :param stop_event: Event object to asynchronously stop the scan
257+ :param fd: Use CANFD packets for scan
246258 :return: Dictionary with all found packets
247259 """
248260 return_values = dict () # type: Dict[int, Tuple[Packet, int]]
@@ -254,7 +266,7 @@ def scan_extended(sock, # type: SuperSocket
254266 continue
255267
256268 pkt = get_isotp_packet (
257- value , extended = True , extended_can_id = extended_can_id )
269+ value , extended = True , extended_can_id = extended_can_id , fd = fd )
258270 id_list = [] # type: List[int]
259271 for ext_isotp_id in range (r [0 ], r [- 1 ], scan_block_size ):
260272 if stop_event is not None and stop_event .is_set ():
@@ -298,7 +310,8 @@ def isotp_scan(sock, # type: SuperSocket
298310 extended_can_id = False , # type: bool
299311 verify_results = True , # type: bool
300312 verbose = False , # type: bool
301- stop_event = None # type: Optional[Event]
313+ stop_event = None , # type: Optional[Event]
314+ fd = False # type: bool
302315 ):
303316 # type: (...) -> Union[str, List[SuperSocket]]
304317 """Scan for ISOTP Sockets on a bus and return findings
@@ -329,6 +342,7 @@ def isotp_scan(sock, # type: SuperSocket
329342 of all possible candidates for ISOTP Sockets
330343 :param verbose: displays information during scan
331344 :param stop_event: Event object to asynchronously stop the scan
345+ :param fd: Create CANFD frames
332346 :return:
333347 """
334348 if verbose :
@@ -337,9 +351,13 @@ def isotp_scan(sock, # type: SuperSocket
337351 log_isotp .info ("Filtering background noise..." )
338352
339353 # Send dummy packet. In most cases, this triggers activity on the bus.
354+ if fd :
355+ dummy_pkt_cls = CANFD # type: Union[Type[CAN], Type[CANFD]]
356+ else :
357+ dummy_pkt_cls = CAN
340358
341- dummy_pkt = CAN (identifier = 0x123 ,
342- data = b'\xaa \xbb \xcc \xdd \xee \xff \xaa \xbb ' )
359+ dummy_pkt = dummy_pkt_cls (identifier = 0x123 ,
360+ data = b'\xaa \xbb \xcc \xdd \xee \xff \xaa \xbb ' )
343361
344362 background_pkts = sock .sniff (
345363 timeout = noise_listen_time ,
@@ -353,14 +371,16 @@ def isotp_scan(sock, # type: SuperSocket
353371 noise_ids = noise_ids ,
354372 sniff_time = sniff_time ,
355373 extended_can_id = extended_can_id ,
356- stop_event = stop_event )
374+ stop_event = stop_event ,
375+ fd = fd )
357376 else :
358377 found_packets = scan (sock , scan_range ,
359378 noise_ids = noise_ids ,
360379 sniff_time = sniff_time ,
361380 extended_can_id = extended_can_id ,
362381 verify_results = verify_results ,
363- stop_event = stop_event )
382+ stop_event = stop_event ,
383+ fd = fd )
364384
365385 filter_periodic_packets (found_packets )
366386
@@ -379,14 +399,15 @@ def isotp_scan(sock, # type: SuperSocket
379399 extended_addressing )
380400
381401
382- def generate_text_output (found_packets , extended_addressing = False ):
383- # type: (Dict[int, Tuple[Packet, int]], bool) -> str
402+ def generate_text_output (found_packets , extended_addressing = False , fd = False ):
403+ # type: (Dict[int, Tuple[Packet, int]], bool, bool ) -> str
384404 """Generate a human readable output from the result of the `scan` or the
385405 `scan_extended` function.
386406
387407 :param found_packets: result of the `scan` or `scan_extended` function
388408 :param extended_addressing: print results from a scan with
389409 ISOTP extended addressing
410+ :param fd: set CANFD flag in output
390411 :return: human readable scan results
391412 """
392413 if not found_packets :
@@ -420,13 +441,16 @@ def generate_text_output(found_packets, extended_addressing=False):
420441 else :
421442 text += "\n No Padding"
422443
444+ if fd :
445+ text += "\n CANFD enabled"
446+
423447 text += "\n "
424448 return text
425449
426450
427451def generate_code_output (found_packets , can_interface = "iface" ,
428- extended_addressing = False ):
429- # type: (Dict[int, Tuple[Packet, int]], Optional[str], bool) -> str
452+ extended_addressing = False , fd = False ):
453+ # type: (Dict[int, Tuple[Packet, int]], Optional[str], bool, bool ) -> str
430454 """Generate a copy&past-able output from the result of the `scan` or
431455 the `scan_extended` function.
432456
@@ -435,6 +459,7 @@ def generate_code_output(found_packets, can_interface="iface",
435459 used for the creation of the output.
436460 :param extended_addressing: print results from a scan with ISOTP
437461 extended addressing
462+ :param fd: set CANFD flag in output
438463 :return: Python-code as string to generate all found sockets
439464 """
440465 result = ""
@@ -452,26 +477,29 @@ def generate_code_output(found_packets, can_interface="iface",
452477 send_ext = pack - (send_id * 256 )
453478 ext_id = orb (found_packets [pack ][0 ].data [0 ])
454479 result += "ISOTPSocket(%s, tx_id=0x%x, rx_id=0x%x, padding=%s, " \
455- "ext_address=0x%x, rx_ext_address=0x%x, " \
480+ "ext_address=0x%x, rx_ext_address=0x%x, fd=%s, " \
456481 "basecls=ISOTP)\n " % \
457482 (can_interface , send_id ,
458483 int (found_packets [pack ][0 ].identifier ),
459484 found_packets [pack ][0 ].length == 8 ,
460485 send_ext ,
461- ext_id )
486+ ext_id ,
487+ fd )
462488
463489 else :
464- result += "ISOTPSocket(%s, tx_id=0x%x, rx_id=0x%x, padding=%s, " \
490+ result += "ISOTPSocket(%s, tx_id=0x%x, rx_id=0x%x, padding=%s, fd=%s, " \
465491 "basecls=ISOTP)\n " % \
466492 (can_interface , pack ,
467493 int (found_packets [pack ][0 ].identifier ),
468- found_packets [pack ][0 ].length == 8 )
494+ found_packets [pack ][0 ].length == 8 ,
495+ fd )
469496 return header + result
470497
471498
472499def generate_json_output (found_packets , # type: Dict[int, Tuple[Packet, int]]
473500 can_interface = "iface" , # type: Optional[str]
474- extended_addressing = False # type: bool
501+ extended_addressing = False , # type: bool
502+ fd = False # type: bool
475503 ):
476504 # type: (...) -> str
477505 """Generate a list of ISOTPSocket objects from the result of the `scan` or
@@ -482,6 +510,7 @@ def generate_json_output(found_packets, # type: Dict[int, Tuple[Packet, int]]
482510 used for the creation of the output.
483511 :param extended_addressing: print results from a scan with ISOTP
484512 extended addressing
513+ :param fd: set CANFD flag in output
485514 :return: A list of all found ISOTPSockets
486515 """
487516 socket_list = [] # type: List[Dict[str, Any]]
@@ -501,20 +530,23 @@ def generate_json_output(found_packets, # type: Dict[int, Tuple[Packet, int]]
501530 "rx_id" : dest_id ,
502531 "rx_ext_address" : dest_ext ,
503532 "padding" : pad ,
533+ "fd" : fd ,
504534 "basecls" : ISOTP .__name__ })
505535 else :
506536 source_id = pack
507537 socket_list .append ({"iface" : can_interface ,
508538 "tx_id" : source_id ,
509539 "rx_id" : dest_id ,
510540 "padding" : pad ,
541+ "fd" : fd ,
511542 "basecls" : ISOTP .__name__ })
512543 return json .dumps (socket_list )
513544
514545
515546def generate_isotp_list (found_packets , # type: Dict[int, Tuple[Packet, int]]
516547 can_interface , # type: Union[SuperSocket, str]
517- extended_addressing = False # type: bool
548+ extended_addressing = False , # type: bool
549+ fd = False # type: bool
518550 ):
519551 # type: (...) -> List[SuperSocket]
520552 """Generate a list of ISOTPSocket objects from the result of the `scan` or
@@ -525,6 +557,7 @@ def generate_isotp_list(found_packets, # type: Dict[int, Tuple[Packet, int]]
525557 used for the creation of the output.
526558 :param extended_addressing: print results from a scan with ISOTP
527559 extended addressing
560+ :param fd: set CANFD flag in output
528561 :return: A list of all found ISOTPSockets
529562 """
530563 from scapy .contrib .isotp import ISOTPSocket
@@ -545,10 +578,12 @@ def generate_isotp_list(found_packets, # type: Dict[int, Tuple[Packet, int]]
545578 rx_id = dest_id ,
546579 rx_ext_address = dest_ext ,
547580 padding = pad ,
581+ fd = fd ,
548582 basecls = ISOTP ))
549583 else :
550584 source_id = pack
551585 socket_list .append (ISOTPSocket (can_interface , tx_id = source_id ,
552586 rx_id = dest_id , padding = pad ,
587+ fd = fd ,
553588 basecls = ISOTP ))
554589 return socket_list
0 commit comments