Skip to content

Commit 95e52d2

Browse files
committed
More type hints
1 parent 6442ae4 commit 95e52d2

File tree

8 files changed

+99
-51
lines changed

8 files changed

+99
-51
lines changed

reolinkfw/__init__.py

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,22 @@
22
import io
33
import posixpath
44
import re
5+
from collections.abc import Iterable, Mapping
56
from pathlib import Path
6-
from typing import Optional
7+
from typing import IO, Any, BinaryIO, Optional, Union
78
from urllib.parse import parse_qsl, urlparse
89
from zipfile import ZipFile, is_zipfile
910

1011
import aiohttp
12+
from aiohttp.typedefs import StrOrURL
1113
from lxml.etree import fromstring
1214
from lxml.html import document_fromstring
1315
from pakler import PAK, Section, is_pak_file
1416
from pycramfs import Cramfs
1517
from PySquashfsImage import SquashFsImage
1618

1719
from reolinkfw.tmpfile import TempFile
20+
from reolinkfw.typedefs import Buffer, Files, StrPath, StrPathURL
1821
from reolinkfw.ubifs import UBIFS
1922
from reolinkfw.uboot import get_arch_name, get_uboot_version, get_uimage_header
2023
from reolinkfw.util import (
@@ -35,7 +38,7 @@
3538
FS_SECTIONS = ROOTFS_SECTIONS + ["app"]
3639

3740

38-
async def download(url):
41+
async def download(url: StrOrURL) -> Union[bytes, int]:
3942
"""Return resource as bytes.
4043
4144
Return the status code of the request if it is not 200.
@@ -45,7 +48,7 @@ async def download(url):
4548
return await resp.read() if resp.status == 200 else resp.status
4649

4750

48-
def extract_paks(zip) -> list[tuple[str, PAK]]:
51+
def extract_paks(zip: Union[StrPath, IO[bytes]]) -> list[tuple[str, PAK]]:
4952
"""Return a list of tuples, one for each PAK file found in the ZIP.
5053
5154
It is the caller's responsibility to close the PAK files.
@@ -61,8 +64,8 @@ def extract_paks(zip) -> list[tuple[str, PAK]]:
6164
return paks
6265

6366

64-
def get_info_from_files(files):
65-
xml = dict(fromstring(files["dvr.xml"]).items())
67+
def get_info_from_files(files: Mapping[Files, Optional[bytes]]) -> dict[str, Optional[str]]:
68+
xml: dict[str, str] = dict(fromstring(files["dvr.xml"]).items())
6669
info = {k: xml.get(k) for k in INFO_KEYS}
6770
info["version_file"] = files["version_file"].decode().strip()
6871
if not info.get("firmware_version_prefix"):
@@ -72,7 +75,7 @@ def get_info_from_files(files):
7275
return info
7376

7477

75-
def get_files_from_squashfs(fd, offset=0, closefd=True):
78+
def get_files_from_squashfs(fd: BinaryIO, offset: int = 0, closefd: bool = True) -> dict[Files, Optional[bytes]]:
7679
# Firmwares using squashfs have either one or two file system
7780
# sections. When there is only one, the app directory is located at
7881
# /mnt/app. Otherwise it's the same as with cramfs and ubifs.
@@ -85,7 +88,7 @@ def get_files_from_squashfs(fd, offset=0, closefd=True):
8588
return files
8689

8790

88-
def get_files_from_ubifs(binbytes):
91+
def get_files_from_ubifs(binbytes: Buffer) -> dict[Files, Optional[bytes]]:
8992
# For now all firmwares using ubifs have two file system sections.
9093
# The interesting files are in the root directory of the "app" one.
9194
# Using select() with a relative path is enough.
@@ -98,7 +101,7 @@ def get_files_from_ubifs(binbytes):
98101
return files
99102

100103

101-
def get_files_from_ubi(fd, size, offset=0):
104+
def get_files_from_ubi(fd: BinaryIO, size: int, offset: int = 0) -> dict[Files, Optional[bytes]]:
102105
fsbytes = get_fs_from_ubi(fd, size, offset)
103106
fs = FileType.from_magic(fsbytes[:4])
104107
if fs == FileType.UBIFS:
@@ -108,7 +111,7 @@ def get_files_from_ubi(fd, size, offset=0):
108111
raise Exception("Unknown file system in UBI")
109112

110113

111-
def get_files_from_cramfs(fd, offset=0, closefd=True):
114+
def get_files_from_cramfs(fd: BinaryIO, offset: int = 0, closefd: bool = True) -> dict[Files, Optional[bytes]]:
112115
# For now all firmwares using cramfs have two file system sections.
113116
# The interesting files are in the root directory of the "app" one.
114117
# Using select() with a relative path is enough.
@@ -120,15 +123,15 @@ def get_files_from_cramfs(fd, offset=0, closefd=True):
120123
return files
121124

122125

123-
def is_url(string):
126+
def is_url(string: StrOrURL) -> bool:
124127
return str(string).startswith("http")
125128

126129

127-
def is_local_file(string):
130+
def is_local_file(string: StrPath) -> bool:
128131
return Path(string).is_file()
129132

130133

131-
def get_fs_info(pak: PAK, fs_sections: list[Section]) -> list[dict[str, str]]:
134+
def get_fs_info(pak: PAK, fs_sections: Iterable[Section]) -> list[dict[str, str]]:
132135
result = []
133136
for section in fs_sections:
134137
pak._fd.seek(section.start)
@@ -143,7 +146,7 @@ def get_fs_info(pak: PAK, fs_sections: list[Section]) -> list[dict[str, str]]:
143146
return result
144147

145148

146-
async def get_info_from_pak(pak: PAK):
149+
async def get_info_from_pak(pak: PAK) -> dict[str, Any]:
147150
ha = await asyncio.to_thread(sha256_pak, pak)
148151
fs_sections = [s for s in pak.sections if s.name in FS_SECTIONS]
149152
app = fs_sections[-1]
@@ -169,7 +172,7 @@ async def get_info_from_pak(pak: PAK):
169172
}
170173

