Skip to content

Commit 6aa257c

Browse files
committed
UBIFS changes
- Add (incomplete) API for easier interaction with UBIFS - DummyLEB is basically the same as pre-existing ubi_file - Using leb_virtual_file in get_fs_from_ubi was not necessary
1 parent 3530887 commit 6aa257c

File tree

4 files changed

+242
-55
lines changed

4 files changed

+242
-55
lines changed

reolinkfw/__init__.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,11 @@
1313
from pakler import PAK, Section, is_pak_file
1414
from pycramfs import Cramfs
1515
from PySquashfsImage import SquashFsImage
16-
from ubireader.ubifs import ubifs, walk
17-
from ubireader.ubifs.output import _process_reg_file
1816

17+
from reolinkfw.tmpfile import TempFile
18+
from reolinkfw.ubifs import UBIFS
1919
from reolinkfw.uboot import get_arch_name, get_uboot_version, get_uimage_header
2020
from reolinkfw.util import (
21-
DummyLEB,
2221
FileType,
2322
get_cache_file,
2423
get_fs_from_ubi,
@@ -87,15 +86,15 @@ def get_files_from_squashfs(fd, offset=0, closefd=True):
8786

8887

8988
def get_files_from_ubifs(binbytes):
89+
# For now all firmwares using ubifs have two file system sections.
90+
# The interesting files are in the root directory of the "app" one.
91+
# Using select() with a relative path is enough.
9092
files = dict.fromkeys(FILES)
91-
with DummyLEB.from_bytes(binbytes) as leb:
92-
image = ubifs(leb)
93-
inodes = {}
94-
bad_blocks = []
95-
walk.index(image, image.master_node.root_lnum, image.master_node.root_offs, inodes, bad_blocks)
96-
for dent in inodes[1]['dent']:
97-
if dent.name in files:
98-
files[dent.name] = _process_reg_file(image, inodes[dent.inum], None)
93+
with TempFile(binbytes) as file:
94+
with UBIFS.from_file(file) as image:
95+
for name in files:
96+
if (file := image.select(name)) is not None:
97+
files[name] = file.read_bytes()
9998
return files
10099

101100

reolinkfw/extract.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@
99
from PySquashfsImage.extract import extract_dir as extract_squashfs
1010
from ubireader.ubifs import ubifs
1111
from ubireader.ubifs.output import extract_files as extract_ubifs
12+
from ubireader.ubi_io import ubi_file
13+
from ubireader.utils import guess_leb_size
1214

1315
from reolinkfw import FS_SECTIONS, ROOTFS_SECTIONS
14-
from reolinkfw.util import DummyLEB, FileType, get_fs_from_ubi
16+
from reolinkfw.tmpfile import TempFile
17+
from reolinkfw.util import FileType, closing_ubifile, get_fs_from_ubi
1518

1619

1720
def extract_file_system(pak: PAK, section: Section, dest: Path = None):
@@ -23,10 +26,12 @@ def extract_file_system(pak: PAK, section: Section, dest: Path = None):
2326
fs_bytes = get_fs_from_ubi(pak._fd, section.len, section.start)
2427
fs = FileType.from_magic(fs_bytes[:4])
2528
if fs == FileType.UBIFS:
26-
with DummyLEB.from_bytes(fs_bytes) as leb:
27-
with redirect_stdout(StringIO()):
28-
# If files already exist they are not written again.
29-
extract_ubifs(ubifs(leb), dest)
29+
with TempFile(fs_bytes) as file:
30+
block_size = guess_leb_size(file)
31+
with closing_ubifile(ubi_file(file, block_size)) as ubifile:
32+
with redirect_stdout(StringIO()):
33+
# Files that already exist are not written again.
34+
extract_ubifs(ubifs(ubifile), dest)
3035
elif fs == FileType.SQUASHFS:
3136
with SquashFsImage.from_bytes(fs_bytes) as image:
3237
extract_squashfs(image.root, dest, True)

reolinkfw/ubifs.py

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
from __future__ import annotations
2+
3+
from pathlib import PurePosixPath
4+
from stat import filemode
5+
from typing import Iterator, Literal, Optional
6+
7+
from ubireader.ubifs import ubifs, walk
8+
from ubireader.ubifs.defines import (
9+
UBIFS_COMMON_HDR_SZ,
10+
UBIFS_ITYPE_BLK,
11+
UBIFS_ITYPE_CHR,
12+
UBIFS_ITYPE_DIR,
13+
UBIFS_ITYPE_FIFO,
14+
UBIFS_ITYPE_LNK,
15+
UBIFS_ITYPE_REG,
16+
UBIFS_ITYPE_SOCK,
17+
UBIFS_NODE_MAGIC,
18+
UBIFS_ROOT_INO,
19+
UBIFS_SB_NODE,
20+
UBIFS_SB_NODE_SZ,
21+
)
22+
from ubireader.ubifs.nodes import common_hdr, ino_node, sb_node
23+
from ubireader.ubifs.output import _process_reg_file
24+
from ubireader.ubi_io import ubi_file
25+
from ubireader.utils import guess_leb_size
26+
27+
from reolinkfw.tmpfile import TempFile
28+
29+
30+
class File:
31+
32+
def __init__(self, image: UBIFS, nodes: dict, name: str = '', parent: Optional[Directory] = None) -> None:
33+
self._image = image
34+
self._nodes = nodes
35+
self._name = name
36+
self._parent = parent
37+
38+
@property
39+
def name(self) -> str:
40+
return self._name
41+
42+
@property
43+
def parent(self) -> Optional[Directory]:
44+
return self._parent
45+
46+
@property
47+
def path(self) -> PurePosixPath:
48+
if self._parent is None:
49+
return PurePosixPath('/')
50+
return self._parent.path / self._name
51+
52+
@property
53+
def inode(self) -> ino_node:
54+
return self._nodes["ino"]
55+
56+
@property
57+
def mode(self) -> int:
58+
return self.inode.mode
59+
60+
@property
61+
def filemode(self) -> str:
62+
return filemode(self.mode)
63+
64+
@property
65+
def is_dir(self) -> bool:
66+
return False
67+
68+
@property
69+
def is_file(self) -> bool:
70+
return False
71+
72+
@property
73+
def is_symlink(self) -> bool:
74+
return False
75+
76+
77+
class DataFile(File):
78+
79+
def read_bytes(self) -> bytes:
80+
...
81+
82+
def read_text(self, encoding: str = "utf8", errors: str = "strict") -> str:
83+
return self.read_bytes().decode(encoding, errors)
84+
85+
86+
class RegularFile(DataFile):
87+
88+
@property
89+
def is_file(self) -> Literal[True]:
90+
return True
91+
92+
def read_bytes(self) -> bytes:
93+
return _process_reg_file(self._image._ubifs, self._nodes, None)
94+
95+
96+
class Symlink(DataFile):
97+
98+
@property
99+
def is_symlink(self) -> Literal[True]:
100+
return True
101+
102+
def read_bytes(self) -> bytes:
103+
return self.inode.data
104+
105+
def readlink(self) -> PurePosixPath:
106+
return PurePosixPath(self.read_text())
107+
108+
109+
class Directory(File):
110+
111+
def __init__(self, image: UBIFS, nodes: dict, name: str = '', parent: Optional[Directory] = None) -> None:
112+
super().__init__(image, nodes, name, parent)
113+
self._children = {}
114+
for dent in nodes.get("dent", []):
115+
cls = filetype[dent.type]
116+
self._children[dent.name] = cls(image, image.inodes[dent.inum], dent.name, self)
117+
118+
def __iter__(self) -> Iterator[File]:
119+
yield from self._children.values()
120+
121+
@property
122+
def is_dir(self) -> Literal[True]:
123+
return True
124+
125+
@property
126+
def children(self) -> dict[str, File]:
127+
return self._children
128+
129+
def select(self, path) -> Optional[File]:
130+
"""Select a file of any kind by path.
131+
132+
The path can be absolute or relative.
133+
Special entries `'.'` and `'..'` are supported.
134+
"""
135+
path = PurePosixPath(path)
136+
if str(path) == "..":
137+
return self.parent if self.parent is not None else self
138+
if path.root == '/':
139+
if str(self.path) == '/':
140+
path = path.relative_to('/')
141+
else:
142+
return self._image.root.select(path)
143+
if str(path) == '.':
144+
return self
145+
child, *descendants = path.parts
146+
if (file := self._children.get(child)) is not None:
147+
if isinstance(file, Directory) and descendants:
148+
return file.select(PurePosixPath(*descendants))
149+
elif not descendants:
150+
return file
151+
return None
152+
153+
def riter(self) -> Iterator[File]:
154+
yield self
155+
for file in self:
156+
if isinstance(file, Directory):
157+
yield from file.riter()
158+
else:
159+
yield file
160+
161+
162+
class UBIFS:
163+
164+
def __init__(self, ubifs: ubifs) -> None:
165+
self._ubifs = ubifs
166+
self._inodes = {}
167+
self._bad_blocks = []
168+
walk.index(ubifs, ubifs.master_node.root_lnum, ubifs.master_node.root_offs, self._inodes, self._bad_blocks)
169+
self._root = Directory(self, self._inodes[UBIFS_ROOT_INO])
170+
171+
def __enter__(self) -> UBIFS:
172+
return self
173+
174+
def __exit__(self, exc_type, exc_value, traceback) -> None:
175+
self.close()
176+
177+
def __iter__(self) -> Iterator[File]:
178+
yield from self._root.riter()
179+
180+
@property
181+
def inodes(self) -> dict:
182+
return self._inodes
183+
184+
@property
185+
def root(self) -> Directory:
186+
return self._root
187+
188+
def close(self) -> None:
189+
self._ubifs._file._fhandle.close()
190+
191+
def select(self, path) -> Optional[File]:
192+
return self._root.select(path)
193+
194+
@classmethod
195+
def from_file(cls, path) -> UBIFS:
196+
return cls(ubifs(ubi_file(path, guess_leb_size(path))))
197+
198+
@classmethod
199+
def from_bytes(cls, bytes_, offset: int = 0) -> UBIFS:
200+
chdr = common_hdr(bytes_[offset:offset+UBIFS_COMMON_HDR_SZ])
201+
if chdr.magic != int.from_bytes(UBIFS_NODE_MAGIC, "little") or chdr.node_type != UBIFS_SB_NODE:
202+
raise Exception("Not UBIFS")
203+
sb_start = offset + UBIFS_COMMON_HDR_SZ
204+
sb_end = sb_start + UBIFS_SB_NODE_SZ
205+
sblk = sb_node(bytes_[sb_start:sb_end])
206+
tmpfile = TempFile(bytes_[offset:])
207+
tmpfile.open() # tmpfile's close() will not be called.
208+
return cls(ubifs(ubi_file(tmpfile, sblk.leb_size)))
209+
210+
211+
filetype = {
212+
UBIFS_ITYPE_REG: RegularFile,
213+
UBIFS_ITYPE_DIR: Directory,
214+
UBIFS_ITYPE_LNK: Symlink,
215+
UBIFS_ITYPE_BLK: File,
216+
UBIFS_ITYPE_CHR: File,
217+
UBIFS_ITYPE_FIFO: File,
218+
UBIFS_ITYPE_SOCK: File,
219+
}

reolinkfw/util.py

Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import hashlib
2-
import io
32
from contextlib import contextmanager
43
from enum import Enum
54
from functools import partial
@@ -14,7 +13,7 @@
1413
from PySquashfsImage.const import SQUASHFS_MAGIC
1514
from ubireader.ubi import ubi
1615
from ubireader.ubi.defines import UBI_EC_HDR_MAGIC as UBI_MAGIC
17-
from ubireader.ubi_io import ubi_file, leb_virtual_file
16+
from ubireader.ubi_io import ubi_file
1817
from ubireader.ubifs.defines import UBIFS_NODE_MAGIC as UBIFS_MAGIC
1918
from ubireader.utils import guess_peb_size
2019

@@ -39,41 +38,6 @@ def from_magic(cls, key, default=None):
3938
return default
4039

4140

42-
class DummyLEB:
43-
"""A class that emulates ubireader's `leb_virtual_file`."""
44-
45-
def __init__(self, fd):
46-
self._fd = fd
47-
self._last_read_addr = 0
48-
49-
def __enter__(self):
50-
return self
51-
52-
def __exit__(self, exc_type, exc_value, traceback):
53-
self.close()
54-
55-
def close(self):
56-
self._fd.close()
57-
58-
def read(self, size):
59-
self._last_read_addr = self._fd.tell()
60-
return self._fd.read(size)
61-
62-
def reset(self):
63-
return self._fd.seek(0)
64-
65-
def seek(self, offset):
66-
return self._fd.seek(offset)
67-
68-
def last_read_addr(self):
69-
"""Start address of last physical file read."""
70-
return self._last_read_addr
71-
72-
@classmethod
73-
def from_bytes(cls, bytes_):
74-
return cls(io.BytesIO(bytes_))
75-
76-
7741
@contextmanager
7842
def closing_ubifile(ubifile):
7943
try:
@@ -90,8 +54,8 @@ def get_fs_from_ubi(fd, size, offset=0) -> bytes:
9054
block_size = guess_peb_size(t)
9155
with closing_ubifile(ubi_file(t, block_size)) as ubifile:
9256
ubi_obj = ubi(ubifile)
93-
vol_blocks = ubi_obj.images[0].volumes.popitem()[1].get_blocks(ubi_obj.blocks)
94-
return b''.join(leb_virtual_file(ubi_obj, vol_blocks).reader())
57+
volume = ubi_obj.images[0].volumes.popitem()[1]
58+
return b''.join(volume.reader(ubi_obj))
9559

9660

9761
def sha256_pak(pak: PAK) -> str:

0 commit comments

Comments
 (0)