66import posixpath
77import re
88import zlib
9+ from ast import literal_eval
910from collections .abc import Iterator , Mapping
1011from contextlib import redirect_stdout
1112from ctypes import sizeof
2324from pakler import PAK , Section , is_pak_file
2425from pycramfs import Cramfs
2526from pycramfs .extract import extract_dir as extract_cramfs
27+ from pyfdt .pyfdt import Fdt , FdtBlobParse
2628from PySquashfsImage import SquashFsImage
2729from PySquashfsImage .extract import extract_dir as extract_squashfs
2830from ubireader .ubifs import ubifs as ubifs_
2931from ubireader .ubifs .output import extract_files as extract_ubifs
3032from ubireader .ubi_io import ubi_file
3133from ubireader .utils import guess_leb_size
3234
35+ from reolinkfw .fdt import RE_FDT_HEADER , FDTHeader
3336from reolinkfw .tmpfile import TempFile
3437from reolinkfw .typedefs import Buffer , Files , StrPath , StrPathURL
3538from reolinkfw .ubifs import UBIFS
7376RE_MSTAR = re .compile (FileType .UIMAGE .value + b".{24}\x11 .\x02 .{33}" , re .DOTALL )
7477RE_UBOOT = re .compile (b"U-Boot [0-9]{4}\.[0-9]{2}.*? \(.+?\)" )
7578
79+ DUMMY = object ()
80+
7681
7782class ReolinkFirmware (PAK ):
7883
@@ -84,6 +89,7 @@ def __init__(self, fd: BinaryIO, offset: int = 0, closefd: bool = True) -> None:
8489 self ._kernel_section_name = self ._get_kernel_section_name ()
8590 self ._kernel_section = None
8691 self ._kernel = None
92+ self ._fdt = DUMMY
8793 self ._sdict = {s .name : s for s in self }
8894 self ._open_files = 1
8995 self ._fs_sections = [s for s in self if s .name in FS_SECTIONS ]
@@ -141,6 +147,19 @@ def kernel(self) -> bytes:
141147 self ._kernel = self ._decompress_kernel ()
142148 return self ._kernel
143149
150+ @property
151+ def fdt (self ) -> Optional [Fdt ]:
152+ if self ._fdt is not DUMMY :
153+ return self ._fdt # type: ignore
154+ self ._fdt = self ._get_fdt ()
155+ return self ._fdt
156+
157+ @property
158+ def fdt_json (self ) -> Optional [dict [str , Any ]]:
159+ if self .fdt is not None :
160+ return literal_eval (self .fdt .to_json ().replace ("null" , "None" ))
161+ return None
162+
144163 def _fdclose (self , fd : BinaryIO ) -> None :
145164 self ._open_files -= 1
146165 if self ._closefd and not self ._open_files :
@@ -200,6 +219,32 @@ def _decompress_kernel(self) -> bytes:
200219 return zlib .decompress (data [start :], wbits = 31 )
201220 raise Exception ("unreachable" )
202221
222+ def _get_fdt (self ) -> Optional [Fdt ]:
223+ # At most 2 FDTs can be found in a firmware, and usually only one.
224+ # Most of the time it's in the fdt section or in the decompressed kernel.
225+ # Hardware versions starting with IPC_30 or IPC_32 have 1 FDT
226+ # in the decompressed kernel.
227+ # Hardware versions starting with IPC_35, IPC_36 or IPC_38 have no FDT.
228+ # HI3536CV100 -> only firmwares with FDT in kernel_section
229+ # Some firmwares with one FDT in the fdt section have a 2nd FDT
230+ # in the U-Boot section with no model.
231+ match = data = None
232+ if "fdt" in self ._sdict :
233+ # Reolink Duo 1: 2 FDTs, section starts with header FKLR (Reolink FDT?)
234+ # Some NVRs: 2 FDTs
235+ data = self .extract_section (self ["fdt" ])
236+ match = RE_FDT_HEADER .search (data )
237+ elif (match := RE_FDT_HEADER .search (self .kernel_section )) is not None :
238+ data = self .kernel_section
239+ elif (match := RE_FDT_HEADER .search (self .kernel )) is not None :
240+ data = self .kernel
241+ if match is not None and data is not None :
242+ start = match .start ()
243+ hdr = FDTHeader .from_buffer_copy (data , start )
244+ end = start + hdr .totalsize
245+ return FdtBlobParse (io .BytesIO (data [start :end ])).to_fdt ()
246+ return None
247+
203248 def open (self , section : Section ) -> SectionFile :
204249 self ._open_files += 1
205250 return SectionFile (self ._fd , section , self ._fdclose )
@@ -338,6 +383,9 @@ def extract(self, dest: Optional[Path] = None, force: bool = False) -> None:
338383 if (kcfg := self .get_kernel_config ()) is not None :
339384 with open (dest / ".config" , mode ) as f :
340385 f .write (kcfg )
386+ if self .fdt is not None :
387+ with open (dest / "camera.dts" , 'w' if force else 'x' , encoding = "utf8" ) as f :
388+ f .write (self .fdt .to_dts ())
341389
342390
343391async def download (url : StrOrURL ) -> Union [bytes , int ]:
0 commit comments