diff --git a/examples/manual_repo/basic_repo.py b/examples/manual_repo/basic_repo.py index e9ccc8c429..e619c190af 100644 --- a/examples/manual_repo/basic_repo.py +++ b/examples/manual_repo/basic_repo.py @@ -21,6 +21,8 @@ """ +from __future__ import annotations + import os import tempfile from datetime import datetime, timedelta, timezone diff --git a/examples/manual_repo/hashed_bin_delegation.py b/examples/manual_repo/hashed_bin_delegation.py index 420f46c8a9..0c90651fad 100644 --- a/examples/manual_repo/hashed_bin_delegation.py +++ b/examples/manual_repo/hashed_bin_delegation.py @@ -16,12 +16,14 @@ """ +from __future__ import annotations + import hashlib import os import tempfile -from collections.abc import Iterator from datetime import datetime, timedelta, timezone from pathlib import Path +from typing import TYPE_CHECKING from securesystemslib.signer import CryptoSigner, Signer @@ -34,6 +36,9 @@ ) from tuf.api.serialization.json import JSONSerializer +if TYPE_CHECKING: + from collections.abc import Iterator + def _in(days: float) -> datetime: """Adds 'days' to now and returns datetime object w/o microseconds.""" diff --git a/examples/manual_repo/succinct_hash_bin_delegations.py b/examples/manual_repo/succinct_hash_bin_delegations.py index 40a71486d6..3923a97d16 100644 --- a/examples/manual_repo/succinct_hash_bin_delegations.py +++ b/examples/manual_repo/succinct_hash_bin_delegations.py @@ -18,6 +18,8 @@ NOTE: Metadata files will be written to a 'tmp*'-directory in CWD. """ +from __future__ import annotations + import math import os import tempfile diff --git a/examples/repository/_simplerepo.py b/examples/repository/_simplerepo.py index 8b1904503a..3d19c8de83 100644 --- a/examples/repository/_simplerepo.py +++ b/examples/repository/_simplerepo.py @@ -3,12 +3,13 @@ """Simple example of using the repository library to build a repository""" +from __future__ import annotations + import copy import json import logging from collections import defaultdict from datetime import datetime, timedelta, timezone -from typing import Union from securesystemslib.signer import CryptoSigner, Key, Signer @@ -93,7 +94,7 @@ def snapshot_info(self) -> MetaFile: def _get_verification_result( self, role: str, md: Metadata - ) -> Union[VerificationResult, RootVerificationResult]: + ) -> VerificationResult | RootVerificationResult: """Verify roles metadata using the existing repository metadata""" if role == Root.type: assert isinstance(md.signed, Root) diff --git a/examples/uploader/_localrepo.py b/examples/uploader/_localrepo.py index a27658c487..edae65821b 100644 --- a/examples/uploader/_localrepo.py +++ b/examples/uploader/_localrepo.py @@ -3,6 +3,8 @@ """A Repository implementation for maintainer and developer tools""" +from __future__ import annotations + import contextlib import copy import json diff --git a/pyproject.toml b/pyproject.toml index 9a6cc3e313..f4e19f4236 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ name = "tuf" description = "A secure updater framework for Python" readme = "README.md" license = { text = "MIT OR Apache-2.0" } -requires-python = ">=3.9" +requires-python = ">=3.8" authors = [ { email = "theupdateframework@googlegroups.com" }, ] @@ -96,7 +96,6 @@ ignore = [ "TRY", # Individual rules that have been disabled - "ANN101", "ANN102", # nonsense, deprecated in ruff "D400", "D415", "D213", "D205", "D202", "D107", "D407", "D413", "D212", "D104", "D406", "D105", "D411", "D401", "D200", "D203", "ISC001", # incompatible with ruff formatter "PLR0913", "PLR2004", @@ -150,3 +149,13 @@ module = [ "securesystemslib.*", ] ignore_missing_imports = "True" + +[tool.coverage.report] +exclude_also = [ + # abstract class method definition + "raise NotImplementedError", + # defensive programming: these cannot happen + "raise AssertionError", + # imports for mypy only + "if TYPE_CHECKING", +] \ No newline at end of file diff --git a/requirements/test.txt b/requirements/test.txt index 66856203ad..69df33451b 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -4,5 +4,5 @@ -r pinned.txt # coverage measurement -coverage==7.6.8 +coverage[toml]==7.6.8 freezegun==1.5.1 diff --git a/tests/generated_data/generate_md.py b/tests/generated_data/generate_md.py index 4af8aab493..6a820fa154 100644 --- a/tests/generated_data/generate_md.py +++ b/tests/generated_data/generate_md.py @@ -3,10 +3,11 @@ # Copyright New York University and the TUF contributors # SPDX-License-Identifier: MIT OR Apache-2.0 +from __future__ import annotations + import os import sys from datetime import datetime, timezone -from typing import Optional from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from securesystemslib.signer import CryptoSigner, Signer, SSlibKey @@ -80,7 +81,7 @@ def verify_generation(md: Metadata, path: str) -> None: def generate_all_files( - dump: Optional[bool] = False, verify: Optional[bool] = False + dump: bool | None = False, verify: bool | None = False ) -> None: """Generate a new repository and optionally verify it. diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index 4cd3ba56ea..637ba42a54 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -42,13 +42,14 @@ updater.refresh() """ +from __future__ import annotations + import datetime import logging import os import tempfile -from collections.abc import Iterator from dataclasses import dataclass, field -from typing import Optional +from typing import TYPE_CHECKING from urllib import parse import securesystemslib.hash as sslib_hash @@ -72,6 +73,9 @@ from tuf.api.serialization.json import JSONSerializer from tuf.ngclient.fetcher import FetcherInterface +if TYPE_CHECKING: + from collections.abc import Iterator + logger = logging.getLogger(__name__) SPEC_VER = ".".join(SPECIFICATION_VERSION) @@ -81,8 +85,8 @@ class FetchTracker: """Fetcher counter for metadata and targets.""" - metadata: list[tuple[str, Optional[int]]] = field(default_factory=list) - targets: list[tuple[str, Optional[str]]] = field(default_factory=list) + metadata: list[tuple[str, int | None]] = field(default_factory=list) + targets: list[tuple[str, str | None]] = field(default_factory=list) @dataclass @@ -116,7 +120,7 @@ def __init__(self) -> None: # Enable hash-prefixed target file names self.prefix_targets_with_hash = True - self.dump_dir: Optional[str] = None + self.dump_dir: str | None = None self.dump_version = 0 self.fetch_tracker = FetchTracker() @@ -201,7 +205,7 @@ def _fetch(self, url: str) -> Iterator[bytes]: if role == Root.type or ( self.root.consistent_snapshot and ver_and_name != Timestamp.type ): - version: Optional[int] = int(version_str) + version: int | None = int(version_str) else: # the file is not version-prefixed role = ver_and_name @@ -213,7 +217,7 @@ def _fetch(self, url: str) -> Iterator[bytes]: target_path = path[len("/targets/") :] dir_parts, sep, prefixed_filename = target_path.rpartition("/") # extract the hash prefix, if any - prefix: Optional[str] = None + prefix: str | None = None filename = prefixed_filename if self.root.consistent_snapshot and self.prefix_targets_with_hash: prefix, _, filename = prefixed_filename.partition(".") @@ -223,9 +227,7 @@ def _fetch(self, url: str) -> Iterator[bytes]: else: raise DownloadHTTPError(f"Unknown path '{path}'", 404) - def fetch_target( - self, target_path: str, target_hash: Optional[str] - ) -> bytes: + def fetch_target(self, target_path: str, target_hash: str | None) -> bytes: """Return data for 'target_path', checking 'target_hash' if it is given. If hash is None, then consistent_snapshot is not used. @@ -244,7 +246,7 @@ def fetch_target( logger.debug("fetched target %s", target_path) return repo_target.data - def fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes: + def fetch_metadata(self, role: str, version: int | None = None) -> bytes: """Return signed metadata for 'role', using 'version' if it is given. If version is None, non-versioned metadata is being requested. @@ -261,7 +263,7 @@ def fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes: return self.signed_roots[version - 1] # sign and serialize the requested metadata - md: Optional[Metadata] + md: Metadata | None if role == Timestamp.type: md = self.md_timestamp elif role == Snapshot.type: diff --git a/tests/test_api.py b/tests/test_api.py index 8ef614604a..5f2e7f8c98 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -2,6 +2,8 @@ # SPDX-License-Identifier: MIT OR Apache-2.0 """Unit tests for api/metadata.py""" +from __future__ import annotations + import json import logging import os @@ -12,7 +14,7 @@ from copy import copy, deepcopy from datetime import datetime, timedelta, timezone from pathlib import Path -from typing import ClassVar, Optional +from typing import ClassVar from securesystemslib import exceptions as sslib_exceptions from securesystemslib import hash as sslib_hash @@ -245,8 +247,8 @@ def from_priv_key_uri( cls, priv_key_uri: str, public_key: Key, - secrets_handler: Optional[SecretsHandler] = None, - ) -> "Signer": + secrets_handler: SecretsHandler | None = None, + ) -> Signer: pass @property diff --git a/tests/test_examples.py b/tests/test_examples.py index 7cb5f827fa..208603ff64 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -2,6 +2,8 @@ # SPDX-License-Identifier: MIT OR Apache-2.0 """Unit tests for 'examples' scripts.""" +from __future__ import annotations + import glob import os import shutil diff --git a/tests/test_metadata_eq_.py b/tests/test_metadata_eq_.py index cf51f6e4e3..428c5ed590 100644 --- a/tests/test_metadata_eq_.py +++ b/tests/test_metadata_eq_.py @@ -3,6 +3,8 @@ """Test __eq__ implementations of classes inside tuf/api/metadata.py.""" +from __future__ import annotations + import copy import os import sys @@ -63,7 +65,7 @@ def setUpClass(cls) -> None: # Keys are class names. # Values are dictionaries containing attribute names and their new values. - classes_attributes_modifications: utils.DataSet = { + classes_attributes_modifications = { "Metadata": {"signed": None, "signatures": None}, "Signed": {"version": -1, "spec_version": "0.0.0"}, "Key": {"keyid": "a", "keytype": "foo", "scheme": "b", "keyval": "b"}, diff --git a/tests/test_metadata_serialization.py b/tests/test_metadata_serialization.py index 2aeadf1d09..7d1099fcb9 100644 --- a/tests/test_metadata_serialization.py +++ b/tests/test_metadata_serialization.py @@ -37,7 +37,7 @@ class TestSerialization(unittest.TestCase): """Test serialization for all classes in 'tuf/api/metadata.py'.""" - invalid_metadata: utils.DataSet = { + invalid_metadata = { "no signatures field": b'{"signed": \ { "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": {"snapshot.json": {"hashes": {"sha256" : "abc"}, "version": 1}}} \ @@ -55,7 +55,7 @@ def test_invalid_metadata_serialization(self, test_data: bytes) -> None: with self.assertRaises(DeserializationError): Metadata.from_bytes(test_data) - valid_metadata: utils.DataSet = { + valid_metadata = { "multiple signatures": b'{ \ "signed": \ { "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ @@ -90,7 +90,7 @@ def test_valid_metadata_serialization(self, test_case_data: bytes) -> None: self.assertEqual(test_bytes, md.to_bytes()) - invalid_signatures: utils.DataSet = { + invalid_signatures = { "missing keyid attribute in a signature": '{ "sig": "abc" }', "missing sig attribute in a signature": '{ "keyid": "id" }', } @@ -101,7 +101,7 @@ def test_invalid_signature_serialization(self, test_data: str) -> None: with self.assertRaises(KeyError): Signature.from_dict(case_dict) - valid_signatures: utils.DataSet = { + valid_signatures = { "all": '{ "keyid": "id", "sig": "b"}', "unrecognized fields": '{ "keyid": "id", "sig": "b", "foo": "bar"}', } @@ -114,7 +114,7 @@ def test_signature_serialization(self, test_case_data: str) -> None: # Snapshot instances with meta = {} are valid, but for a full valid # repository it's required that meta has at least one element inside it. - invalid_signed: utils.DataSet = { + invalid_signed = { "no _type": '{"spec_version": "1.0.0", "expires": "2030-01-01T00:00:00Z", "meta": {}}', "no spec_version": '{"_type": "snapshot", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', "no version": '{"_type": "snapshot", "spec_version": "1.0.0", "expires": "2030-01-01T00:00:00Z", "meta": {}}', @@ -138,7 +138,7 @@ def test_invalid_signed_serialization(self, test_case_data: str) -> None: with self.assertRaises((KeyError, ValueError, TypeError)): Snapshot.from_dict(case_dict) - valid_keys: utils.DataSet = { + valid_keys = { "all": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", \ "keyval": {"public": "foo"}}', "unrecognized field": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", \ @@ -153,7 +153,7 @@ def test_valid_key_serialization(self, test_case_data: str) -> None: key = Key.from_dict("id", copy.copy(case_dict)) self.assertDictEqual(case_dict, key.to_dict()) - invalid_keys: utils.DataSet = { + invalid_keys = { "no keyid": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "abc"}}', "no keytype": '{"keyid": "id", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}', "no scheme": '{"keyid": "id", "keytype": "rsa", "keyval": {"public": "foo"}}', @@ -171,7 +171,7 @@ def test_invalid_key_serialization(self, test_case_data: str) -> None: keyid = case_dict.pop("keyid") Key.from_dict(keyid, case_dict) - invalid_roles: utils.DataSet = { + invalid_roles = { "no threshold": '{"keyids": ["keyid"]}', "no keyids": '{"threshold": 3}', "wrong threshold type": '{"keyids": ["keyid"], "threshold": "a"}', @@ -186,7 +186,7 @@ def test_invalid_role_serialization(self, test_case_data: str) -> None: with self.assertRaises((KeyError, TypeError, ValueError)): Role.from_dict(case_dict) - valid_roles: utils.DataSet = { + valid_roles = { "all": '{"keyids": ["keyid"], "threshold": 3}', "many keyids": '{"keyids": ["a", "b", "c", "d", "e"], "threshold": 1}', "ordered keyids": '{"keyids": ["c", "b", "a"], "threshold": 1}', @@ -200,7 +200,7 @@ def test_role_serialization(self, test_case_data: str) -> None: role = Role.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, role.to_dict()) - valid_roots: utils.DataSet = { + valid_roots = { "all": '{"_type": "root", "spec_version": "1.0.0", "version": 1, \ "expires": "2030-01-01T00:00:00Z", "consistent_snapshot": false, \ "keys": { \ @@ -248,7 +248,7 @@ def test_root_serialization(self, test_case_data: str) -> None: root = Root.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, root.to_dict()) - invalid_roots: utils.DataSet = { + invalid_roots = { "invalid role name": '{"_type": "root", "spec_version": "1.0.0", "version": 1, \ "expires": "2030-01-01T00:00:00Z", "consistent_snapshot": false, \ "keys": { \ @@ -293,7 +293,7 @@ def test_invalid_root_serialization(self, test_case_data: str) -> None: with self.assertRaises(ValueError): Root.from_dict(case_dict) - invalid_metafiles: utils.DataSet = { + invalid_metafiles = { "wrong length type": '{"version": 1, "length": "a", "hashes": {"sha256" : "abc"}}', "version 0": '{"version": 0, "length": 1, "hashes": {"sha256" : "abc"}}', "length below 0": '{"version": 1, "length": -1, "hashes": {"sha256" : "abc"}}', @@ -308,7 +308,7 @@ def test_invalid_metafile_serialization(self, test_case_data: str) -> None: with self.assertRaises((TypeError, ValueError, AttributeError)): MetaFile.from_dict(case_dict) - valid_metafiles: utils.DataSet = { + valid_metafiles = { "all": '{"hashes": {"sha256" : "abc"}, "length": 12, "version": 1}', "no length": '{"hashes": {"sha256" : "abc"}, "version": 1 }', "length 0": '{"version": 1, "length": 0, "hashes": {"sha256" : "abc"}}', @@ -323,7 +323,7 @@ def test_metafile_serialization(self, test_case_data: str) -> None: metafile = MetaFile.from_dict(copy.copy(case_dict)) self.assertDictEqual(case_dict, metafile.to_dict()) - invalid_timestamps: utils.DataSet = { + invalid_timestamps = { "no metafile": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z"}', } @@ -333,7 +333,7 @@ def test_invalid_timestamp_serialization(self, test_case_data: str) -> None: with self.assertRaises((ValueError, KeyError)): Timestamp.from_dict(case_dict) - valid_timestamps: utils.DataSet = { + valid_timestamps = { "all": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": {"snapshot.json": {"hashes": {"sha256" : "abc"}, "version": 1}}}', "legacy spec_version": '{ "_type": "timestamp", "spec_version": "1.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ @@ -348,7 +348,7 @@ def test_timestamp_serialization(self, test_case_data: str) -> None: timestamp = Timestamp.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, timestamp.to_dict()) - valid_snapshots: utils.DataSet = { + valid_snapshots = { "all": '{ "_type": "snapshot", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": { \ "file1.txt": {"hashes": {"sha256" : "abc"}, "version": 1}, \ @@ -367,7 +367,7 @@ def test_snapshot_serialization(self, test_case_data: str) -> None: snapshot = Snapshot.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, snapshot.to_dict()) - valid_delegated_roles: utils.DataSet = { + valid_delegated_roles = { # DelegatedRole inherits Role and some use cases can be found in the valid_roles. "no hash prefix attribute": '{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], \ "terminating": false, "threshold": 1}', @@ -390,7 +390,7 @@ def test_delegated_role_serialization(self, test_case_data: str) -> None: deserialized_role = DelegatedRole.from_dict(copy.copy(case_dict)) self.assertDictEqual(case_dict, deserialized_role.to_dict()) - invalid_delegated_roles: utils.DataSet = { + invalid_delegated_roles = { # DelegatedRole inherits Role and some use cases can be found in the invalid_roles. "missing hash prefixes and paths": '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false}', "both hash prefixes and paths": '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false, \ @@ -409,7 +409,7 @@ def test_invalid_delegated_role_serialization( with self.assertRaises(ValueError): DelegatedRole.from_dict(case_dict) - valid_succinct_roles: utils.DataSet = { + valid_succinct_roles = { # SuccinctRoles inherits Role and some use cases can be found in the valid_roles. "standard succinct_roles information": '{"keyids": ["keyid"], "threshold": 1, \ "bit_length": 8, "name_prefix": "foo"}', @@ -423,7 +423,7 @@ def test_succinct_roles_serialization(self, test_case_data: str) -> None: succinct_roles = SuccinctRoles.from_dict(copy.copy(case_dict)) self.assertDictEqual(case_dict, succinct_roles.to_dict()) - invalid_succinct_roles: utils.DataSet = { + invalid_succinct_roles = { # SuccinctRoles inherits Role and some use cases can be found in the invalid_roles. "missing bit_length from succinct_roles": '{"keyids": ["keyid"], "threshold": 1, "name_prefix": "foo"}', "missing name_prefix from succinct_roles": '{"keyids": ["keyid"], "threshold": 1, "bit_length": 8}', @@ -439,7 +439,7 @@ def test_invalid_succinct_roles_serialization(self, test_data: str) -> None: with self.assertRaises((ValueError, KeyError, TypeError)): SuccinctRoles.from_dict(case_dict) - invalid_delegations: utils.DataSet = { + invalid_delegations = { "empty delegations": "{}", "missing keys": '{ "roles": [ \ {"keyids": ["keyid"], "name": "a", "terminating": true, "paths": ["fn1"], "threshold": 3}, \ @@ -507,7 +507,7 @@ def test_invalid_delegation_serialization( with self.assertRaises((ValueError, KeyError, AttributeError)): Delegations.from_dict(case_dict) - valid_delegations: utils.DataSet = { + valid_delegations = { "with roles": '{"keys": { \ "keyid1" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}, \ "keyid2" : {"keytype": "ed25519", "scheme": "ed25519", "keyval": {"public": "bar"}}}, \ @@ -533,7 +533,7 @@ def test_delegation_serialization(self, test_case_data: str) -> None: delegation = Delegations.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, delegation.to_dict()) - invalid_targetfiles: utils.DataSet = { + invalid_targetfiles = { "no hashes": '{"length": 1}', "no length": '{"hashes": {"sha256": "abc"}}', # The remaining cases are the same as for invalid_hashes and @@ -548,7 +548,7 @@ def test_invalid_targetfile_serialization( with self.assertRaises(KeyError): TargetFile.from_dict(case_dict, "file1.txt") - valid_targetfiles: utils.DataSet = { + valid_targetfiles = { "all": '{"length": 12, "hashes": {"sha256" : "abc"}, \ "custom" : {"foo": "bar"} }', "no custom": '{"length": 12, "hashes": {"sha256" : "abc"}}', @@ -562,7 +562,7 @@ def test_targetfile_serialization(self, test_case_data: str) -> None: target_file = TargetFile.from_dict(copy.copy(case_dict), "file1.txt") self.assertDictEqual(case_dict, target_file.to_dict()) - valid_targets: utils.DataSet = { + valid_targets = { "all attributes": '{"_type": "targets", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "targets": { \ "file.txt": {"length": 12, "hashes": {"sha256" : "abc"} }, \ diff --git a/tests/test_repository.py b/tests/test_repository.py index 977f381d53..f5179e52fd 100644 --- a/tests/test_repository.py +++ b/tests/test_repository.py @@ -3,6 +3,8 @@ """Tests for tuf.repository module""" +from __future__ import annotations + import copy import logging import sys diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 3dc2437c5b..076a205cc2 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -1,11 +1,13 @@ """Unit tests for 'tuf/ngclient/_internal/trusted_metadata_set.py'.""" +from __future__ import annotations + import logging import os import sys import unittest from datetime import datetime, timezone -from typing import Callable, ClassVar, Optional +from typing import Callable, ClassVar from securesystemslib.signer import Signer @@ -104,8 +106,8 @@ def setUp(self) -> None: def _update_all_besides_targets( self, - timestamp_bytes: Optional[bytes] = None, - snapshot_bytes: Optional[bytes] = None, + timestamp_bytes: bytes | None = None, + snapshot_bytes: bytes | None = None, ) -> None: """Update all metadata roles besides targets. diff --git a/tests/test_updater_consistent_snapshot.py b/tests/test_updater_consistent_snapshot.py index 998d852296..35497864f9 100644 --- a/tests/test_updater_consistent_snapshot.py +++ b/tests/test_updater_consistent_snapshot.py @@ -3,12 +3,13 @@ """Test ngclient Updater toggling consistent snapshot""" +from __future__ import annotations + import os import sys import tempfile import unittest -from collections.abc import Iterable -from typing import Any, Optional +from typing import TYPE_CHECKING, Any from tests import utils from tests.repository_simulator import RepositorySimulator @@ -21,6 +22,9 @@ ) from tuf.ngclient import Updater +if TYPE_CHECKING: + from collections.abc import Iterable + class TestConsistentSnapshot(unittest.TestCase): """Test different combinations of 'consistent_snapshot' and @@ -28,7 +32,7 @@ class TestConsistentSnapshot(unittest.TestCase): are formed for each combination""" # set dump_dir to trigger repository state dumps - dump_dir: Optional[str] = None + dump_dir: str | None = None def setUp(self) -> None: self.subtest_count = 0 @@ -98,7 +102,7 @@ def _assert_targets_files_exist(self, filenames: Iterable[str]) -> None: for filename in filenames: self.assertIn(filename, local_target_files) - top_level_roles_data: utils.DataSet = { + top_level_roles_data = { "consistent_snaphot disabled": { "consistent_snapshot": False, "calls": [ @@ -143,7 +147,7 @@ def test_top_level_roles_update( finally: self.teardown_subtest() - delegated_roles_data: utils.DataSet = { + delegated_roles_data = { "consistent_snaphot disabled": { "consistent_snapshot": False, "expected_version": None, @@ -162,7 +166,7 @@ def test_delegated_roles_update( # the correct version prefix, depending on 'consistent_snapshot' config try: consistent_snapshot: bool = test_case_data["consistent_snapshot"] - exp_version: Optional[int] = test_case_data["expected_version"] + exp_version: int | None = test_case_data["expected_version"] rolenames = ["role1", "..", "."] exp_calls = [(role, exp_version) for role in rolenames] @@ -190,7 +194,7 @@ def test_delegated_roles_update( finally: self.teardown_subtest() - targets_download_data: utils.DataSet = { + targets_download_data = { "consistent_snaphot disabled": { "consistent_snapshot": False, "prefix_targets": True, @@ -219,7 +223,7 @@ def test_download_targets(self, test_case_data: dict[str, Any]) -> None: try: consistent_snapshot: bool = test_case_data["consistent_snapshot"] prefix_targets_with_hash: bool = test_case_data["prefix_targets"] - hash_algo: Optional[str] = test_case_data["hash_algo"] + hash_algo: str | None = test_case_data["hash_algo"] targetpaths: list[str] = test_case_data["targetpaths"] self.setup_subtest(consistent_snapshot, prefix_targets_with_hash) diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py index f801cbffd5..dbdd16fb79 100644 --- a/tests/test_updater_delegation_graphs.py +++ b/tests/test_updater_delegation_graphs.py @@ -4,13 +4,14 @@ """Test updating delegated targets roles and searching for target files with various delegation graphs""" +from __future__ import annotations + import os import sys import tempfile import unittest -from collections.abc import Iterable from dataclasses import astuple, dataclass, field -from typing import Optional +from typing import TYPE_CHECKING from tests import utils from tests.repository_simulator import RepositorySimulator @@ -23,6 +24,9 @@ ) from tuf.ngclient import Updater +if TYPE_CHECKING: + from collections.abc import Iterable + @dataclass class TestDelegation: @@ -31,8 +35,8 @@ class TestDelegation: keyids: list[str] = field(default_factory=list) threshold: int = 1 terminating: bool = False - paths: Optional[list[str]] = field(default_factory=lambda: ["*"]) - path_hash_prefixes: Optional[list[str]] = None + paths: list[str] | None = field(default_factory=lambda: ["*"]) + path_hash_prefixes: list[str] | None = None @dataclass @@ -63,7 +67,7 @@ class TestDelegations(unittest.TestCase): """Base class for delegation tests""" # set dump_dir to trigger repository state dumps - dump_dir: Optional[str] = None + dump_dir: str | None = None def setUp(self) -> None: self.subtest_count = 0 @@ -139,7 +143,7 @@ class TestDelegationsGraphs(TestDelegations): """Test creating delegations graphs with different complexity and successfully updating the delegated roles metadata""" - graphs: utils.DataSet = { + graphs = { "basic delegation": DelegationsTestCase( delegations=[TestDelegation("targets", "A")], visited_order=["A"], @@ -287,7 +291,7 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: finally: self.teardown_subtest() - invalid_metadata: utils.DataSet = { + invalid_metadata = { "unsigned delegated role": DelegationsTestCase( delegations=[ TestDelegation("targets", "invalid"), @@ -360,7 +364,7 @@ def test_safely_encoded_rolenames(self) -> None: exp_calls = [(quoted[:-5], 1) for quoted in roles_to_filenames.values()] self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls) - hash_bins_graph: utils.DataSet = { + hash_bins_graph = { "delegations": DelegationsTestCase( delegations=[ TestDelegation( @@ -432,7 +436,7 @@ class SuccinctRolesTestCase: # By setting the bit_length the total number of bins is 2^bit_length. # In each test case target_path is a path to a random target we want to # fetch and expected_target_bin is the bin we are expecting to visit. - succinct_bins_graph: utils.DataSet = { + succinct_bins_graph = { "bin amount = 2, taget bin index 0": SuccinctRolesTestCase( bit_length=1, target_path="boo", @@ -544,7 +548,7 @@ def setUp(self) -> None: self._init_repo(self.delegations_tree) # fmt: off - targets: utils.DataSet = { + targets = { "no delegations": TargetTestCase("targetfile", True, []), "targetpath matches wildcard": diff --git a/tests/test_updater_fetch_target.py b/tests/test_updater_fetch_target.py index 612f8131e0..5304843fab 100644 --- a/tests/test_updater_fetch_target.py +++ b/tests/test_updater_fetch_target.py @@ -66,7 +66,7 @@ def _init_updater(self) -> Updater: self.sim, ) - targets: utils.DataSet = { + targets = { "standard case": TestTarget( path="targetpath", content=b"target content", diff --git a/tests/test_updater_key_rotations.py b/tests/test_updater_key_rotations.py index c0831dc042..b78113d67d 100644 --- a/tests/test_updater_key_rotations.py +++ b/tests/test_updater_key_rotations.py @@ -3,12 +3,14 @@ """Test ngclient Updater key rotation handling""" +from __future__ import annotations + import os import sys import tempfile import unittest from dataclasses import dataclass -from typing import ClassVar, Optional +from typing import ClassVar from securesystemslib.signer import CryptoSigner, Signer @@ -25,14 +27,14 @@ class MdVersion: keys: list[int] threshold: int sigs: list[int] - res: Optional[type[Exception]] = None + res: type[Exception] | None = None class TestUpdaterKeyRotations(unittest.TestCase): """Test ngclient root rotation handling""" # set dump_dir to trigger repository state dumps - dump_dir: Optional[str] = None + dump_dir: str | None = None temp_dir: ClassVar[tempfile.TemporaryDirectory] keys: ClassVar[list[Key]] signers: ClassVar[list[Signer]] diff --git a/tests/test_updater_ng.py b/tests/test_updater_ng.py index 6f24dfd810..f42e510b1e 100644 --- a/tests/test_updater_ng.py +++ b/tests/test_updater_ng.py @@ -3,6 +3,8 @@ """Test Updater class""" +from __future__ import annotations + import logging import os import shutil diff --git a/tests/utils.py b/tests/utils.py index 26774b6ee0..e020684d49 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -18,6 +18,8 @@ Provide common utilities for TUF tests """ +from __future__ import annotations + import argparse import errno import logging @@ -28,11 +30,13 @@ import sys import threading import time -import unittest import warnings -from collections.abc import Iterator from contextlib import contextmanager -from typing import IO, Any, Callable, Optional +from typing import IO, TYPE_CHECKING, Any, Callable + +if TYPE_CHECKING: + import unittest + from collections.abc import Iterator logger = logging.getLogger(__name__) @@ -42,15 +46,12 @@ # Used when forming URLs on the client side TEST_HOST_ADDRESS = "127.0.0.1" -# DataSet is only here so type hints can be used. -DataSet = dict[str, Any] - # Test runner decorator: Runs the test as a set of N SubTests, # (where N is number of items in dataset), feeding the actual test # function one test case at a time def run_sub_tests_with_dataset( - dataset: DataSet, + dataset: dict[str, Any], ) -> Callable[[Callable], Callable]: """Decorator starting a unittest.TestCase.subtest() for each of the cases in dataset""" @@ -103,7 +104,7 @@ def wait_for_server( succeeded = False while not succeeded and remaining_timeout > 0: try: - sock: Optional[socket.socket] = socket.socket( + sock: socket.socket | None = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) assert sock is not None @@ -185,14 +186,14 @@ def __init__( server: str = os.path.join(TESTS_DIR, "simple_server.py"), timeout: int = 10, popen_cwd: str = ".", - extra_cmd_args: Optional[list[str]] = None, + extra_cmd_args: list[str] | None = None, ): self.server = server self.__logger = log # Stores popped messages from the queue. self.__logged_messages: list[str] = [] - self.__server_process: Optional[subprocess.Popen] = None - self._log_queue: Optional[queue.Queue] = None + self.__server_process: subprocess.Popen | None = None + self._log_queue: queue.Queue | None = None self.port = -1 if extra_cmd_args is None: extra_cmd_args = [] diff --git a/tox.ini b/tox.ini index 9d4679749f..03dd2324e8 100644 --- a/tox.ini +++ b/tox.ini @@ -17,7 +17,7 @@ changedir = tests commands = python3 --version python3 -m coverage run aggregate_tests.py - python3 -m coverage report -m --fail-under 97 + python3 -m coverage report --rcfile {toxinidir}/pyproject.toml -m --fail-under 97 deps = -r{toxinidir}/requirements/test.txt @@ -38,7 +38,7 @@ commands_pre = commands = python3 -m coverage run aggregate_tests.py - python3 -m coverage report -m + python3 -m coverage report --rcfile {toxinidir}/pyproject.toml -m [testenv:lint] changedir = {toxinidir} diff --git a/tuf/api/_payload.py b/tuf/api/_payload.py index 998705846b..3149102588 100644 --- a/tuf/api/_payload.py +++ b/tuf/api/_payload.py @@ -4,20 +4,20 @@ """Helper classes for low-level Metadata API.""" +from __future__ import annotations + import abc import fnmatch import io import logging -from collections.abc import Iterator from dataclasses import dataclass from datetime import datetime, timezone from typing import ( IO, + TYPE_CHECKING, Any, ClassVar, - Optional, TypeVar, - Union, ) from securesystemslib import exceptions as sslib_exceptions @@ -26,6 +26,9 @@ from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError +if TYPE_CHECKING: + from collections.abc import Iterator + _ROOT = "root" _SNAPSHOT = "snapshot" _TARGETS = "targets" @@ -97,10 +100,10 @@ def expires(self, value: datetime) -> None: # or "inner metadata") def __init__( self, - version: Optional[int], - spec_version: Optional[str], - expires: Optional[datetime], - unrecognized_fields: Optional[dict[str, Any]], + version: int | None, + spec_version: str | None, + expires: datetime | None, + unrecognized_fields: dict[str, Any] | None, ): if spec_version is None: spec_version = ".".join(SPECIFICATION_VERSION) @@ -149,7 +152,7 @@ def to_dict(self) -> dict[str, Any]: @classmethod @abc.abstractmethod - def from_dict(cls, signed_dict: dict[str, Any]) -> "Signed": + def from_dict(cls, signed_dict: dict[str, Any]) -> Signed: """Deserialization helper, creates object from json/dict representation. """ @@ -198,7 +201,7 @@ def _common_fields_to_dict(self) -> dict[str, Any]: **self.unrecognized_fields, } - def is_expired(self, reference_time: Optional[datetime] = None) -> bool: + def is_expired(self, reference_time: datetime | None = None) -> bool: """Check metadata expiration against a reference time. Args: @@ -237,7 +240,7 @@ def __init__( self, keyids: list[str], threshold: int, - unrecognized_fields: Optional[dict[str, Any]] = None, + unrecognized_fields: dict[str, Any] | None = None, ): if len(set(keyids)) != len(keyids): raise ValueError(f"Nonunique keyids: {keyids}") @@ -261,7 +264,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, role_dict: dict[str, Any]) -> "Role": + def from_dict(cls, role_dict: dict[str, Any]) -> Role: """Create ``Role`` object from its json/dict representation. Raises: @@ -343,17 +346,19 @@ def verified(self) -> bool: def signed(self) -> dict[str, Key]: """Dictionary of all signing keys that have signed, from both VerificationResults. - return a union of all signed. + return a union of all signed (in python<3.9 this requires + dict unpacking) """ - return self.first.signed | self.second.signed + return {**self.first.signed, **self.second.signed} @property def unsigned(self) -> dict[str, Key]: """Dictionary of all signing keys that have not signed, from both VerificationResults. - return a union of all unsigned. + return a union of all unsigned (in python<3.9 this requires + dict unpacking) """ - return self.first.unsigned | self.second.unsigned + return {**self.first.unsigned, **self.second.unsigned} class _DelegatorMixin(metaclass=abc.ABCMeta): @@ -481,13 +486,13 @@ class Root(Signed, _DelegatorMixin): def __init__( self, - version: Optional[int] = None, - spec_version: Optional[str] = None, - expires: Optional[datetime] = None, - keys: Optional[dict[str, Key]] = None, - roles: Optional[dict[str, Role]] = None, - consistent_snapshot: Optional[bool] = True, - unrecognized_fields: Optional[dict[str, Any]] = None, + version: int | None = None, + spec_version: str | None = None, + expires: datetime | None = None, + keys: dict[str, Key] | None = None, + roles: dict[str, Role] | None = None, + consistent_snapshot: bool | None = True, + unrecognized_fields: dict[str, Any] | None = None, ): super().__init__(version, spec_version, expires, unrecognized_fields) self.consistent_snapshot = consistent_snapshot @@ -511,7 +516,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, signed_dict: dict[str, Any]) -> "Root": + def from_dict(cls, signed_dict: dict[str, Any]) -> Root: """Create ``Root`` object from its json/dict representation. Raises: @@ -609,7 +614,7 @@ def get_key(self, keyid: str) -> Key: def get_root_verification_result( self, - previous: Optional["Root"], + previous: Root | None, payload: bytes, signatures: dict[str, Signature], ) -> RootVerificationResult: @@ -656,7 +661,7 @@ class BaseFile: @staticmethod def _verify_hashes( - data: Union[bytes, IO[bytes]], expected_hashes: dict[str, str] + data: bytes | IO[bytes], expected_hashes: dict[str, str] ) -> None: """Verify that the hash of ``data`` matches ``expected_hashes``.""" is_bytes = isinstance(data, bytes) @@ -684,9 +689,7 @@ def _verify_hashes( ) @staticmethod - def _verify_length( - data: Union[bytes, IO[bytes]], expected_length: int - ) -> None: + def _verify_length(data: bytes | IO[bytes], expected_length: int) -> None: """Verify that the length of ``data`` matches ``expected_length``.""" if isinstance(data, bytes): observed_length = len(data) @@ -716,7 +719,7 @@ def _validate_length(length: int) -> None: @staticmethod def _get_length_and_hashes( - data: Union[bytes, IO[bytes]], hash_algorithms: Optional[list[str]] + data: bytes | IO[bytes], hash_algorithms: list[str] | None ) -> tuple[int, dict[str, str]]: """Calculate length and hashes of ``data``.""" if isinstance(data, bytes): @@ -771,9 +774,9 @@ class MetaFile(BaseFile): def __init__( self, version: int = 1, - length: Optional[int] = None, - hashes: Optional[dict[str, str]] = None, - unrecognized_fields: Optional[dict[str, Any]] = None, + length: int | None = None, + hashes: dict[str, str] | None = None, + unrecognized_fields: dict[str, Any] | None = None, ): if version <= 0: raise ValueError(f"Metafile version must be > 0, got {version}") @@ -802,7 +805,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, meta_dict: dict[str, Any]) -> "MetaFile": + def from_dict(cls, meta_dict: dict[str, Any]) -> MetaFile: """Create ``MetaFile`` object from its json/dict representation. Raises: @@ -819,9 +822,9 @@ def from_dict(cls, meta_dict: dict[str, Any]) -> "MetaFile": def from_data( cls, version: int, - data: Union[bytes, IO[bytes]], + data: bytes | IO[bytes], hash_algorithms: list[str], - ) -> "MetaFile": + ) -> MetaFile: """Creates MetaFile object from bytes. This constructor should only be used if hashes are wanted. By default, MetaFile(ver) should be used. @@ -853,7 +856,7 @@ def to_dict(self) -> dict[str, Any]: return res_dict - def verify_length_and_hashes(self, data: Union[bytes, IO[bytes]]) -> None: + def verify_length_and_hashes(self, data: bytes | IO[bytes]) -> None: """Verify that the length and hashes of ``data`` match expected values. Args: @@ -898,11 +901,11 @@ class Timestamp(Signed): def __init__( self, - version: Optional[int] = None, - spec_version: Optional[str] = None, - expires: Optional[datetime] = None, - snapshot_meta: Optional[MetaFile] = None, - unrecognized_fields: Optional[dict[str, Any]] = None, + version: int | None = None, + spec_version: str | None = None, + expires: datetime | None = None, + snapshot_meta: MetaFile | None = None, + unrecognized_fields: dict[str, Any] | None = None, ): super().__init__(version, spec_version, expires, unrecognized_fields) self.snapshot_meta = snapshot_meta or MetaFile(1) @@ -916,7 +919,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, signed_dict: dict[str, Any]) -> "Timestamp": + def from_dict(cls, signed_dict: dict[str, Any]) -> Timestamp: """Create ``Timestamp`` object from its json/dict representation. Raises: @@ -961,11 +964,11 @@ class Snapshot(Signed): def __init__( self, - version: Optional[int] = None, - spec_version: Optional[str] = None, - expires: Optional[datetime] = None, - meta: Optional[dict[str, MetaFile]] = None, - unrecognized_fields: Optional[dict[str, Any]] = None, + version: int | None = None, + spec_version: str | None = None, + expires: datetime | None = None, + meta: dict[str, MetaFile] | None = None, + unrecognized_fields: dict[str, Any] | None = None, ): super().__init__(version, spec_version, expires, unrecognized_fields) self.meta = meta if meta is not None else {"targets.json": MetaFile(1)} @@ -977,7 +980,7 @@ def __eq__(self, other: object) -> bool: return super().__eq__(other) and self.meta == other.meta @classmethod - def from_dict(cls, signed_dict: dict[str, Any]) -> "Snapshot": + def from_dict(cls, signed_dict: dict[str, Any]) -> Snapshot: """Create ``Snapshot`` object from its json/dict representation. Raises: @@ -1038,9 +1041,9 @@ def __init__( keyids: list[str], threshold: int, terminating: bool, - paths: Optional[list[str]] = None, - path_hash_prefixes: Optional[list[str]] = None, - unrecognized_fields: Optional[dict[str, Any]] = None, + paths: list[str] | None = None, + path_hash_prefixes: list[str] | None = None, + unrecognized_fields: dict[str, Any] | None = None, ): super().__init__(keyids, threshold, unrecognized_fields) self.name = name @@ -1074,7 +1077,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, role_dict: dict[str, Any]) -> "DelegatedRole": + def from_dict(cls, role_dict: dict[str, Any]) -> DelegatedRole: """Create ``DelegatedRole`` object from its json/dict representation. Raises: @@ -1200,7 +1203,7 @@ def __init__( threshold: int, bit_length: int, name_prefix: str, - unrecognized_fields: Optional[dict[str, Any]] = None, + unrecognized_fields: dict[str, Any] | None = None, ) -> None: super().__init__(keyids, threshold, unrecognized_fields) @@ -1232,7 +1235,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, role_dict: dict[str, Any]) -> "SuccinctRoles": + def from_dict(cls, role_dict: dict[str, Any]) -> SuccinctRoles: """Create ``SuccinctRoles`` object from its json/dict representation. Raises: @@ -1340,9 +1343,9 @@ class Delegations: def __init__( self, keys: dict[str, Key], - roles: Optional[dict[str, DelegatedRole]] = None, - succinct_roles: Optional[SuccinctRoles] = None, - unrecognized_fields: Optional[dict[str, Any]] = None, + roles: dict[str, DelegatedRole] | None = None, + succinct_roles: SuccinctRoles | None = None, + unrecognized_fields: dict[str, Any] | None = None, ): self.keys = keys if sum(1 for v in [roles, succinct_roles] if v is not None) != 1: @@ -1384,7 +1387,7 @@ def __eq__(self, other: object) -> bool: return all_attributes_check @classmethod - def from_dict(cls, delegations_dict: dict[str, Any]) -> "Delegations": + def from_dict(cls, delegations_dict: dict[str, Any]) -> Delegations: """Create ``Delegations`` object from its json/dict representation. Raises: @@ -1395,7 +1398,7 @@ def from_dict(cls, delegations_dict: dict[str, Any]) -> "Delegations": for keyid, key_dict in keys.items(): keys_res[keyid] = Key.from_dict(keyid, key_dict) roles = delegations_dict.pop("roles", None) - roles_res: Optional[dict[str, DelegatedRole]] = None + roles_res: dict[str, DelegatedRole] | None = None if roles is not None: roles_res = {} @@ -1472,7 +1475,7 @@ def __init__( length: int, hashes: dict[str, str], path: str, - unrecognized_fields: Optional[dict[str, Any]] = None, + unrecognized_fields: dict[str, Any] | None = None, ): self._validate_length(length) self._validate_hashes(hashes) @@ -1505,7 +1508,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, target_dict: dict[str, Any], path: str) -> "TargetFile": + def from_dict(cls, target_dict: dict[str, Any], path: str) -> TargetFile: """Create ``TargetFile`` object from its json/dict representation. Raises: @@ -1530,8 +1533,8 @@ def from_file( cls, target_file_path: str, local_path: str, - hash_algorithms: Optional[list[str]] = None, - ) -> "TargetFile": + hash_algorithms: list[str] | None = None, + ) -> TargetFile: """Create ``TargetFile`` object from a file. Args: @@ -1553,9 +1556,9 @@ def from_file( def from_data( cls, target_file_path: str, - data: Union[bytes, IO[bytes]], - hash_algorithms: Optional[list[str]] = None, - ) -> "TargetFile": + data: bytes | IO[bytes], + hash_algorithms: list[str] | None = None, + ) -> TargetFile: """Create ``TargetFile`` object from bytes. Args: @@ -1572,7 +1575,7 @@ def from_data( length, hashes = cls._get_length_and_hashes(data, hash_algorithms) return cls(length, hashes, target_file_path) - def verify_length_and_hashes(self, data: Union[bytes, IO[bytes]]) -> None: + def verify_length_and_hashes(self, data: bytes | IO[bytes]) -> None: """Verify that length and hashes of ``data`` match expected values. Args: @@ -1626,12 +1629,12 @@ class Targets(Signed, _DelegatorMixin): def __init__( self, - version: Optional[int] = None, - spec_version: Optional[str] = None, - expires: Optional[datetime] = None, - targets: Optional[dict[str, TargetFile]] = None, - delegations: Optional[Delegations] = None, - unrecognized_fields: Optional[dict[str, Any]] = None, + version: int | None = None, + spec_version: str | None = None, + expires: datetime | None = None, + targets: dict[str, TargetFile] | None = None, + delegations: Delegations | None = None, + unrecognized_fields: dict[str, Any] | None = None, ) -> None: super().__init__(version, spec_version, expires, unrecognized_fields) self.targets = targets if targets is not None else {} @@ -1648,7 +1651,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, signed_dict: dict[str, Any]) -> "Targets": + def from_dict(cls, signed_dict: dict[str, Any]) -> Targets: """Create ``Targets`` object from its json/dict representation. Raises: @@ -1681,7 +1684,7 @@ def to_dict(self) -> dict[str, Any]: targets_dict["delegations"] = self.delegations.to_dict() return targets_dict - def add_key(self, key: Key, role: Optional[str] = None) -> None: + def add_key(self, key: Key, role: str | None = None) -> None: """Add new signing key for delegated role ``role``. If succinct_roles is used then the ``role`` argument is not required. @@ -1713,7 +1716,7 @@ def add_key(self, key: Key, role: Optional[str] = None) -> None: self.delegations.keys[key.keyid] = key - def revoke_key(self, keyid: str, role: Optional[str] = None) -> None: + def revoke_key(self, keyid: str, role: str | None = None) -> None: """Revokes key from delegated role ``role`` and updates the delegations key store. @@ -1760,7 +1763,7 @@ def get_delegated_role(self, delegated_role: str) -> Role: if self.delegations is None: raise ValueError("No delegations found") - role: Optional[Role] = None + role: Role | None = None if self.delegations.roles is not None: role = self.delegations.roles.get(delegated_role) elif self.delegations.succinct_roles is not None: diff --git a/tuf/api/dsse.py b/tuf/api/dsse.py index 7834798e14..d027d14013 100644 --- a/tuf/api/dsse.py +++ b/tuf/api/dsse.py @@ -1,5 +1,7 @@ """Low-level TUF DSSE API. (experimental!)""" +from __future__ import annotations + import json from typing import Generic, cast @@ -55,7 +57,7 @@ class SimpleEnvelope(Generic[T], BaseSimpleEnvelope): DEFAULT_PAYLOAD_TYPE = "application/vnd.tuf+json" @classmethod - def from_bytes(cls, data: bytes) -> "SimpleEnvelope[T]": + def from_bytes(cls, data: bytes) -> SimpleEnvelope[T]: """Load envelope from JSON bytes. NOTE: Unlike ``tuf.api.metadata.Metadata.from_bytes``, this method @@ -102,7 +104,7 @@ def to_bytes(self) -> bytes: return json_bytes @classmethod - def from_signed(cls, signed: T) -> "SimpleEnvelope[T]": + def from_signed(cls, signed: T) -> SimpleEnvelope[T]: """Serialize payload as JSON bytes and wrap in envelope. Args: diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index ed54230dab..76b5ce0fde 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -30,9 +30,11 @@ `examples/repository `_. """ +from __future__ import annotations + import logging import tempfile -from typing import Any, Generic, Optional, cast +from typing import TYPE_CHECKING, Any, Generic, cast from securesystemslib.signer import Signature, Signer from securesystemslib.storage import FilesystemBackend, StorageBackendInterface @@ -65,11 +67,13 @@ VerificationResult, ) from tuf.api.exceptions import UnsignedMetadataError -from tuf.api.serialization import ( - MetadataDeserializer, - MetadataSerializer, - SignedSerializer, -) + +if TYPE_CHECKING: + from tuf.api.serialization import ( + MetadataDeserializer, + MetadataSerializer, + SignedSerializer, + ) logger = logging.getLogger(__name__) @@ -121,8 +125,8 @@ class Metadata(Generic[T]): def __init__( self, signed: T, - signatures: Optional[dict[str, Signature]] = None, - unrecognized_fields: Optional[dict[str, Any]] = None, + signatures: dict[str, Signature] | None = None, + unrecognized_fields: dict[str, Any] | None = None, ): self.signed: T = signed self.signatures = signatures if signatures is not None else {} @@ -153,7 +157,7 @@ def signed_bytes(self) -> bytes: return CanonicalJSONSerializer().serialize(self.signed) @classmethod - def from_dict(cls, metadata: dict[str, Any]) -> "Metadata[T]": + def from_dict(cls, metadata: dict[str, Any]) -> Metadata[T]: """Create ``Metadata`` object from its json/dict representation. Args: @@ -205,9 +209,9 @@ def from_dict(cls, metadata: dict[str, Any]) -> "Metadata[T]": def from_file( cls, filename: str, - deserializer: Optional[MetadataDeserializer] = None, - storage_backend: Optional[StorageBackendInterface] = None, - ) -> "Metadata[T]": + deserializer: MetadataDeserializer | None = None, + storage_backend: StorageBackendInterface | None = None, + ) -> Metadata[T]: """Load TUF metadata from file storage. Args: @@ -238,8 +242,8 @@ def from_file( def from_bytes( cls, data: bytes, - deserializer: Optional[MetadataDeserializer] = None, - ) -> "Metadata[T]": + deserializer: MetadataDeserializer | None = None, + ) -> Metadata[T]: """Load TUF metadata from raw data. Args: @@ -263,9 +267,7 @@ def from_bytes( return deserializer.deserialize(data) - def to_bytes( - self, serializer: Optional[MetadataSerializer] = None - ) -> bytes: + def to_bytes(self, serializer: MetadataSerializer | None = None) -> bytes: """Return the serialized TUF file format as bytes. Note that if bytes are first deserialized into ``Metadata`` and then @@ -306,8 +308,8 @@ def to_dict(self) -> dict[str, Any]: def to_file( self, filename: str, - serializer: Optional[MetadataSerializer] = None, - storage_backend: Optional[StorageBackendInterface] = None, + serializer: MetadataSerializer | None = None, + storage_backend: StorageBackendInterface | None = None, ) -> None: """Write TUF metadata to file storage. @@ -345,7 +347,7 @@ def sign( self, signer: Signer, append: bool = False, - signed_serializer: Optional[SignedSerializer] = None, + signed_serializer: SignedSerializer | None = None, ) -> Signature: """Create signature over ``signed`` and assigns it to ``signatures``. @@ -388,8 +390,8 @@ def sign( def verify_delegate( self, delegated_role: str, - delegated_metadata: "Metadata", - signed_serializer: Optional[SignedSerializer] = None, + delegated_metadata: Metadata, + signed_serializer: SignedSerializer | None = None, ) -> None: """Verify that ``delegated_metadata`` is signed with the required threshold of keys for ``delegated_role``. diff --git a/tuf/ngclient/_internal/requests_fetcher.py b/tuf/ngclient/_internal/requests_fetcher.py index 72269aa4ea..2f89e47ab4 100644 --- a/tuf/ngclient/_internal/requests_fetcher.py +++ b/tuf/ngclient/_internal/requests_fetcher.py @@ -9,9 +9,10 @@ # sigstore-python 1.0 still uses the module from there). requests_fetcher # can be moved out of _internal once sigstore-python 1.0 is not relevant. +from __future__ import annotations + import logging -from collections.abc import Iterator -from typing import Optional +from typing import TYPE_CHECKING from urllib import parse # Imports @@ -21,6 +22,9 @@ from tuf.api import exceptions from tuf.ngclient.fetcher import FetcherInterface +if TYPE_CHECKING: + from collections.abc import Iterator + # Globals logger = logging.getLogger(__name__) @@ -39,7 +43,7 @@ def __init__( self, socket_timeout: int = 30, chunk_size: int = 400000, - app_user_agent: Optional[str] = None, + app_user_agent: str | None = None, ) -> None: # http://docs.python-requests.org/en/master/user/advanced/#session-objects: # @@ -103,7 +107,7 @@ def _fetch(self, url: str) -> Iterator[bytes]: return self._chunks(response) - def _chunks(self, response: "requests.Response") -> Iterator[bytes]: + def _chunks(self, response: requests.Response) -> Iterator[bytes]: """A generator function to be returned by fetch. This way the caller of fetch can differentiate between connection diff --git a/tuf/ngclient/_internal/trusted_metadata_set.py b/tuf/ngclient/_internal/trusted_metadata_set.py index a178b318b6..3678ddf3a1 100644 --- a/tuf/ngclient/_internal/trusted_metadata_set.py +++ b/tuf/ngclient/_internal/trusted_metadata_set.py @@ -61,13 +61,12 @@ >>> trusted_set.update_snapshot(f.read()) """ +from __future__ import annotations + import datetime import logging from collections import abc -from collections.abc import Iterator -from typing import Optional, Union, cast - -from securesystemslib.signer import Signature +from typing import TYPE_CHECKING, Union, cast from tuf.api import exceptions from tuf.api.dsse import SimpleEnvelope @@ -82,6 +81,11 @@ ) from tuf.ngclient.config import EnvelopeType +if TYPE_CHECKING: + from collections.abc import Iterator + + from securesystemslib.signer import Signature + logger = logging.getLogger(__name__) Delegator = Union[Root, Targets] @@ -270,7 +274,7 @@ def _check_final_timestamp(self) -> None: raise exceptions.ExpiredMetadataError("timestamp.json is expired") def update_snapshot( - self, data: bytes, trusted: Optional[bool] = False + self, data: bytes, trusted: bool | None = False ) -> Snapshot: """Verify and load ``data`` as new snapshot metadata. @@ -402,7 +406,7 @@ def update_delegated_targets( # does not match meta version in timestamp self._check_final_snapshot() - delegator: Optional[Delegator] = self.get(delegator_name) + delegator: Delegator | None = self.get(delegator_name) if delegator is None: raise RuntimeError("Cannot load targets before delegator") @@ -453,8 +457,8 @@ def _load_trusted_root(self, data: bytes) -> None: def _load_from_metadata( role: type[T], data: bytes, - delegator: Optional[Delegator] = None, - role_name: Optional[str] = None, + delegator: Delegator | None = None, + role_name: str | None = None, ) -> tuple[T, bytes, dict[str, Signature]]: """Load traditional metadata bytes, and extract and verify payload. @@ -480,8 +484,8 @@ def _load_from_metadata( def _load_from_simple_envelope( role: type[T], data: bytes, - delegator: Optional[Delegator] = None, - role_name: Optional[str] = None, + delegator: Delegator | None = None, + role_name: str | None = None, ) -> tuple[T, bytes, dict[str, Signature]]: """Load simple envelope bytes, and extract and verify payload. diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index f9327610c2..51bda41f26 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -37,19 +37,23 @@ `_. """ +from __future__ import annotations + import contextlib import logging import os import shutil import tempfile -from typing import Optional, cast +from typing import TYPE_CHECKING, cast from urllib import parse from tuf.api import exceptions from tuf.api.metadata import Root, Snapshot, TargetFile, Targets, Timestamp from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set from tuf.ngclient.config import EnvelopeType, UpdaterConfig -from tuf.ngclient.fetcher import FetcherInterface + +if TYPE_CHECKING: + from tuf.ngclient.fetcher import FetcherInterface logger = logging.getLogger(__name__) @@ -80,10 +84,10 @@ def __init__( self, metadata_dir: str, metadata_base_url: str, - target_dir: Optional[str] = None, - target_base_url: Optional[str] = None, - fetcher: Optional[FetcherInterface] = None, - config: Optional[UpdaterConfig] = None, + target_dir: str | None = None, + target_base_url: str | None = None, + fetcher: FetcherInterface | None = None, + config: UpdaterConfig | None = None, ): self._dir = metadata_dir self._metadata_base_url = _ensure_trailing_slash(metadata_base_url) @@ -153,7 +157,7 @@ def _generate_target_file_path(self, targetinfo: TargetFile) -> str: filename = parse.quote(targetinfo.path, "") return os.path.join(self.target_dir, filename) - def get_targetinfo(self, target_path: str) -> Optional[TargetFile]: + def get_targetinfo(self, target_path: str) -> TargetFile | None: """Return ``TargetFile`` instance with information for ``target_path``. The return value can be used as an argument to @@ -186,8 +190,8 @@ def get_targetinfo(self, target_path: str) -> Optional[TargetFile]: def find_cached_target( self, targetinfo: TargetFile, - filepath: Optional[str] = None, - ) -> Optional[str]: + filepath: str | None = None, + ) -> str | None: """Check whether a local file is an up to date target. Args: @@ -216,8 +220,8 @@ def find_cached_target( def download_target( self, targetinfo: TargetFile, - filepath: Optional[str] = None, - target_base_url: Optional[str] = None, + filepath: str | None = None, + target_base_url: str | None = None, ) -> str: """Download the target file specified by ``targetinfo``. @@ -275,7 +279,7 @@ def download_target( return filepath def _download_metadata( - self, rolename: str, length: int, version: Optional[int] = None + self, rolename: str, length: int, version: int | None = None ) -> bytes: """Download a metadata file and return it as bytes.""" encoded_name = parse.quote(rolename, "") @@ -292,7 +296,7 @@ def _load_local_metadata(self, rolename: str) -> bytes: def _persist_metadata(self, rolename: str, data: bytes) -> None: """Write metadata to disk atomically to avoid data loss.""" - temp_file_name: Optional[str] = None + temp_file_name: str | None = None try: # encode the rolename to avoid issues with e.g. path separators encoded_name = parse.quote(rolename, "") @@ -420,7 +424,7 @@ def _load_targets(self, role: str, parent_role: str) -> Targets: def _preorder_depth_first_walk( self, target_filepath: str - ) -> Optional[TargetFile]: + ) -> TargetFile | None: """ Interrogates the tree of target delegations in order of appearance (which implicitly order trustworthiness), and returns the matching diff --git a/tuf/repository/_repository.py b/tuf/repository/_repository.py index 82a75c7c31..a6c5de1ea4 100644 --- a/tuf/repository/_repository.py +++ b/tuf/repository/_repository.py @@ -3,12 +3,13 @@ """Repository Abstraction for metadata management""" +from __future__ import annotations + import logging from abc import ABC, abstractmethod -from collections.abc import Generator from contextlib import contextmanager, suppress from copy import deepcopy -from typing import Optional +from typing import TYPE_CHECKING from tuf.api.exceptions import UnsignedMetadataError from tuf.api.metadata import ( @@ -21,6 +22,9 @@ Timestamp, ) +if TYPE_CHECKING: + from collections.abc import Generator + logger = logging.getLogger(__name__) @@ -110,7 +114,7 @@ def edit_root(self) -> Generator[Root, None, None]: """Context manager for editing root metadata. See edit()""" with self.edit(Root.type) as root: if not isinstance(root, Root): - raise RuntimeError("Unexpected root type") + raise AssertionError("Unexpected root type") yield root @contextmanager @@ -118,7 +122,7 @@ def edit_timestamp(self) -> Generator[Timestamp, None, None]: """Context manager for editing timestamp metadata. See edit()""" with self.edit(Timestamp.type) as timestamp: if not isinstance(timestamp, Timestamp): - raise RuntimeError("Unexpected timestamp type") + raise AssertionError("Unexpected timestamp type") yield timestamp @contextmanager @@ -126,7 +130,7 @@ def edit_snapshot(self) -> Generator[Snapshot, None, None]: """Context manager for editing snapshot metadata. See edit()""" with self.edit(Snapshot.type) as snapshot: if not isinstance(snapshot, Snapshot): - raise RuntimeError("Unexpected snapshot type") + raise AssertionError("Unexpected snapshot type") yield snapshot @contextmanager @@ -136,35 +140,35 @@ def edit_targets( """Context manager for editing targets metadata. See edit()""" with self.edit(rolename) as targets: if not isinstance(targets, Targets): - raise RuntimeError(f"Unexpected targets ({rolename}) type") + raise AssertionError(f"Unexpected targets ({rolename}) type") yield targets def root(self) -> Root: """Read current root metadata""" root = self.open(Root.type).signed if not isinstance(root, Root): - raise RuntimeError("Unexpected root type") + raise AssertionError("Unexpected root type") return root def timestamp(self) -> Timestamp: """Read current timestamp metadata""" timestamp = self.open(Timestamp.type).signed if not isinstance(timestamp, Timestamp): - raise RuntimeError("Unexpected timestamp type") + raise AssertionError("Unexpected timestamp type") return timestamp def snapshot(self) -> Snapshot: """Read current snapshot metadata""" snapshot = self.open(Snapshot.type).signed if not isinstance(snapshot, Snapshot): - raise RuntimeError("Unexpected snapshot type") + raise AssertionError("Unexpected snapshot type") return snapshot def targets(self, rolename: str = Targets.type) -> Targets: """Read current targets metadata""" targets = self.open(rolename).signed if not isinstance(targets, Targets): - raise RuntimeError("Unexpected targets type") + raise AssertionError("Unexpected targets type") return targets def do_snapshot( @@ -229,9 +233,7 @@ def do_snapshot( return update_version, removed - def do_timestamp( - self, force: bool = False - ) -> tuple[bool, Optional[MetaFile]]: + def do_timestamp(self, force: bool = False) -> tuple[bool, MetaFile | None]: """Update timestamp meta information Updates timestamp according to current snapshot state