diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index 637ba42a54..d0c50bc424 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -45,6 +45,7 @@ from __future__ import annotations import datetime +import hashlib import logging import os import tempfile @@ -52,7 +53,6 @@ from typing import TYPE_CHECKING from urllib import parse -import securesystemslib.hash as sslib_hash from securesystemslib.signer import CryptoSigner, Signer from tuf.api.exceptions import DownloadHTTPError @@ -80,6 +80,8 @@ SPEC_VER = ".".join(SPECIFICATION_VERSION) +_HASH_ALGORITHM = "sha256" + @dataclass class FetchTracker: @@ -292,9 +294,9 @@ def _compute_hashes_and_length( self, role: str ) -> tuple[dict[str, str], int]: data = self.fetch_metadata(role) - digest_object = sslib_hash.digest(sslib_hash.DEFAULT_HASH_ALGORITHM) + digest_object = hashlib.new(_HASH_ALGORITHM) digest_object.update(data) - hashes = {sslib_hash.DEFAULT_HASH_ALGORITHM: digest_object.hexdigest()} + hashes = {_HASH_ALGORITHM: digest_object.hexdigest()} return hashes, len(data) def update_timestamp(self) -> None: diff --git a/tests/test_api.py b/tests/test_api.py index 7b80d36041..8006cd48e7 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -17,7 +17,6 @@ from typing import ClassVar from securesystemslib import exceptions as sslib_exceptions -from securesystemslib import hash as sslib_hash from securesystemslib.signer import ( CryptoSigner, Key, @@ -896,6 +895,12 @@ def test_length_and_hash_validation(self) -> None: # test with data as bytes snapshot_metafile.verify_length_and_hashes(data) + # test with custom blake algorithm + snapshot_metafile.hashes = { + "blake2b-256": "963a3c31aad8e2a91cfc603fdba12555e48dd0312674ac48cce2c19c243236a1" + } + snapshot_metafile.verify_length_and_hashes(data) + # test exceptions expected_length = snapshot_metafile.length snapshot_metafile.length = 2345 @@ -958,9 +963,7 @@ def test_targetfile_from_file(self) -> None: # Test with a non-existing file file_path = os.path.join(self.repo_dir, Targets.type, "file123.txt") with self.assertRaises(FileNotFoundError): - TargetFile.from_file( - file_path, file_path, [sslib_hash.DEFAULT_HASH_ALGORITHM] - ) + TargetFile.from_file(file_path, file_path, ["sha256"]) # Test with an unsupported algorithm file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt") @@ -990,6 +993,12 @@ def test_targetfile_from_data(self) -> None: targetfile_from_data = TargetFile.from_data(target_file_path, data) targetfile_from_data.verify_length_and_hashes(data) + # Test with custom blake hash algorithm + targetfile_from_data = TargetFile.from_data( + target_file_path, data, ["blake2b-256"] + ) + targetfile_from_data.verify_length_and_hashes(data) + def test_metafile_from_data(self) -> None: data = b"Inline test content" @@ -1013,6 +1022,10 @@ def test_metafile_from_data(self) -> None: ), ) + # Test with custom blake hash algorithm + metafile = MetaFile.from_data(1, data, ["blake2b-256"]) + metafile.verify_length_and_hashes(data) + def test_targetfile_get_prefixed_paths(self) -> None: target = TargetFile(100, {"sha256": "abc", "md5": "def"}, "a/b/f.ext") self.assertEqual( diff --git a/tuf/api/_payload.py b/tuf/api/_payload.py index 56852082ea..89fcbfe812 100644 --- a/tuf/api/_payload.py +++ b/tuf/api/_payload.py @@ -8,8 +8,10 @@ import abc import fnmatch +import hashlib import io import logging +import sys from dataclasses import dataclass from datetime import datetime, timezone from typing import ( @@ -21,7 +23,6 @@ ) from securesystemslib import exceptions as sslib_exceptions -from securesystemslib import hash as sslib_hash from securesystemslib.signer import Key, Signature from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError @@ -34,6 +35,9 @@ _TARGETS = "targets" _TIMESTAMP = "timestamp" +_DEFAULT_HASH_ALGORITHM = "sha256" +_BLAKE_HASH_ALGORITHM = "blake2b-256" + # We aim to support SPECIFICATION_VERSION and require the input metadata # files to have the same major version (the first number) as ours. SPECIFICATION_VERSION = ["1", "0", "31"] @@ -45,6 +49,38 @@ T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets") +def _get_digest(algo: str) -> Any: # noqa: ANN401 + """New digest helper to support custom "blake2b-256" algo name.""" + if algo == _BLAKE_HASH_ALGORITHM: + return hashlib.blake2b(digest_size=32) + + return hashlib.new(algo) + + +def _hash_bytes(data: bytes, algo: str) -> str: + """Returns hexdigest for data using algo.""" + digest = _get_digest(algo) + digest.update(data) + + return digest.hexdigest() + + +def _hash_file(f: IO[bytes], algo: str) -> str: + """Returns hexdigest for file using algo.""" + f.seek(0) + if sys.version_info >= (3, 11): + digest = hashlib.file_digest(f, lambda: _get_digest(algo)) # type: ignore[arg-type] + + else: + # Fallback for older Pythons. Chunk size is taken from the previously + # used and now deprecated `securesystemslib.hash.digest_fileobject`. + digest = _get_digest(algo) + for chunk in iter(lambda: f.read(4096), b""): + digest.update(chunk) + + return digest.hexdigest() + + class Signed(metaclass=abc.ABCMeta): """A base class for the signed part of TUF metadata. @@ -664,24 +700,18 @@ def _verify_hashes( data: bytes | IO[bytes], expected_hashes: dict[str, str] ) -> None: """Verify that the hash of ``data`` matches ``expected_hashes``.""" - is_bytes = isinstance(data, bytes) for algo, exp_hash in expected_hashes.items(): try: - if is_bytes: - digest_object = sslib_hash.digest(algo) - digest_object.update(data) + if isinstance(data, bytes): + observed_hash = _hash_bytes(data, algo) else: # if data is not bytes, assume it is a file object - digest_object = sslib_hash.digest_fileobject(data, algo) - except ( - sslib_exceptions.UnsupportedAlgorithmError, - sslib_exceptions.FormatError, - ) as e: + observed_hash = _hash_file(data, algo) + except (ValueError, TypeError) as e: raise LengthOrHashMismatchError( f"Unsupported algorithm '{algo}'" ) from e - observed_hash = digest_object.hexdigest() if observed_hash != exp_hash: raise LengthOrHashMismatchError( f"Observed hash {observed_hash} does not match " @@ -731,25 +761,17 @@ def _get_length_and_hashes( hashes = {} if hash_algorithms is None: - hash_algorithms = [sslib_hash.DEFAULT_HASH_ALGORITHM] + hash_algorithms = [_DEFAULT_HASH_ALGORITHM] for algorithm in hash_algorithms: try: if isinstance(data, bytes): - digest_object = sslib_hash.digest(algorithm) - digest_object.update(data) + hashes[algorithm] = _hash_bytes(data, algorithm) else: - digest_object = sslib_hash.digest_fileobject( - data, algorithm - ) - except ( - sslib_exceptions.UnsupportedAlgorithmError, - sslib_exceptions.FormatError, - ) as e: + hashes[algorithm] = _hash_file(data, algorithm) + except (ValueError, TypeError) as e: raise ValueError(f"Unsupported algorithm '{algorithm}'") from e - hashes[algorithm] = digest_object.hexdigest() - return (length, hashes) @@ -832,7 +854,7 @@ def from_data( version: Version of the metadata file. data: Metadata bytes that the metafile represents. hash_algorithms: Hash algorithms to create the hashes with. If not - specified, the securesystemslib default hash algorithm is used. + specified, "sha256" is used. Raises: ValueError: The hash algorithms list contains an unsupported @@ -1150,7 +1172,7 @@ def is_delegated_path(self, target_filepath: str) -> bool: if self.path_hash_prefixes is not None: # Calculate the hash of the filepath # to determine in which bin to find the target. - digest_object = sslib_hash.digest(algorithm="sha256") + digest_object = hashlib.new(name="sha256") digest_object.update(target_filepath.encode("utf-8")) target_filepath_hash = digest_object.hexdigest() @@ -1269,7 +1291,7 @@ def get_role_for_target(self, target_filepath: str) -> str: target_filepath: URL path to a target file, relative to a base targets URL. """ - hasher = sslib_hash.digest(algorithm="sha256") + hasher = hashlib.new(name="sha256") hasher.update(target_filepath.encode("utf-8")) # We can't ever need more than 4 bytes (32 bits). @@ -1542,7 +1564,7 @@ def from_file( targets URL. local_path: Local path to target file content. hash_algorithms: Hash algorithms to calculate hashes with. If not - specified the securesystemslib default hash algorithm is used. + specified, "sha256" is used. Raises: FileNotFoundError: The file doesn't exist. @@ -1566,7 +1588,7 @@ def from_data( targets URL. data: Target file content. hash_algorithms: Hash algorithms to create the hashes with. If not - specified the securesystemslib default hash algorithm is used. + specified, "sha256" is used. Raises: ValueError: The hash algorithms list contains an unsupported