diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29b..347758cf 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,4 @@ +- bump: patch + changes: + added: + - Versioning to dataset uploads. diff --git a/policyengine_us_data/storage/upload_completed_datasets.py b/policyengine_us_data/storage/upload_completed_datasets.py index 84f267d8..943e5ebb 100644 --- a/policyengine_us_data/storage/upload_completed_datasets.py +++ b/policyengine_us_data/storage/upload_completed_datasets.py @@ -4,43 +4,28 @@ CPS_2023, ) from policyengine_us_data.storage import STORAGE_FOLDER -from policyengine_us_data.utils.huggingface import upload +from policyengine_us_data.utils.data_upload import upload_data_files from google.cloud import storage import google.auth def upload_datasets(): - credentials, project_id = google.auth.default() - storage_client = storage.Client( - credentials=credentials, project=project_id - ) - bucket = storage_client.bucket("policyengine-us-data") - - datasets_to_upload = [ - EnhancedCPS_2024, - Pooled_3_Year_CPS_2023, - CPS_2023, + dataset_files = [ + EnhancedCPS_2024.file_path, + Pooled_3_Year_CPS_2023.file_path, + CPS_2023.file_path, ] - for dataset in datasets_to_upload: - dataset = dataset() - if not dataset.exists: - raise ValueError( - f"Dataset {dataset.name} does not exist at {dataset.file_path}." - ) + for file_path in dataset_files: + if not file_path.exists(): + raise ValueError(f"File {file_path} does not exist.") - upload( - dataset.file_path, - "policyengine/policyengine-us-data", - dataset.file_path.name, - ) - - blob = dataset.file_path.name - blob = bucket.blob(blob) - blob.upload_from_filename(dataset.file_path) - print( - f"Uploaded {dataset.file_path.name} to GCS bucket policyengine-us-data." - ) + upload_data_files( + files=dataset_files, + hf_repo_name="policyengine/policyengine-us-data", + hf_repo_type="model", + gcs_bucket_name="policyengine-us-data", + ) if __name__ == "__main__": diff --git a/policyengine_us_data/utils/data_upload.py b/policyengine_us_data/utils/data_upload.py new file mode 100644 index 00000000..721c4c77 --- /dev/null +++ b/policyengine_us_data/utils/data_upload.py @@ -0,0 +1,104 @@ +from typing import List +from huggingface_hub import HfApi, CommitOperationAdd +from huggingface_hub.errors import RevisionNotFoundError +from google.cloud import storage +from pathlib import Path +from importlib import metadata +import google.auth +import logging + + +def upload_data_files( + files: List[str], + gcs_bucket_name: str = "policyengine-us-data", + hf_repo_name: str = "policyengine/policyengine-us-data", + hf_repo_type: str = "model", + version: str = None, +): + if version is None: + version = metadata.version("policyengine-us-data") + + upload_files_to_hf( + files=files, + version=version, + hf_repo_name=hf_repo_name, + hf_repo_type=hf_repo_type, + ) + + upload_files_to_gcs( + files=files, + version=version, + gcs_bucket_name=gcs_bucket_name, + ) + + +def upload_files_to_hf( + files: List[str], + version: str, + hf_repo_name: str = "policyengine/policyengine-us-data", + hf_repo_type: str = "model", +): + """ + Upload files to Hugging Face repository and tag the commit with the version. + """ + api = HfApi() + hf_operations = [] + + for file_path in files: + file_path = Path(file_path) + if not file_path.exists(): + raise ValueError(f"File {file_path} does not exist.") + hf_operations.append( + CommitOperationAdd( + path_in_repo=file_path.name, + path_or_fileobj=str(file_path), + ) + ) + commit_info = api.create_commit( + repo_id=hf_repo_name, + operations=hf_operations, + repo_type=hf_repo_type, + commit_message=f"Upload data files for version {version}", + ) + logging.info(f"Uploaded files to Hugging Face repository {hf_repo_name}.") + + # Tag commit with version + api.create_tag( + repo_id=hf_repo_name, + tag=version, + revision=commit_info.oid, + repo_type=hf_repo_type, + ) + logging.info( + f"Tagged commit with {version} in Hugging Face repository {hf_repo_name}." + ) + + +def upload_files_to_gcs( + files: List[str], + version: str, + gcs_bucket_name: str = "policyengine-us-data", +): + """ + Upload files to Google Cloud Storage and set metadata with the version. + """ + credentials, project_id = google.auth.default() + storage_client = storage.Client( + credentials=credentials, project=project_id + ) + bucket = storage_client.bucket(gcs_bucket_name) + + for file_path in files: + file_path = Path(file_path) + blob = bucket.blob(file_path.name) + blob.upload_from_filename(file_path) + logging.info( + f"Uploaded {file_path.name} to GCS bucket {gcs_bucket_name}." + ) + + # Set metadata + blob.metadata = {"version": version} + blob.patch() + logging.info( + f"Set metadata for {file_path.name} in GCS bucket {gcs_bucket_name}." + )