|
2 | 2 | import io |
3 | 3 | from contextlib import contextmanager |
4 | 4 | from functools import partial |
| 5 | +from os import scandir |
| 6 | +from pathlib import Path |
| 7 | +from shutil import disk_usage |
| 8 | +from tempfile import gettempdir as _gettempdir |
| 9 | +from zipfile import is_zipfile |
5 | 10 |
|
6 | | -from pakler import PAK |
| 11 | +from pakler import PAK, is_pak_file |
7 | 12 | from ubireader.ubi import ubi |
8 | 13 | from ubireader.ubi_io import ubi_file, leb_virtual_file |
9 | 14 | from ubireader.utils import guess_peb_size |
10 | 15 |
|
11 | 16 | from reolinkfw.tmpfile import TempFile |
12 | 17 |
|
| 18 | +ONEMIB = 1024**2 |
| 19 | +ONEGIB = 1024**3 |
| 20 | + |
13 | 21 |
|
14 | 22 | class DummyLEB: |
15 | 23 | """A class that emulates ubireader's `leb_virtual_file`.""" |
@@ -69,6 +77,57 @@ def get_fs_from_ubi(fd, size, offset=0) -> bytes: |
69 | 77 | def sha256_pak(pak: PAK) -> str: |
70 | 78 | sha = hashlib.sha256() |
71 | 79 | pak._fd.seek(0) |
72 | | - for block in iter(partial(pak._fd.read, 1024**2), b''): |
| 80 | + for block in iter(partial(pak._fd.read, ONEMIB), b''): |
73 | 81 | sha.update(block) |
74 | 82 | return sha.hexdigest() |
| 83 | + |
| 84 | + |
| 85 | +def dir_size(path): |
| 86 | + size = 0 |
| 87 | + try: |
| 88 | + with scandir(path) as it: |
| 89 | + for entry in it: |
| 90 | + if entry.is_dir(follow_symlinks=False): |
| 91 | + size += dir_size(entry.path) |
| 92 | + elif entry.is_file(follow_symlinks=False): |
| 93 | + size += entry.stat().st_size |
| 94 | + except OSError: |
| 95 | + pass |
| 96 | + return size |
| 97 | + |
| 98 | + |
| 99 | +def gettempdir() -> Path: |
| 100 | + return Path(_gettempdir()) / "reolinkfwcache" |
| 101 | + |
| 102 | + |
| 103 | +def get_cache_file(url: str) -> Path: |
| 104 | + file = gettempdir() / hashlib.sha256(url.encode("utf8")).hexdigest() |
| 105 | + if is_zipfile(file) or is_pak_file(file): |
| 106 | + return file |
| 107 | + try: |
| 108 | + with open(file, 'r', encoding="utf8") as f: |
| 109 | + return gettempdir() / f.read(256) |
| 110 | + except (OSError, UnicodeDecodeError): |
| 111 | + return file |
| 112 | + |
| 113 | + |
| 114 | +def has_cache(url: str) -> bool: |
| 115 | + return get_cache_file(url).is_file() |
| 116 | + |
| 117 | + |
| 118 | +def make_cache_file(url: str, filebytes, name=None) -> bool: |
| 119 | + tempdir = gettempdir() |
| 120 | + tempdir.mkdir(exist_ok=True) |
| 121 | + if disk_usage(tempdir).free < ONEGIB or dir_size(tempdir) > ONEGIB: |
| 122 | + return False |
| 123 | + sha = hashlib.sha256(url.encode("utf8")).hexdigest() |
| 124 | + name = sha if not isinstance(name, str) else name |
| 125 | + try: |
| 126 | + with open(tempdir / name, "wb") as f: |
| 127 | + f.write(filebytes) |
| 128 | + if name != sha: |
| 129 | + with open(tempdir / sha, 'w', encoding="utf8") as f: |
| 130 | + f.write(name) |
| 131 | + except OSError: |
| 132 | + return False |
| 133 | + return True |
0 commit comments