|
2 | 2 | import io |
3 | 3 | import posixpath |
4 | 4 | import re |
| 5 | +from enum import Enum |
5 | 6 | from pathlib import Path |
6 | 7 | from typing import Optional |
7 | 8 | from urllib.parse import parse_qsl, urlparse |
|
12 | 13 | from lxml.html import document_fromstring |
13 | 14 | from pakler import PAK, is_pak_file |
14 | 15 | from pycramfs import Cramfs |
| 16 | +from pycramfs.const import MAGIC_BYTES as CRAMFS_MAGIC |
15 | 17 | from PySquashfsImage import SquashFsImage |
| 18 | +from PySquashfsImage.const import SQUASHFS_MAGIC |
| 19 | +from ubireader.ubi.defines import UBI_EC_HDR_MAGIC as UBI_MAGIC |
16 | 20 | from ubireader.ubifs import ubifs, walk |
| 21 | +from ubireader.ubifs.defines import UBIFS_NODE_MAGIC as UBIFS_MAGIC |
17 | 22 | from ubireader.ubifs.output import _process_reg_file |
18 | 23 |
|
19 | | -from reolinkfw.util import ( |
20 | | - DummyLEB, |
21 | | - get_fs_from_ubi, |
22 | | - is_cramfs, |
23 | | - is_squashfs, |
24 | | - is_ubi, |
25 | | - is_ubifs, |
26 | | - sha256_pak |
27 | | -) |
| 24 | +from reolinkfw.util import DummyLEB, get_fs_from_ubi, sha256_pak |
28 | 25 |
|
29 | 26 | __version__ = "1.1.0" |
30 | 27 |
|
|
35 | 32 | FS_SECTIONS = ROOTFS_SECTIONS + ["app"] |
36 | 33 |
|
37 | 34 |
|
| 35 | +class FileSystem(Enum): |
| 36 | + CRAMFS = CRAMFS_MAGIC |
| 37 | + SQUASHFS = SQUASHFS_MAGIC.to_bytes(4, "little") |
| 38 | + UBI = UBI_MAGIC # Not a file system |
| 39 | + UBIFS = UBIFS_MAGIC |
| 40 | + |
| 41 | + @classmethod |
| 42 | + def from_magic(cls, key, default=None): |
| 43 | + try: |
| 44 | + return cls(key) |
| 45 | + except ValueError: |
| 46 | + return default |
| 47 | + |
| 48 | + |
38 | 49 | async def download(url): |
39 | 50 | """Return resource as bytes. |
40 | 51 |
|
@@ -111,12 +122,12 @@ def get_files_from_ubifs(binbytes): |
111 | 122 |
|
112 | 123 | def get_files_from_ubi(binbytes): |
113 | 124 | fsbytes = get_fs_from_ubi(binbytes) |
114 | | - if is_ubifs(fsbytes): |
| 125 | + fs = FileSystem.from_magic(fsbytes[:4]) |
| 126 | + if fs == FileSystem.UBIFS: |
115 | 127 | return get_files_from_ubifs(fsbytes) |
116 | | - elif is_squashfs(fsbytes): |
| 128 | + elif fs == FileSystem.SQUASHFS: |
117 | 129 | return get_files_from_squashfs(fsbytes) |
118 | | - else: |
119 | | - raise Exception("unknown file system in UBI") |
| 130 | + raise Exception("Unknown file system in UBI") |
120 | 131 |
|
121 | 132 |
|
122 | 133 | def get_files_from_cramfs(binbytes): |
@@ -144,13 +155,12 @@ async def get_info_from_pak(pak: PAK): |
144 | 155 | binbytes = await asyncio.to_thread(extract_fs, pak) |
145 | 156 | if isinstance(binbytes, str): |
146 | 157 | return {"error": binbytes, "sha256": ha} |
147 | | - if is_cramfs(binbytes): |
148 | | - func = get_files_from_cramfs |
149 | | - elif is_ubi(binbytes): |
150 | | - func = get_files_from_ubi |
151 | | - elif is_squashfs(binbytes): |
152 | | - func = get_files_from_squashfs |
153 | | - else: |
| 158 | + func = { |
| 159 | + FileSystem.CRAMFS: get_files_from_cramfs, |
| 160 | + FileSystem.UBI: get_files_from_ubi, |
| 161 | + FileSystem.SQUASHFS: get_files_from_squashfs, |
| 162 | + }.get(FileSystem.from_magic(binbytes[:4])) |
| 163 | + if func is None: |
154 | 164 | return {"error": "Unrecognized image type", "sha256": ha} |
155 | 165 | files = await asyncio.to_thread(func, binbytes) |
156 | 166 | info = get_info_from_files(files) |
|
0 commit comments