1- # SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD
1+ # SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD
22# SPDX-License-Identifier: Apache-2.0
33import argparse
44import os
55import re
66import struct
7- import time
8-
9- parsed_num = 0
10- all_line_num = 0
117
128
139def create_new_bt_snoop_file (filename : str ) -> None :
@@ -20,113 +16,113 @@ def create_new_bt_snoop_file(filename: str) -> None:
2016 f .write (header )
2117
2218
23- def append_hci_to_bt_snoop_file (filename : str , direction : int , data : str ) -> None :
19+ def append_hci_to_bt_snoop_file (filename : str , direction : int , data : str , timestamp_us : int ) -> None :
2420 if os .path .exists (filename ):
2521 print (f'Appending to existing file: { filename } ' )
2622 else :
2723 print (f'Creating new file: { filename } ' )
2824 create_new_bt_snoop_file (filename )
29-
30- # Ensure data is converted to bytearray from hex string
31- data_bytes = bytearray .fromhex (data ) # Convert hex string to bytearray
32-
33- with open (filename , 'ab' ) as f : # 'ab' mode for appending binary data
34- global parsed_num
35- parsed_num += 1
36- data_len = len (data_bytes )
37- orig_len = data_len
38- incl_len = data_len
39- packet_flags = direction + (data_bytes [0 ] != 1 or data_bytes [0 ] != 3 ) * 2
25+ data_bytes = bytearray .fromhex (data )
26+ with open (filename , 'ab' ) as f :
27+ orig_len = incl_len = len (data_bytes )
28+ packet_flags = direction + (data_bytes [0 ] != 1 and data_bytes [0 ] != 3 ) * 2
4029 cumulative_drops = 0
41- # Calculate the timestamp with an offset from a predefined reference time(0x00DCDDB30F2F8000).
42- timestamp_us = int (round (time .time () * 1000000 )) + 0x00DCDDB30F2F8000
43-
44- # Writing structured data followed by the byte array data
4530 f .write (struct .pack ('>IIIIQ' , orig_len , incl_len , packet_flags , cumulative_drops , timestamp_us ))
46- f .write (data_bytes ) # Write bytearray (binary data) to file
31+ f .write (data_bytes )
4732
4833
4934def log_data_clean (data : str ) -> str :
5035 cleaned = re .sub (r'.*?<pre>' , '' , data )
5136 return cleaned
5237
5338
54- def is_adv_report (data : str ) -> bool :
55- is_binary = all (re .fullmatch (r'[0-9a-fA-F]{2}' , part ) for part in data .split ())
56- return is_binary
57-
58-
5939def parse_log (input_path : str , output_tag : str ) -> None :
60- """
61- Parses the specified log file and saves the results to an output file.
62-
63- Args:
64- input_path (str): Path to the input log file.
65- output_tag (str): Name tag for the output file.
66-
67- Returns:
68- None
69- """
70- global parsed_num
71- global all_line_num
7240 if not os .path .exists (input_path ):
7341 print (f"Error: The file '{ input_path } ' does not exist." )
7442 return
75-
76- # Define the output directory and create it if it doesn't exist
7743 output_dir = './parsed_logs'
7844 os .makedirs (output_dir , exist_ok = True )
79-
80- # Define the output file name based on the tag
8145 output_file = os .path .join (output_dir , f'parsed_log_{ output_tag } .btsnoop.log' )
82-
46+ parsed_num = 0
47+ all_line_num = 0
8348 with open (input_path , 'r' , encoding = 'utf-8' ) as infile :
84- # Example: Parse each line and save processed results
8549 for line in infile :
8650 try :
8751 all_line_num += 1
88- line = log_data_clean (line )
89- line = line .replace ('C:' , '01 ' )
90- line = line .replace ('E:' , '04 ' )
91- line = line [3 :]
92- if line [0 ] == 'H' :
93- line = line .replace ('H:' , '02 ' )
94- append_hci_to_bt_snoop_file (output_file , 0 , line )
95- continue
96- if line [0 ] == 'D' :
97- line = line .replace ('D:' , '02 ' )
98- append_hci_to_bt_snoop_file (output_file , 1 , line )
52+ line = log_data_clean (line ).strip ()
53+ if not line :
9954 continue
100- if line .startswith ( '01' ):
101- append_hci_to_bt_snoop_file ( output_file , 0 , line )
55+ parts = line .split ()
56+ if len ( parts ) < 10 :
10257 continue
103- if line .startswith ('04' ):
104- append_hci_to_bt_snoop_file (output_file , 1 , line )
105- continue
106- if is_adv_report (line ):
107- line = '04 3e ' + line
108- append_hci_to_bt_snoop_file (output_file , 1 , line )
58+ parts_wo_ln = parts [1 :]
59+ # Check for literal in the first token
60+ literal = None
61+ if ':' in parts_wo_ln [0 ]:
62+ literal_part , sep , ts_byte = parts_wo_ln [0 ].partition (':' )
63+ if sep == ':' and literal_part in ['C' , 'D' , 'E' , 'H' ] and len (ts_byte ) == 2 :
64+ literal = literal_part + ':'
65+ parts_wo_ln = [literal , ts_byte ] + parts_wo_ln [1 :]
66+ else :
67+ literal = None
68+ if literal :
69+ # Parse timestamp
70+ try :
71+ timestamp_bytes = bytes (int (b , 16 ) for b in parts_wo_ln [1 :9 ])
72+ except Exception :
73+ continue
74+ timestamp_us = int .from_bytes (timestamp_bytes , byteorder = 'little' , signed = False )
75+ hci_data = ' ' .join (parts_wo_ln [9 :])
76+ if not hci_data :
77+ continue
78+ # Determine indicator and direction
79+ if literal == 'C:' :
80+ hci_data = '01 ' + hci_data
81+ direction = 0
82+ elif literal == 'E:' :
83+ hci_data = '04 ' + hci_data
84+ direction = 1
85+ elif literal == 'H:' :
86+ hci_data = '02 ' + hci_data
87+ direction = 0
88+ elif literal == 'D:' :
89+ hci_data = '02 ' + hci_data
90+ direction = 1
91+ else :
92+ continue
93+ append_hci_to_bt_snoop_file (output_file , direction , hci_data , timestamp_us )
94+ parsed_num += 1
95+ else :
96+ # No literal: treat as advertising report
97+ try :
98+ timestamp_bytes = bytes (int (b , 16 ) for b in parts [1 :9 ])
99+ except Exception :
100+ continue
101+ timestamp_us = int .from_bytes (timestamp_bytes , byteorder = 'little' , signed = False )
102+ adv_data = ' ' .join (parts [9 :])
103+ if not adv_data :
104+ continue
105+ hci_data = '04 3e ' + adv_data
106+ direction = 1
107+ append_hci_to_bt_snoop_file (output_file , direction , hci_data , timestamp_us )
108+ parsed_num += 1
109109 except Exception as e :
110110 print (f'Exception: { e } ' )
111-
112111 if parsed_num > 0 :
113- print (f'Log parsing completed, parsed_num { parsed_num } , all_line_num { all_line_num } .\n Output saved to: { output_file } ' )
112+ print (
113+ f'Parsing completed, parsed_num { parsed_num } , all_line_num { all_line_num } .\n Output saved to: { output_file } '
114+ )
114115 else :
115116 print ('No data could be parsed.' )
116117
117118
118119def main () -> None :
119- # Define command-line arguments
120120 parser = argparse .ArgumentParser (description = 'Log Parsing Tool' )
121121 parser .add_argument ('-p' , '--path' , required = True , help = 'Path to the input log file' )
122122 parser .add_argument ('-o' , '--output' , required = True , help = 'Name tag for the output file' )
123-
124- # Parse arguments
125123 args = parser .parse_args ()
126124 input_path = args .path
127125 output_tag = args .output
128-
129- # Call the log parsing function
130126 parse_log (input_path , output_tag )
131127
132128
0 commit comments