22import io
33import posixpath
44import re
5- from collections .abc import Iterable , Mapping
5+ from collections .abc import Iterable , Iterator , Mapping
66from pathlib import Path
77from typing import IO , Any , BinaryIO , Optional , Union
88from urllib .parse import parse_qsl , urlparse
2222from reolinkfw .uboot import get_arch_name , get_uboot_version , get_uimage_header
2323from reolinkfw .util import (
2424 FileType ,
25+ SectionFile ,
2526 get_cache_file ,
2627 get_fs_from_ubi ,
2728 has_cache ,
3435FILES = ("version_file" , "version.json" , "dvr.xml" , "dvr" , "router" )
3536INFO_KEYS = ("firmware_version_prefix" , "board_type" , "board_name" , "build_date" , "display_type_info" , "detail_machine_type" , "type" )
3637
37- ROOTFS_SECTIONS = ["fs" , "rootfs" ]
38- FS_SECTIONS = ROOTFS_SECTIONS + ["app" ]
38+ UBOOT_SECTIONS = ("uboot" , "uboot1" , "BOOT" )
39+ KERNEL_SECTIONS = ("kernel" , "KERNEL" )
40+ ROOTFS_SECTIONS = ("fs" , "rootfs" )
41+ FS_SECTIONS = ROOTFS_SECTIONS + ("app" ,)
42+
43+
44+ class ReolinkFirmware (PAK ):
45+
46+ def __init__ (self , fd : BinaryIO , offset : int = 0 , closefd : bool = True ) -> None :
47+ super ().__init__ (fd , offset , closefd )
48+ self ._uboot_section_name = self ._get_uboot_section_name ()
49+ self ._kernel_section_name = self ._get_kernel_section_name ()
50+ self ._sdict = {s .name : s for s in self }
51+ self ._open_files = 1
52+
53+ def __del__ (self ) -> None :
54+ self .close ()
55+
56+ def __getitem__ (self , key : Union [int , str ]) -> Section :
57+ if isinstance (key , int ):
58+ return self .sections [key ]
59+ if isinstance (key , str ):
60+ if key .lower () == "uboot" :
61+ key = self ._uboot_section_name
62+ elif key .lower () == "kernel" :
63+ key = self ._kernel_section_name
64+ return self ._sdict [key ]
65+ raise TypeError
66+
67+ def __iter__ (self ) -> Iterator [Section ]:
68+ yield from self .sections
69+
70+ def _fdclose (self , fd : BinaryIO ) -> None :
71+ self ._open_files -= 1
72+ if self ._closefd and not self ._open_files :
73+ fd .close ()
74+
75+ def _get_uboot_section_name (self ) -> str :
76+ for section in self :
77+ if section .len and section .name in UBOOT_SECTIONS :
78+ return section .name
79+ raise Exception ("U-Boot section not found" )
80+
81+ def _get_kernel_section_name (self ) -> str :
82+ for section in self :
83+ if section .len and section .name in KERNEL_SECTIONS :
84+ return section .name
85+ raise Exception ("Kernel section not found" )
86+
87+ def open (self , section : Section ) -> SectionFile :
88+ self ._open_files += 1
89+ return SectionFile (self ._fd , section , self ._fdclose )
90+
91+ def close (self ) -> None :
92+ if self ._fd is not None :
93+ self ._fdclose (self ._fd )
94+ self ._fd = None
3995
4096
4197async def download (url : StrOrURL ) -> Union [bytes , int ]:
@@ -48,7 +104,7 @@ async def download(url: StrOrURL) -> Union[bytes, int]:
48104 return await resp .read () if resp .status == 200 else resp .status
49105
50106
51- def extract_paks (zip : Union [StrPath , IO [bytes ]]) -> list [tuple [str , PAK ]]:
107+ def extract_paks (zip : Union [StrPath , IO [bytes ]]) -> list [tuple [str , ReolinkFirmware ]]:
52108 """Return a list of tuples, one for each PAK file found in the ZIP.
53109
54110 It is the caller's responsibility to close the PAK files.
@@ -58,7 +114,7 @@ def extract_paks(zip: Union[StrPath, IO[bytes]]) -> list[tuple[str, PAK]]:
58114 for name in myzip .namelist ():
59115 file = myzip .open (name )
60116 if is_pak_file (file ):
61- paks .append ((file .name , PAK .from_fd (file )))
117+ paks .append ((file .name , ReolinkFirmware .from_fd (file )))
62118 else :
63119 file .close ()
64120 return paks
@@ -93,8 +149,8 @@ def get_files_from_ubifs(binbytes: Buffer) -> dict[Files, Optional[bytes]]:
93149 # The interesting files are in the root directory of the "app" one.
94150 # Using select() with a relative path is enough.
95151 files = dict .fromkeys (FILES )
96- with TempFile (binbytes ) as file :
97- with UBIFS .from_file (file ) as image :
152+ with TempFile (binbytes ) as tempfile :
153+ with UBIFS .from_file (tempfile ) as image :
98154 for name in files :
99155 if (file := image .select (name )) is not None :
100156 files [name ] = file .read_bytes ()
@@ -131,43 +187,43 @@ def is_local_file(string: StrPath) -> bool:
131187 return Path (string ).is_file ()
132188
133189
134- def get_fs_info (pak : PAK , fs_sections : Iterable [Section ]) -> list [dict [str , str ]]:
190+ def get_fs_info (fw : ReolinkFirmware , fs_sections : Iterable [Section ]) -> list [dict [str , str ]]:
135191 result = []
136192 for section in fs_sections :
137- pak . _fd . seek (section . start )
138- fs = FileType .from_magic (pak . _fd .read (4 ))
139- if fs == FileType .UBI :
140- pak . _fd . seek (section . start + 266240 )
141- fs = FileType .from_magic (pak . _fd .read (4 ))
193+ with fw . open (section ) as f :
194+ fs = FileType .from_magic (f .read (4 ))
195+ if fs == FileType .UBI :
196+ f . seek (266240 )
197+ fs = FileType .from_magic (f .read (4 ))
142198 result .append ({
143199 "name" : section .name ,
144200 "type" : fs .name .lower () if fs is not None else "unknown"
145201 })
146202 return result
147203
148204
149- async def get_info_from_pak (pak : PAK ) -> dict [str , Any ]:
150- ha = await asyncio .to_thread (sha256_pak , pak )
151- fs_sections = [s for s in pak . sections if s .name in FS_SECTIONS ]
205+ async def get_info_from_pak (fw : ReolinkFirmware ) -> dict [str , Any ]:
206+ ha = await asyncio .to_thread (sha256_pak , fw )
207+ fs_sections = [s for s in fw if s .name in FS_SECTIONS ]
152208 app = fs_sections [- 1 ]
153- pak . _fd . seek (app . start )
154- fs = FileType .from_magic (pak . _fd .read (4 ))
155- if fs == FileType .CRAMFS :
156- files = await asyncio .to_thread (get_files_from_cramfs , pak . _fd , app . start , False )
157- elif fs == FileType .UBI :
158- files = await asyncio .to_thread (get_files_from_ubi , pak . _fd , app .len , app . start )
159- elif fs == FileType .SQUASHFS :
160- files = await asyncio .to_thread (get_files_from_squashfs , pak . _fd , app . start , False )
161- else :
162- return {"error" : "Unrecognized image type" , "sha256" : ha }
163- uimage = get_uimage_header (pak )
209+ with fw . open (app ) as f :
210+ fs = FileType .from_magic (f .read (4 ))
211+ if fs == FileType .CRAMFS :
212+ files = await asyncio .to_thread (get_files_from_cramfs , f , 0 , False )
213+ elif fs == FileType .UBI :
214+ files = await asyncio .to_thread (get_files_from_ubi , f , app .len , 0 )
215+ elif fs == FileType .SQUASHFS :
216+ files = await asyncio .to_thread (get_files_from_squashfs , f , 0 , False )
217+ else :
218+ return {"error" : "Unrecognized image type" , "sha256" : ha }
219+ uimage = get_uimage_header (fw )
164220 return {
165221 ** get_info_from_files (files ),
166222 "os" : "Linux" if uimage .os == 5 else "Unknown" ,
167223 "architecture" : get_arch_name (uimage .arch ),
168224 "kernel_image_name" : uimage .name ,
169- "uboot_version" : get_uboot_version (pak ),
170- "filesystems" : get_fs_info (pak , fs_sections ),
225+ "uboot_version" : get_uboot_version (fw ),
226+ "filesystems" : get_fs_info (fw , fs_sections ),
171227 "sha256" : ha
172228 }
173229
@@ -185,7 +241,7 @@ async def direct_download_url(url: str) -> str:
185241 return url
186242
187243
188- async def get_paks (file_or_url : StrPathURL , use_cache : bool = True ) -> list [tuple [Optional [str ], PAK ]]:
244+ async def get_paks (file_or_url : StrPathURL , use_cache : bool = True ) -> list [tuple [Optional [str ], ReolinkFirmware ]]:
189245 """Return PAK files read from an on-disk file or a URL.
190246
191247 The file or resource may be a ZIP or a PAK. On success return a
@@ -206,7 +262,7 @@ async def get_paks(file_or_url: StrPathURL, use_cache: bool = True) -> list[tupl
206262 if use_cache :
207263 make_cache_file (file_or_url , zip_or_pak_bytes , pakname )
208264 if is_pak_file (zip_or_pak_bytes ):
209- return [(pakname , PAK .from_bytes (zip_or_pak_bytes ))]
265+ return [(pakname , ReolinkFirmware .from_bytes (zip_or_pak_bytes ))]
210266 else :
211267 zipfile = io .BytesIO (zip_or_pak_bytes )
212268 if is_zipfile (zipfile ):
@@ -218,7 +274,7 @@ async def get_paks(file_or_url: StrPathURL, use_cache: bool = True) -> list[tupl
218274 if is_zipfile (file_or_url ):
219275 return await asyncio .to_thread (extract_paks , file_or_url )
220276 elif is_pak_file (file_or_url ):
221- return [(file_or_url .name , PAK .from_file (file_or_url ))]
277+ return [(file_or_url .name , ReolinkFirmware .from_file (file_or_url ))]
222278 raise Exception ("Not a ZIP or a PAK file" )
223279 raise Exception ("Not a URL or file" )
224280
0 commit comments