11import asyncio
22import hashlib
33import io
4+ import lzma
45import posixpath
56import re
67from collections .abc import Iterator , Mapping
78from contextlib import redirect_stdout
9+ from ctypes import sizeof
810from functools import partial
911from pathlib import Path
1012from typing import IO , Any , BinaryIO , Optional , Union
2931from reolinkfw .tmpfile import TempFile
3032from reolinkfw .typedefs import Buffer , Files , StrPath , StrPathURL
3133from reolinkfw .ubifs import UBIFS
32- from reolinkfw .uboot import LegacyImageHeader , get_arch_name
34+ from reolinkfw .uboot import Compression , LegacyImageHeader , get_arch_name
3335from reolinkfw .util import (
3436 ONEMIB ,
3537 FileType ,
5153ROOTFS_SECTIONS = ("fs" , "rootfs" )
5254FS_SECTIONS = ROOTFS_SECTIONS + ("app" ,)
5355
56+ RE_COMPLINK = re .compile (b"\x00 ([^\x00 ]+?-linux-.+? \(.+?\) [0-9].+?)\n \x00 +(.+?)\n \x00 " )
57+ # Pattern for a legacy image header with these properties:
58+ # OS: U-Boot / firmware (0x11)
59+ # Type: kernel (0x02)
60+ # Only used for MStar/SigmaStar cameras (Lumus and RLC-410W IPC_30K128M4MP)
61+ RE_MSTAR = re .compile (FileType .UIMAGE .value + b".{24}\x11 .\x02 .{33}" , re .DOTALL )
62+ RE_UBOOT = re .compile (b"U-Boot [0-9]{4}\.[0-9]{2}.*? \(.+?\)" )
63+
5464
5565class ReolinkFirmware (PAK ):
5666
5767 def __init__ (self , fd : BinaryIO , offset : int = 0 , closefd : bool = True ) -> None :
5868 super ().__init__ (fd , offset , closefd )
5969 self ._uboot_section_name = self ._get_uboot_section_name ()
70+ self ._uboot_section = None
71+ self ._uboot = None
6072 self ._kernel_section_name = self ._get_kernel_section_name ()
6173 self ._sdict = {s .name : s for s in self }
6274 self ._open_files = 1
@@ -79,6 +91,26 @@ def __getitem__(self, key: Union[int, str]) -> Section:
7991 def __iter__ (self ) -> Iterator [Section ]:
8092 yield from self .sections
8193
94+ @property
95+ def uboot_section (self ) -> bytes :
96+ """Return the firmware's U-Boot section as bytes."""
97+ if self ._uboot_section is not None :
98+ return self ._uboot_section
99+ self ._uboot_section = self .extract_section (self ["uboot" ])
100+ return self ._uboot_section
101+
102+ @property
103+ def uboot (self ) -> bytes :
104+ """Return the firmware's decompressed U-Boot as bytes.
105+
106+ If the U-Boot is not compressed this gives the same result
107+ as the `uboot_section` property.
108+ """
109+ if self ._uboot is not None :
110+ return self ._uboot
111+ self ._uboot = self ._decompress_uboot ()
112+ return self ._uboot
113+
82114 def _fdclose (self , fd : BinaryIO ) -> None :
83115 self ._open_files -= 1
84116 if self ._closefd and not self ._open_files :
@@ -96,6 +128,23 @@ def _get_kernel_section_name(self) -> str:
96128 return section .name
97129 raise Exception ("Kernel section not found" )
98130
131+ def _decompress_uboot (self ) -> bytes :
132+ uboot = self .uboot_section
133+ if uboot .startswith (pybcl .BCL_MAGIC_BYTES ):
134+ # Sometimes section.len - sizeof(hdr) is 1 to 3 bytes larger
135+ # than hdr.size. The extra bytes are 0xff (padding?). This
136+ # could explain why the compressed size is added to the header.
137+ hdr = pybcl .HeaderVariant .from_buffer_copy (uboot )
138+ compressed = uboot [sizeof (hdr ):sizeof (hdr )+ hdr .size ]
139+ return pybcl .decompress (compressed , hdr .algo , hdr .outsize )
140+ if (match := RE_MSTAR .search (uboot )) is not None :
141+ hdr = LegacyImageHeader .from_buffer_copy (uboot , match .start ())
142+ start = match .start () + sizeof (hdr )
143+ if hdr .comp == Compression .LZMA :
144+ return lzma .decompress (uboot [start :start + hdr .size ])
145+ raise Exception (f"Unexpected compression { hdr .comp } " )
146+ return uboot # Assume no compression
147+
99148 def open (self , section : Section ) -> SectionFile :
100149 self ._open_files += 1
101150 return SectionFile (self ._fd , section , self ._fdclose )
@@ -112,22 +161,15 @@ def sha256(self) -> str:
112161 sha .update (block )
113162 return sha .hexdigest ()
114163
115- def get_uboot_version (self ) -> Optional [str ]:
116- for section in self :
117- if section .len and "uboot" in section .name .lower ():
118- # This section is always named 'uboot' or 'uboot1'.
119- with self .open (section ) as f :
120- if f .peek (len (pybcl .BCL_MAGIC_BYTES )) == pybcl .BCL_MAGIC_BYTES :
121- # Sometimes section.len - sizeof(hdr) is 1 to 3 bytes larger
122- # than hdr.size. The extra bytes are 0xff (padding?). This
123- # could explain why the compressed size is added to the header.
124- hdr = pybcl .HeaderVariant .from_fd (f )
125- data = pybcl .decompress (f .read (hdr .size ), hdr .algo , hdr .outsize )
126- else :
127- data = f .read (section .len )
128- match = re .search (b"U-Boot [0-9]{4}\.[0-9]{2}.*? \(.*?\)" , data )
129- return match .group ().decode () if match is not None else None
130- return None
164+ def get_uboot_info (self ) -> tuple [Optional [str ], Optional [str ], Optional [str ]]:
165+ # Should never be None.
166+ match_ub = RE_UBOOT .search (self .uboot )
167+ version = match_ub .group ().decode () if match_ub is not None else None
168+ # Should only be None for HiSilicon devices.
169+ match_cl = RE_COMPLINK .search (self .uboot )
170+ compiler = match_cl .group (1 ).decode () if match_cl is not None else None
171+ linker = match_cl .group (2 ).decode () if match_cl is not None else None
172+ return version , compiler , linker
131173
132174 def get_uimage_header (self ) -> LegacyImageHeader :
133175 for section in self :
@@ -165,12 +207,15 @@ async def get_info(self) -> dict[str, Any]:
165207 else :
166208 return {"error" : "Unrecognized image type" , "sha256" : ha }
167209 uimage = self .get_uimage_header ()
210+ uboot_version , compiler , linker = self .get_uboot_info ()
168211 return {
169212 ** get_info_from_files (files ),
170213 "os" : "Linux" if uimage .os == 5 else "Unknown" ,
171214 "architecture" : get_arch_name (uimage .arch ),
172215 "kernel_image_name" : uimage .name ,
173- "uboot_version" : self .get_uboot_version (),
216+ "uboot_version" : uboot_version ,
217+ "uboot_compiler" : compiler ,
218+ "uboot_linker" : linker ,
174219 "filesystems" : self .get_fs_info (),
175220 "sha256" : ha
176221 }
@@ -208,13 +253,15 @@ def extract(self, dest: Optional[Path] = None, force: bool = False) -> None:
208253 dest = (Path .cwd () / "reolink_firmware" ) if dest is None else dest
209254 dest .mkdir (parents = True , exist_ok = force )
210255 rootfsdir = [s .name for s in self if s .name in ROOTFS_SECTIONS ][0 ]
211- for section in self :
212- if section .name in FS_SECTIONS :
213- if section .name == "app" :
214- outpath = dest / rootfsdir / "mnt" / "app"
215- else :
216- outpath = dest / rootfsdir
217- self .extract_file_system (section , outpath )
256+ for section in self ._fs_sections :
257+ if section .name == "app" :
258+ outpath = dest / rootfsdir / "mnt" / "app"
259+ else :
260+ outpath = dest / rootfsdir
261+ self .extract_file_system (section , outpath )
262+ mode = "wb" if force else "xb"
263+ with open (dest / "uboot" , mode ) as f :
264+ f .write (self .uboot )
218265
219266
220267async def download (url : StrOrURL ) -> Union [bytes , int ]:
0 commit comments