Skip to content

Commit 3976ecf

Browse files
asybagmarull
authored andcommitted
pbl: add flash log download and dehash tools
Implement 'pbl flash_logs' to download raw logs from the watch flash and 'tools/dehash_flash_logs.py' to parse and dehash them. Signed-off-by: Federico Bechini <federico.bechini@gmail.com>
1 parent 121adc5 commit 3976ecf

File tree

3 files changed

+297
-0
lines changed

3 files changed

+297
-0
lines changed

python_libs/pbl/pbl/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from .commands import install_lang
88
from .commands import test
99
from .commands import install_firmware
10+
from .commands import flash_logs
1011

1112
# TODO: unopened logging ports cause super noisy logs, fix this in the
1213
# pulse package then remove this
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# SPDX-FileCopyrightText: 2025 Federico Bechini
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
from __future__ import absolute_import, print_function
5+
6+
from libpebble2.services.getbytes import GetBytesService
7+
from libpebble2.exceptions import GetBytesError
8+
from libpebble2.protocol.transfers import GetBytesInfoResponse
9+
10+
from pebble_tool.commands.base import PebbleCommand
11+
from pebble_tool.exceptions import ToolError
12+
13+
import os
14+
15+
class FlashLogsCommand(PebbleCommand):
16+
"""Dump flash logs (PBL_LOG) from the watch."""
17+
command = 'flash_logs'
18+
19+
@classmethod
20+
def add_parser(cls, parser):
21+
parser = super(FlashLogsCommand, cls).add_parser(parser)
22+
parser.add_argument('--board', required=True, type=str.lower,
23+
help='Board name (e.g., aplite, basalt, asterix)')
24+
return parser
25+
26+
def __call__(self, args):
27+
super(FlashLogsCommand, self).__call__(args)
28+
get_bytes = GetBytesService(self.pebble)
29+
30+
# Map board names to (start_address, size)
31+
# Sizes are mostly 128KB (0x20000)
32+
FLASH_LOG_REGIONS = {
33+
# Legacy Platforms
34+
'aplite': (0x3E0000, 0x20000),
35+
'tintin': (0x3E0000, 0x20000),
36+
37+
# Snowy / Spalding (Bottom Boot)
38+
'basalt': (0x000000, 0x20000),
39+
'snowy': (0x000000, 0x20000),
40+
'chalk': (0x000000, 0x20000),
41+
'spalding': (0x000000, 0x20000),
42+
43+
# Silk / Diorite
44+
'diorite': (0x280000, 0x20000),
45+
'silk': (0x280000, 0x20000),
46+
47+
# Robert / Calculus
48+
'robert': (0x480000, 0x20000),
49+
'calculus': (0x480000, 0x20000),
50+
51+
# Asterix
52+
'asterix': (0x1FD0000, 0x20000),
53+
54+
# Obelix / Getafix
55+
'obelix': (0x1FCF000, 0x20000),
56+
'getafix': (0x1FCF000, 0x20000),
57+
}
58+
59+
# Normalize board name
60+
board = args.board
61+
62+
region = FLASH_LOG_REGIONS.get(board)
63+
if not region:
64+
# Try simple aliasing or partial matching if needed, but for now strict map
65+
print("Error: Unknown board '{}'.".format(board))
66+
print("Supported boards: {}".format(", ".join(sorted(FLASH_LOG_REGIONS.keys()))))
67+
return
68+
69+
flash_log_start, flash_log_size = region
70+
71+
print("Board: {}".format(board))
72+
print("Reading flash log region: 0x{:X} - 0x{:X} ({} KB)".format(
73+
flash_log_start, flash_log_start + flash_log_size, flash_log_size // 1024))
74+
75+
try:
76+
flash_data = get_bytes.get_flash_region(flash_log_start, flash_log_size)
77+
print("Read {} bytes from flash".format(len(flash_data)))
78+
79+
# Save to file
80+
import datetime
81+
filename = datetime.datetime.now().strftime("flash_logs_{}_%Y-%m-%d_%H-%M-%S.bin".format(board))
82+
filepath = os.path.abspath(filename)
83+
with open(filename, "wb") as log_file:
84+
log_file.write(flash_data)
85+
print("Saved flash logs to {}".format(filepath))
86+
87+
print("\nTo parse and dehash the logs:")
88+
print(" tools/dehash_flash_logs.py {}".format(filename))
89+
90+
except GetBytesError as ex:
91+
if ex.code == GetBytesInfoResponse.ErrorCode.DoesNotExist:
92+
raise ToolError('Could not read flash region. This may require non-release firmware.')
93+
else:
94+
raise
95+
96+

tools/dehash_flash_logs.py

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
#!/usr/bin/env python3
2+
# SPDX-FileCopyrightText: 2025 Federico Bechini
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
"""
6+
Parser for Pebble flash logs (PBL_LOG).
7+
Parses the binary circular buffer format and extracts log messages.
8+
"""
9+
10+
import argparse
11+
import struct
12+
import sys
13+
import json
14+
import os
15+
from datetime import datetime
16+
17+
# Setup paths for dehash libraries
18+
PYTHON_LIBS_PATH = os.path.join(os.path.dirname(__file__), '..', 'python_libs')
19+
LOG_HASHING_PATH = os.path.join(os.path.dirname(__file__), '..', 'tools', 'log_hashing')
20+
21+
if PYTHON_LIBS_PATH not in sys.path:
22+
sys.path.insert(0, PYTHON_LIBS_PATH)
23+
if LOG_HASHING_PATH not in sys.path:
24+
sys.path.insert(0, LOG_HASHING_PATH)
25+
26+
try:
27+
import logdehash
28+
DEHASH_AVAILABLE = True
29+
except ImportError:
30+
DEHASH_AVAILABLE = False
31+
32+
# Firmware Constants
33+
LOG_MAGIC = 0x21474F4C # "LOG!"
34+
LOG_VERSION = 0x1
35+
LOG_FLAGS_VALID = 0x1
36+
LOG_PAGE_SIZE = 0x2000 # 8KB
37+
MAX_MSG_LEN = 253
38+
39+
FLASH_LOGGING_HEADER_SIZE = 4 + 1 + 20 + 1 + 1 + 1 # 28 bytes
40+
LOG_RECORD_HEADER_SIZE = 2
41+
LOG_BINARY_MESSAGE_BASE_SIZE = 4 + 1 + 1 + 2 + 16 # 24 bytes
42+
43+
def parse_flash_logging_header(data, offset):
44+
if offset + FLASH_LOGGING_HEADER_SIZE > len(data):
45+
return None
46+
47+
magic, = struct.unpack_from('<I', data, offset)
48+
if magic != LOG_MAGIC:
49+
return None
50+
51+
version = data[offset + 4]
52+
if version != LOG_VERSION:
53+
return None
54+
55+
return {
56+
'build_id': data[offset + 5:offset + 5 + 20].hex(),
57+
'log_file_id': data[offset + 25],
58+
'log_chunk_id': data[offset + 26],
59+
'offset': offset
60+
}
61+
62+
def parse_log_binary_message(data, offset, msg_length):
63+
if offset + LOG_BINARY_MESSAGE_BASE_SIZE + msg_length > len(data):
64+
return None
65+
66+
timestamp, = struct.unpack_from('>I', data, offset)
67+
log_level = data[offset + 4]
68+
line_number, = struct.unpack_from('>H', data, offset + 6)
69+
70+
filename_bytes = data[offset + 8:offset + 8 + 16]
71+
null_pos = filename_bytes.find(b'\x00')
72+
if null_pos >= 0:
73+
filename_bytes = filename_bytes[:null_pos]
74+
filename = filename_bytes.decode('utf-8', errors='ignore')
75+
76+
message_bytes = data[offset + 24:offset + 24 + msg_length]
77+
message = message_bytes.decode('utf-8', errors='ignore').rstrip('\x00')
78+
79+
return {
80+
'timestamp': timestamp,
81+
'log_level': log_level,
82+
'line_number': line_number,
83+
'filename': filename,
84+
'message': message
85+
}
86+
87+
def parse_flash_logs(flash_data):
88+
logs = []
89+
# Find all pages with valid headers
90+
pages = []
91+
for page_start in range(0, len(flash_data), LOG_PAGE_SIZE):
92+
header = parse_flash_logging_header(flash_data, page_start)
93+
if header:
94+
pages.append((page_start, header))
95+
96+
# Sort pages by file_id and chunk_id
97+
pages.sort(key=lambda x: (x[1]['log_file_id'], x[1]['log_chunk_id']))
98+
99+
for page_start, header in pages:
100+
page_offset = page_start + FLASH_LOGGING_HEADER_SIZE
101+
while page_offset < page_start + LOG_PAGE_SIZE:
102+
if page_offset + LOG_RECORD_HEADER_SIZE > len(flash_data):
103+
break
104+
105+
flags = flash_data[page_offset]
106+
length = flash_data[page_offset + 1]
107+
108+
if length == 0 or length > MAX_MSG_LEN:
109+
break
110+
111+
if (flags & LOG_FLAGS_VALID) == 0:
112+
msg_offset = page_offset + LOG_RECORD_HEADER_SIZE
113+
if msg_offset + length <= len(flash_data):
114+
msg_length = flash_data[msg_offset + 5]
115+
log_msg = parse_log_binary_message(flash_data, msg_offset, msg_length)
116+
if log_msg:
117+
logs.append(log_msg)
118+
119+
page_offset += LOG_RECORD_HEADER_SIZE + length
120+
else:
121+
break
122+
return logs
123+
124+
_dehasher = None
125+
126+
def get_dehasher(loghash_dict_path):
127+
global _dehasher
128+
if _dehasher is None and DEHASH_AVAILABLE and loghash_dict_path:
129+
try:
130+
_dehasher = logdehash.LogDehash('', monitor_dict_file=False)
131+
with open(loghash_dict_path, 'r') as f:
132+
_dehasher.load_log_strings_from_dict(json.load(f))
133+
except Exception as e:
134+
print(f"Warning: Failed to load dehash dictionary: {e}")
135+
_dehasher = None
136+
return _dehasher
137+
138+
def format_log_message(log_msg, dehasher=None):
139+
try:
140+
dt = datetime.fromtimestamp(log_msg['timestamp'])
141+
ts = dt.strftime("%H:%M:%S.%f")[:-3]
142+
except:
143+
ts = f"0x{log_msg['timestamp']:08X}"
144+
145+
level = {0:'A', 1:'E', 2:'W', 3:'I', 4:'D'}.get(log_msg['log_level'], '?')
146+
msg = log_msg['message']
147+
148+
if dehasher and msg.startswith('NL:'):
149+
result = dehasher.dehash(f":0> {msg}")
150+
if result and 'formatted_msg' in result:
151+
msg = result['formatted_msg']
152+
153+
filename = log_msg['filename'] or 'unknown'
154+
return f"{level} {ts} {filename}:{log_msg['line_number']}> {msg}"
155+
156+
def main():
157+
parser = argparse.ArgumentParser(description="Parse Pebble flash logs")
158+
parser.add_argument('file', help='Binary flash log file')
159+
parser.add_argument('--filter', help='Filter messages containing text')
160+
parser.add_argument('--output', help='Output file')
161+
parser.add_argument('--show', action='store_true', help='Show logs in stdout')
162+
parser.add_argument('--dehash', help='Path to loghash_dict.json')
163+
args = parser.parse_args()
164+
165+
if not os.path.exists(args.file):
166+
print(f"Error: File {args.file} does not exist")
167+
sys.exit(1)
168+
169+
with open(args.file, 'rb') as f:
170+
data = f.read()
171+
172+
# Try to find default dictionary if not provided
173+
dehash_path = args.dehash
174+
if not dehash_path:
175+
default_dict = os.path.join(os.path.dirname(__file__), '..', 'build', 'src', 'fw', 'tintin_fw_loghash_dict.json')
176+
if os.path.exists(default_dict):
177+
dehash_path = default_dict
178+
179+
logs = parse_flash_logs(data)
180+
dehasher = get_dehasher(dehash_path)
181+
182+
if args.filter:
183+
logs = [l for l in logs if args.filter in l['message']]
184+
185+
output_lines = [format_log_message(l, dehasher) for l in logs]
186+
output_text = '\n'.join(output_lines)
187+
188+
current_ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
189+
out_file = args.output or os.path.splitext(args.file)[0] + f"_parsed_{current_ts}.txt"
190+
with open(out_file, 'w', encoding='utf-8') as f:
191+
f.write(output_text)
192+
193+
print(f"Successfully parsed {len(logs)} messages. Saved to: {out_file}")
194+
195+
if args.show:
196+
for line in output_lines:
197+
print(line)
198+
199+
if __name__ == '__main__':
200+
main()

0 commit comments

Comments
 (0)