44import lzma
55import posixpath
66import re
7+ import zlib
78from collections .abc import Iterator , Mapping
89from contextlib import redirect_stdout
910from ctypes import sizeof
4041 get_cache_file ,
4142 get_fs_from_ubi ,
4243 has_cache ,
44+ lz4_legacy_decompress ,
4345 make_cache_file ,
4446)
4547
5355ROOTFS_SECTIONS = ("fs" , "rootfs" )
5456FS_SECTIONS = ROOTFS_SECTIONS + ("app" ,)
5557
58+ RE_BANNER = re .compile (b"\x00 (Linux version .+? \(.+?@.+?\) \(.+?\) .+?)\n \x00 " )
5659RE_COMPLINK = re .compile (b"\x00 ([^\x00 ]+?-linux-.+? \(.+?\) [0-9].+?)\n \x00 +(.+?)\n \x00 " )
60+ RE_KERNEL_COMP = re .compile (
61+ b"(?P<lz4>" + FileType .LZ4_LEGACY_FRAME .value + b')'
62+ b"|(?P<xz>\xFD \x37 \x7A \x58 \x5A \x00 \x00 .(?!XZ))"
63+ b"|(?P<lzma>.{5}\xff {8})"
64+ b"|(?P<gzip>\x1f \x8b \x08 \x00 \x00 \x00 \x00 \x00 \x02 \x03 )"
65+ )
66+ RE_LZMA_OR_XZ = re .compile (b".{5}\xff {8}|\xFD \x37 \x7A \x58 \x5A \x00 \x00 " )
5767# Pattern for a legacy image header with these properties:
5868# OS: U-Boot / firmware (0x11)
5969# Type: kernel (0x02)
@@ -70,6 +80,8 @@ def __init__(self, fd: BinaryIO, offset: int = 0, closefd: bool = True) -> None:
7080 self ._uboot_section = None
7181 self ._uboot = None
7282 self ._kernel_section_name = self ._get_kernel_section_name ()
83+ self ._kernel_section = None
84+ self ._kernel = None
7385 self ._sdict = {s .name : s for s in self }
7486 self ._open_files = 1
7587 self ._fs_sections = [s for s in self if s .name in FS_SECTIONS ]
@@ -111,6 +123,22 @@ def uboot(self) -> bytes:
111123 self ._uboot = self ._decompress_uboot ()
112124 return self ._uboot
113125
126+ @property
127+ def kernel_section (self ) -> bytes :
128+ """Return the firmware's kernel section as bytes."""
129+ if self ._kernel_section is not None :
130+ return self ._kernel_section
131+ self ._kernel_section = self .extract_section (self ["kernel" ])
132+ return self ._kernel_section
133+
134+ @property
135+ def kernel (self ) -> bytes :
136+ """Return the firmware's decompressed kernel as bytes."""
137+ if self ._kernel is not None :
138+ return self ._kernel
139+ self ._kernel = self ._decompress_kernel ()
140+ return self ._kernel
141+
114142 def _fdclose (self , fd : BinaryIO ) -> None :
115143 self ._open_files -= 1
116144 if self ._closefd and not self ._open_files :
@@ -145,6 +173,31 @@ def _decompress_uboot(self) -> bytes:
145173 raise Exception (f"Unexpected compression { hdr .comp } " )
146174 return uboot # Assume no compression
147175
176+ def _decompress_kernel (self ) -> bytes :
177+ # Use lzma.LZMADecompressor instead of lzma.decompress
178+ # because we know there's only one stream.
179+ data = self .kernel_section
180+ uimage_hdr_size = sizeof (LegacyImageHeader )
181+ # RLN36 kernel image headers report no compression
182+ # so don't bother reading the header and just look for
183+ # a compression magic.
184+ if RE_LZMA_OR_XZ .match (data , uimage_hdr_size ):
185+ return lzma .LZMADecompressor ().decompress (data [uimage_hdr_size :])
186+ if (halt := data .find (b" -- System halted" )) == - 1 :
187+ raise Exception ("'System halted' string not found" )
188+ match = RE_KERNEL_COMP .search (data , halt )
189+ if match is None :
190+ raise Exception ("No known compression found in kernel" )
191+ start = match .start ()
192+ if match .lastgroup == "lz4" :
193+ return lz4_legacy_decompress (io .BytesIO (data [start :]))
194+ elif match .lastgroup in ("xz" , "lzma" ):
195+ return lzma .LZMADecompressor ().decompress (data [start :])
196+ elif match .lastgroup == "gzip" :
197+ # wbits=31 because only one member to decompress.
198+ return zlib .decompress (data [start :], wbits = 31 )
199+ raise Exception ("unreachable" )
200+
148201 def open (self , section : Section ) -> SectionFile :
149202 self ._open_files += 1
150203 return SectionFile (self ._fd , section , self ._fdclose )
@@ -171,13 +224,23 @@ def get_uboot_info(self) -> tuple[Optional[str], Optional[str], Optional[str]]:
171224 linker = match_cl .group (2 ).decode () if match_cl is not None else None
172225 return version , compiler , linker
173226
174- def get_uimage_header (self ) -> LegacyImageHeader :
175- for section in self :
176- with self .open (section ) as f :
177- if section .len and FileType .from_magic (f .peek (4 )) == FileType .UIMAGE :
178- # This section is always named 'KERNEL' or 'kernel'.
179- return LegacyImageHeader .from_fd (f )
180- raise Exception ("No kernel section found" )
227+ def get_kernel_image_header (self ) -> Optional [LegacyImageHeader ]:
228+ with self .open (self ["kernel" ]) as f :
229+ data = f .read (sizeof (LegacyImageHeader ))
230+ if FileType .from_magic (data [:4 ]) == FileType .UIMAGE :
231+ return LegacyImageHeader .from_buffer_copy (data )
232+ return None
233+
234+ def get_kernel_image_header_info (self ) -> tuple [Optional [str ], Optional [str ], Optional [str ]]:
235+ hdr = self .get_kernel_image_header ()
236+ if hdr is None :
237+ return None , None , None
238+ os = "Linux" if hdr .os == 5 else "Unknown"
239+ return os , get_arch_name (hdr .arch ), hdr .name
240+
241+ def get_linux_banner (self ) -> Optional [str ]:
242+ match = RE_BANNER .search (self .kernel )
243+ return match .group (1 ).decode () if match is not None else None
181244
182245 def get_fs_info (self ) -> list [dict [str , str ]]:
183246 result = []
@@ -206,16 +269,17 @@ async def get_info(self) -> dict[str, Any]:
206269 files = await asyncio .to_thread (get_files_from_squashfs , f , 0 , False )
207270 else :
208271 return {"error" : "Unrecognized image type" , "sha256" : ha }
209- uimage = self .get_uimage_header ()
272+ os , architecture , kernel_image_name = self .get_kernel_image_header_info ()
210273 uboot_version , compiler , linker = self .get_uboot_info ()
211274 return {
212275 ** get_info_from_files (files ),
213- "os" : "Linux" if uimage . os == 5 else "Unknown" ,
214- "architecture" : get_arch_name ( uimage . arch ) ,
215- "kernel_image_name" : uimage . name ,
276+ "os" : os ,
277+ "architecture" : architecture ,
278+ "kernel_image_name" : kernel_image_name ,
216279 "uboot_version" : uboot_version ,
217280 "uboot_compiler" : compiler ,
218281 "uboot_linker" : linker ,
282+ "linux_banner" : self .get_linux_banner (),
219283 "filesystems" : self .get_fs_info (),
220284 "sha256" : ha
221285 }
@@ -262,6 +326,8 @@ def extract(self, dest: Optional[Path] = None, force: bool = False) -> None:
262326 mode = "wb" if force else "xb"
263327 with open (dest / "uboot" , mode ) as f :
264328 f .write (self .uboot )
329+ with open (dest / "kernel" , mode ) as f :
330+ f .write (self .kernel )
265331
266332
267333async def download (url : StrOrURL ) -> Union [bytes , int ]:
0 commit comments