171174

172-
async def direct_download_url(url):
175+
async def direct_download_url(url: str) -> str:
173176
if url.startswith("https://drive.google.com/file/d/"):
174177
return f"https://drive.google.com/uc?id={url.split('/')[5]}&confirm=t"
175178
elif url.startswith("https://www.mediafire.com/file/"):
@@ -182,7 +185,7 @@ async def direct_download_url(url):
182185
return url
183186

184187

185-
async def get_paks(file_or_url, use_cache: bool = True) -> list[tuple[Optional[str], PAK]]:
188+
async def get_paks(file_or_url: StrPathURL, use_cache: bool = True) -> list[tuple[Optional[str], PAK]]:
186189
"""Return PAK files read from an on-disk file or a URL.
187190
188191
The file or resource may be a ZIP or a PAK. On success return a
@@ -191,6 +194,7 @@ async def get_paks(file_or_url, use_cache: bool = True) -> list[tuple[Optional[s
191194
be None. If the file is a ZIP the list might be empty.
192195
It is the caller's responsibility to close the PAK files.
193196
"""
197+
file_or_url = str(file_or_url)
194198
if is_url(file_or_url):
195199
if use_cache and has_cache(file_or_url):
196200
return await get_paks(get_cache_file(file_or_url))
@@ -219,7 +223,7 @@ async def get_paks(file_or_url, use_cache: bool = True) -> list[tuple[Optional[s
219223
raise Exception("Not a URL or file")
220224

221225

222-
async def get_info(file_or_url, use_cache: bool = True):
226+
async def get_info(file_or_url: StrPathURL, use_cache: bool = True) -> list[dict[str, Any]]:
223227
"""Retrieve firmware info from an on-disk file or a URL.
224228
225229
The file or resource may be a ZIP or a PAK.

reolinkfw/__main__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ async def extract(args: Namespace) -> None:
5555
pakfile.close()
5656

5757

58-
def main():
58+
def main() -> None:
5959
parser = ArgumentParser(description="Extract information and files from Reolink firmwares")
6060
parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {__version__}")
6161
subparsers = parser.add_subparsers(required=True)

reolinkfw/extract.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from contextlib import redirect_stdout
22
from io import StringIO
33
from pathlib import Path
4+
from typing import Optional
45

56
from pakler import PAK, Section
67
from pycramfs import Cramfs
@@ -17,7 +18,7 @@
1718
from reolinkfw.util import FileType, closing_ubifile, get_fs_from_ubi
1819

1920

20-
def extract_file_system(pak: PAK, section: Section, dest: Path = None):
21+
def extract_file_system(pak: PAK, section: Section, dest: Optional[Path] = None) -> None:
2122
dest = (Path.cwd() / "reolink_fs") if dest is None else dest
2223
dest.mkdir(parents=True, exist_ok=True)
2324
pak._fd.seek(section.start)
@@ -47,7 +48,7 @@ def extract_file_system(pak: PAK, section: Section, dest: Path = None):
4748
raise Exception("Unknown file system")
4849

4950

50-
def extract_pak(pak: PAK, dest: Path = None, force: bool = False):
51+
def extract_pak(pak: PAK, dest: Optional[Path] = None, force: bool = False) -> None:
5152
dest = (Path.cwd() / "reolink_firmware") if dest is None else dest
5253
dest.mkdir(parents=True, exist_ok=force)
5354
rootfsdir = [s.name for s in pak.sections if s.name in ROOTFS_SECTIONS][0]

reolinkfw/tmpfile.py

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,62 @@
1+
from __future__ import annotations
2+
13
import os
24
import platform
35
import sys
46
import tempfile
57
from abc import abstractmethod
68

9+
from reolinkfw.typedefs import Buffer
10+
711

812
class TempFileFromBytes(os.PathLike):
913

10-
def __init__(self, filebytes):
14+
def __init__(self, filebytes: Buffer) -> None:
1115
self._filebytes = filebytes
1216
self._fd = -1
1317
self._path = None
1418

15-
def __enter__(self):
19+
def __enter__(self) -> TempFileFromBytes:
1620
self.open()
1721
return self
18-
19-
def __exit__(self, exc_type, exc_value, traceback):
22+
23+
def __exit__(self, exc_type, exc_value, traceback) -> None:
2024
self.close()
21-
22-
def __fspath__(self):
25+
26+
def __fspath__(self) -> str:
2327
return self._path
24-
28+
2529
@abstractmethod
26-
def open(self):
30+
def open(self) -> int:
2731
...
28-
32+
2933
@abstractmethod
30-
def close(self):
34+
def close(self) -> None:
3135
...
3236

3337

3438
class LinuxInMemoryFile(TempFileFromBytes):
3539

36-
def open(self):
40+
def open(self) -> int:
3741
self._fd = os.memfd_create("temp")
3842
os.write(self._fd, self._filebytes)
3943
self._path = f"/proc/self/fd/{self._fd}"
4044
return self._fd
41-
42-
def close(self):
45+
46+
def close(self) -> None:
4347
os.close(self._fd)
4448
self._fd = -1
4549
self._path = None
4650

4751

4852
class OnDiskTempFile(TempFileFromBytes):
49-
50-
def open(self):
53+
54+
def open(self) -> int:
5155
self._fd, self._path = tempfile.mkstemp()
5256
os.write(self._fd, self._filebytes)
5357
return self._fd
5458

55-
def close(self):
59+
def close(self) -> None:
5660
os.close(self._fd)
5761
os.unlink(self._path)
5862
self._fd = -1

reolinkfw/typedefs.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import collections.abc
2+
from os import PathLike
3+
from typing import AnyStr, Literal, Union
4+
5+
from aiohttp.typedefs import StrOrURL
6+
7+
if hasattr(collections.abc, "Buffer"):
8+
Buffer = collections.abc.Buffer
9+
else:
10+
Buffer = Union[bytes, bytearray, memoryview]
11+
12+
Files = Literal["version_file", "version.json", "dvr.xml", "dvr", "router"]
13+
GenericPath = Union[AnyStr, PathLike[AnyStr]]
14+
StrPath = Union[str, PathLike[str]]
15+
StrPathURL = Union[StrPath, StrOrURL]
16+
StrOrBytesPath = Union[StrPath, bytes, PathLike[bytes]]
17+
FileDescriptorOrPath = Union[int, StrOrBytesPath]

reolinkfw/ubifs.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
from __future__ import annotations
22

3+
from collections.abc import Mapping
34
from pathlib import PurePosixPath
45
from stat import filemode
5-
from typing import Iterator, Literal, Optional
6+
from typing import Any, Iterator, Literal, Optional
67

78
from ubireader.ubifs import ubifs, walk
89
from ubireader.ubifs.defines import (
@@ -25,11 +26,12 @@
2526
from ubireader.utils import guess_leb_size
2627

2728
from reolinkfw.tmpfile import TempFile
29+
from reolinkfw.typedefs import Buffer, FileDescriptorOrPath, StrPath
2830

2931

3032
class File:
3133

32-
def __init__(self, image: UBIFS, nodes: dict, name: str = '', parent: Optional[Directory] = None) -> None:
34+
def __init__(self, image: UBIFS, nodes: Mapping[str, Any], name: str = '', parent: Optional[Directory] = None) -> None:
3335
self._image = image
3436
self._nodes = nodes
3537
self._name = name
@@ -108,9 +110,9 @@ def readlink(self) -> PurePosixPath:
108110

109111
class Directory(File):
110112

111-
def __init__(self, image: UBIFS, nodes: dict, name: str = '', parent: Optional[Directory] = None) -> None:
113+
def __init__(self, image: UBIFS, nodes: Mapping[str, Any], name: str = '', parent: Optional[Directory] = None) -> None:
112114
super().__init__(image, nodes, name, parent)
113-
self._children = {}
115+
self._children: dict[str, File] = {}
114116
for dent in nodes.get("dent", []):
115117
cls = filetype[dent.type]
116118
self._children[dent.name] = cls(image, image.inodes[dent.inum], dent.name, self)
@@ -126,7 +128,7 @@ def is_dir(self) -> Literal[True]:
126128
def children(self) -> dict[str, File]:
127129
return self._children
128130

129-
def select(self, path) -> Optional[File]:
131+
def select(self, path: StrPath) -> Optional[File]:
130132
"""Select a file of any kind by path.
131133
132134
The path can be absolute or relative.
@@ -163,7 +165,7 @@ class UBIFS:
163165

164166
def __init__(self, ubifs: ubifs) -> None:
165167
self._ubifs = ubifs
166-
self._inodes = {}
168+
self._inodes: dict[int, dict[str, Any]] = {}
167169
self._bad_blocks = []
168170
walk.index(ubifs, ubifs.master_node.root_lnum, ubifs.master_node.root_offs, self._inodes, self._bad_blocks)
169171
self._root = Directory(self, self._inodes[UBIFS_ROOT_INO])
@@ -178,7 +180,7 @@ def __iter__(self) -> Iterator[File]:
178180
yield from self._root.riter()
179181

180182
@property
181-
def inodes(self) -> dict:
183+
def inodes(self) -> dict[int, dict[str, Any]]:
182184
return self._inodes
183185

184186
@property
@@ -188,15 +190,15 @@ def root(self) -> Directory:
188190
def close(self) -> None:
189191
self._ubifs._file._fhandle.close()
190192

191-
def select(self, path) -> Optional[File]:
193+
def select(self, path: StrPath) -> Optional[File]:
192194
return self._root.select(path)
193195

194196
@classmethod
195-
def from_file(cls, path) -> UBIFS:
197+
def from_file(cls, path: FileDescriptorOrPath) -> UBIFS:
196198
return cls(ubifs(ubi_file(path, guess_leb_size(path))))
197199

198200
@classmethod
199-
def from_bytes(cls, bytes_, offset: int = 0) -> UBIFS:
201+
def from_bytes(cls, bytes_: Buffer, offset: int = 0) -> UBIFS:
200202
chdr = common_hdr(bytes_[offset:offset+UBIFS_COMMON_HDR_SZ])
201203
if chdr.magic != int.from_bytes(UBIFS_NODE_MAGIC, "little") or chdr.node_type != UBIFS_SB_NODE:
202204
raise Exception("Not UBIFS")

0 commit comments

Comments
 (0)