Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- bump: patch
changes:
fixed:
- Added more log messages.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
SingleEconomy,
)
from typing import List, Dict, Optional
import logging

logger = logging.getLogger(__file__)


class BudgetaryImpact(BaseModel):
Expand Down Expand Up @@ -809,7 +812,9 @@ def calculate_economy_comparison(
if not simulation.is_comparison:
raise ValueError("Simulation must be a comparison simulation.")

logging.info("Calculating baseline econonmy")
baseline: SingleEconomy = simulation.calculate_single_economy(reform=False)
logging.info("Calculating reform economy")
reform: SingleEconomy = simulation.calculate_single_economy(reform=True)
options = simulation.options
country_id = options.country
Expand All @@ -835,6 +840,7 @@ def calculate_economy_comparison(
)

if simulation.options.include_cliffs:
logging.info("Calculating cliff impacts")
cliff_impact = CliffImpact(
baseline=CliffImpactInSimulation(
cliff_gap=baseline.cliff_gap,
Expand All @@ -848,6 +854,7 @@ def calculate_economy_comparison(
else:
cliff_impact = None

logging.info("Comparison complete")
return EconomyComparison(
model_version=simulation.model_version,
data_version=simulation.data_version,
Expand Down
7 changes: 7 additions & 0 deletions policyengine/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
from typing import Callable
import importlib
from policyengine.utils.data_download import download
import logging

logger = logging.getLogger(__file__)

CountryType = Literal["uk", "us"]
ScopeType = Literal["household", "macro"]
Expand Down Expand Up @@ -102,10 +105,14 @@ def __init__(self, **options: SimulationOptions):
if not isinstance(self.options.data, dict) and not isinstance(
self.options.data, Dataset
):
logging.debug("Loading data")
self._set_data(self.options.data)
logging.info("Data loaded")
self._initialise_simulations()
logging.info("Simulations initialised")
self.check_data_version()
self._add_output_functions()
logging.info("Output functions loaded")

def _add_output_functions(self):
folder = Path(__file__).parent / "outputs"
Expand Down
23 changes: 15 additions & 8 deletions policyengine/utils/data/caching_google_storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,15 @@ def download(
f"Copying downloaded data for {bucket}, {key} to {target}"
)
atomic_write(target, data)
logger.info(
f"Data successfully copied to {target} with version {version}"
)
if return_version:
return version
return
logger.error(
f"Cached data for {bucket}, {key}{', ' + version if version is not None else ''} is not bytes."
)
raise Exception("Expected data for blob to be cached as bytes")

# If the crc has changed from what we downloaded last time download it again.
Expand All @@ -68,22 +74,22 @@ def sync(
crckey = f"{bucket}.{key}.{version}.crc"

crc = self.client.crc32c(bucket, key, version=version)
id_string = (
f"{bucket}, {key}{', ' + version if version is not None else ''}"
)
if crc is None:
raise Exception(f"Unable to find {key} in bucket {bucket}")
raise Exception(f"Unable to find {id_string}")

prev_crc = self.cache.get(crckey, default=None)
logger.debug(f"Previous crc for {bucket}, {key} was {prev_crc}")
logger.debug(f"Previous crc for {id_string} was {prev_crc}")
if prev_crc == crc:
logger.info(
f"Cache exists and crc is unchanged for {bucket}, {key}."
)
logger.info(f"Cache exists and crc is unchanged for {id_string} .")
return

[content, downloaded_crc] = self.client.download(
bucket, key, version=version
)
logger.debug(
f"Downloaded new version of {bucket}, {key} with crc {downloaded_crc}"
logger.info(
f"Downloaded new version of {id_string} with crc {downloaded_crc}"
)

# atomic transaction to update both the data and the metadata
Expand All @@ -94,6 +100,7 @@ def sync(
# Whatever the CRC was before we downloaded, we set the cache CRC
# to the CRC reported by the download itself to avoid race conditions.
self.cache.set(crckey, downloaded_crc)
logger.info("Cache updated for {id_string}")

def clear(self):
self.cache.clear()
Expand Down
61 changes: 42 additions & 19 deletions policyengine/utils/data/simplified_google_storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,31 @@ def __init__(self):
self.client = Client()

def get_versioned_blob(
self, bucket: str, key: str, version: Optional[str] = None
self, bucket_name: str, key: str, version: Optional[str] = None
) -> Blob:
"""
Get a versioned blob from the specified bucket and key.
If version is None, returns the latest version of the blob.
"""
bucket = self.client.bucket(bucket)
bucket = self.client.bucket(bucket_name)
if version is None:
return bucket.blob(key)
else:
versions: Iterable[Blob] = bucket.list_blobs(
prefix=key, versions=True
)
for v in versions:
if v.metadata is None:
continue # Skip blobs without metadata
if v.metadata.get("version") == version:
return v
raise ValueError(
f"Could not find version {version} of blob {key} in bucket {bucket.name}"
)
logging.debug(
"Searching {bucket_name}, {prefix}* for version {version}"
)
versions: Iterable[Blob] = bucket.list_blobs(prefix=key, versions=True)
for v in versions:
if v.metadata is None:
continue # Skip blobs without metadata
if v.metadata.get("version") == version:
logging.info(
f"Blob {bucket_name}, {v.path} has version {version}"
)
return v
logging.info(f"No version {version} found for {bucket_name}, {key}")
raise ValueError(
f"Could not find version {version} of blob {key} in bucket {bucket.name}"
)

def crc32c(
self, bucket_name: str, key: str, version: Optional[str] = None
Expand All @@ -51,7 +55,7 @@ def crc32c(
blob = self.get_versioned_blob(bucket_name, key, version)

blob.reload()
logger.debug(f"Crc is {blob.crc32c}")
logger.info(f"Crc for {bucket_name}, {key} is {blob.crc32c}")
return blob.crc32c

def download(
Expand All @@ -60,10 +64,15 @@ def download(
"""
get the blob content and associated CRC from google storage.
"""
logger.info(f"Downloading {bucket}, {key}")
logger.debug(
f"Downloading {bucket}, {key}{ ', version:' + version if version is not None else ''}"
)
blob = self.get_versioned_blob(bucket, key, version)

result = blob.download_as_bytes()
logger.info(
f"Downloaded {bucket}, {key}{ ', version:' + version if version is not None else ''}"
)
# According to documentation blob.crc32c is updated as a side effect of
# downloading the content. As a result this should now be the crc of the downloaded
# content (i.e. there is not a race condition where it's getting the CRC from the cloud)
Expand All @@ -74,11 +83,25 @@ def _get_latest_version(self, bucket: str, key: str) -> Optional[str]:
Get the latest version of a blob in the specified bucket and key.
If no version is specified, return None.
"""
logger.debug(f"Getting latest version of {bucket}, {key}")
blob = self.client.get_bucket(bucket).get_blob(key)
if blob is None:
logging.warning(f"No blob found in bucket {bucket} with key {key}")
return None

if blob.metadata is None:
logging.warning(
"No metadata found for blob, so it has no version attached."
f"No metadata found for blob {bucket}, {key}, so it has no version attached."
)
return None

version = blob.metadata.get("version")
if version is None:
logging.warning(
f"Blob {bucket}, {key} does not have a version in its metadata"
)
return None
else:
return blob.metadata.get("version")
logging.info(
f"Metadata for blob {bucket}, {key} has version: {version}"
)
return blob.metadata.get("version")
6 changes: 3 additions & 3 deletions policyengine/utils/data_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ def download(
gcs_bucket: str,
version: Optional[str] = None,
return_version: bool = False,
) -> Tuple[str, Optional[str]]:
) -> Tuple[str, str] | str:
logging.info("Using Google Cloud Storage for download.")
version = download_file_from_gcs(
downloaded_version = download_file_from_gcs(
bucket_name=gcs_bucket,
file_name=filepath,
destination_path=filepath,
version=version,
)
if return_version:
return filepath, version
return filepath, downloaded_version
return filepath
9 changes: 5 additions & 4 deletions policyengine/utils/google_cloud_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import asyncio
from pathlib import Path
from google.cloud.storage import Blob
from typing import Iterable
from typing import Iterable, Optional

_caching_client: CachingGoogleStorageClient | None = None

Expand All @@ -24,8 +24,8 @@ def download_file_from_gcs(
bucket_name: str,
file_name: str,
destination_path: str,
version: str = None,
) -> str:
version: Optional[str] = None,
) -> str | None:
"""
Download a file from Google Cloud Storage to a local path.

Expand All @@ -38,10 +38,11 @@ def download_file_from_gcs(
version (str): The version of the file that was downloaded, if available.
"""

return _get_client().download(
version = _get_client().download(
bucket_name,
file_name,
Path(destination_path),
version=version,
return_version=True,
)
return version
Loading