-
Notifications
You must be signed in to change notification settings - Fork 287
Port securesystemslib.hash module #2815
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,8 +8,10 @@ | |
|
|
||
| import abc | ||
| import fnmatch | ||
| import hashlib | ||
| import io | ||
| import logging | ||
| import sys | ||
| from dataclasses import dataclass | ||
| from datetime import datetime, timezone | ||
| from typing import ( | ||
|
|
@@ -21,7 +23,6 @@ | |
| ) | ||
|
|
||
| from securesystemslib import exceptions as sslib_exceptions | ||
| from securesystemslib import hash as sslib_hash | ||
| from securesystemslib.signer import Key, Signature | ||
|
|
||
| from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError | ||
|
|
@@ -34,6 +35,9 @@ | |
| _TARGETS = "targets" | ||
| _TIMESTAMP = "timestamp" | ||
|
|
||
| _DEFAULT_HASH_ALGORITHM = "sha256" | ||
| _BLAKE_HASH_ALGORITHM = "blake2b-256" | ||
|
|
||
| # We aim to support SPECIFICATION_VERSION and require the input metadata | ||
| # files to have the same major version (the first number) as ours. | ||
| SPECIFICATION_VERSION = ["1", "0", "31"] | ||
|
|
@@ -45,6 +49,30 @@ | |
| T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets") | ||
|
|
||
|
|
||
| def _hash(algo: str) -> Any: # noqa: ANN401 | ||
|
||
| """Returns new hash object, supporting custom "blake2b-256" algo name.""" | ||
| if algo == _BLAKE_HASH_ALGORITHM: | ||
| return hashlib.blake2b(digest_size=32) | ||
|
|
||
| return hashlib.new(algo) | ||
|
|
||
|
|
||
| def _file_hash(f: IO[bytes], algo: str) -> Any: # noqa: ANN401 | ||
| """Returns hashed file.""" | ||
| f.seek(0) | ||
| if sys.version_info >= (3, 11): | ||
| digest = hashlib.file_digest(f, lambda: _hash(algo)) # type: ignore[arg-type] | ||
|
||
|
|
||
| else: | ||
| # Fallback for older Pythons. Chunk size is taken from the previously | ||
| # used and now deprecated `securesystemslib.hash.digest_fileobject`. | ||
| digest = _hash(algo) | ||
| for chunk in iter(lambda: f.read(4096), b""): | ||
| digest.update(chunk) | ||
|
|
||
| return digest | ||
|
|
||
|
|
||
| class Signed(metaclass=abc.ABCMeta): | ||
| """A base class for the signed part of TUF metadata. | ||
|
|
||
|
|
@@ -664,19 +692,15 @@ def _verify_hashes( | |
| data: bytes | IO[bytes], expected_hashes: dict[str, str] | ||
| ) -> None: | ||
| """Verify that the hash of ``data`` matches ``expected_hashes``.""" | ||
| is_bytes = isinstance(data, bytes) | ||
jku marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| for algo, exp_hash in expected_hashes.items(): | ||
| try: | ||
| if is_bytes: | ||
| digest_object = sslib_hash.digest(algo) | ||
| if isinstance(data, bytes): | ||
| digest_object = _hash(algo) | ||
| digest_object.update(data) | ||
| else: | ||
| # if data is not bytes, assume it is a file object | ||
| digest_object = sslib_hash.digest_fileobject(data, algo) | ||
| except ( | ||
| sslib_exceptions.UnsupportedAlgorithmError, | ||
| sslib_exceptions.FormatError, | ||
| ) as e: | ||
| digest_object = _file_hash(data, algo) | ||
| except (ValueError, TypeError) as e: | ||
| raise LengthOrHashMismatchError( | ||
| f"Unsupported algorithm '{algo}'" | ||
| ) from e | ||
|
|
@@ -731,21 +755,16 @@ def _get_length_and_hashes( | |
| hashes = {} | ||
|
|
||
| if hash_algorithms is None: | ||
| hash_algorithms = [sslib_hash.DEFAULT_HASH_ALGORITHM] | ||
| hash_algorithms = [_DEFAULT_HASH_ALGORITHM] | ||
|
|
||
| for algorithm in hash_algorithms: | ||
| try: | ||
| if isinstance(data, bytes): | ||
| digest_object = sslib_hash.digest(algorithm) | ||
| digest_object = _hash(algorithm) | ||
| digest_object.update(data) | ||
| else: | ||
| digest_object = sslib_hash.digest_fileobject( | ||
| data, algorithm | ||
| ) | ||
| except ( | ||
| sslib_exceptions.UnsupportedAlgorithmError, | ||
| sslib_exceptions.FormatError, | ||
| ) as e: | ||
| digest_object = _file_hash(data, algorithm) | ||
| except (ValueError, TypeError) as e: | ||
|
||
| raise ValueError(f"Unsupported algorithm '{algorithm}'") from e | ||
|
|
||
| hashes[algorithm] = digest_object.hexdigest() | ||
|
|
@@ -1150,7 +1169,7 @@ def is_delegated_path(self, target_filepath: str) -> bool: | |
| if self.path_hash_prefixes is not None: | ||
| # Calculate the hash of the filepath | ||
| # to determine in which bin to find the target. | ||
| digest_object = sslib_hash.digest(algorithm="sha256") | ||
| digest_object = hashlib.new(name="sha256") | ||
| digest_object.update(target_filepath.encode("utf-8")) | ||
| target_filepath_hash = digest_object.hexdigest() | ||
|
|
||
|
|
@@ -1269,7 +1288,7 @@ def get_role_for_target(self, target_filepath: str) -> str: | |
| target_filepath: URL path to a target file, relative to a base | ||
| targets URL. | ||
| """ | ||
| hasher = sslib_hash.digest(algorithm="sha256") | ||
| hasher = hashlib.new(name="sha256") | ||
| hasher.update(target_filepath.encode("utf-8")) | ||
|
|
||
| # We can't ever need more than 4 bytes (32 bits). | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it's okay to make this a local default, and not re-use the one newly added to _payload.py, so that we can keep the latter internal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, I'd be fine with just a magic value as well: it's not really even a default, it's just what this repository decided to use