From a9dff131b93bd2897b8d42ae0f833679decddf13 Mon Sep 17 00:00:00 2001 From: David Klement Date: Tue, 21 Oct 2025 13:14:57 +0200 Subject: [PATCH] Add some type annotations Changes in .pyi files are to make mypy happy. --- src/borg/archive.py | 10 +- src/borg/cache.py | 162 +++++++++++++++++-------------- src/borg/hashindex.pyi | 17 +++- src/borg/helpers/checks.py | 6 +- src/borg/helpers/datastruct.py | 42 +++++--- src/borg/helpers/shellpattern.py | 8 +- src/borg/item.pyi | 4 + src/borg/repository.py | 101 ++++++++++--------- 8 files changed, 205 insertions(+), 145 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index e2e7cf000f..2e9c10a94a 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -756,16 +756,16 @@ def extract_helper(self, item, path, hlm, *, dry_run=False): def extract_item( self, - item, + item: Item, *, restore_attrs=True, dry_run=False, stdout=False, sparse=False, - hlm=None, - pi=None, + hlm: HardLinkManager | None = None, + pi: ProgressIndicatorPercent | None = None, continue_extraction=False, - ): + ) -> None: """ Extract archive item. @@ -779,7 +779,7 @@ def extract_item( :param continue_extraction: continue a previously interrupted extraction of the same archive """ - def same_item(item, st): + def same_item(item: Item, st: os.stat_result) -> bool: """Is the archived item the same as the filesystem item at the same path with stat st?""" if not stat.S_ISREG(st.st_mode): # we only "optimize" for regular files. diff --git a/src/borg/cache.py b/src/borg/cache.py index 777c5514b8..aa3dd304a4 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -31,7 +31,7 @@ from .helpers import msgpack from .helpers.msgpack import int_to_timestamp, timestamp_to_int from .item import ChunkListEntry -from .crypto.key import PlaintextKey +from .crypto.key import PlaintextKey, KeyBase from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError from .manifest import Manifest from .platform import SaveFile @@ -39,7 +39,7 @@ from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound, repo_lister -def files_cache_name(archive_name, files_cache_name="files"): +def files_cache_name(archive_name: str, files_cache_name: str = "files") -> str: """ Return the name of the files cache file for the given archive name. @@ -56,7 +56,7 @@ def files_cache_name(archive_name, files_cache_name="files"): return files_cache_name + "." + suffix -def discover_files_cache_names(path, files_cache_name="files"): +def discover_files_cache_names(path: Path, files_cache_name: str = "files") -> list[str]: """ Return a list of all files cache file names in the given directory. @@ -91,7 +91,7 @@ class SecurityManager: be reconciled, and also with no cache existing but a security database entry. """ - def __init__(self, repository): + def __init__(self, repository: Repository | RemoteRepository) -> None: self.repository = repository self.dir = Path(get_security_dir(repository.id_str, legacy=(repository.version == 1))) self.key_type_file = self.dir / "key-type" @@ -99,16 +99,16 @@ def __init__(self, repository): self.manifest_ts_file = self.dir / "manifest-timestamp" @staticmethod - def destroy(repository, path=None): + def destroy(repository: Repository | RemoteRepository, path: str | None = None) -> None: """Destroys the security directory for ``repository`` or at ``path``.""" path = path or get_security_dir(repository.id_str, legacy=(repository.version == 1)) if Path(path).exists(): shutil.rmtree(path) - def known(self): + def known(self) -> bool: return all(f.exists() for f in (self.key_type_file, self.location_file, self.manifest_ts_file)) - def key_matches(self, key): + def key_matches(self, key: KeyBase) -> bool: if not self.known(): return False try: @@ -117,8 +117,9 @@ def key_matches(self, key): return type == str(key.TYPE) except OSError as exc: logger.warning("Could not read/parse key type file: %s", exc) + return False - def save(self, manifest, key): + def save(self, manifest: Manifest, key: KeyBase) -> None: logger.debug("security: saving state for %s to %s", self.repository.id_str, str(self.dir)) current_location = self.repository._location.canonical_path() logger.debug("security: current location %s", current_location) @@ -131,7 +132,7 @@ def save(self, manifest, key): with SaveFile(self.manifest_ts_file) as fd: fd.write(manifest.timestamp) - def assert_location_matches(self): + def assert_location_matches(self) -> None: # Warn user before sending data to a relocated repository try: with self.location_file.open() as fd: @@ -165,7 +166,7 @@ def assert_location_matches(self): with SaveFile(self.location_file) as fd: fd.write(repository_location) - def assert_no_manifest_replay(self, manifest, key): + def assert_no_manifest_replay(self, manifest: Manifest, key: KeyBase) -> None: try: with self.manifest_ts_file.open() as fd: timestamp = fd.read() @@ -184,19 +185,19 @@ def assert_no_manifest_replay(self, manifest, key): else: raise Cache.RepositoryReplay() - def assert_key_type(self, key): + def assert_key_type(self, key: KeyBase) -> None: # Make sure an encrypted repository has not been swapped for an unencrypted repository if self.known() and not self.key_matches(key): raise Cache.EncryptionMethodMismatch() - def assert_secure(self, manifest, key, *, warn_if_unencrypted=True): + def assert_secure(self, manifest: Manifest, key: KeyBase, *, warn_if_unencrypted=True) -> None: # warn_if_unencrypted=False is only used for initializing a new repository. # Thus, avoiding asking about a repository that's currently initializing. self.assert_access_unknown(warn_if_unencrypted, manifest, key) self._assert_secure(manifest, key) logger.debug("security: repository checks ok, allowing access") - def _assert_secure(self, manifest, key): + def _assert_secure(self, manifest: Manifest, key: KeyBase) -> None: self.assert_location_matches() self.assert_key_type(key) self.assert_no_manifest_replay(manifest, key) @@ -204,7 +205,7 @@ def _assert_secure(self, manifest, key): logger.debug("security: remembering previously unknown repository") self.save(manifest, key) - def assert_access_unknown(self, warn_if_unencrypted, manifest, key): + def assert_access_unknown(self, warn_if_unencrypted: bool, manifest: Manifest, key: KeyBase) -> None: # warn_if_unencrypted=False is only used for initializing a new repository. # Thus, avoiding asking about a repository that's currently initializing. if not key.logically_encrypted and not self.known(): @@ -229,33 +230,33 @@ def assert_access_unknown(self, warn_if_unencrypted, manifest, key): raise Cache.CacheInitAbortedError() -def assert_secure(repository, manifest): +def assert_secure(repository: Repository | RemoteRepository, manifest: Manifest) -> None: sm = SecurityManager(repository) sm.assert_secure(manifest, manifest.key) -def cache_dir(repository, path=None): +def cache_dir(repository: Repository | RemoteRepository, path: Path | str | None = None) -> Path: return Path(path) if path else Path(get_cache_dir()) / repository.id_str class CacheConfig: - def __init__(self, repository, path=None): + def __init__(self, repository: Repository | RemoteRepository, path: Path | str | None = None) -> None: self.repository = repository self.path = cache_dir(repository, path) logger.debug("Using %s as cache", self.path) self.config_path = self.path / "config" - def __enter__(self): + def __enter__(self) -> "CacheConfig": self.open() return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.close() - def exists(self): + def exists(self) -> bool: return self.config_path.exists() - def create(self): + def create(self) -> None: assert not self.exists() config = configparser.ConfigParser(interpolation=None) config.add_section("cache") @@ -267,10 +268,10 @@ def create(self): with SaveFile(self.config_path) as fd: config.write(fd) - def open(self): + def open(self) -> None: self.load() - def load(self): + def load(self) -> None: self._config = configparser.ConfigParser(interpolation=None) with self.config_path.open() as fd: self._config.read_file(fd) @@ -296,7 +297,7 @@ def load(self): logger.debug("Cache integrity: No integrity data found (files, chunks). Cache is from old version.") self.integrity = {} - def save(self, manifest=None): + def save(self, manifest: Manifest | None = None) -> None: if manifest: self._config.set("cache", "manifest", manifest.id_str) self._config.set("cache", "ignored_features", ",".join(self.ignored_features)) @@ -309,10 +310,10 @@ def save(self, manifest=None): with SaveFile(self.config_path) as fd: self._config.write(fd) - def close(self): + def close(self) -> None: pass - def _check_upgrade(self, config_path): + def _check_upgrade(self, config_path: Path) -> None: try: cache_version = self._config.getint("cache", "version") wanted_version = 1 @@ -359,7 +360,7 @@ def break_lock(repository, path=None): pass @staticmethod - def destroy(repository, path=None): + def destroy(repository: Repository | RemoteRepository, path: Path | None = None) -> None: """destroy the cache for ``repository`` or at ``path``""" path = cache_dir(repository, path) config = path / "config" @@ -370,15 +371,15 @@ def destroy(repository, path=None): def __new__( cls, repository, - manifest, - path=None, + manifest: Manifest, + path: Path | str | None = None, sync=True, warn_if_unencrypted=True, progress=False, - cache_mode=FILES_CACHE_MODE_DISABLED, + cache_mode: str = FILES_CACHE_MODE_DISABLED, iec=False, - archive_name=None, - start_backup=None, + archive_name: str | None = None, + start_backup: int | None = None, ): return AdHocWithFilesCache( manifest=manifest, @@ -403,18 +404,23 @@ class FilesCacheMixin: """ FILES_CACHE_NAME = "files" + chunks: ChunkIndex + manifest: Manifest + key: KeyBase + path: Path + cache_config: CacheConfig - def __init__(self, cache_mode, archive_name=None, start_backup=None): + def __init__(self, cache_mode: str, archive_name: str | None = None, start_backup: int | None = None) -> None: self.archive_name = archive_name # ideally a SERIES name assert not ("c" in cache_mode and "m" in cache_mode) assert "d" in cache_mode or "c" in cache_mode or "m" in cache_mode self.cache_mode = cache_mode - self._files = None + self._files: dict | None = None self._newest_cmtime = 0 - self._newest_path_hashes = set() + self._newest_path_hashes: set[bytes] = set() self.start_backup = start_backup - def compress_entry(self, entry): + def compress_entry(self, entry: FileCacheEntry) -> bytes: """ compress a files cache entry: @@ -437,7 +443,7 @@ def compress_entry(self, entry): entry = entry._replace(chunks=compressed_chunks) return msgpack.packb(entry) - def decompress_entry(self, entry_packed): + def decompress_entry(self, entry_packed: bytes) -> FileCacheEntry: """reverse operation of compress_entry""" assert isinstance(self.chunks, ChunkIndex), f"{self.chunks} is not a ChunkIndex" assert isinstance(entry_packed, bytes) @@ -454,7 +460,7 @@ def decompress_entry(self, entry_packed): return entry @property - def files(self): + def files(self) -> dict: if self._files is None: self._files = self._read_files_cache() # try loading from cache dir if self._files is None: @@ -463,13 +469,13 @@ def files(self): self._files = {} # start from scratch return self._files - def _build_files_cache(self): + def _build_files_cache(self) -> dict | None: """rebuild the files cache by reading previous archive from repository""" if "d" in self.cache_mode: # d(isabled) - return + return None if not self.archive_name: - return + return None from .archive import Archive @@ -480,7 +486,7 @@ def _build_files_cache(self): archives = None if not archives: # nothing found - return + return None prev_archive = archives[0] files = {} @@ -523,16 +529,16 @@ def _build_files_cache(self): files_cache_logger.debug("FILES-CACHE-BUILD: finished, %d entries loaded.", len(files)) return files - def files_cache_name(self): + def files_cache_name(self) -> str: return files_cache_name(self.archive_name, self.FILES_CACHE_NAME) - def discover_files_cache_names(self, path): + def discover_files_cache_names(self, path: Path) -> list[str]: return discover_files_cache_names(path, self.FILES_CACHE_NAME) - def _read_files_cache(self): + def _read_files_cache(self) -> dict | None: """read files cache from cache directory""" if "d" in self.cache_mode: # d(isabled) - return + return None files = {} logger.debug("Reading files cache ...") @@ -572,7 +578,7 @@ def _read_files_cache(self): files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(files or {})) return files - def _write_files_cache(self, files): + def _write_files_cache(self, files: dict) -> str: """write files cache to cache directory""" max_time_ns = 2**63 - 1 # nanoseconds, good until y2262 # _self._newest_cmtime might be None if it was never set because no files were modified/added. @@ -613,7 +619,7 @@ def _write_files_cache(self, files): files_cache_logger.debug(f"FILES-CACHE-SAVE: finished, {entries} remaining entries saved.") return fd.integrity_data - def file_known_and_unchanged(self, hashed_path, path_hash, st): + def file_known_and_unchanged(self, hashed_path: bytes, path_hash: bytes, st: os.stat_result): """ Check if we know the file that has this path_hash (know == it is in our files cache) and whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode). @@ -705,7 +711,7 @@ def try_upgrade_to_b14(repository): pass # likely already upgraded -def list_chunkindex_hashes(repository): +def list_chunkindex_hashes(repository: Repository | RemoteRepository) -> list[str]: hashes = [] for info in repository.store_list("cache"): info = ItemInfo(*info) # RPC does not give namedtuple @@ -717,7 +723,7 @@ def list_chunkindex_hashes(repository): return hashes -def delete_chunkindex_cache(repository): +def delete_chunkindex_cache(repository: Repository | RemoteRepository) -> None: hashes = list_chunkindex_hashes(repository) for hash in hashes: cache_name = f"cache/chunks.{hash}" @@ -733,8 +739,15 @@ def delete_chunkindex_cache(repository): def write_chunkindex_to_repo_cache( - repository, chunks, *, incremental=True, clear=False, force_write=False, delete_other=False, delete_these=None -): + repository: Repository | RemoteRepository, + chunks: ChunkIndex, + *, + incremental=True, + clear=False, + force_write=False, + delete_other=False, + delete_these=None, +) -> str: # for now, we don't want to serialize the flags or the size, just the keys (chunk IDs): cleaned_value = ChunkIndexEntry(flags=ChunkIndex.F_NONE, size=0) chunks_to_write = ChunkIndex() @@ -788,7 +801,7 @@ def write_chunkindex_to_repo_cache( return new_hash -def read_chunkindex_from_repo_cache(repository, hash): +def read_chunkindex_from_repo_cache(repository: Repository | RemoteRepository, hash: str) -> ChunkIndex | None: cache_name = f"cache/chunks.{hash}" logger.debug(f"trying to load {cache_name} from the repo...") try: @@ -804,9 +817,12 @@ def read_chunkindex_from_repo_cache(repository, hash): return chunks else: logger.debug(f"{cache_name} is invalid.") + return None -def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False): +def build_chunkindex_from_repo( + repository: Repository | RemoteRepository, *, disable_caches=False, cache_immediately=False +): try_upgrade_to_b14(repository) # first, try to build a fresh, mostly complete chunk index from centrally cached chunk indexes: if not disable_caches: @@ -861,20 +877,22 @@ class ChunksMixin: Chunks index related code for misc. Cache implementations. """ - def __init__(self): - self._chunks = None + repository: Repository | RemoteRepository + + def __init__(self) -> None: + self._chunks: ChunkIndex | None = None self.last_refresh_dt = datetime.now(timezone.utc) self.refresh_td = timedelta(seconds=60) self.chunks_cache_last_write = datetime.now(timezone.utc) self.chunks_cache_write_td = timedelta(seconds=600) @property - def chunks(self): + def chunks(self) -> ChunkIndex: if self._chunks is None: self._chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True) return self._chunks - def seen_chunk(self, id, size=None): + def seen_chunk(self, id: bytes, size: int | None = None) -> bool: entry = self.chunks.get(id) entry_exists = entry is not None if entry_exists and size is not None: @@ -956,15 +974,15 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin): def __init__( self, - manifest, - path=None, + manifest: Manifest, + path: Path | str | None = None, warn_if_unencrypted=True, progress=False, - cache_mode=FILES_CACHE_MODE_DISABLED, + cache_mode: str = FILES_CACHE_MODE_DISABLED, iec=False, - archive_name=None, - start_backup=None, - ): + archive_name: str | None = None, + start_backup: int | None = None, + ) -> None: """ :param warn_if_unencrypted: print warning if accessing unknown unencrypted repository :param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison @@ -999,28 +1017,28 @@ def __init__( self.close() raise - def __enter__(self): + def __enter__(self) -> "AdHocWithFilesCache": self._chunks = None return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.close() self._chunks = None - def create(self): + def create(self) -> None: """Create a new empty cache at `self.path`""" self.path.mkdir(parents=True, exist_ok=True) with open(self.path / "README", "w") as fd: fd.write(CACHE_README) self.cache_config.create() - def open(self): + def open(self) -> None: if not self.path.is_dir(): raise Exception("%s Does not look like a Borg cache" % self.path) self.cache_config.open() self.cache_config.load() - def close(self): + def close(self) -> None: self.security_manager.save(self.manifest, self.key) pi = ProgressIndicatorMessage(msgid="cache.close") if self._files is not None: @@ -1041,7 +1059,7 @@ def close(self): pi.finish() self.cache_config = None - def check_cache_compatibility(self): + def check_cache_compatibility(self) -> bool: my_features = Manifest.SUPPORTED_REPO_FEATURES if self.cache_config.ignored_features & my_features: # The cache might not contain references of chunks that need a feature that is mandatory for some operation @@ -1053,7 +1071,7 @@ def check_cache_compatibility(self): return False return True - def wipe_cache(self): + def wipe_cache(self) -> None: logger.warning("Discarding incompatible cache and forcing a cache rebuild") self._chunks = ChunkIndex() self.cache_config.manifest_id = "" @@ -1062,7 +1080,7 @@ def wipe_cache(self): self.cache_config.ignored_features = set() self.cache_config.mandatory_features = set() - def update_compatibility(self): + def update_compatibility(self) -> None: operation_to_features_map = self.manifest.get_all_mandatory_features() my_features = Manifest.SUPPORTED_REPO_FEATURES repo_features = set() diff --git a/src/borg/hashindex.pyi b/src/borg/hashindex.pyi index fb05aba86d..ca234958e2 100644 --- a/src/borg/hashindex.pyi +++ b/src/borg/hashindex.pyi @@ -1,4 +1,4 @@ -from typing import NamedTuple, Tuple, Type, Union, IO, Iterator, Any +from typing import BinaryIO, NamedTuple, Tuple, Union, IO, Iterator, Any API_VERSION: str @@ -8,7 +8,7 @@ class ChunkIndexEntry(NamedTuple): flags: int size: int -CIE = Union[Tuple[int, int], Type[ChunkIndexEntry]] +CIE = Union[Tuple[int, int], ChunkIndexEntry] class ChunkIndex: F_NONE: int @@ -20,9 +20,20 @@ class ChunkIndex: def add(self, key: bytes, size: int) -> None: ... def iteritems(self, *, only_new: bool = ...) -> Iterator: ... def clear_new(self) -> None: ... + @classmethod + def read(cls, path: BinaryIO | str | bytes) -> "ChunkIndex": ... + def write(self, path: BinaryIO | str | bytes) -> None: ... + @property + def stats(self) -> dict[str, int]: ... + def k_to_idx(self, key: bytes) -> int: ... + def idx_to_k(self, entry: int) -> bytes: ... def __contains__(self, key: bytes) -> bool: ... - def __getitem__(self, key: bytes) -> Type[ChunkIndexEntry]: ... + def __getitem__(self, key: bytes) -> ChunkIndexEntry: ... def __setitem__(self, key: bytes, value: CIE) -> None: ... + def clear(self) -> None: ... + def get(self, key: bytes, default: ChunkIndexEntry | None = ...) -> ChunkIndexEntry | None: ... + def items(self) -> Iterator[Tuple[bytes, ChunkIndexEntry]]: ... + def __len__(self) -> int: ... class NSIndex1Entry(NamedTuple): segment: int diff --git a/src/borg/helpers/checks.py b/src/borg/helpers/checks.py index b11f2230cf..18814c748d 100644 --- a/src/borg/helpers/checks.py +++ b/src/borg/helpers/checks.py @@ -1,10 +1,12 @@ import os +from typing import Any, Callable from .errors import RTError from ..platformflags import is_win32 -def check_python(): +def check_python() -> None: + required_funcs: set[Callable[..., Any]] if is_win32: required_funcs = {os.stat} else: @@ -13,7 +15,7 @@ def check_python(): raise RTError("""FATAL: This Python was compiled for a too old (g)libc and lacks required functionality.""") -def check_extension_modules(): +def check_extension_modules() -> None: from .. import platform, compress, crypto, item, hashindex, chunkers msg = """The Borg binary extension modules do not seem to be properly installed.""" diff --git a/src/borg/helpers/datastruct.py b/src/borg/helpers/datastruct.py index 44983f1dab..5c205db3dc 100644 --- a/src/borg/helpers/datastruct.py +++ b/src/borg/helpers/datastruct.py @@ -1,6 +1,22 @@ +from typing import Callable, Generic, Protocol, TypeVar + from .errors import Error +class TBufferProtocol(Protocol): + def __add__(self, other): ... + def __getitem__(self, index): ... + def __len__(self) -> int: ... + + +class TBufferCallableProtocol(TBufferProtocol): + def __call__(self): ... + + +TBuffer = TypeVar("TBuffer", bound=TBufferProtocol) +TBufferCallable = TypeVar("TBufferCallable", bound=TBufferCallableProtocol) + + class StableDict(dict): """A dict subclass with stable items() ordering.""" @@ -8,7 +24,7 @@ def items(self): return sorted(super().items()) -class Buffer: +class Buffer(Generic[TBuffer]): """ Provides a managed, resizable buffer. """ @@ -16,7 +32,7 @@ class Buffer: class MemoryLimitExceeded(Error, OSError): """Requested buffer size {} is above the limit of {}.""" - def __init__(self, allocator, size=4096, limit=None): + def __init__(self, allocator: Callable[[int], TBuffer], size: int = 4096, limit: int | None = None) -> None: """ Initialize the buffer by using allocator(size) to allocate a buffer. Optionally, set an upper limit for the buffer size. @@ -27,10 +43,10 @@ def __init__(self, allocator, size=4096, limit=None): self.limit = limit self.resize(size, init=True) - def __len__(self): + def __len__(self) -> int: return len(self.buffer) - def resize(self, size, init=False): + def resize(self, size: int, init=False) -> None: """ Resize the buffer. To avoid frequent reallocation, we usually grow (if needed). By giving init=True it is possible to initialize for the first time or shrink the buffer. @@ -42,7 +58,7 @@ def resize(self, size, init=False): if init or len(self) < size: self.buffer = self.allocator(size) - def get(self, size=None, init=False): + def get(self, size: int | None = None, init=False) -> TBuffer: """ Return a buffer of at least the requested size (None: any current size). init=True can be given to trigger shrinking of the buffer to the given size. @@ -52,7 +68,7 @@ def get(self, size=None, init=False): return self.buffer -class EfficientCollectionQueue: +class EfficientCollectionQueue(Generic[TBufferCallable]): """ An efficient FIFO queue that splits received elements into chunks. """ @@ -60,18 +76,18 @@ class EfficientCollectionQueue: class SizeUnderflow(Error): """Could not pop the first {} elements; collection only has {} elements.""" - def __init__(self, split_size, member_type): + def __init__(self, split_size: int, member_type: TBufferCallable) -> None: """ Initialize an empty queue. split_size defines the maximum chunk size. member_type is the type that defines what the base collection looks like. """ - self.buffers = [] + self.buffers: list[TBufferCallable] = [] self.size = 0 self.split_size = split_size self.member_type = member_type - def peek_front(self): + def peek_front(self) -> TBufferCallable: """ Return the first chunk from the queue without removing it. The returned collection will have between 1 and split_size elements. @@ -82,7 +98,7 @@ def peek_front(self): buffer = self.buffers[0] return buffer - def pop_front(self, size): + def pop_front(self, size: int) -> None: """ Remove the first `size` elements from the queue. Raises an error if the requested removal size is larger than the entire queue. @@ -100,7 +116,7 @@ def pop_front(self, size): size -= to_remove self.size -= to_remove - def push_back(self, data): + def push_back(self, data: bytes) -> None: """ Add data at the end of the queue. Chunks data into elements of size up to split_size. @@ -119,13 +135,13 @@ def push_back(self, data): self.buffers[-1] = buffer self.size += to_add - def __len__(self): + def __len__(self) -> int: """ Return the current queue length for all elements across all chunks. """ return self.size - def __bool__(self): + def __bool__(self) -> bool: """ Return True if the queue is not empty. """ diff --git a/src/borg/helpers/shellpattern.py b/src/borg/helpers/shellpattern.py index 00e9237bdd..9b2fb5afde 100644 --- a/src/borg/helpers/shellpattern.py +++ b/src/borg/helpers/shellpattern.py @@ -3,7 +3,7 @@ from queue import LifoQueue -def translate(pat, match_end=r"\Z"): +def translate(pat: str, match_end: str = r"\Z") -> str: """Translate a shell-style pattern to a regular expression. The pattern may include ``**`` ( stands for the platform-specific path separator; "/" on POSIX systems) @@ -68,7 +68,7 @@ def translate(pat, match_end=r"\Z"): return "(?ms)" + res + match_end -def _parse_braces(pat): +def _parse_braces(pat: str) -> list[tuple[int, int]]: """Return the index pairs of matched braces in `pat` as a list of tuples. The dictionary's keys are the indices corresponding to opening braces. Initially, @@ -84,7 +84,7 @@ def _parse_braces(pat): queue: pushing opening braces on and popping them off when finding a closing brace. """ - curly_q = LifoQueue() + curly_q: LifoQueue = LifoQueue() pairs: dict[int, int] = dict() for idx, c in enumerate(pat): @@ -105,7 +105,7 @@ def _parse_braces(pat): return [(opening, closing) for opening, closing in pairs.items() if closing is not None] -def _translate_alternatives(pat): +def _translate_alternatives(pat: str) -> str: """Translates the shell-style alternative portions of the pattern to regular expression groups. For example: {alt1,alt2} -> (alt1|alt2) diff --git a/src/borg/item.pyi b/src/borg/item.pyi index 97de37904b..b0059d6caa 100644 --- a/src/borg/item.pyi +++ b/src/borg/item.pyi @@ -116,6 +116,10 @@ class Item(PropDict): def source(self) -> str: ... @source.setter def source(self, val: str) -> None: ... + @property + def target(self) -> str: ... + @target.setter + def target(self, val: str) -> None: ... def is_dir(self) -> bool: ... def is_link(self) -> bool: ... def _is_type(self, typetest: Callable) -> bool: ... diff --git a/src/borg/repository.py b/src/borg/repository.py index 42405c09fa..5b9804102f 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -1,7 +1,9 @@ +import builtins import os import time +from typing import Callable, Iterable, Iterator -from borgstore.store import Store +from borgstore.store import ItemInfo, Store from borgstore.store import ObjectNotFound as StoreObjectNotFound from borgstore.backends.errors import BackendError as StoreBackendError from borgstore.backends.errors import BackendDoesNotExist as StoreBackendDoesNotExist @@ -21,7 +23,7 @@ logger = create_logger(__name__) -def repo_lister(repository, *, limit=None): +def repo_lister(repository, *, limit: int | None = None) -> Iterator[tuple[bytes, int]]: marker = None finished = False while not finished: @@ -94,14 +96,14 @@ class PathPermissionDenied(Error): def __init__( self, - path_or_location, + path_or_location: str | Location, create=False, exclusive=False, lock_wait=1.0, lock=True, - send_log_cb=None, - permissions=None, - ): + send_log_cb: Callable[[], None] | None = None, + permissions: str | None = None, + ) -> None: if isinstance(path_or_location, Location): location = path_or_location if location.proto == "file": @@ -126,10 +128,11 @@ def __init__( # Get permissions from parameter or environment variable permissions = permissions if permissions is not None else os.environ.get("BORG_REPO_PERMISSIONS", "all") + permissions_dict: dict[str, str] | None if permissions == "all": - permissions = None # permissions system will not be used + permissions_dict = None # permissions system will not be used elif permissions == "no-delete": # mostly no delete, no overwrite - permissions = { + permissions_dict = { "": "lr", "archives": "lrw", "cache": "lrwWD", # WD for chunks., last-key-checked, ... @@ -139,7 +142,7 @@ def __init__( "locks": "lrwD", # borg needs to create/delete a shared lock here } elif permissions == "write-only": # mostly no reading - permissions = { + permissions_dict = { "": "l", "archives": "lw", "cache": "lrwWD", # read allowed, e.g. for chunks. cache @@ -149,7 +152,7 @@ def __init__( "locks": "lrwD", # borg needs to create/delete a shared lock here } elif permissions == "read-only": # mostly r/o - permissions = {"": "lr", "locks": "lrwD"} + permissions_dict = {"": "lr", "locks": "lrwD"} else: raise Error( f"Invalid BORG_REPO_PERMISSIONS value: {permissions}, should be one of: " @@ -157,11 +160,11 @@ def __init__( ) try: - self.store = Store(url, levels=levels_config, permissions=permissions) + self.store = Store(url, levels=levels_config, permissions=permissions_dict) except StoreBackendError as e: raise Error(str(e)) self.store_opened = False - self.version = None + self.version: int | None = None # long-running repository methods which emit log or progress output are responsible for calling # the ._send_log method periodically to get log and progress output transferred to the borg client # in a timely manner, in case we have a RemoteRepository. @@ -176,10 +179,10 @@ def __init__( self.lock_wait = lock_wait self.exclusive = exclusive - def __repr__(self): + def __repr__(self) -> str: return f"<{self.__class__.__name__} {self._location}>" - def __enter__(self): + def __enter__(self) -> "Repository": if self.do_create: self.do_create = False self.create() @@ -191,14 +194,14 @@ def __enter__(self): raise return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.close() @property - def id_str(self): + def id_str(self) -> str: return bin_to_hex(self.id) - def create(self): + def create(self) -> None: """Create a new empty repository""" try: self.store.create() @@ -221,32 +224,32 @@ def create(self): finally: self.store.close() - def _set_id(self, id): + def _set_id(self, id: bytes) -> None: # for testing: change the id of an existing repository assert self.opened assert isinstance(id, bytes) and len(id) == 32 self.id = id self.store.store("config/id", bin_to_hex(id).encode()) - def _lock_refresh(self): + def _lock_refresh(self) -> None: if self.lock is not None: self.lock.refresh() - def save_key(self, keydata): + def save_key(self, keydata: bytes) -> None: # note: saving an empty key means that there is no repokey anymore self.store.store("keys/repokey", keydata) - def load_key(self): + def load_key(self) -> bytes: keydata = self.store.load("keys/repokey") # note: if we return an empty string, it means there is no repo key return keydata - def destroy(self): + def destroy(self) -> None: """Destroy the repository""" self.close() self.store.destroy() - def open(self, *, exclusive, lock_wait=None, lock=True): + def open(self, *, exclusive: bool, lock_wait: float, lock=True) -> None: assert lock_wait is not None try: self.store.open() @@ -272,7 +275,7 @@ def open(self, *, exclusive, lock_wait=None, lock=True): self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire() self.opened = True - def close(self): + def close(self) -> None: if self.lock: self.lock.release() self.lock = None @@ -281,22 +284,24 @@ def close(self): self.store_opened = False self.opened = False - def info(self): + def info(self) -> dict: """return some infos about the repo (must be opened first)""" # note: don't do anything expensive here or separate the lock refresh into a separate method. self._lock_refresh() # do not remove, see do_with_lock() info = dict(id=self.id, version=self.version) return info - def check(self, repair=False, max_duration=0): + def check(self, repair=False, max_duration=0) -> bool: """Check repository consistency""" - def log_error(msg): + obj_corrupted = False + + def log_error(msg: str) -> None: nonlocal obj_corrupted obj_corrupted = True logger.error(f"Repo object {info.name} is corrupted: {msg}") - def check_object(obj): + def check_object(obj: bytes) -> None: """Check if obj looks valid.""" hdr_size = RepoObj.obj_header.size obj_size = len(obj) @@ -420,13 +425,13 @@ def check_object(obj): logger.error(f"Finished {mode} repository check, errors found.") return objs_errors == 0 or repair - def list(self, limit=None, marker=None): + def list(self, limit: int | None = None, marker: bytes | None = None) -> builtins.list[tuple[bytes, int]]: """ list infos starting from after id . each info is a tuple (id, storage_size). """ collect = True if marker is None else False - result = [] + result: builtins.list[tuple[bytes, int]] = [] infos = self.store.list("data") # generator yielding ItemInfos while True: self._lock_refresh() @@ -447,7 +452,7 @@ def list(self, limit=None, marker=None): # note: do not collect the marker id return result - def get(self, id, read_data=True, raise_missing=True): + def get(self, id: bytes, read_data=True, raise_missing=True) -> bytes | None: self._lock_refresh() id_hex = bin_to_hex(id) key = "data/" + id_hex @@ -479,11 +484,13 @@ def get(self, id, read_data=True, raise_missing=True): else: return None - def get_many(self, ids, read_data=True, is_preloaded=False, raise_missing=True): + def get_many( + self, ids: Iterable[bytes], read_data=True, is_preloaded=False, raise_missing=True + ) -> Iterator[bytes | None]: for id_ in ids: yield self.get(id_, read_data=read_data, raise_missing=raise_missing) - def put(self, id, data, wait=True): + def put(self, id: bytes, data: bytes, wait=True) -> None: """put a repo object Note: when doing calls with wait=False this gets async and caller must @@ -497,7 +504,7 @@ def put(self, id, data, wait=True): key = "data/" + bin_to_hex(id) self.store.store(key, data) - def delete(self, id, wait=True): + def delete(self, id: bytes, wait=True) -> None: """delete a repo object Note: when doing calls with wait=False this gets async and caller must @@ -524,7 +531,7 @@ def async_response(self, wait=True): def preload(self, ids): """Preload objects (only applies to remote repositories)""" - def break_lock(self): + def break_lock(self) -> None: Lock(self.store).break_lock() def migrate_lock(self, old_id, new_id): @@ -532,36 +539,38 @@ def migrate_lock(self, old_id, new_id): if self.lock is not None: self.lock.migrate_lock(old_id, new_id) - def get_manifest(self): + def get_manifest(self) -> bytes: self._lock_refresh() try: return self.store.load("config/manifest") except StoreObjectNotFound: raise NoManifestError - def put_manifest(self, data): + def put_manifest(self, data: bytes) -> None: self._lock_refresh() - return self.store.store("config/manifest", data) + self.store.store("config/manifest", data) - def store_list(self, name, *, deleted=False): + def store_list(self, name: str, *, deleted=False) -> builtins.list[ItemInfo]: self._lock_refresh() try: return list(self.store.list(name, deleted=deleted)) except StoreObjectNotFound: return [] - def store_load(self, name): + def store_load(self, name: str) -> bytes: self._lock_refresh() return self.store.load(name) - def store_store(self, name, value): + def store_store(self, name: str, value: bytes) -> None: self._lock_refresh() - return self.store.store(name, value) + self.store.store(name, value) - def store_delete(self, name, *, deleted=False): + def store_delete(self, name: str, *, deleted=False) -> None: self._lock_refresh() - return self.store.delete(name, deleted=deleted) + self.store.delete(name, deleted=deleted) - def store_move(self, name, new_name=None, *, delete=False, undelete=False, deleted=False): + def store_move( + self, name: str, new_name: str | None = None, *, delete=False, undelete=False, deleted=False + ) -> None: self._lock_refresh() - return self.store.move(name, new_name, delete=delete, undelete=undelete, deleted=deleted) + self.store.move(name, new_name, delete=delete, undelete=undelete, deleted=deleted)