1+ import json
12import logging
23from abc import ABC , abstractmethod
4+ from datetime import date , datetime
35from typing import Any
46
57import requests
68
9+ from twyn .base .utils import _normalize_packages
10+ from twyn .file_handler .file_handler import FileHandler
11+ from twyn .trusted_packages .constants import TRUSTED_PACKAGES_FILE_PATH , TRUSTED_PACKAGES_MAX_RETENTION_DAYS
712from twyn .trusted_packages .exceptions import (
813 EmptyPackagesListError ,
14+ InvalidCacheError ,
915 InvalidJSONError ,
1016 InvalidPyPiFormatError ,
1117)
@@ -20,17 +26,95 @@ def __init__(self, source: str) -> None:
2026 self .source = source
2127
2228 @abstractmethod
23- def get_packages (self ) -> set [str ]:
29+ def get_packages (self , use_cache : bool = True ) -> set [str ]:
2430 """Return the names of the trusted packages available in the reference."""
2531
2632
2733class TopPyPiReference (AbstractPackageReference ):
2834 """Top PyPi packages retrieved from an online source."""
2935
30- def get_packages (self ) -> set [str ]:
36+ def get_packages (self , use_cache : bool = True ) -> set [str ]:
3137 """Download and parse online source of top Python Package Index packages."""
32- packages_info = self ._download ()
33- return self ._parse (packages_info )
38+ packages_to_use = set ()
39+ if use_cache :
40+ trusted_packages_file = FileHandler (TRUSTED_PACKAGES_FILE_PATH )
41+ packages_to_use = self ._get_packages_from_cache (trusted_packages_file )
42+ # we don't save the cache here, we keep it as it is so the date remains the original one.
43+
44+ if not packages_to_use :
45+ # no cache usage, no cache hit (non-existent or outdated) or cache was empty.
46+ logger .info ("Fetching trusted packages from PyPI reference..." )
47+ packages_to_use = self ._parse (self ._download ())
48+ if use_cache :
49+ self ._save_trusted_packages_to_file (packages_to_use , trusted_packages_file , self .source )
50+
51+ normalized_packages = _normalize_packages (packages_to_use )
52+ return normalized_packages
53+
54+ def _is_content_outdated (self , content_date : date ) -> bool :
55+ """Check if cached content is outdated based on retention days."""
56+ days_diff = (datetime .today ().date () - content_date ).days
57+ return days_diff > TRUSTED_PACKAGES_MAX_RETENTION_DAYS
58+
59+ def _save_trusted_packages_to_file (self , packages : set [str ], file_handler : FileHandler , source : str ) -> None :
60+ """Save trusted packages to JSON file with timestamp."""
61+ trusted_data = {
62+ "source" : source ,
63+ "data" : {
64+ "packages" : list (packages ),
65+ "count" : len (packages ),
66+ "saved_date" : datetime .now ().date ().isoformat (),
67+ },
68+ }
69+ file_handler .file_path .parent .mkdir (parents = True , exist_ok = True )
70+ file_handler .write (json .dumps (trusted_data ))
71+ logger .debug ("Saved %d trusted packages to %s" , len (packages ), file_handler .file_path )
72+
73+ def _load_trusted_packages_from_file (self , file_handler : FileHandler ) -> tuple [set [str ], bool ]:
74+ """Load trusted packages from JSON file and check if it's outdated."""
75+ try :
76+ try :
77+ trusted_packages_raw_content = json .loads (file_handler .read ())
78+ except json .JSONDecodeError as e :
79+ raise InvalidCacheError ("Could not decode cache." ) from e
80+
81+ try :
82+ data = trusted_packages_raw_content ["data" ]
83+ saved_date_str = data ["saved_date" ]
84+ except KeyError as e :
85+ raise InvalidCacheError ("Invalid cache format." ) from e
86+
87+ try :
88+ saved_date = datetime .fromisoformat (saved_date_str ).date ()
89+ except ValueError as e :
90+ raise InvalidCacheError ("Cache saved date is invalid." ) from e
91+
92+ try :
93+ packages = set (data ["packages" ])
94+ except TypeError as e :
95+ raise InvalidCacheError ("Invalid format in cached packages" ) from e
96+
97+ is_outdated = self ._is_content_outdated (saved_date )
98+
99+ except InvalidCacheError as e :
100+ logger .warning ("Error reading cached trusted packages: %s" , e )
101+ return set (), True
102+ else :
103+ if is_outdated :
104+ logger .info ("Cached trusted packages are outdated (saved: %s)" , saved_date )
105+ else :
106+ logger .debug ("Using cached trusted packages from %s" , saved_date )
107+
108+ return packages , is_outdated
109+
110+ def _get_packages_from_cache (self , trusted_packages_file : FileHandler ) -> set [str ]:
111+ """Get packages from cache file if it's present and up to date."""
112+ if trusted_packages_file .exists ():
113+ packages_from_cache , is_outdated = self ._load_trusted_packages_from_file (trusted_packages_file )
114+ if not is_outdated :
115+ return packages_from_cache
116+
117+ return set ()
34118
35119 def _download (self ) -> dict [str , Any ]:
36120 packages = requests .get (self .source )
0 commit comments