11import logging
2- from abc import ABC , abstractmethod
2+ import re
3+ from abc import abstractmethod
34from datetime import datetime
45from typing import Any , Union
56
67import requests
8+ from typing_extensions import override
79
8- from twyn .base .utils import normalize_packages
910from twyn .trusted_packages .cache_handler import CacheEntry , CacheHandler
1011from twyn .trusted_packages .exceptions import (
1112 EmptyPackagesListError ,
1213 InvalidJSONError ,
1314 InvalidPyPiFormatError ,
15+ PackageNormalizingError ,
1416)
1517
1618logger = logging .getLogger ("twyn" )
1719
1820
19- class AbstractPackageReference (ABC ):
20- """Represents a reference from where to retrieve trusted packages."""
21+ class AbstractPackageReference :
22+ """Represents a reference from where to retrieve trusted packages.
23+
24+ It abstracts all the package-retrieval and caching logic.
25+
26+ It defines the `_parse` abstract method, so each subclass defines how to handle the feched data.
27+ It defines the `normalize_package` abstract method, so each subclass validates that the packages names are correct.
28+ """
2129
2230 def __init__ (self , source : str , cache_handler : Union [CacheHandler , None ] = None ) -> None :
2331 self .source = source
2432 self .cache_handler = cache_handler
2533
34+ @staticmethod
2635 @abstractmethod
27- def get_packages (self ) -> set [str ]:
28- """Return the names of the trusted packages available in the reference."""
29-
30-
31- class TopPyPiReference (AbstractPackageReference ):
32- """Top PyPi packages retrieved from an online source."""
33-
34- def get_packages (self ) -> set [str ]:
35- """Download and parse online source of top Python Package Index packages."""
36- packages_to_use = set ()
37- packages_to_use = self ._get_packages_from_cache_if_enabled ()
38- # we don't save the cache here, we keep it as it is so the date remains the original one.
39-
40- if not packages_to_use :
41- # no cache usage, no cache hit (non-existent or outdated) or cache was empty.
42- logger .info ("Fetching trusted packages from PyPI reference..." )
43- packages_to_use = self ._parse (self ._download ())
36+ def _parse (packages_json : dict [str , Any ]) -> set [str ]:
37+ """Parse and retrieve the packages within the given json structure."""
4438
45- # New packages were downloaded, we create a new entry updating all values.
46- self ._save_trusted_packages_to_cache_if_enabled (packages_to_use )
39+ @staticmethod
40+ @abstractmethod
41+ def normalize_packages (packages : set [str ]) -> set [str ]:
42+ """Normalize package names to make sure they're valid within the package manager context."""
4743
48- normalized_packages = normalize_packages (packages_to_use )
49- return normalized_packages
44+ def _download (self ) -> dict [str , Any ]:
45+ packages = requests .get (self .source )
46+ packages .raise_for_status ()
47+ try :
48+ packages_json : dict [str , Any ] = packages .json ()
49+ except requests .exceptions .JSONDecodeError as err :
50+ raise InvalidJSONError from err
51+ else :
52+ logger .debug ("Successfully downloaded trusted packages list from %s" , self .source )
53+ return packages_json
5054
5155 def _save_trusted_packages_to_cache_if_enabled (self , packages : set [str ]) -> None :
5256 """Save trusted packages using CacheHandler."""
@@ -67,18 +71,28 @@ def _get_packages_from_cache_if_enabled(self) -> set[str]:
6771
6872 return cache_entry .packages
6973
70- def _download (self ) -> dict [str , Any ]:
71- packages = requests .get (self .source )
72- packages .raise_for_status ()
73- try :
74- packages_json : dict [str , Any ] = packages .json ()
75- except requests .exceptions .JSONDecodeError as err :
76- raise InvalidJSONError from err
74+ def get_packages (self ) -> set [str ]:
75+ """Download and parse online source of top Python Package Index packages."""
76+ packages_to_use = set ()
77+ packages_to_use = self ._get_packages_from_cache_if_enabled ()
78+ # we don't save the cache here, we keep it as it is so the date remains the original one.
79+
80+ if not packages_to_use :
81+ # no cache usage, no cache hit (non-existent or outdated) or cache was empty.
82+ logger .info ("Fetching trusted packages from PyPI reference..." )
83+ packages_to_use = self ._parse (self ._download ())
84+
85+ # New packages were downloaded, we create a new entry updating all values.
86+ self ._save_trusted_packages_to_cache_if_enabled (packages_to_use )
7787
78- logger .debug ("Successfully downloaded trusted packages list from %s" , self .source )
88+ normalized_packages = self .normalize_packages (packages_to_use )
89+ return normalized_packages
7990
80- return packages_json
8191
92+ class TopPyPiReference (AbstractPackageReference ):
93+ """Top PyPi packages retrieved from an online source."""
94+
95+ @override
8296 @staticmethod
8397 def _parse (packages_info : dict [str , Any ]) -> set [str ]:
8498 try :
@@ -90,5 +104,17 @@ def _parse(packages_info: dict[str, Any]) -> set[str]:
90104 raise EmptyPackagesListError
91105
92106 logger .debug ("Successfully parsed trusted packages list" )
93-
94107 return names
108+
109+ @override
110+ @staticmethod
111+ def normalize_packages (packages : set [str ]) -> set [str ]:
112+ """Normalize dependency names according to PyPi https://packaging.python.org/en/latest/specifications/name-normalization/."""
113+ renamed_packages = {re .sub (r"[-_.]+" , "-" , name ).lower () for name in packages }
114+
115+ pattern = re .compile (r"^([a-z0-9]|[a-z0-9][a-z0-9._-]*[a-z0-9])\Z" ) # noqa: F821
116+ for package in renamed_packages :
117+ if not pattern .match (package ):
118+ raise PackageNormalizingError (f"Package name '{ package } ' does not match required pattern" )
119+
120+ return renamed_packages
0 commit comments