Skip to content

Commit da5f3ca

Browse files
committed
Work with file objects instead of bytes
When dealing with cramfs and squashfs we can use the PAK's file object directly instead of loading the section in memory. Unfortunately UBI is a bit harder to work with.
1 parent 9275b30 commit da5f3ca

File tree

3 files changed

+40
-41
lines changed

3 files changed

+40
-41
lines changed

reolinkfw/__init__.py

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -56,17 +56,6 @@ async def download(url):
5656
return await resp.read() if resp.status == 200 else resp.status
5757

5858

59-
def extract_fs(pak: PAK):
60-
"""Return the fs.bin, app.bin or rootfs.bin file as bytes."""
61-
sections = {s.name: s for s in pak.sections if s.name in FS_SECTIONS}
62-
if len(sections) == 2:
63-
return pak.extract_section(sections["app"])
64-
elif len(sections) == 1:
65-
return pak.extract_section(sections.popitem()[1])
66-
else:
67-
return "No section found"
68-
69-
7059
def extract_paks(zip) -> list[tuple[str, PAK]]:
7160
"""Return a list of tuples, one for each PAK file found in the ZIP.
7261
@@ -94,12 +83,12 @@ def get_info_from_files(files):
9483
return info
9584

9685

97-
def get_files_from_squashfs(binbytes):
86+
def get_files_from_squashfs(fd, offset=0, closefd=True):
9887
# Firmwares using squashfs have either one or two file system
9988
# sections. When there is only one, the app directory is located at
10089
# /mnt/app. Otherwise it's the same as with cramfs and ubifs.
10190
files = dict.fromkeys(FILES)
102-
with SquashFsImage.from_bytes(binbytes) as image:
91+
with SquashFsImage(fd, offset, closefd) as image:
10392
for name in files:
10493
path2 = posixpath.join("/mnt/app", name)
10594
if (file := (image.select(name) or image.select(path2))) is not None:
@@ -120,22 +109,22 @@ def get_files_from_ubifs(binbytes):
120109
return files
121110

122111

123-
def get_files_from_ubi(binbytes):
124-
fsbytes = get_fs_from_ubi(binbytes)
112+
def get_files_from_ubi(fd, size, offset=0):
113+
fsbytes = get_fs_from_ubi(fd, size, offset)
125114
fs = FileSystem.from_magic(fsbytes[:4])
126115
if fs == FileSystem.UBIFS:
127116
return get_files_from_ubifs(fsbytes)
128117
elif fs == FileSystem.SQUASHFS:
129-
return get_files_from_squashfs(fsbytes)
118+
return get_files_from_squashfs(io.BytesIO(fsbytes))
130119
raise Exception("Unknown file system in UBI")
131120

132121

133-
def get_files_from_cramfs(binbytes):
122+
def get_files_from_cramfs(fd, offset=0, closefd=True):
134123
# For now all firmwares using cramfs have two file system sections.
135124
# The interesting files are in the root directory of the "app" one.
136125
# Using select() with a relative path is enough.
137126
files = dict.fromkeys(FILES)
138-
with Cramfs.from_bytes(binbytes) as cramfs:
127+
with Cramfs.from_fd(fd, offset, closefd) as cramfs:
139128
for name in files:
140129
if (file := cramfs.select(name)) is not None:
141130
files[name] = file.read_bytes()
@@ -152,17 +141,18 @@ def is_local_file(string):
152141

153142
async def get_info_from_pak(pak: PAK):
154143
ha = await asyncio.to_thread(sha256_pak, pak)
155-
binbytes = await asyncio.to_thread(extract_fs, pak)
156-
if isinstance(binbytes, str):
157-
return {"error": binbytes, "sha256": ha}
158-
func = {
159-
FileSystem.CRAMFS: get_files_from_cramfs,
160-
FileSystem.UBI: get_files_from_ubi,
161-
FileSystem.SQUASHFS: get_files_from_squashfs,
162-
}.get(FileSystem.from_magic(binbytes[:4]))
163-
if func is None:
144+
fs_sections = [s for s in pak.sections if s.name in FS_SECTIONS]
145+
app = fs_sections[-1]
146+
pak._fd.seek(app.start)
147+
fs = FileSystem.from_magic(pak._fd.read(4))
148+
if fs == FileSystem.CRAMFS:
149+
files = await asyncio.to_thread(get_files_from_cramfs, pak._fd, app.start, False)
150+
elif fs == FileSystem.UBI:
151+
files = await asyncio.to_thread(get_files_from_ubi, pak._fd, app.len, app.start)
152+
elif fs == FileSystem.SQUASHFS:
153+
files = await asyncio.to_thread(get_files_from_squashfs, pak._fd, app.start, False)
154+
else:
164155
return {"error": "Unrecognized image type", "sha256": ha}
165-
files = await asyncio.to_thread(func, binbytes)
166156
info = get_info_from_files(files)
167157
return {**info, "sha256": ha}
168158

reolinkfw/extract.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from io import StringIO
33
from pathlib import Path
44

5-
from pakler import PAK
5+
from pakler import PAK, Section
66
from pycramfs import Cramfs
77
from pycramfs.extract import extract_dir as extract_cramfs
88
from PySquashfsImage import SquashFsImage
@@ -14,22 +14,29 @@
1414
from reolinkfw.util import DummyLEB, get_fs_from_ubi
1515

1616

17-
def extract_file_system(fs_bytes, dest: Path = None):
17+
def extract_file_system(pak: PAK, section: Section, dest: Path = None):
1818
dest = (Path.cwd() / "reolink_fs") if dest is None else dest
1919
dest.mkdir(parents=True, exist_ok=True)
20-
fs = FileSystem.from_magic(fs_bytes[:4])
20+
pak._fd.seek(section.start)
21+
fs = FileSystem.from_magic(pak._fd.read(4))
2122
if fs == FileSystem.UBI:
22-
extract_file_system(get_fs_from_ubi(fs_bytes), dest)
23-
elif fs == FileSystem.UBIFS:
24-
with DummyLEB.from_bytes(fs_bytes) as leb:
25-
with redirect_stdout(StringIO()):
26-
# If files already exist they are not written again.
27-
extract_ubifs(ubifs(leb), dest)
23+
fs_bytes = get_fs_from_ubi(pak._fd, section.len, section.start)
24+
fs = FileSystem.from_magic(fs_bytes[:4])
25+
if fs == FileSystem.UBIFS:
26+
with DummyLEB.from_bytes(fs_bytes) as leb:
27+
with redirect_stdout(StringIO()):
28+
# If files already exist they are not written again.
29+
extract_ubifs(ubifs(leb), dest)
30+
elif fs == FileSystem.SQUASHFS:
31+
with SquashFsImage.from_bytes(fs_bytes) as image:
32+
extract_squashfs(image.root, dest, True)
33+
else:
34+
raise Exception("Unknown file system in UBI")
2835
elif fs == FileSystem.SQUASHFS:
29-
with SquashFsImage.from_bytes(fs_bytes) as image:
36+
with SquashFsImage(pak._fd, section.start, False) as image:
3037
extract_squashfs(image.root, dest, True)
3138
elif fs == FileSystem.CRAMFS:
32-
with Cramfs.from_bytes(fs_bytes) as image:
39+
with Cramfs.from_fd(pak._fd, section.start, False) as image:
3340
extract_cramfs(image.rootdir, dest, True)
3441
else:
3542
raise Exception("Unknown file system")
@@ -45,4 +52,4 @@ def extract_pak(pak: PAK, dest: Path = None, force: bool = False):
4552
outpath = dest / rootfsdir / "mnt" / "app"
4653
else:
4754
outpath = dest / rootfsdir
48-
extract_file_system(pak.extract_section(section), outpath)
55+
extract_file_system(pak, section, outpath)

reolinkfw/util.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,10 @@ def closing_ubifile(ubifile):
5454
ubifile._fhandle.close()
5555

5656

57-
def get_fs_from_ubi(binbytes):
57+
def get_fs_from_ubi(fd, size, offset=0) -> bytes:
5858
"""Return the first file system that sits on top of the UBI volume."""
59+
fd.seek(offset)
60+
binbytes = fd.read(size)
5961
with TempFile(binbytes) as t:
6062
block_size = guess_peb_size(t)
6163
with closing_ubifile(ubi_file(t, block_size)) as ubifile:

0 commit comments

Comments
 (0)