diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29b..d231ead8 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,4 @@ +- bump: patch + changes: + fixed: + - Added more log messages. diff --git a/policyengine/outputs/macro/comparison/calculate_economy_comparison.py b/policyengine/outputs/macro/comparison/calculate_economy_comparison.py index bc53044c..7e6457da 100644 --- a/policyengine/outputs/macro/comparison/calculate_economy_comparison.py +++ b/policyengine/outputs/macro/comparison/calculate_economy_comparison.py @@ -11,6 +11,9 @@ SingleEconomy, ) from typing import List, Dict, Optional +import logging + +logger = logging.getLogger(__file__) class BudgetaryImpact(BaseModel): @@ -809,7 +812,9 @@ def calculate_economy_comparison( if not simulation.is_comparison: raise ValueError("Simulation must be a comparison simulation.") + logging.info("Calculating baseline econonmy") baseline: SingleEconomy = simulation.calculate_single_economy(reform=False) + logging.info("Calculating reform economy") reform: SingleEconomy = simulation.calculate_single_economy(reform=True) options = simulation.options country_id = options.country @@ -835,6 +840,7 @@ def calculate_economy_comparison( ) if simulation.options.include_cliffs: + logging.info("Calculating cliff impacts") cliff_impact = CliffImpact( baseline=CliffImpactInSimulation( cliff_gap=baseline.cliff_gap, @@ -848,6 +854,7 @@ def calculate_economy_comparison( else: cliff_impact = None + logging.info("Comparison complete") return EconomyComparison( model_version=simulation.model_version, data_version=simulation.data_version, diff --git a/policyengine/simulation.py b/policyengine/simulation.py index 551bae3a..a22038fd 100644 --- a/policyengine/simulation.py +++ b/policyengine/simulation.py @@ -33,6 +33,9 @@ from typing import Callable import importlib from policyengine.utils.data_download import download +import logging + +logger = logging.getLogger(__file__) CountryType = Literal["uk", "us"] ScopeType = Literal["household", "macro"] @@ -102,10 +105,14 @@ def __init__(self, **options: SimulationOptions): if not isinstance(self.options.data, dict) and not isinstance( self.options.data, Dataset ): + logging.debug("Loading data") self._set_data(self.options.data) + logging.info("Data loaded") self._initialise_simulations() + logging.info("Simulations initialised") self.check_data_version() self._add_output_functions() + logging.info("Output functions loaded") def _add_output_functions(self): folder = Path(__file__).parent / "outputs" diff --git a/policyengine/utils/data/caching_google_storage_client.py b/policyengine/utils/data/caching_google_storage_client.py index bfb1b1ff..dc9dd8f5 100644 --- a/policyengine/utils/data/caching_google_storage_client.py +++ b/policyengine/utils/data/caching_google_storage_client.py @@ -50,9 +50,15 @@ def download( f"Copying downloaded data for {bucket}, {key} to {target}" ) atomic_write(target, data) + logger.info( + f"Data successfully copied to {target} with version {version}" + ) if return_version: return version return + logger.error( + f"Cached data for {bucket}, {key}{', ' + version if version is not None else ''} is not bytes." + ) raise Exception("Expected data for blob to be cached as bytes") # If the crc has changed from what we downloaded last time download it again. @@ -68,22 +74,22 @@ def sync( crckey = f"{bucket}.{key}.{version}.crc" crc = self.client.crc32c(bucket, key, version=version) + id_string = ( + f"{bucket}, {key}{', ' + version if version is not None else ''}" + ) if crc is None: - raise Exception(f"Unable to find {key} in bucket {bucket}") + raise Exception(f"Unable to find {id_string}") prev_crc = self.cache.get(crckey, default=None) - logger.debug(f"Previous crc for {bucket}, {key} was {prev_crc}") + logger.debug(f"Previous crc for {id_string} was {prev_crc}") if prev_crc == crc: - logger.info( - f"Cache exists and crc is unchanged for {bucket}, {key}." - ) + logger.info(f"Cache exists and crc is unchanged for {id_string} .") return - [content, downloaded_crc] = self.client.download( bucket, key, version=version ) - logger.debug( - f"Downloaded new version of {bucket}, {key} with crc {downloaded_crc}" + logger.info( + f"Downloaded new version of {id_string} with crc {downloaded_crc}" ) # atomic transaction to update both the data and the metadata @@ -94,6 +100,7 @@ def sync( # Whatever the CRC was before we downloaded, we set the cache CRC # to the CRC reported by the download itself to avoid race conditions. self.cache.set(crckey, downloaded_crc) + logger.info("Cache updated for {id_string}") def clear(self): self.cache.clear() diff --git a/policyengine/utils/data/simplified_google_storage_client.py b/policyengine/utils/data/simplified_google_storage_client.py index f15f337b..b7d80df0 100644 --- a/policyengine/utils/data/simplified_google_storage_client.py +++ b/policyengine/utils/data/simplified_google_storage_client.py @@ -19,27 +19,31 @@ def __init__(self): self.client = Client() def get_versioned_blob( - self, bucket: str, key: str, version: Optional[str] = None + self, bucket_name: str, key: str, version: Optional[str] = None ) -> Blob: """ Get a versioned blob from the specified bucket and key. If version is None, returns the latest version of the blob. """ - bucket = self.client.bucket(bucket) + bucket = self.client.bucket(bucket_name) if version is None: return bucket.blob(key) - else: - versions: Iterable[Blob] = bucket.list_blobs( - prefix=key, versions=True - ) - for v in versions: - if v.metadata is None: - continue # Skip blobs without metadata - if v.metadata.get("version") == version: - return v - raise ValueError( - f"Could not find version {version} of blob {key} in bucket {bucket.name}" - ) + logging.debug( + "Searching {bucket_name}, {prefix}* for version {version}" + ) + versions: Iterable[Blob] = bucket.list_blobs(prefix=key, versions=True) + for v in versions: + if v.metadata is None: + continue # Skip blobs without metadata + if v.metadata.get("version") == version: + logging.info( + f"Blob {bucket_name}, {v.path} has version {version}" + ) + return v + logging.info(f"No version {version} found for {bucket_name}, {key}") + raise ValueError( + f"Could not find version {version} of blob {key} in bucket {bucket.name}" + ) def crc32c( self, bucket_name: str, key: str, version: Optional[str] = None @@ -51,7 +55,7 @@ def crc32c( blob = self.get_versioned_blob(bucket_name, key, version) blob.reload() - logger.debug(f"Crc is {blob.crc32c}") + logger.info(f"Crc for {bucket_name}, {key} is {blob.crc32c}") return blob.crc32c def download( @@ -60,10 +64,15 @@ def download( """ get the blob content and associated CRC from google storage. """ - logger.info(f"Downloading {bucket}, {key}") + logger.debug( + f"Downloading {bucket}, {key}{ ', version:' + version if version is not None else ''}" + ) blob = self.get_versioned_blob(bucket, key, version) result = blob.download_as_bytes() + logger.info( + f"Downloaded {bucket}, {key}{ ', version:' + version if version is not None else ''}" + ) # According to documentation blob.crc32c is updated as a side effect of # downloading the content. As a result this should now be the crc of the downloaded # content (i.e. there is not a race condition where it's getting the CRC from the cloud) @@ -74,11 +83,25 @@ def _get_latest_version(self, bucket: str, key: str) -> Optional[str]: Get the latest version of a blob in the specified bucket and key. If no version is specified, return None. """ + logger.debug(f"Getting latest version of {bucket}, {key}") blob = self.client.get_bucket(bucket).get_blob(key) + if blob is None: + logging.warning(f"No blob found in bucket {bucket} with key {key}") + return None + if blob.metadata is None: logging.warning( - "No metadata found for blob, so it has no version attached." + f"No metadata found for blob {bucket}, {key}, so it has no version attached." + ) + return None + + version = blob.metadata.get("version") + if version is None: + logging.warning( + f"Blob {bucket}, {key} does not have a version in its metadata" ) return None - else: - return blob.metadata.get("version") + logging.info( + f"Metadata for blob {bucket}, {key} has version: {version}" + ) + return blob.metadata.get("version") diff --git a/policyengine/utils/data_download.py b/policyengine/utils/data_download.py index 2bf60387..3f33fbf7 100644 --- a/policyengine/utils/data_download.py +++ b/policyengine/utils/data_download.py @@ -12,14 +12,14 @@ def download( gcs_bucket: str, version: Optional[str] = None, return_version: bool = False, -) -> Tuple[str, Optional[str]]: +) -> Tuple[str, str] | str: logging.info("Using Google Cloud Storage for download.") - version = download_file_from_gcs( + downloaded_version = download_file_from_gcs( bucket_name=gcs_bucket, file_name=filepath, destination_path=filepath, version=version, ) if return_version: - return filepath, version + return filepath, downloaded_version return filepath diff --git a/policyengine/utils/google_cloud_bucket.py b/policyengine/utils/google_cloud_bucket.py index 19f2f5ac..a7cc6842 100644 --- a/policyengine/utils/google_cloud_bucket.py +++ b/policyengine/utils/google_cloud_bucket.py @@ -2,7 +2,7 @@ import asyncio from pathlib import Path from google.cloud.storage import Blob -from typing import Iterable +from typing import Iterable, Optional _caching_client: CachingGoogleStorageClient | None = None @@ -24,8 +24,8 @@ def download_file_from_gcs( bucket_name: str, file_name: str, destination_path: str, - version: str = None, -) -> str: + version: Optional[str] = None, +) -> str | None: """ Download a file from Google Cloud Storage to a local path. @@ -38,10 +38,11 @@ def download_file_from_gcs( version (str): The version of the file that was downloaded, if available. """ - return _get_client().download( + version = _get_client().download( bucket_name, file_name, Path(destination_path), version=version, return_version=True, ) + return version